blob: 04c569a7e6a2f9a1914e411236fb661c60fea48b [file] [log] [blame]
Kevin Clarke0fddde2008-06-18 01:16:02 +00001require File.dirname(__FILE__) + '/spec_helper'
2require 'thrift/server/nonblockingserver'
Kevin Clarke0fddde2008-06-18 01:16:02 +00003require 'NonblockingService'
4
5class ThriftNonblockingServerSpec < Spec::ExampleGroup
6 include Thrift
7 include SpecNamespace
8
9 class Handler
10 def initialize
11 @queue = Queue.new
12 end
13
14 attr_accessor :server
15
16 def greeting(english)
17 if english
18 SpecNamespace::Hello.new
19 else
20 SpecNamespace::Hello.new(:greeting => "Aloha!")
21 end
22 end
23
24 def block
25 @queue.pop
26 end
27
28 def unblock
29 @queue.num_waiting.times { @queue.push true }
30 end
31
32 def sleep(time)
33 Kernel.sleep time
34 end
35
36 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000037 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000038 end
39 end
40
Kevin Clarke45bf592008-06-18 01:17:44 +000041 class SpecTransport < Transport
42 def initialize(transport, queue)
43 @transport = transport
44 @queue = queue
45 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000046 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000047
Kevin Clarke45bf592008-06-18 01:17:44 +000048 def open?
49 @transport.open?
50 end
51
52 def open
53 @transport.open
54 end
55
56 def close
57 @transport.close
58 end
59
60 def read(sz)
61 @transport.read(sz)
62 end
63
64 def write(buf,sz=nil)
65 @transport.write(buf, sz)
66 end
67
68 def flush
69 @queue.push :flushed unless @flushed or @queue.nil?
70 @flushed = true
71 @transport.flush
72 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000073 end
74
Kevin Clarke45bf592008-06-18 01:17:44 +000075 describe Thrift::NonblockingServer do
76 before(:each) do
77 @port = 43251
78 handler = Handler.new
79 processor = NonblockingService::Processor.new(handler)
80 @transport = ServerSocket.new('localhost', @port)
81 transportFactory = FramedTransportFactory.new
82 logger = Logger.new(STDERR)
83 logger.level = Logger::WARN
84 @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
85 handler.server = @server
86 @server_thread = Thread.new(Thread.current) do |master_thread|
87 begin
88 @server.serve
89 rescue => e
90 p e
91 puts e.backtrace * "\n"
92 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +000093 end
94 end
Kevin Clarke45bf592008-06-18 01:17:44 +000095 Thread.pass
96
97 @clients = []
98 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000099 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000100
Kevin Clarke45bf592008-06-18 01:17:44 +0000101 after(:each) do
102 @clients.each { |client, trans| trans.close }
103 # @server.shutdown(1)
104 @server_thread.kill
105 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000106 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000107
Kevin Clarke45bf592008-06-18 01:17:44 +0000108 def setup_client(queue = nil)
109 transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
110 protocol = BinaryProtocol.new(transport)
111 client = NonblockingService::Client.new(protocol)
112 transport.open
113 @clients << [client, transport]
114 client
115 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000116
Kevin Clarke45bf592008-06-18 01:17:44 +0000117 def setup_client_thread(result)
118 queue = Queue.new
119 Thread.new do
120 begin
121 client = setup_client
122 while (msg = queue.pop)
123 case msg
124 when :block
125 result << client.block
126 when :unblock
127 client.unblock
128 when :hello
129 result << client.greeting(true) # ignore result
130 when :sleep
131 client.sleep(0.5)
132 result << :slept
133 when :shutdown
134 client.shutdown
135 when :exit
136 result << :done
137 break
138 end
139 end
140 @clients.each { |c,t| t.close and break if c == client } #close the transport
141 rescue => e
142 raise e unless @catch_exceptions
143 end
144 end
145 queue
146 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000147
Kevin Clarke45bf592008-06-18 01:17:44 +0000148 it "should handle basic message passing" do
149 client = setup_client
150 client.greeting(true).should == Hello.new
151 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
152 end
Kevin Clark15350782008-06-18 01:16:27 +0000153
Kevin Clarke45bf592008-06-18 01:17:44 +0000154 it "should handle concurrent clients" do
155 queue = Queue.new
156 trans_queue = Queue.new
157 4.times { Thread.new { queue.push setup_client(trans_queue).block } }
158 4.times { trans_queue.pop }
159 setup_client.unblock
160 4.times { queue.pop.should be_true }
161 end
162
163 it "should handle messages from more than 5 long-lived connections" do
164 queues = []
165 result = Queue.new
166 7.times do |i|
167 queues << setup_client_thread(result)
168 Thread.pass if i == 4 # give the server time to accept connections
169 end
170 client = setup_client
171 # block 4 connections
172 4.times { |i| queues[i] << :block }
173 queues[4] << :hello
174 queues[5] << :hello
175 queues[6] << :hello
176 3.times { result.pop.should == Hello.new }
177 client.greeting(true).should == Hello.new
178 queues[5] << :unblock
179 4.times { result.pop.should be_true }
180 queues[2] << :hello
181 result.pop.should == Hello.new
182 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
183 7.times { queues.shift << :exit }
184 client.greeting(true).should == Hello.new
185 end
186
187 it "should shut down when asked" do
188 # connect first to ensure it's running
189 client = setup_client
190 client.greeting(false) # force a message pass
191 @server.shutdown
192 @server_thread.join(2).should be_an_instance_of(Thread)
193 end
194
195 it "should continue processing active messages when shutting down" do
196 result = Queue.new
197 client = setup_client_thread(result)
198 client << :sleep
199 sleep 0.1 # give the server time to start processing the client's message
200 @server.shutdown
201 @server_thread.join(2).should be_an_instance_of(Thread)
202 result.pop.should == :slept
203 end
204
205 it "should kill active messages when they don't expire while shutting down" do
206 result = Queue.new
207 client = setup_client_thread(result)
208 client << :block
209 sleep 0.1 # start processing the client's message
210 @server.shutdown(1)
211 @catch_exceptions = true
212 @server_thread.join(3).should_not be_nil
213 end
214
215 it "should allow shutting down in response to a message" do
216 client = setup_client
217 client.greeting(true).should == Hello.new
218 client.shutdown
219 @server_thread.join(2).should_not be_nil
220 end
Kevin Clark15350782008-06-18 01:16:27 +0000221 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000222end