blob: ae7fe2ca3e4daa13d3f21eecb41fe220a83deb78 [file] [log] [blame]
Kevin Clarke0fddde2008-06-18 01:16:02 +00001require File.dirname(__FILE__) + '/spec_helper'
2require 'thrift/server/nonblockingserver'
Kevin Clarke0fddde2008-06-18 01:16:02 +00003require 'NonblockingService'
4
5class ThriftNonblockingServerSpec < Spec::ExampleGroup
6 include Thrift
7 include SpecNamespace
8
9 class Handler
10 def initialize
11 @queue = Queue.new
12 end
13
14 attr_accessor :server
15
16 def greeting(english)
17 if english
18 SpecNamespace::Hello.new
19 else
20 SpecNamespace::Hello.new(:greeting => "Aloha!")
21 end
22 end
23
24 def block
25 @queue.pop
26 end
27
Kevin Clark980e4452008-06-18 01:19:59 +000028 def unblock(n)
29 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000030 end
31
32 def sleep(time)
33 Kernel.sleep time
34 end
35
36 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000037 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000038 end
39 end
40
Kevin Clarke45bf592008-06-18 01:17:44 +000041 class SpecTransport < Transport
42 def initialize(transport, queue)
43 @transport = transport
44 @queue = queue
45 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000046 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000047
Kevin Clarke45bf592008-06-18 01:17:44 +000048 def open?
49 @transport.open?
50 end
51
52 def open
53 @transport.open
54 end
55
56 def close
57 @transport.close
58 end
59
60 def read(sz)
61 @transport.read(sz)
62 end
63
64 def write(buf,sz=nil)
65 @transport.write(buf, sz)
66 end
67
68 def flush
69 @queue.push :flushed unless @flushed or @queue.nil?
70 @flushed = true
71 @transport.flush
72 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000073 end
74
Kevin Clark980e4452008-06-18 01:19:59 +000075 class SpecServerSocket < ServerSocket
76 def initialize(host, port, queue)
77 super(host, port)
78 @queue = queue
79 end
80
81 def listen
82 super
83 @queue.push :listen
84 end
85 end
86
Kevin Clarke45bf592008-06-18 01:17:44 +000087 describe Thrift::NonblockingServer do
88 before(:each) do
89 @port = 43251
90 handler = Handler.new
91 processor = NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +000092 queue = Queue.new
93 @transport = SpecServerSocket.new('localhost', @port, queue)
Kevin Clarke45bf592008-06-18 01:17:44 +000094 transportFactory = FramedTransportFactory.new
95 logger = Logger.new(STDERR)
96 logger.level = Logger::WARN
97 @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
98 handler.server = @server
99 @server_thread = Thread.new(Thread.current) do |master_thread|
100 begin
101 @server.serve
102 rescue => e
103 p e
104 puts e.backtrace * "\n"
105 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000106 end
107 end
Kevin Clark980e4452008-06-18 01:19:59 +0000108 queue.pop
Kevin Clarke45bf592008-06-18 01:17:44 +0000109
110 @clients = []
111 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000112 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000113
Kevin Clarke45bf592008-06-18 01:17:44 +0000114 after(:each) do
115 @clients.each { |client, trans| trans.close }
116 # @server.shutdown(1)
117 @server_thread.kill
118 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000119 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000120
Kevin Clarke45bf592008-06-18 01:17:44 +0000121 def setup_client(queue = nil)
122 transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
123 protocol = BinaryProtocol.new(transport)
124 client = NonblockingService::Client.new(protocol)
125 transport.open
126 @clients << [client, transport]
127 client
128 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000129
Kevin Clarke45bf592008-06-18 01:17:44 +0000130 def setup_client_thread(result)
131 queue = Queue.new
132 Thread.new do
133 begin
134 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000135 while (cmd = queue.pop)
136 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000137 case msg
138 when :block
139 result << client.block
140 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000141 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000142 when :hello
143 result << client.greeting(true) # ignore result
144 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000145 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000146 result << :slept
147 when :shutdown
148 client.shutdown
149 when :exit
150 result << :done
151 break
152 end
153 end
154 @clients.each { |c,t| t.close and break if c == client } #close the transport
155 rescue => e
156 raise e unless @catch_exceptions
157 end
158 end
159 queue
160 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000161
Kevin Clarke45bf592008-06-18 01:17:44 +0000162 it "should handle basic message passing" do
163 client = setup_client
164 client.greeting(true).should == Hello.new
165 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
166 end
Kevin Clark15350782008-06-18 01:16:27 +0000167
Kevin Clarke45bf592008-06-18 01:17:44 +0000168 it "should handle concurrent clients" do
169 queue = Queue.new
170 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000171 4.times do
172 Thread.new(Thread.current) do |main_thread|
173 begin
174 queue.push setup_client(trans_queue).block
175 rescue => e
176 main_thread.raise e
177 end
178 end
179 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000180 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000181 setup_client.unblock(4)
Kevin Clarke45bf592008-06-18 01:17:44 +0000182 4.times { queue.pop.should be_true }
183 end
184
185 it "should handle messages from more than 5 long-lived connections" do
186 queues = []
187 result = Queue.new
188 7.times do |i|
189 queues << setup_client_thread(result)
190 Thread.pass if i == 4 # give the server time to accept connections
191 end
192 client = setup_client
193 # block 4 connections
194 4.times { |i| queues[i] << :block }
195 queues[4] << :hello
196 queues[5] << :hello
197 queues[6] << :hello
198 3.times { result.pop.should == Hello.new }
199 client.greeting(true).should == Hello.new
Kevin Clark980e4452008-06-18 01:19:59 +0000200 queues[5] << [:unblock, 4]
Kevin Clarke45bf592008-06-18 01:17:44 +0000201 4.times { result.pop.should be_true }
202 queues[2] << :hello
203 result.pop.should == Hello.new
204 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
205 7.times { queues.shift << :exit }
206 client.greeting(true).should == Hello.new
207 end
208
209 it "should shut down when asked" do
210 # connect first to ensure it's running
211 client = setup_client
212 client.greeting(false) # force a message pass
213 @server.shutdown
214 @server_thread.join(2).should be_an_instance_of(Thread)
215 end
216
217 it "should continue processing active messages when shutting down" do
218 result = Queue.new
219 client = setup_client_thread(result)
220 client << :sleep
221 sleep 0.1 # give the server time to start processing the client's message
222 @server.shutdown
223 @server_thread.join(2).should be_an_instance_of(Thread)
224 result.pop.should == :slept
225 end
226
227 it "should kill active messages when they don't expire while shutting down" do
228 result = Queue.new
229 client = setup_client_thread(result)
Kevin Clarkf731b492008-06-18 01:20:25 +0000230 client << [:sleep, 10]
Kevin Clarke45bf592008-06-18 01:17:44 +0000231 sleep 0.1 # start processing the client's message
232 @server.shutdown(1)
233 @catch_exceptions = true
234 @server_thread.join(3).should_not be_nil
Kevin Clarkf731b492008-06-18 01:20:25 +0000235 result.should be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000236 end
237
238 it "should allow shutting down in response to a message" do
239 client = setup_client
240 client.greeting(true).should == Hello.new
241 client.shutdown
242 @server_thread.join(2).should_not be_nil
243 end
Kevin Clark15350782008-06-18 01:16:27 +0000244 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000245end