blob: bf390bd3833159d0edbce0bb211755880819a6a5 [file] [log] [blame]
Kevin Clarke0fddde2008-06-18 01:16:02 +00001require File.dirname(__FILE__) + '/spec_helper'
2require 'thrift/server/nonblockingserver'
3$:.unshift File.dirname(__FILE__) + '/gen-rb'
4require 'NonblockingService'
5
6class ThriftNonblockingServerSpec < Spec::ExampleGroup
7 include Thrift
8 include SpecNamespace
9
10 class Handler
11 def initialize
12 @queue = Queue.new
13 end
14
15 attr_accessor :server
16
17 def greeting(english)
18 if english
19 SpecNamespace::Hello.new
20 else
21 SpecNamespace::Hello.new(:greeting => "Aloha!")
22 end
23 end
24
25 def block
26 @queue.pop
27 end
28
29 def unblock
30 @queue.num_waiting.times { @queue.push true }
31 end
32
33 def sleep(time)
34 Kernel.sleep time
35 end
36
37 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000038 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000039 end
40 end
41
Kevin Clarke45bf592008-06-18 01:17:44 +000042 class SpecTransport < Transport
43 def initialize(transport, queue)
44 @transport = transport
45 @queue = queue
46 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000047 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000048
Kevin Clarke45bf592008-06-18 01:17:44 +000049 def open?
50 @transport.open?
51 end
52
53 def open
54 @transport.open
55 end
56
57 def close
58 @transport.close
59 end
60
61 def read(sz)
62 @transport.read(sz)
63 end
64
65 def write(buf,sz=nil)
66 @transport.write(buf, sz)
67 end
68
69 def flush
70 @queue.push :flushed unless @flushed or @queue.nil?
71 @flushed = true
72 @transport.flush
73 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000074 end
75
Kevin Clarke45bf592008-06-18 01:17:44 +000076 describe Thrift::NonblockingServer do
77 before(:each) do
78 @port = 43251
79 handler = Handler.new
80 processor = NonblockingService::Processor.new(handler)
81 @transport = ServerSocket.new('localhost', @port)
82 transportFactory = FramedTransportFactory.new
83 logger = Logger.new(STDERR)
84 logger.level = Logger::WARN
85 @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
86 handler.server = @server
87 @server_thread = Thread.new(Thread.current) do |master_thread|
88 begin
89 @server.serve
90 rescue => e
91 p e
92 puts e.backtrace * "\n"
93 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +000094 end
95 end
Kevin Clarke45bf592008-06-18 01:17:44 +000096 Thread.pass
97
98 @clients = []
99 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000100 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000101
Kevin Clarke45bf592008-06-18 01:17:44 +0000102 after(:each) do
103 @clients.each { |client, trans| trans.close }
104 # @server.shutdown(1)
105 @server_thread.kill
106 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000107 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000108
Kevin Clarke45bf592008-06-18 01:17:44 +0000109 def setup_client(queue = nil)
110 transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
111 protocol = BinaryProtocol.new(transport)
112 client = NonblockingService::Client.new(protocol)
113 transport.open
114 @clients << [client, transport]
115 client
116 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000117
Kevin Clarke45bf592008-06-18 01:17:44 +0000118 def setup_client_thread(result)
119 queue = Queue.new
120 Thread.new do
121 begin
122 client = setup_client
123 while (msg = queue.pop)
124 case msg
125 when :block
126 result << client.block
127 when :unblock
128 client.unblock
129 when :hello
130 result << client.greeting(true) # ignore result
131 when :sleep
132 client.sleep(0.5)
133 result << :slept
134 when :shutdown
135 client.shutdown
136 when :exit
137 result << :done
138 break
139 end
140 end
141 @clients.each { |c,t| t.close and break if c == client } #close the transport
142 rescue => e
143 raise e unless @catch_exceptions
144 end
145 end
146 queue
147 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000148
Kevin Clarke45bf592008-06-18 01:17:44 +0000149 it "should handle basic message passing" do
150 client = setup_client
151 client.greeting(true).should == Hello.new
152 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
153 end
Kevin Clark15350782008-06-18 01:16:27 +0000154
Kevin Clarke45bf592008-06-18 01:17:44 +0000155 it "should handle concurrent clients" do
156 queue = Queue.new
157 trans_queue = Queue.new
158 4.times { Thread.new { queue.push setup_client(trans_queue).block } }
159 4.times { trans_queue.pop }
160 setup_client.unblock
161 4.times { queue.pop.should be_true }
162 end
163
164 it "should handle messages from more than 5 long-lived connections" do
165 queues = []
166 result = Queue.new
167 7.times do |i|
168 queues << setup_client_thread(result)
169 Thread.pass if i == 4 # give the server time to accept connections
170 end
171 client = setup_client
172 # block 4 connections
173 4.times { |i| queues[i] << :block }
174 queues[4] << :hello
175 queues[5] << :hello
176 queues[6] << :hello
177 3.times { result.pop.should == Hello.new }
178 client.greeting(true).should == Hello.new
179 queues[5] << :unblock
180 4.times { result.pop.should be_true }
181 queues[2] << :hello
182 result.pop.should == Hello.new
183 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
184 7.times { queues.shift << :exit }
185 client.greeting(true).should == Hello.new
186 end
187
188 it "should shut down when asked" do
189 # connect first to ensure it's running
190 client = setup_client
191 client.greeting(false) # force a message pass
192 @server.shutdown
193 @server_thread.join(2).should be_an_instance_of(Thread)
194 end
195
196 it "should continue processing active messages when shutting down" do
197 result = Queue.new
198 client = setup_client_thread(result)
199 client << :sleep
200 sleep 0.1 # give the server time to start processing the client's message
201 @server.shutdown
202 @server_thread.join(2).should be_an_instance_of(Thread)
203 result.pop.should == :slept
204 end
205
206 it "should kill active messages when they don't expire while shutting down" do
207 result = Queue.new
208 client = setup_client_thread(result)
209 client << :block
210 sleep 0.1 # start processing the client's message
211 @server.shutdown(1)
212 @catch_exceptions = true
213 @server_thread.join(3).should_not be_nil
214 end
215
216 it "should allow shutting down in response to a message" do
217 client = setup_client
218 client.greeting(true).should == Hello.new
219 client.shutdown
220 @server_thread.join(2).should_not be_nil
221 end
Kevin Clark15350782008-06-18 01:16:27 +0000222 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000223end