blob: 572e0b5b018bf12e5730e73897cb11c7d1326b2b [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Jake Farrella87810f2012-09-28 01:59:04 +000020require 'spec_helper'
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -050021require 'timeout'
Kevin Clarke0fddde2008-06-18 01:16:02 +000022
Jake Farrella87810f2012-09-28 01:59:04 +000023describe 'NonblockingServer' do
Kevin Clarke0fddde2008-06-18 01:16:02 +000024 class Handler
25 def initialize
26 @queue = Queue.new
27 end
28
29 attr_accessor :server
30
31 def greeting(english)
32 if english
33 SpecNamespace::Hello.new
34 else
35 SpecNamespace::Hello.new(:greeting => "Aloha!")
36 end
37 end
38
39 def block
40 @queue.pop
41 end
42
Kevin Clark980e4452008-06-18 01:19:59 +000043 def unblock(n)
44 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000045 end
46
47 def sleep(time)
48 Kernel.sleep time
49 end
50
51 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000052 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000053 end
54 end
55
Jake Farrella87810f2012-09-28 01:59:04 +000056 class SpecTransport < Thrift::BaseTransport
Kevin Clarke45bf592008-06-18 01:17:44 +000057 def initialize(transport, queue)
58 @transport = transport
59 @queue = queue
60 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000061 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000062
Kevin Clarke45bf592008-06-18 01:17:44 +000063 def open?
64 @transport.open?
65 end
66
67 def open
68 @transport.open
69 end
70
71 def close
72 @transport.close
73 end
74
75 def read(sz)
76 @transport.read(sz)
77 end
78
Dmytro Shteflyuk3b0ab4d2026-03-11 17:46:48 -040079 def write(buf, sz = nil)
Kevin Clarke45bf592008-06-18 01:17:44 +000080 @transport.write(buf, sz)
81 end
82
83 def flush
84 @queue.push :flushed unless @flushed or @queue.nil?
85 @flushed = true
86 @transport.flush
87 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000088 end
89
Jake Farrella87810f2012-09-28 01:59:04 +000090 class SpecServerSocket < Thrift::ServerSocket
Kevin Clark980e4452008-06-18 01:19:59 +000091 def initialize(host, port, queue)
92 super(host, port)
93 @queue = queue
94 end
95
96 def listen
97 super
98 @queue.push :listen
99 end
100 end
101
Kevin Clarke45bf592008-06-18 01:17:44 +0000102 describe Thrift::NonblockingServer do
103 before(:each) do
104 @port = 43251
105 handler = Handler.new
Jake Farrella87810f2012-09-28 01:59:04 +0000106 processor = SpecNamespace::NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +0000107 queue = Queue.new
108 @transport = SpecServerSocket.new('localhost', @port, queue)
Jake Farrella87810f2012-09-28 01:59:04 +0000109 transport_factory = Thrift::FramedTransportFactory.new
Kevin Clarke45bf592008-06-18 01:17:44 +0000110 logger = Logger.new(STDERR)
111 logger.level = Logger::WARN
Jake Farrella87810f2012-09-28 01:59:04 +0000112 @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
Kevin Clarke45bf592008-06-18 01:17:44 +0000113 handler.server = @server
114 @server_thread = Thread.new(Thread.current) do |master_thread|
115 begin
116 @server.serve
117 rescue => e
118 p e
119 puts e.backtrace * "\n"
120 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000121 end
122 end
Kevin Clark980e4452008-06-18 01:19:59 +0000123 queue.pop
Kevin Clarke45bf592008-06-18 01:17:44 +0000124
125 @clients = []
126 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000127 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000128
Kevin Clarke45bf592008-06-18 01:17:44 +0000129 after(:each) do
130 @clients.each { |client, trans| trans.close }
131 # @server.shutdown(1)
132 @server_thread.kill
133 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000134 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000135
Kevin Clarke45bf592008-06-18 01:17:44 +0000136 def setup_client(queue = nil)
Jake Farrella87810f2012-09-28 01:59:04 +0000137 transport = SpecTransport.new(Thrift::FramedTransport.new(Thrift::Socket.new('localhost', @port)), queue)
138 protocol = Thrift::BinaryProtocol.new(transport)
139 client = SpecNamespace::NonblockingService::Client.new(protocol)
Kevin Clarke45bf592008-06-18 01:17:44 +0000140 transport.open
141 @clients << [client, transport]
142 client
143 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000144
Kevin Clarke45bf592008-06-18 01:17:44 +0000145 def setup_client_thread(result)
146 queue = Queue.new
147 Thread.new do
148 begin
149 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000150 while (cmd = queue.pop)
151 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000152 case msg
153 when :block
154 result << client.block
155 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000156 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000157 when :hello
158 result << client.greeting(true) # ignore result
159 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000160 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000161 result << :slept
162 when :shutdown
163 client.shutdown
164 when :exit
165 result << :done
166 break
167 end
168 end
Dmytro Shteflyuk3b0ab4d2026-03-11 17:46:48 -0400169 @clients.each { |c, t| t.close and break if c == client } # close the transport
Kevin Clarke45bf592008-06-18 01:17:44 +0000170 rescue => e
171 raise e unless @catch_exceptions
172 end
173 end
174 queue
175 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000176
Kevin Clarke45bf592008-06-18 01:17:44 +0000177 it "should handle basic message passing" do
178 client = setup_client
James E. King III27247072018-03-22 20:50:23 -0400179 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
180 expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000181 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000182 end
Kevin Clark15350782008-06-18 01:16:27 +0000183
Kevin Clarke45bf592008-06-18 01:17:44 +0000184 it "should handle concurrent clients" do
185 queue = Queue.new
186 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000187 4.times do
188 Thread.new(Thread.current) do |main_thread|
189 begin
190 queue.push setup_client(trans_queue).block
191 rescue => e
192 main_thread.raise e
193 end
194 end
195 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000196 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000197 setup_client.unblock(4)
James E. King III27247072018-03-22 20:50:23 -0400198 4.times { expect(queue.pop).to be_truthy }
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000199 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000200 end
201
202 it "should handle messages from more than 5 long-lived connections" do
203 queues = []
204 result = Queue.new
205 7.times do |i|
206 queues << setup_client_thread(result)
207 Thread.pass if i == 4 # give the server time to accept connections
208 end
209 client = setup_client
210 # block 4 connections
211 4.times { |i| queues[i] << :block }
212 queues[4] << :hello
213 queues[5] << :hello
214 queues[6] << :hello
James E. King III27247072018-03-22 20:50:23 -0400215 3.times { expect(result.pop).to eq(SpecNamespace::Hello.new) }
216 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Kevin Clark980e4452008-06-18 01:19:59 +0000217 queues[5] << [:unblock, 4]
James E. King III27247072018-03-22 20:50:23 -0400218 4.times { expect(result.pop).to be_truthy }
Kevin Clarke45bf592008-06-18 01:17:44 +0000219 queues[2] << :hello
James E. King III27247072018-03-22 20:50:23 -0400220 expect(result.pop).to eq(SpecNamespace::Hello.new)
221 expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
Kevin Clarke45bf592008-06-18 01:17:44 +0000222 7.times { queues.shift << :exit }
James E. King III27247072018-03-22 20:50:23 -0400223 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000224 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000225 end
226
227 it "should shut down when asked" do
228 # connect first to ensure it's running
229 client = setup_client
230 client.greeting(false) # force a message pass
231 @server.shutdown
James E. King III27247072018-03-22 20:50:23 -0400232 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
Kevin Clarke45bf592008-06-18 01:17:44 +0000233 end
234
235 it "should continue processing active messages when shutting down" do
236 result = Queue.new
237 client = setup_client_thread(result)
238 client << :sleep
239 sleep 0.1 # give the server time to start processing the client's message
240 @server.shutdown
James E. King III27247072018-03-22 20:50:23 -0400241 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
242 expect(result.pop).to eq(:slept)
Kevin Clarke45bf592008-06-18 01:17:44 +0000243 end
244
245 it "should kill active messages when they don't expire while shutting down" do
246 result = Queue.new
247 client = setup_client_thread(result)
Dmytro Shteflyukeb6eb852025-11-22 20:03:19 -0500248 client << [:sleep, 10.0]
Kevin Clarke45bf592008-06-18 01:17:44 +0000249 sleep 0.1 # start processing the client's message
250 @server.shutdown(1)
251 @catch_exceptions = true
James E. King III27247072018-03-22 20:50:23 -0400252 expect(@server_thread.join(3)).not_to be_nil
253 expect(result).to be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000254 end
255
256 it "should allow shutting down in response to a message" do
257 client = setup_client
James E. King III27247072018-03-22 20:50:23 -0400258 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Kevin Clarke45bf592008-06-18 01:17:44 +0000259 client.shutdown
James E. King III27247072018-03-22 20:50:23 -0400260 expect(@server_thread.join(2)).not_to be_nil
Kevin Clarke45bf592008-06-18 01:17:44 +0000261 end
Kevin Clark15350782008-06-18 01:16:27 +0000262 end
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -0500263
264 describe "#{Thrift::NonblockingServer} with TLS transport" do
265 before(:each) do
266 @port = available_port
267 handler = Handler.new
268 processor = SpecNamespace::NonblockingService::Processor.new(handler)
269 @transport = Thrift::SSLServerSocket.new('localhost', @port, create_server_ssl_context)
270 transport_factory = Thrift::FramedTransportFactory.new
271 logger = Logger.new(STDERR)
272 logger.level = Logger::WARN
273 @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
274 handler.server = @server
275
276 @server_thread = Thread.new(Thread.current) do |master_thread|
277 begin
278 @server.serve
279 rescue => e
280 master_thread.raise e
281 end
282 end
283
284 @clients = []
285 wait_until_listening
286 end
287
288 after(:each) do
289 @clients.each(&:close)
290 @server.shutdown if @server
291 @server_thread.join(2) if @server_thread
292 @transport.close if @transport
293 end
294
295 it "should handle requests over TLS" do
296 expect(@server_thread).to be_alive
297
298 client = setup_tls_client
299 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
300
301 @server.shutdown
302 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
303 end
304
305 def setup_tls_client
306 transport = Thrift::FramedTransport.new(
307 Thrift::SSLSocket.new('localhost', @port, nil, create_client_ssl_context)
308 )
309 protocol = Thrift::BinaryProtocol.new(transport)
310 client = SpecNamespace::NonblockingService::Client.new(protocol)
311 transport.open
312 @clients << transport
313 client
314 end
315
316 def wait_until_listening
317 Timeout.timeout(2) do
318 until @transport.handle
319 raise "Server thread exited unexpectedly" unless @server_thread.alive?
320 sleep 0.01
321 end
322 end
323 end
324
325 def available_port
326 TCPServer.open('localhost', 0) { |server| server.addr[1] }
327 end
328
329 def ssl_keys_dir
330 File.expand_path('../../../test/keys', __dir__)
331 end
332
333 def create_server_ssl_context
334 OpenSSL::SSL::SSLContext.new.tap do |ctx|
335 ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
336 if ctx.respond_to?(:min_version=) && OpenSSL::SSL.const_defined?(:TLS1_2_VERSION)
337 ctx.min_version = OpenSSL::SSL::TLS1_2_VERSION
338 end
339 ctx.ca_file = File.join(ssl_keys_dir, 'CA.pem')
340 ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(ssl_keys_dir, 'server.crt')))
341 ctx.cert_store = OpenSSL::X509::Store.new
342 ctx.cert_store.add_file(File.join(ssl_keys_dir, 'client.pem'))
343 ctx.key = OpenSSL::PKey::RSA.new(File.read(File.join(ssl_keys_dir, 'server.key')))
344 end
345 end
346
347 def create_client_ssl_context
348 OpenSSL::SSL::SSLContext.new.tap do |ctx|
349 ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
350 if ctx.respond_to?(:min_version=) && OpenSSL::SSL.const_defined?(:TLS1_2_VERSION)
351 ctx.min_version = OpenSSL::SSL::TLS1_2_VERSION
352 end
353 ctx.ca_file = File.join(ssl_keys_dir, 'CA.pem')
354 ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(ssl_keys_dir, 'client.crt')))
355 ctx.cert_store = OpenSSL::X509::Store.new
356 ctx.cert_store.add_file(File.join(ssl_keys_dir, 'server.pem'))
357 ctx.key = OpenSSL::PKey::RSA.new(File.read(File.join(ssl_keys_dir, 'client.key')))
358 end
359 end
360 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000361end