blob: 22c9280e26b6086170890c529c6f9da2e3447765 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Kevin Clarke0fddde2008-06-18 01:16:02 +000020require File.dirname(__FILE__) + '/spec_helper'
Kevin Clark2bd3a302008-06-26 17:49:49 +000021require File.dirname(__FILE__) + '/gen-rb/NonblockingService'
Kevin Clarke0fddde2008-06-18 01:16:02 +000022
23class ThriftNonblockingServerSpec < Spec::ExampleGroup
24 include Thrift
25 include SpecNamespace
26
27 class Handler
28 def initialize
29 @queue = Queue.new
30 end
31
32 attr_accessor :server
33
34 def greeting(english)
35 if english
36 SpecNamespace::Hello.new
37 else
38 SpecNamespace::Hello.new(:greeting => "Aloha!")
39 end
40 end
41
42 def block
43 @queue.pop
44 end
45
Kevin Clark980e4452008-06-18 01:19:59 +000046 def unblock(n)
47 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000048 end
49
50 def sleep(time)
51 Kernel.sleep time
52 end
53
54 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000055 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000056 end
57 end
58
Bryan Duxburyd1d15422009-04-04 00:58:03 +000059 class SpecTransport < BaseTransport
Kevin Clarke45bf592008-06-18 01:17:44 +000060 def initialize(transport, queue)
61 @transport = transport
62 @queue = queue
63 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000064 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000065
Kevin Clarke45bf592008-06-18 01:17:44 +000066 def open?
67 @transport.open?
68 end
69
70 def open
71 @transport.open
72 end
73
74 def close
75 @transport.close
76 end
77
78 def read(sz)
79 @transport.read(sz)
80 end
81
82 def write(buf,sz=nil)
83 @transport.write(buf, sz)
84 end
85
86 def flush
87 @queue.push :flushed unless @flushed or @queue.nil?
88 @flushed = true
89 @transport.flush
90 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000091 end
92
Kevin Clark980e4452008-06-18 01:19:59 +000093 class SpecServerSocket < ServerSocket
94 def initialize(host, port, queue)
95 super(host, port)
96 @queue = queue
97 end
98
99 def listen
100 super
101 @queue.push :listen
102 end
103 end
104
Kevin Clarke45bf592008-06-18 01:17:44 +0000105 describe Thrift::NonblockingServer do
106 before(:each) do
107 @port = 43251
108 handler = Handler.new
109 processor = NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +0000110 queue = Queue.new
111 @transport = SpecServerSocket.new('localhost', @port, queue)
Bryan Duxburyd1d15422009-04-04 00:58:03 +0000112 transport_factory = FramedTransportFactory.new
Kevin Clarke45bf592008-06-18 01:17:44 +0000113 logger = Logger.new(STDERR)
114 logger.level = Logger::WARN
Bryan Duxburyd1d15422009-04-04 00:58:03 +0000115 @server = NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
Kevin Clarke45bf592008-06-18 01:17:44 +0000116 handler.server = @server
117 @server_thread = Thread.new(Thread.current) do |master_thread|
118 begin
119 @server.serve
120 rescue => e
121 p e
122 puts e.backtrace * "\n"
123 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000124 end
125 end
Kevin Clark980e4452008-06-18 01:19:59 +0000126 queue.pop
Kevin Clarke45bf592008-06-18 01:17:44 +0000127
128 @clients = []
129 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000130 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000131
Kevin Clarke45bf592008-06-18 01:17:44 +0000132 after(:each) do
133 @clients.each { |client, trans| trans.close }
134 # @server.shutdown(1)
135 @server_thread.kill
136 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000137 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000138
Kevin Clarke45bf592008-06-18 01:17:44 +0000139 def setup_client(queue = nil)
140 transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
141 protocol = BinaryProtocol.new(transport)
142 client = NonblockingService::Client.new(protocol)
143 transport.open
144 @clients << [client, transport]
145 client
146 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000147
Kevin Clarke45bf592008-06-18 01:17:44 +0000148 def setup_client_thread(result)
149 queue = Queue.new
150 Thread.new do
151 begin
152 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000153 while (cmd = queue.pop)
154 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000155 case msg
156 when :block
157 result << client.block
158 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000159 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000160 when :hello
161 result << client.greeting(true) # ignore result
162 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000163 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000164 result << :slept
165 when :shutdown
166 client.shutdown
167 when :exit
168 result << :done
169 break
170 end
171 end
172 @clients.each { |c,t| t.close and break if c == client } #close the transport
173 rescue => e
174 raise e unless @catch_exceptions
175 end
176 end
177 queue
178 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000179
Kevin Clarke45bf592008-06-18 01:17:44 +0000180 it "should handle basic message passing" do
181 client = setup_client
182 client.greeting(true).should == Hello.new
183 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
184 end
Kevin Clark15350782008-06-18 01:16:27 +0000185
Kevin Clarke45bf592008-06-18 01:17:44 +0000186 it "should handle concurrent clients" do
187 queue = Queue.new
188 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000189 4.times do
190 Thread.new(Thread.current) do |main_thread|
191 begin
192 queue.push setup_client(trans_queue).block
193 rescue => e
194 main_thread.raise e
195 end
196 end
197 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000198 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000199 setup_client.unblock(4)
Kevin Clarke45bf592008-06-18 01:17:44 +0000200 4.times { queue.pop.should be_true }
201 end
202
203 it "should handle messages from more than 5 long-lived connections" do
204 queues = []
205 result = Queue.new
206 7.times do |i|
207 queues << setup_client_thread(result)
208 Thread.pass if i == 4 # give the server time to accept connections
209 end
210 client = setup_client
211 # block 4 connections
212 4.times { |i| queues[i] << :block }
213 queues[4] << :hello
214 queues[5] << :hello
215 queues[6] << :hello
216 3.times { result.pop.should == Hello.new }
217 client.greeting(true).should == Hello.new
Kevin Clark980e4452008-06-18 01:19:59 +0000218 queues[5] << [:unblock, 4]
Kevin Clarke45bf592008-06-18 01:17:44 +0000219 4.times { result.pop.should be_true }
220 queues[2] << :hello
221 result.pop.should == Hello.new
222 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
223 7.times { queues.shift << :exit }
224 client.greeting(true).should == Hello.new
225 end
226
227 it "should shut down when asked" do
228 # connect first to ensure it's running
229 client = setup_client
230 client.greeting(false) # force a message pass
231 @server.shutdown
232 @server_thread.join(2).should be_an_instance_of(Thread)
233 end
234
235 it "should continue processing active messages when shutting down" do
236 result = Queue.new
237 client = setup_client_thread(result)
238 client << :sleep
239 sleep 0.1 # give the server time to start processing the client's message
240 @server.shutdown
241 @server_thread.join(2).should be_an_instance_of(Thread)
242 result.pop.should == :slept
243 end
244
245 it "should kill active messages when they don't expire while shutting down" do
246 result = Queue.new
247 client = setup_client_thread(result)
Kevin Clarkf731b492008-06-18 01:20:25 +0000248 client << [:sleep, 10]
Kevin Clarke45bf592008-06-18 01:17:44 +0000249 sleep 0.1 # start processing the client's message
250 @server.shutdown(1)
251 @catch_exceptions = true
252 @server_thread.join(3).should_not be_nil
Kevin Clarkf731b492008-06-18 01:20:25 +0000253 result.should be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000254 end
255
256 it "should allow shutting down in response to a message" do
257 client = setup_client
258 client.greeting(true).should == Hello.new
259 client.shutdown
260 @server_thread.join(2).should_not be_nil
261 end
Kevin Clark15350782008-06-18 01:16:27 +0000262 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000263end