blob: 02666dba7ddbaa311581f15b67a5a8824b7eaf5f [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Kevin Clarke0fddde2008-06-18 01:16:02 +000020require File.dirname(__FILE__) + '/spec_helper'
21require 'thrift/server/nonblockingserver'
Kevin Clark2bd3a302008-06-26 17:49:49 +000022require File.dirname(__FILE__) + '/gen-rb/NonblockingService'
Kevin Clarke0fddde2008-06-18 01:16:02 +000023
24class ThriftNonblockingServerSpec < Spec::ExampleGroup
25 include Thrift
26 include SpecNamespace
27
28 class Handler
29 def initialize
30 @queue = Queue.new
31 end
32
33 attr_accessor :server
34
35 def greeting(english)
36 if english
37 SpecNamespace::Hello.new
38 else
39 SpecNamespace::Hello.new(:greeting => "Aloha!")
40 end
41 end
42
43 def block
44 @queue.pop
45 end
46
Kevin Clark980e4452008-06-18 01:19:59 +000047 def unblock(n)
48 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000049 end
50
51 def sleep(time)
52 Kernel.sleep time
53 end
54
55 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000056 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000057 end
58 end
59
Kevin Clarke45bf592008-06-18 01:17:44 +000060 class SpecTransport < Transport
61 def initialize(transport, queue)
62 @transport = transport
63 @queue = queue
64 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000065 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000066
Kevin Clarke45bf592008-06-18 01:17:44 +000067 def open?
68 @transport.open?
69 end
70
71 def open
72 @transport.open
73 end
74
75 def close
76 @transport.close
77 end
78
79 def read(sz)
80 @transport.read(sz)
81 end
82
83 def write(buf,sz=nil)
84 @transport.write(buf, sz)
85 end
86
87 def flush
88 @queue.push :flushed unless @flushed or @queue.nil?
89 @flushed = true
90 @transport.flush
91 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000092 end
93
Kevin Clark980e4452008-06-18 01:19:59 +000094 class SpecServerSocket < ServerSocket
95 def initialize(host, port, queue)
96 super(host, port)
97 @queue = queue
98 end
99
100 def listen
101 super
102 @queue.push :listen
103 end
104 end
105
Kevin Clarke45bf592008-06-18 01:17:44 +0000106 describe Thrift::NonblockingServer do
107 before(:each) do
108 @port = 43251
109 handler = Handler.new
110 processor = NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +0000111 queue = Queue.new
112 @transport = SpecServerSocket.new('localhost', @port, queue)
Kevin Clarke45bf592008-06-18 01:17:44 +0000113 transportFactory = FramedTransportFactory.new
114 logger = Logger.new(STDERR)
115 logger.level = Logger::WARN
116 @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
117 handler.server = @server
118 @server_thread = Thread.new(Thread.current) do |master_thread|
119 begin
120 @server.serve
121 rescue => e
122 p e
123 puts e.backtrace * "\n"
124 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000125 end
126 end
Kevin Clark980e4452008-06-18 01:19:59 +0000127 queue.pop
Kevin Clarke45bf592008-06-18 01:17:44 +0000128
129 @clients = []
130 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000131 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000132
Kevin Clarke45bf592008-06-18 01:17:44 +0000133 after(:each) do
134 @clients.each { |client, trans| trans.close }
135 # @server.shutdown(1)
136 @server_thread.kill
137 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000138 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000139
Kevin Clarke45bf592008-06-18 01:17:44 +0000140 def setup_client(queue = nil)
141 transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
142 protocol = BinaryProtocol.new(transport)
143 client = NonblockingService::Client.new(protocol)
144 transport.open
145 @clients << [client, transport]
146 client
147 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000148
Kevin Clarke45bf592008-06-18 01:17:44 +0000149 def setup_client_thread(result)
150 queue = Queue.new
151 Thread.new do
152 begin
153 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000154 while (cmd = queue.pop)
155 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000156 case msg
157 when :block
158 result << client.block
159 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000160 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000161 when :hello
162 result << client.greeting(true) # ignore result
163 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000164 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000165 result << :slept
166 when :shutdown
167 client.shutdown
168 when :exit
169 result << :done
170 break
171 end
172 end
173 @clients.each { |c,t| t.close and break if c == client } #close the transport
174 rescue => e
175 raise e unless @catch_exceptions
176 end
177 end
178 queue
179 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000180
Kevin Clarke45bf592008-06-18 01:17:44 +0000181 it "should handle basic message passing" do
182 client = setup_client
183 client.greeting(true).should == Hello.new
184 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
185 end
Kevin Clark15350782008-06-18 01:16:27 +0000186
Kevin Clarke45bf592008-06-18 01:17:44 +0000187 it "should handle concurrent clients" do
188 queue = Queue.new
189 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000190 4.times do
191 Thread.new(Thread.current) do |main_thread|
192 begin
193 queue.push setup_client(trans_queue).block
194 rescue => e
195 main_thread.raise e
196 end
197 end
198 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000199 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000200 setup_client.unblock(4)
Kevin Clarke45bf592008-06-18 01:17:44 +0000201 4.times { queue.pop.should be_true }
202 end
203
204 it "should handle messages from more than 5 long-lived connections" do
205 queues = []
206 result = Queue.new
207 7.times do |i|
208 queues << setup_client_thread(result)
209 Thread.pass if i == 4 # give the server time to accept connections
210 end
211 client = setup_client
212 # block 4 connections
213 4.times { |i| queues[i] << :block }
214 queues[4] << :hello
215 queues[5] << :hello
216 queues[6] << :hello
217 3.times { result.pop.should == Hello.new }
218 client.greeting(true).should == Hello.new
Kevin Clark980e4452008-06-18 01:19:59 +0000219 queues[5] << [:unblock, 4]
Kevin Clarke45bf592008-06-18 01:17:44 +0000220 4.times { result.pop.should be_true }
221 queues[2] << :hello
222 result.pop.should == Hello.new
223 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
224 7.times { queues.shift << :exit }
225 client.greeting(true).should == Hello.new
226 end
227
228 it "should shut down when asked" do
229 # connect first to ensure it's running
230 client = setup_client
231 client.greeting(false) # force a message pass
232 @server.shutdown
233 @server_thread.join(2).should be_an_instance_of(Thread)
234 end
235
236 it "should continue processing active messages when shutting down" do
237 result = Queue.new
238 client = setup_client_thread(result)
239 client << :sleep
240 sleep 0.1 # give the server time to start processing the client's message
241 @server.shutdown
242 @server_thread.join(2).should be_an_instance_of(Thread)
243 result.pop.should == :slept
244 end
245
246 it "should kill active messages when they don't expire while shutting down" do
247 result = Queue.new
248 client = setup_client_thread(result)
Kevin Clarkf731b492008-06-18 01:20:25 +0000249 client << [:sleep, 10]
Kevin Clarke45bf592008-06-18 01:17:44 +0000250 sleep 0.1 # start processing the client's message
251 @server.shutdown(1)
252 @catch_exceptions = true
253 @server_thread.join(3).should_not be_nil
Kevin Clarkf731b492008-06-18 01:20:25 +0000254 result.should be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000255 end
256
257 it "should allow shutting down in response to a message" do
258 client = setup_client
259 client.greeting(true).should == Hello.new
260 client.shutdown
261 @server_thread.join(2).should_not be_nil
262 end
Kevin Clark15350782008-06-18 01:16:27 +0000263 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000264end