blob: a0e86cf2ef7afc4996bbcd7c38bc4f7e7368fa51 [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20require File.dirname(__FILE__) + '/spec_helper'
21require File.dirname(__FILE__) + '/gen-rb/nonblocking_service'
22
23class ThriftNonblockingServerSpec < Spec::ExampleGroup
24 include Thrift
25 include SpecNamespace
26
27 class Handler
28 def initialize
29 @queue = Queue.new
30 end
31
32 attr_accessor :server
33
34 def greeting(english)
35 if english
36 SpecNamespace::Hello.new
37 else
38 SpecNamespace::Hello.new(:greeting => "Aloha!")
39 end
40 end
41
42 def block
43 @queue.pop
44 end
45
46 def unblock(n)
47 n.times { @queue.push true }
48 end
49
50 def sleep(time)
51 Kernel.sleep time
52 end
53
54 def shutdown
55 @server.shutdown(0, false)
56 end
57 end
58
59 class SpecTransport < BaseTransport
60 def initialize(transport, queue)
61 @transport = transport
62 @queue = queue
63 @flushed = false
64 end
65
66 def open?
67 @transport.open?
68 end
69
70 def open
71 @transport.open
72 end
73
74 def close
75 @transport.close
76 end
77
78 def read(sz)
79 @transport.read(sz)
80 end
81
82 def write(buf,sz=nil)
83 @transport.write(buf, sz)
84 end
85
86 def flush
87 @queue.push :flushed unless @flushed or @queue.nil?
88 @flushed = true
89 @transport.flush
90 end
91 end
92
93 class SpecServerSocket < ServerSocket
94 def initialize(host, port, queue)
95 super(host, port)
96 @queue = queue
97 end
98
99 def listen
100 super
101 @queue.push :listen
102 end
103 end
104
105 describe Thrift::NonblockingServer do
106 before(:each) do
107 @port = 43251
108 handler = Handler.new
109 processor = NonblockingService::Processor.new(handler)
110 queue = Queue.new
111 @transport = SpecServerSocket.new('localhost', @port, queue)
112 transport_factory = FramedTransportFactory.new
113 logger = Logger.new(STDERR)
114 logger.level = Logger::WARN
115 @server = NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
116 handler.server = @server
117 @server_thread = Thread.new(Thread.current) do |master_thread|
118 begin
119 @server.serve
120 rescue => e
121 p e
122 puts e.backtrace * "\n"
123 master_thread.raise e
124 end
125 end
126 queue.pop
127
128 @clients = []
129 @catch_exceptions = false
130 end
131
132 after(:each) do
133 @clients.each { |client, trans| trans.close }
134 # @server.shutdown(1)
135 @server_thread.kill
136 @transport.close
137 end
138
139 def setup_client(queue = nil)
140 transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
141 protocol = BinaryProtocol.new(transport)
142 client = NonblockingService::Client.new(protocol)
143 transport.open
144 @clients << [client, transport]
145 client
146 end
147
148 def setup_client_thread(result)
149 queue = Queue.new
150 Thread.new do
151 begin
152 client = setup_client
153 while (cmd = queue.pop)
154 msg, *args = cmd
155 case msg
156 when :block
157 result << client.block
158 when :unblock
159 client.unblock(args.first)
160 when :hello
161 result << client.greeting(true) # ignore result
162 when :sleep
163 client.sleep(args[0] || 0.5)
164 result << :slept
165 when :shutdown
166 client.shutdown
167 when :exit
168 result << :done
169 break
170 end
171 end
172 @clients.each { |c,t| t.close and break if c == client } #close the transport
173 rescue => e
174 raise e unless @catch_exceptions
175 end
176 end
177 queue
178 end
179
180 it "should handle basic message passing" do
181 client = setup_client
182 client.greeting(true).should == Hello.new
183 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
184 @server.shutdown
185 end
186
187 it "should handle concurrent clients" do
188 queue = Queue.new
189 trans_queue = Queue.new
190 4.times do
191 Thread.new(Thread.current) do |main_thread|
192 begin
193 queue.push setup_client(trans_queue).block
194 rescue => e
195 main_thread.raise e
196 end
197 end
198 end
199 4.times { trans_queue.pop }
200 setup_client.unblock(4)
201 4.times { queue.pop.should be_true }
202 @server.shutdown
203 end
204
205 it "should handle messages from more than 5 long-lived connections" do
206 queues = []
207 result = Queue.new
208 7.times do |i|
209 queues << setup_client_thread(result)
210 Thread.pass if i == 4 # give the server time to accept connections
211 end
212 client = setup_client
213 # block 4 connections
214 4.times { |i| queues[i] << :block }
215 queues[4] << :hello
216 queues[5] << :hello
217 queues[6] << :hello
218 3.times { result.pop.should == Hello.new }
219 client.greeting(true).should == Hello.new
220 queues[5] << [:unblock, 4]
221 4.times { result.pop.should be_true }
222 queues[2] << :hello
223 result.pop.should == Hello.new
224 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
225 7.times { queues.shift << :exit }
226 client.greeting(true).should == Hello.new
227 @server.shutdown
228 end
229
230 it "should shut down when asked" do
231 # connect first to ensure it's running
232 client = setup_client
233 client.greeting(false) # force a message pass
234 @server.shutdown
235 @server_thread.join(2).should be_an_instance_of(Thread)
236 end
237
238 it "should continue processing active messages when shutting down" do
239 result = Queue.new
240 client = setup_client_thread(result)
241 client << :sleep
242 sleep 0.1 # give the server time to start processing the client's message
243 @server.shutdown
244 @server_thread.join(2).should be_an_instance_of(Thread)
245 result.pop.should == :slept
246 end
247
248 it "should kill active messages when they don't expire while shutting down" do
249 result = Queue.new
250 client = setup_client_thread(result)
251 client << [:sleep, 10]
252 sleep 0.1 # start processing the client's message
253 @server.shutdown(1)
254 @catch_exceptions = true
255 @server_thread.join(3).should_not be_nil
256 result.should be_empty
257 end
258
259 it "should allow shutting down in response to a message" do
260 client = setup_client
261 client.greeting(true).should == Hello.new
262 client.shutdown
263 @server_thread.join(2).should_not be_nil
264 end
265 end
266end