blob: 58949e34920d84c8aa4693fafbc43da632c69228 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Jake Farrella87810f2012-09-28 01:59:04 +000020require 'spec_helper'
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -050021require 'timeout'
Kevin Clarke0fddde2008-06-18 01:16:02 +000022
Jake Farrella87810f2012-09-28 01:59:04 +000023describe 'NonblockingServer' do
Kevin Clarke0fddde2008-06-18 01:16:02 +000024 class Handler
25 def initialize
26 @queue = Queue.new
27 end
28
29 attr_accessor :server
30
31 def greeting(english)
32 if english
33 SpecNamespace::Hello.new
34 else
35 SpecNamespace::Hello.new(:greeting => "Aloha!")
36 end
37 end
38
39 def block
40 @queue.pop
41 end
42
Kevin Clark980e4452008-06-18 01:19:59 +000043 def unblock(n)
44 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000045 end
46
47 def sleep(time)
48 Kernel.sleep time
49 end
50
51 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000052 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000053 end
54 end
55
Jake Farrella87810f2012-09-28 01:59:04 +000056 class SpecTransport < Thrift::BaseTransport
Kevin Clarke45bf592008-06-18 01:17:44 +000057 def initialize(transport, queue)
58 @transport = transport
59 @queue = queue
60 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000061 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000062
Kevin Clarke45bf592008-06-18 01:17:44 +000063 def open?
64 @transport.open?
65 end
66
67 def open
68 @transport.open
69 end
70
71 def close
72 @transport.close
73 end
74
75 def read(sz)
76 @transport.read(sz)
77 end
78
Dmytro Shteflyuk3b0ab4d2026-03-11 17:46:48 -040079 def write(buf, sz = nil)
Kevin Clarke45bf592008-06-18 01:17:44 +000080 @transport.write(buf, sz)
81 end
82
83 def flush
84 @queue.push :flushed unless @flushed or @queue.nil?
85 @flushed = true
86 @transport.flush
87 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000088 end
89
Jake Farrella87810f2012-09-28 01:59:04 +000090 class SpecServerSocket < Thrift::ServerSocket
Kevin Clark980e4452008-06-18 01:19:59 +000091 def initialize(host, port, queue)
92 super(host, port)
93 @queue = queue
94 end
95
96 def listen
97 super
98 @queue.push :listen
99 end
100 end
101
Kevin Clarke45bf592008-06-18 01:17:44 +0000102 describe Thrift::NonblockingServer do
103 before(:each) do
Dmytro Shteflyuk269396a2026-03-27 11:25:04 -0400104 @port = available_port
Kevin Clarke45bf592008-06-18 01:17:44 +0000105 handler = Handler.new
Jake Farrella87810f2012-09-28 01:59:04 +0000106 processor = SpecNamespace::NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +0000107 queue = Queue.new
108 @transport = SpecServerSocket.new('localhost', @port, queue)
Jake Farrella87810f2012-09-28 01:59:04 +0000109 transport_factory = Thrift::FramedTransportFactory.new
Kevin Clarke45bf592008-06-18 01:17:44 +0000110 logger = Logger.new(STDERR)
111 logger.level = Logger::WARN
Jake Farrella87810f2012-09-28 01:59:04 +0000112 @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
Kevin Clarke45bf592008-06-18 01:17:44 +0000113 handler.server = @server
114 @server_thread = Thread.new(Thread.current) do |master_thread|
115 begin
116 @server.serve
117 rescue => e
118 p e
119 puts e.backtrace * "\n"
120 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000121 end
122 end
Kevin Clark980e4452008-06-18 01:19:59 +0000123 queue.pop
Dmytro Shteflyuk269396a2026-03-27 11:25:04 -0400124 wait_until_listening(@transport, @server_thread)
Kevin Clarke45bf592008-06-18 01:17:44 +0000125
126 @clients = []
127 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000128 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000129
Kevin Clarke45bf592008-06-18 01:17:44 +0000130 after(:each) do
131 @clients.each { |client, trans| trans.close }
Dmytro Shteflyuk269396a2026-03-27 11:25:04 -0400132 @server.shutdown(1, false) if @server
133 @server_thread.join(2) if @server_thread
134 @server_thread.kill if @server_thread && @server_thread.alive?
135 @server_thread.join(2) if @server_thread
136 @transport.close if @transport
Kevin Clarke0fddde2008-06-18 01:16:02 +0000137 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000138
Kevin Clarke45bf592008-06-18 01:17:44 +0000139 def setup_client(queue = nil)
Jake Farrella87810f2012-09-28 01:59:04 +0000140 transport = SpecTransport.new(Thrift::FramedTransport.new(Thrift::Socket.new('localhost', @port)), queue)
141 protocol = Thrift::BinaryProtocol.new(transport)
142 client = SpecNamespace::NonblockingService::Client.new(protocol)
Kevin Clarke45bf592008-06-18 01:17:44 +0000143 transport.open
144 @clients << [client, transport]
145 client
146 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000147
Kevin Clarke45bf592008-06-18 01:17:44 +0000148 def setup_client_thread(result)
149 queue = Queue.new
150 Thread.new do
151 begin
152 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000153 while (cmd = queue.pop)
154 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000155 case msg
156 when :block
157 result << client.block
158 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000159 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000160 when :hello
161 result << client.greeting(true) # ignore result
162 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000163 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000164 result << :slept
165 when :shutdown
166 client.shutdown
167 when :exit
168 result << :done
169 break
170 end
171 end
Dmytro Shteflyuk3b0ab4d2026-03-11 17:46:48 -0400172 @clients.each { |c, t| t.close and break if c == client } # close the transport
Kevin Clarke45bf592008-06-18 01:17:44 +0000173 rescue => e
174 raise e unless @catch_exceptions
175 end
176 end
177 queue
178 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000179
Kevin Clarke45bf592008-06-18 01:17:44 +0000180 it "should handle basic message passing" do
181 client = setup_client
James E. King III27247072018-03-22 20:50:23 -0400182 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
183 expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000184 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000185 end
Kevin Clark15350782008-06-18 01:16:27 +0000186
Kevin Clarke45bf592008-06-18 01:17:44 +0000187 it "should handle concurrent clients" do
188 queue = Queue.new
189 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000190 4.times do
191 Thread.new(Thread.current) do |main_thread|
192 begin
193 queue.push setup_client(trans_queue).block
194 rescue => e
195 main_thread.raise e
196 end
197 end
198 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000199 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000200 setup_client.unblock(4)
James E. King III27247072018-03-22 20:50:23 -0400201 4.times { expect(queue.pop).to be_truthy }
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000202 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000203 end
204
205 it "should handle messages from more than 5 long-lived connections" do
206 queues = []
207 result = Queue.new
208 7.times do |i|
209 queues << setup_client_thread(result)
210 Thread.pass if i == 4 # give the server time to accept connections
211 end
212 client = setup_client
213 # block 4 connections
214 4.times { |i| queues[i] << :block }
215 queues[4] << :hello
216 queues[5] << :hello
217 queues[6] << :hello
James E. King III27247072018-03-22 20:50:23 -0400218 3.times { expect(result.pop).to eq(SpecNamespace::Hello.new) }
219 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Kevin Clark980e4452008-06-18 01:19:59 +0000220 queues[5] << [:unblock, 4]
James E. King III27247072018-03-22 20:50:23 -0400221 4.times { expect(result.pop).to be_truthy }
Kevin Clarke45bf592008-06-18 01:17:44 +0000222 queues[2] << :hello
James E. King III27247072018-03-22 20:50:23 -0400223 expect(result.pop).to eq(SpecNamespace::Hello.new)
224 expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
Kevin Clarke45bf592008-06-18 01:17:44 +0000225 7.times { queues.shift << :exit }
James E. King III27247072018-03-22 20:50:23 -0400226 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000227 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000228 end
229
230 it "should shut down when asked" do
231 # connect first to ensure it's running
232 client = setup_client
233 client.greeting(false) # force a message pass
234 @server.shutdown
James E. King III27247072018-03-22 20:50:23 -0400235 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
Kevin Clarke45bf592008-06-18 01:17:44 +0000236 end
237
238 it "should continue processing active messages when shutting down" do
239 result = Queue.new
240 client = setup_client_thread(result)
241 client << :sleep
242 sleep 0.1 # give the server time to start processing the client's message
243 @server.shutdown
James E. King III27247072018-03-22 20:50:23 -0400244 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
245 expect(result.pop).to eq(:slept)
Kevin Clarke45bf592008-06-18 01:17:44 +0000246 end
247
248 it "should kill active messages when they don't expire while shutting down" do
249 result = Queue.new
250 client = setup_client_thread(result)
Dmytro Shteflyukeb6eb852025-11-22 20:03:19 -0500251 client << [:sleep, 10.0]
Kevin Clarke45bf592008-06-18 01:17:44 +0000252 sleep 0.1 # start processing the client's message
253 @server.shutdown(1)
254 @catch_exceptions = true
James E. King III27247072018-03-22 20:50:23 -0400255 expect(@server_thread.join(3)).not_to be_nil
256 expect(result).to be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000257 end
258
259 it "should allow shutting down in response to a message" do
260 client = setup_client
James E. King III27247072018-03-22 20:50:23 -0400261 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Kevin Clarke45bf592008-06-18 01:17:44 +0000262 client.shutdown
James E. King III27247072018-03-22 20:50:23 -0400263 expect(@server_thread.join(2)).not_to be_nil
Kevin Clarke45bf592008-06-18 01:17:44 +0000264 end
Kevin Clark15350782008-06-18 01:16:27 +0000265 end
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -0500266
Dmytro Shteflyuk269396a2026-03-27 11:25:04 -0400267 describe Thrift::NonblockingServer::IOManager do
268 def build_io_manager
269 logger = Logger.new(IO::NULL)
270 logger.level = Logger::FATAL
271 Thrift::NonblockingServer::IOManager.new(
272 double('processor'),
273 double('server_transport'),
274 Thrift::BaseTransportFactory.new,
275 Thrift::BinaryProtocolFactory.new,
276 1,
277 logger
278 )
279 end
280
281 it "closes tracked connections and signal pipes during forced cleanup" do
282 io_manager = build_io_manager
283 connection = double('connection', :close => nil)
284 pipe_a = double('pipe_a', :closed? => false, :close => nil)
285 pipe_b = double('pipe_b', :closed? => false, :close => nil)
286
287 io_manager.instance_variable_set(:@connections, [connection])
288 io_manager.instance_variable_set(:@buffers, { connection => 'frame' })
289 io_manager.instance_variable_set(:@signal_pipes, [pipe_a, pipe_b])
290 io_manager.instance_variable_set(:@worker_threads, [])
291
292 io_manager.ensure_closed
293
294 expect(connection).to have_received(:close)
295 expect(pipe_a).to have_received(:close)
296 expect(pipe_b).to have_received(:close)
297 expect(io_manager.instance_variable_get(:@connections)).to be_empty
298 expect(io_manager.instance_variable_get(:@buffers)).to be_empty
299 end
300
301 it "continues closing remaining signal pipes when one close raises" do
302 io_manager = build_io_manager
303 pipe_a = double('pipe_a', :closed? => false)
304 pipe_b = double('pipe_b', :closed? => false, :close => nil)
305
306 allow(pipe_a).to receive(:close).and_raise(IOError)
307
308 io_manager.instance_variable_set(:@signal_pipes, [pipe_a, pipe_b])
309 io_manager.instance_variable_set(:@worker_threads, [])
310
311 io_manager.send(:close_signal_pipes)
312
313 expect(pipe_a).to have_received(:close)
314 expect(pipe_b).to have_received(:close)
315 end
316
317 it "drops removed connections from bookkeeping" do
318 io_manager = build_io_manager
319 connection = double('connection', :close => nil)
320
321 io_manager.instance_variable_set(:@connections, [connection])
322 io_manager.instance_variable_set(:@buffers, { connection => 'frame' })
323
324 io_manager.send(:remove_connection, connection)
325
326 expect(io_manager.instance_variable_get(:@connections)).to be_empty
327 expect(io_manager.instance_variable_get(:@buffers)).to be_empty
328 end
329 end
330
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -0500331 describe "#{Thrift::NonblockingServer} with TLS transport" do
332 before(:each) do
333 @port = available_port
334 handler = Handler.new
335 processor = SpecNamespace::NonblockingService::Processor.new(handler)
336 @transport = Thrift::SSLServerSocket.new('localhost', @port, create_server_ssl_context)
337 transport_factory = Thrift::FramedTransportFactory.new
338 logger = Logger.new(STDERR)
339 logger.level = Logger::WARN
340 @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
341 handler.server = @server
342
343 @server_thread = Thread.new(Thread.current) do |master_thread|
344 begin
345 @server.serve
346 rescue => e
347 master_thread.raise e
348 end
349 end
350
351 @clients = []
Dmytro Shteflyuk269396a2026-03-27 11:25:04 -0400352 wait_until_listening(@transport, @server_thread)
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -0500353 end
354
355 after(:each) do
356 @clients.each(&:close)
357 @server.shutdown if @server
358 @server_thread.join(2) if @server_thread
359 @transport.close if @transport
360 end
361
362 it "should handle requests over TLS" do
363 expect(@server_thread).to be_alive
364
365 client = setup_tls_client
366 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
367
368 @server.shutdown
369 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
370 end
371
372 def setup_tls_client
373 transport = Thrift::FramedTransport.new(
374 Thrift::SSLSocket.new('localhost', @port, nil, create_client_ssl_context)
375 )
376 protocol = Thrift::BinaryProtocol.new(transport)
377 client = SpecNamespace::NonblockingService::Client.new(protocol)
378 transport.open
379 @clients << transport
380 client
381 end
382
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -0500383 def ssl_keys_dir
384 File.expand_path('../../../test/keys', __dir__)
385 end
386
387 def create_server_ssl_context
388 OpenSSL::SSL::SSLContext.new.tap do |ctx|
389 ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
390 if ctx.respond_to?(:min_version=) && OpenSSL::SSL.const_defined?(:TLS1_2_VERSION)
391 ctx.min_version = OpenSSL::SSL::TLS1_2_VERSION
392 end
393 ctx.ca_file = File.join(ssl_keys_dir, 'CA.pem')
394 ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(ssl_keys_dir, 'server.crt')))
395 ctx.cert_store = OpenSSL::X509::Store.new
396 ctx.cert_store.add_file(File.join(ssl_keys_dir, 'client.pem'))
397 ctx.key = OpenSSL::PKey::RSA.new(File.read(File.join(ssl_keys_dir, 'server.key')))
398 end
399 end
400
401 def create_client_ssl_context
402 OpenSSL::SSL::SSLContext.new.tap do |ctx|
403 ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
404 if ctx.respond_to?(:min_version=) && OpenSSL::SSL.const_defined?(:TLS1_2_VERSION)
405 ctx.min_version = OpenSSL::SSL::TLS1_2_VERSION
406 end
407 ctx.ca_file = File.join(ssl_keys_dir, 'CA.pem')
408 ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(ssl_keys_dir, 'client.crt')))
409 ctx.cert_store = OpenSSL::X509::Store.new
410 ctx.cert_store.add_file(File.join(ssl_keys_dir, 'server.pem'))
411 ctx.key = OpenSSL::PKey::RSA.new(File.read(File.join(ssl_keys_dir, 'client.key')))
412 end
413 end
414 end
Dmytro Shteflyuk269396a2026-03-27 11:25:04 -0400415
416 def wait_until_listening(server_transport, server_thread)
417 Timeout.timeout(2) do
418 until server_transport.handle
419 raise "Server thread exited unexpectedly" unless server_thread.alive?
420 sleep 0.01
421 end
422 end
423 end
424
425 def available_port
426 TCPServer.open('localhost', 0) { |server| server.addr[1] }
427 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000428end