blob: ead51d78449b1804af11394696071466e9b46a4d [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Kevin Clarke0fddde2008-06-18 01:16:02 +000020require File.dirname(__FILE__) + '/spec_helper'
Kevin Clarke0fddde2008-06-18 01:16:02 +000021
22class ThriftNonblockingServerSpec < Spec::ExampleGroup
23 include Thrift
24 include SpecNamespace
25
26 class Handler
27 def initialize
28 @queue = Queue.new
29 end
30
31 attr_accessor :server
32
33 def greeting(english)
34 if english
35 SpecNamespace::Hello.new
36 else
37 SpecNamespace::Hello.new(:greeting => "Aloha!")
38 end
39 end
40
41 def block
42 @queue.pop
43 end
44
Kevin Clark980e4452008-06-18 01:19:59 +000045 def unblock(n)
46 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000047 end
48
49 def sleep(time)
50 Kernel.sleep time
51 end
52
53 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000054 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000055 end
56 end
57
Bryan Duxburyd1d15422009-04-04 00:58:03 +000058 class SpecTransport < BaseTransport
Kevin Clarke45bf592008-06-18 01:17:44 +000059 def initialize(transport, queue)
60 @transport = transport
61 @queue = queue
62 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000063 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000064
Kevin Clarke45bf592008-06-18 01:17:44 +000065 def open?
66 @transport.open?
67 end
68
69 def open
70 @transport.open
71 end
72
73 def close
74 @transport.close
75 end
76
77 def read(sz)
78 @transport.read(sz)
79 end
80
81 def write(buf,sz=nil)
82 @transport.write(buf, sz)
83 end
84
85 def flush
86 @queue.push :flushed unless @flushed or @queue.nil?
87 @flushed = true
88 @transport.flush
89 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000090 end
91
Kevin Clark980e4452008-06-18 01:19:59 +000092 class SpecServerSocket < ServerSocket
93 def initialize(host, port, queue)
94 super(host, port)
95 @queue = queue
96 end
97
98 def listen
99 super
100 @queue.push :listen
101 end
102 end
103
Kevin Clarke45bf592008-06-18 01:17:44 +0000104 describe Thrift::NonblockingServer do
105 before(:each) do
106 @port = 43251
107 handler = Handler.new
108 processor = NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +0000109 queue = Queue.new
110 @transport = SpecServerSocket.new('localhost', @port, queue)
Bryan Duxburyd1d15422009-04-04 00:58:03 +0000111 transport_factory = FramedTransportFactory.new
Kevin Clarke45bf592008-06-18 01:17:44 +0000112 logger = Logger.new(STDERR)
113 logger.level = Logger::WARN
Bryan Duxburyd1d15422009-04-04 00:58:03 +0000114 @server = NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
Kevin Clarke45bf592008-06-18 01:17:44 +0000115 handler.server = @server
116 @server_thread = Thread.new(Thread.current) do |master_thread|
117 begin
118 @server.serve
119 rescue => e
120 p e
121 puts e.backtrace * "\n"
122 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000123 end
124 end
Kevin Clark980e4452008-06-18 01:19:59 +0000125 queue.pop
Kevin Clarke45bf592008-06-18 01:17:44 +0000126
127 @clients = []
128 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000129 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000130
Kevin Clarke45bf592008-06-18 01:17:44 +0000131 after(:each) do
132 @clients.each { |client, trans| trans.close }
133 # @server.shutdown(1)
134 @server_thread.kill
135 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000136 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000137
Kevin Clarke45bf592008-06-18 01:17:44 +0000138 def setup_client(queue = nil)
139 transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
140 protocol = BinaryProtocol.new(transport)
141 client = NonblockingService::Client.new(protocol)
142 transport.open
143 @clients << [client, transport]
144 client
145 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000146
Kevin Clarke45bf592008-06-18 01:17:44 +0000147 def setup_client_thread(result)
148 queue = Queue.new
149 Thread.new do
150 begin
151 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000152 while (cmd = queue.pop)
153 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000154 case msg
155 when :block
156 result << client.block
157 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000158 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000159 when :hello
160 result << client.greeting(true) # ignore result
161 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000162 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000163 result << :slept
164 when :shutdown
165 client.shutdown
166 when :exit
167 result << :done
168 break
169 end
170 end
171 @clients.each { |c,t| t.close and break if c == client } #close the transport
172 rescue => e
173 raise e unless @catch_exceptions
174 end
175 end
176 queue
177 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000178
Kevin Clarke45bf592008-06-18 01:17:44 +0000179 it "should handle basic message passing" do
180 client = setup_client
181 client.greeting(true).should == Hello.new
182 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000183 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000184 end
Kevin Clark15350782008-06-18 01:16:27 +0000185
Kevin Clarke45bf592008-06-18 01:17:44 +0000186 it "should handle concurrent clients" do
187 queue = Queue.new
188 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000189 4.times do
190 Thread.new(Thread.current) do |main_thread|
191 begin
192 queue.push setup_client(trans_queue).block
193 rescue => e
194 main_thread.raise e
195 end
196 end
197 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000198 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000199 setup_client.unblock(4)
Kevin Clarke45bf592008-06-18 01:17:44 +0000200 4.times { queue.pop.should be_true }
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000201 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000202 end
203
204 it "should handle messages from more than 5 long-lived connections" do
205 queues = []
206 result = Queue.new
207 7.times do |i|
208 queues << setup_client_thread(result)
209 Thread.pass if i == 4 # give the server time to accept connections
210 end
211 client = setup_client
212 # block 4 connections
213 4.times { |i| queues[i] << :block }
214 queues[4] << :hello
215 queues[5] << :hello
216 queues[6] << :hello
217 3.times { result.pop.should == Hello.new }
218 client.greeting(true).should == Hello.new
Kevin Clark980e4452008-06-18 01:19:59 +0000219 queues[5] << [:unblock, 4]
Kevin Clarke45bf592008-06-18 01:17:44 +0000220 4.times { result.pop.should be_true }
221 queues[2] << :hello
222 result.pop.should == Hello.new
223 client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
224 7.times { queues.shift << :exit }
225 client.greeting(true).should == Hello.new
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000226 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000227 end
228
229 it "should shut down when asked" do
230 # connect first to ensure it's running
231 client = setup_client
232 client.greeting(false) # force a message pass
233 @server.shutdown
234 @server_thread.join(2).should be_an_instance_of(Thread)
235 end
236
237 it "should continue processing active messages when shutting down" do
238 result = Queue.new
239 client = setup_client_thread(result)
240 client << :sleep
241 sleep 0.1 # give the server time to start processing the client's message
242 @server.shutdown
243 @server_thread.join(2).should be_an_instance_of(Thread)
244 result.pop.should == :slept
245 end
246
247 it "should kill active messages when they don't expire while shutting down" do
248 result = Queue.new
249 client = setup_client_thread(result)
Kevin Clarkf731b492008-06-18 01:20:25 +0000250 client << [:sleep, 10]
Kevin Clarke45bf592008-06-18 01:17:44 +0000251 sleep 0.1 # start processing the client's message
252 @server.shutdown(1)
253 @catch_exceptions = true
254 @server_thread.join(3).should_not be_nil
Kevin Clarkf731b492008-06-18 01:20:25 +0000255 result.should be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000256 end
257
258 it "should allow shutting down in response to a message" do
259 client = setup_client
260 client.greeting(true).should == Hello.new
261 client.shutdown
262 @server_thread.join(2).should_not be_nil
263 end
Kevin Clark15350782008-06-18 01:16:27 +0000264 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000265end