blob: 712cf45c2c735ccc9cb1ea419c827b1bf5dfb612 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Jake Farrella87810f2012-09-28 01:59:04 +000020require 'spec_helper'
Kevin Clarke0fddde2008-06-18 01:16:02 +000021
Jake Farrella87810f2012-09-28 01:59:04 +000022describe 'NonblockingServer' do
Kevin Clarke0fddde2008-06-18 01:16:02 +000023
24 class Handler
25 def initialize
26 @queue = Queue.new
27 end
28
29 attr_accessor :server
30
31 def greeting(english)
32 if english
33 SpecNamespace::Hello.new
34 else
35 SpecNamespace::Hello.new(:greeting => "Aloha!")
36 end
37 end
38
39 def block
40 @queue.pop
41 end
42
Kevin Clark980e4452008-06-18 01:19:59 +000043 def unblock(n)
44 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000045 end
46
47 def sleep(time)
48 Kernel.sleep time
49 end
50
51 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000052 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000053 end
54 end
55
Jake Farrella87810f2012-09-28 01:59:04 +000056 class SpecTransport < Thrift::BaseTransport
Kevin Clarke45bf592008-06-18 01:17:44 +000057 def initialize(transport, queue)
58 @transport = transport
59 @queue = queue
60 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000061 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000062
Kevin Clarke45bf592008-06-18 01:17:44 +000063 def open?
64 @transport.open?
65 end
66
67 def open
68 @transport.open
69 end
70
71 def close
72 @transport.close
73 end
74
75 def read(sz)
76 @transport.read(sz)
77 end
78
79 def write(buf,sz=nil)
80 @transport.write(buf, sz)
81 end
82
83 def flush
84 @queue.push :flushed unless @flushed or @queue.nil?
85 @flushed = true
86 @transport.flush
87 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000088 end
89
Jake Farrella87810f2012-09-28 01:59:04 +000090 class SpecServerSocket < Thrift::ServerSocket
Kevin Clark980e4452008-06-18 01:19:59 +000091 def initialize(host, port, queue)
92 super(host, port)
93 @queue = queue
94 end
95
96 def listen
97 super
98 @queue.push :listen
99 end
100 end
101
Kevin Clarke45bf592008-06-18 01:17:44 +0000102 describe Thrift::NonblockingServer do
103 before(:each) do
104 @port = 43251
105 handler = Handler.new
Jake Farrella87810f2012-09-28 01:59:04 +0000106 processor = SpecNamespace::NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +0000107 queue = Queue.new
108 @transport = SpecServerSocket.new('localhost', @port, queue)
Jake Farrella87810f2012-09-28 01:59:04 +0000109 transport_factory = Thrift::FramedTransportFactory.new
Kevin Clarke45bf592008-06-18 01:17:44 +0000110 logger = Logger.new(STDERR)
111 logger.level = Logger::WARN
Jake Farrella87810f2012-09-28 01:59:04 +0000112 @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
Kevin Clarke45bf592008-06-18 01:17:44 +0000113 handler.server = @server
114 @server_thread = Thread.new(Thread.current) do |master_thread|
115 begin
116 @server.serve
117 rescue => e
118 p e
119 puts e.backtrace * "\n"
120 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000121 end
122 end
Kevin Clark980e4452008-06-18 01:19:59 +0000123 queue.pop
Kevin Clarke45bf592008-06-18 01:17:44 +0000124
125 @clients = []
126 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000127 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000128
Kevin Clarke45bf592008-06-18 01:17:44 +0000129 after(:each) do
130 @clients.each { |client, trans| trans.close }
131 # @server.shutdown(1)
132 @server_thread.kill
133 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000134 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000135
Kevin Clarke45bf592008-06-18 01:17:44 +0000136 def setup_client(queue = nil)
Jake Farrella87810f2012-09-28 01:59:04 +0000137 transport = SpecTransport.new(Thrift::FramedTransport.new(Thrift::Socket.new('localhost', @port)), queue)
138 protocol = Thrift::BinaryProtocol.new(transport)
139 client = SpecNamespace::NonblockingService::Client.new(protocol)
Kevin Clarke45bf592008-06-18 01:17:44 +0000140 transport.open
141 @clients << [client, transport]
142 client
143 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000144
Kevin Clarke45bf592008-06-18 01:17:44 +0000145 def setup_client_thread(result)
146 queue = Queue.new
147 Thread.new do
148 begin
149 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000150 while (cmd = queue.pop)
151 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000152 case msg
153 when :block
154 result << client.block
155 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000156 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000157 when :hello
158 result << client.greeting(true) # ignore result
159 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000160 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000161 result << :slept
162 when :shutdown
163 client.shutdown
164 when :exit
165 result << :done
166 break
167 end
168 end
169 @clients.each { |c,t| t.close and break if c == client } #close the transport
170 rescue => e
171 raise e unless @catch_exceptions
172 end
173 end
174 queue
175 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000176
Kevin Clarke45bf592008-06-18 01:17:44 +0000177 it "should handle basic message passing" do
178 client = setup_client
Jake Farrella87810f2012-09-28 01:59:04 +0000179 client.greeting(true).should == SpecNamespace::Hello.new
180 client.greeting(false).should == SpecNamespace::Hello.new(:greeting => 'Aloha!')
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000181 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000182 end
Kevin Clark15350782008-06-18 01:16:27 +0000183
Kevin Clarke45bf592008-06-18 01:17:44 +0000184 it "should handle concurrent clients" do
185 queue = Queue.new
186 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000187 4.times do
188 Thread.new(Thread.current) do |main_thread|
189 begin
190 queue.push setup_client(trans_queue).block
191 rescue => e
192 main_thread.raise e
193 end
194 end
195 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000196 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000197 setup_client.unblock(4)
Kevin Clarke45bf592008-06-18 01:17:44 +0000198 4.times { queue.pop.should be_true }
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000199 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000200 end
201
202 it "should handle messages from more than 5 long-lived connections" do
203 queues = []
204 result = Queue.new
205 7.times do |i|
206 queues << setup_client_thread(result)
207 Thread.pass if i == 4 # give the server time to accept connections
208 end
209 client = setup_client
210 # block 4 connections
211 4.times { |i| queues[i] << :block }
212 queues[4] << :hello
213 queues[5] << :hello
214 queues[6] << :hello
Jake Farrella87810f2012-09-28 01:59:04 +0000215 3.times { result.pop.should == SpecNamespace::Hello.new }
216 client.greeting(true).should == SpecNamespace::Hello.new
Kevin Clark980e4452008-06-18 01:19:59 +0000217 queues[5] << [:unblock, 4]
Kevin Clarke45bf592008-06-18 01:17:44 +0000218 4.times { result.pop.should be_true }
219 queues[2] << :hello
Jake Farrella87810f2012-09-28 01:59:04 +0000220 result.pop.should == SpecNamespace::Hello.new
221 client.greeting(false).should == SpecNamespace::Hello.new(:greeting => 'Aloha!')
Kevin Clarke45bf592008-06-18 01:17:44 +0000222 7.times { queues.shift << :exit }
Jake Farrella87810f2012-09-28 01:59:04 +0000223 client.greeting(true).should == SpecNamespace::Hello.new
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000224 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000225 end
226
227 it "should shut down when asked" do
228 # connect first to ensure it's running
229 client = setup_client
230 client.greeting(false) # force a message pass
231 @server.shutdown
232 @server_thread.join(2).should be_an_instance_of(Thread)
233 end
234
235 it "should continue processing active messages when shutting down" do
236 result = Queue.new
237 client = setup_client_thread(result)
238 client << :sleep
239 sleep 0.1 # give the server time to start processing the client's message
240 @server.shutdown
241 @server_thread.join(2).should be_an_instance_of(Thread)
242 result.pop.should == :slept
243 end
244
245 it "should kill active messages when they don't expire while shutting down" do
246 result = Queue.new
247 client = setup_client_thread(result)
Kevin Clarkf731b492008-06-18 01:20:25 +0000248 client << [:sleep, 10]
Kevin Clarke45bf592008-06-18 01:17:44 +0000249 sleep 0.1 # start processing the client's message
250 @server.shutdown(1)
251 @catch_exceptions = true
252 @server_thread.join(3).should_not be_nil
Kevin Clarkf731b492008-06-18 01:20:25 +0000253 result.should be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000254 end
255
256 it "should allow shutting down in response to a message" do
257 client = setup_client
Jake Farrella87810f2012-09-28 01:59:04 +0000258 client.greeting(true).should == SpecNamespace::Hello.new
Kevin Clarke45bf592008-06-18 01:17:44 +0000259 client.shutdown
260 @server_thread.join(2).should_not be_nil
261 end
Kevin Clark15350782008-06-18 01:16:27 +0000262 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000263end