blob: cfc60ff204bbbe1ca2d4709076d7a11d1d7d5384 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Jake Farrella87810f2012-09-28 01:59:04 +000020require 'spec_helper'
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -050021require 'timeout'
Kevin Clarke0fddde2008-06-18 01:16:02 +000022
Jake Farrella87810f2012-09-28 01:59:04 +000023describe 'NonblockingServer' do
Kevin Clarke0fddde2008-06-18 01:16:02 +000024
25 class Handler
26 def initialize
27 @queue = Queue.new
28 end
29
30 attr_accessor :server
31
32 def greeting(english)
33 if english
34 SpecNamespace::Hello.new
35 else
36 SpecNamespace::Hello.new(:greeting => "Aloha!")
37 end
38 end
39
40 def block
41 @queue.pop
42 end
43
Kevin Clark980e4452008-06-18 01:19:59 +000044 def unblock(n)
45 n.times { @queue.push true }
Kevin Clarke0fddde2008-06-18 01:16:02 +000046 end
47
48 def sleep(time)
49 Kernel.sleep time
50 end
51
52 def shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +000053 @server.shutdown(0, false)
Kevin Clarke0fddde2008-06-18 01:16:02 +000054 end
55 end
56
Jake Farrella87810f2012-09-28 01:59:04 +000057 class SpecTransport < Thrift::BaseTransport
Kevin Clarke45bf592008-06-18 01:17:44 +000058 def initialize(transport, queue)
59 @transport = transport
60 @queue = queue
61 @flushed = false
Kevin Clarke0fddde2008-06-18 01:16:02 +000062 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000063
Kevin Clarke45bf592008-06-18 01:17:44 +000064 def open?
65 @transport.open?
66 end
67
68 def open
69 @transport.open
70 end
71
72 def close
73 @transport.close
74 end
75
76 def read(sz)
77 @transport.read(sz)
78 end
79
80 def write(buf,sz=nil)
81 @transport.write(buf, sz)
82 end
83
84 def flush
85 @queue.push :flushed unless @flushed or @queue.nil?
86 @flushed = true
87 @transport.flush
88 end
Kevin Clarke0fddde2008-06-18 01:16:02 +000089 end
90
Jake Farrella87810f2012-09-28 01:59:04 +000091 class SpecServerSocket < Thrift::ServerSocket
Kevin Clark980e4452008-06-18 01:19:59 +000092 def initialize(host, port, queue)
93 super(host, port)
94 @queue = queue
95 end
96
97 def listen
98 super
99 @queue.push :listen
100 end
101 end
102
Kevin Clarke45bf592008-06-18 01:17:44 +0000103 describe Thrift::NonblockingServer do
104 before(:each) do
105 @port = 43251
106 handler = Handler.new
Jake Farrella87810f2012-09-28 01:59:04 +0000107 processor = SpecNamespace::NonblockingService::Processor.new(handler)
Kevin Clark980e4452008-06-18 01:19:59 +0000108 queue = Queue.new
109 @transport = SpecServerSocket.new('localhost', @port, queue)
Jake Farrella87810f2012-09-28 01:59:04 +0000110 transport_factory = Thrift::FramedTransportFactory.new
Kevin Clarke45bf592008-06-18 01:17:44 +0000111 logger = Logger.new(STDERR)
112 logger.level = Logger::WARN
Jake Farrella87810f2012-09-28 01:59:04 +0000113 @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
Kevin Clarke45bf592008-06-18 01:17:44 +0000114 handler.server = @server
115 @server_thread = Thread.new(Thread.current) do |master_thread|
116 begin
117 @server.serve
118 rescue => e
119 p e
120 puts e.backtrace * "\n"
121 master_thread.raise e
Kevin Clarke0fddde2008-06-18 01:16:02 +0000122 end
123 end
Kevin Clark980e4452008-06-18 01:19:59 +0000124 queue.pop
Kevin Clarke45bf592008-06-18 01:17:44 +0000125
126 @clients = []
127 @catch_exceptions = false
Kevin Clarke0fddde2008-06-18 01:16:02 +0000128 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000129
Kevin Clarke45bf592008-06-18 01:17:44 +0000130 after(:each) do
131 @clients.each { |client, trans| trans.close }
132 # @server.shutdown(1)
133 @server_thread.kill
134 @transport.close
Kevin Clarke0fddde2008-06-18 01:16:02 +0000135 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000136
Kevin Clarke45bf592008-06-18 01:17:44 +0000137 def setup_client(queue = nil)
Jake Farrella87810f2012-09-28 01:59:04 +0000138 transport = SpecTransport.new(Thrift::FramedTransport.new(Thrift::Socket.new('localhost', @port)), queue)
139 protocol = Thrift::BinaryProtocol.new(transport)
140 client = SpecNamespace::NonblockingService::Client.new(protocol)
Kevin Clarke45bf592008-06-18 01:17:44 +0000141 transport.open
142 @clients << [client, transport]
143 client
144 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000145
Kevin Clarke45bf592008-06-18 01:17:44 +0000146 def setup_client_thread(result)
147 queue = Queue.new
148 Thread.new do
149 begin
150 client = setup_client
Kevin Clark980e4452008-06-18 01:19:59 +0000151 while (cmd = queue.pop)
152 msg, *args = cmd
Kevin Clarke45bf592008-06-18 01:17:44 +0000153 case msg
154 when :block
155 result << client.block
156 when :unblock
Kevin Clark980e4452008-06-18 01:19:59 +0000157 client.unblock(args.first)
Kevin Clarke45bf592008-06-18 01:17:44 +0000158 when :hello
159 result << client.greeting(true) # ignore result
160 when :sleep
Kevin Clarkf731b492008-06-18 01:20:25 +0000161 client.sleep(args[0] || 0.5)
Kevin Clarke45bf592008-06-18 01:17:44 +0000162 result << :slept
163 when :shutdown
164 client.shutdown
165 when :exit
166 result << :done
167 break
168 end
169 end
170 @clients.each { |c,t| t.close and break if c == client } #close the transport
171 rescue => e
172 raise e unless @catch_exceptions
173 end
174 end
175 queue
176 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000177
Kevin Clarke45bf592008-06-18 01:17:44 +0000178 it "should handle basic message passing" do
179 client = setup_client
James E. King III27247072018-03-22 20:50:23 -0400180 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
181 expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000182 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000183 end
Kevin Clark15350782008-06-18 01:16:27 +0000184
Kevin Clarke45bf592008-06-18 01:17:44 +0000185 it "should handle concurrent clients" do
186 queue = Queue.new
187 trans_queue = Queue.new
Kevin Clark980e4452008-06-18 01:19:59 +0000188 4.times do
189 Thread.new(Thread.current) do |main_thread|
190 begin
191 queue.push setup_client(trans_queue).block
192 rescue => e
193 main_thread.raise e
194 end
195 end
196 end
Kevin Clarke45bf592008-06-18 01:17:44 +0000197 4.times { trans_queue.pop }
Kevin Clark980e4452008-06-18 01:19:59 +0000198 setup_client.unblock(4)
James E. King III27247072018-03-22 20:50:23 -0400199 4.times { expect(queue.pop).to be_truthy }
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000200 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000201 end
202
203 it "should handle messages from more than 5 long-lived connections" do
204 queues = []
205 result = Queue.new
206 7.times do |i|
207 queues << setup_client_thread(result)
208 Thread.pass if i == 4 # give the server time to accept connections
209 end
210 client = setup_client
211 # block 4 connections
212 4.times { |i| queues[i] << :block }
213 queues[4] << :hello
214 queues[5] << :hello
215 queues[6] << :hello
James E. King III27247072018-03-22 20:50:23 -0400216 3.times { expect(result.pop).to eq(SpecNamespace::Hello.new) }
217 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Kevin Clark980e4452008-06-18 01:19:59 +0000218 queues[5] << [:unblock, 4]
James E. King III27247072018-03-22 20:50:23 -0400219 4.times { expect(result.pop).to be_truthy }
Kevin Clarke45bf592008-06-18 01:17:44 +0000220 queues[2] << :hello
James E. King III27247072018-03-22 20:50:23 -0400221 expect(result.pop).to eq(SpecNamespace::Hello.new)
222 expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
Kevin Clarke45bf592008-06-18 01:17:44 +0000223 7.times { queues.shift << :exit }
James E. King III27247072018-03-22 20:50:23 -0400224 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Bryan Duxbury8407cfd2009-04-07 16:35:49 +0000225 @server.shutdown
Kevin Clarke45bf592008-06-18 01:17:44 +0000226 end
227
228 it "should shut down when asked" do
229 # connect first to ensure it's running
230 client = setup_client
231 client.greeting(false) # force a message pass
232 @server.shutdown
James E. King III27247072018-03-22 20:50:23 -0400233 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
Kevin Clarke45bf592008-06-18 01:17:44 +0000234 end
235
236 it "should continue processing active messages when shutting down" do
237 result = Queue.new
238 client = setup_client_thread(result)
239 client << :sleep
240 sleep 0.1 # give the server time to start processing the client's message
241 @server.shutdown
James E. King III27247072018-03-22 20:50:23 -0400242 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
243 expect(result.pop).to eq(:slept)
Kevin Clarke45bf592008-06-18 01:17:44 +0000244 end
245
246 it "should kill active messages when they don't expire while shutting down" do
247 result = Queue.new
248 client = setup_client_thread(result)
Dmytro Shteflyukeb6eb852025-11-22 20:03:19 -0500249 client << [:sleep, 10.0]
Kevin Clarke45bf592008-06-18 01:17:44 +0000250 sleep 0.1 # start processing the client's message
251 @server.shutdown(1)
252 @catch_exceptions = true
James E. King III27247072018-03-22 20:50:23 -0400253 expect(@server_thread.join(3)).not_to be_nil
254 expect(result).to be_empty
Kevin Clarke45bf592008-06-18 01:17:44 +0000255 end
256
257 it "should allow shutting down in response to a message" do
258 client = setup_client
James E. King III27247072018-03-22 20:50:23 -0400259 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
Kevin Clarke45bf592008-06-18 01:17:44 +0000260 client.shutdown
James E. King III27247072018-03-22 20:50:23 -0400261 expect(@server_thread.join(2)).not_to be_nil
Kevin Clarke45bf592008-06-18 01:17:44 +0000262 end
Kevin Clark15350782008-06-18 01:16:27 +0000263 end
Dmytro Shteflyuk697910f2026-02-13 14:12:43 -0500264
265 describe "#{Thrift::NonblockingServer} with TLS transport" do
266 before(:each) do
267 @port = available_port
268 handler = Handler.new
269 processor = SpecNamespace::NonblockingService::Processor.new(handler)
270 @transport = Thrift::SSLServerSocket.new('localhost', @port, create_server_ssl_context)
271 transport_factory = Thrift::FramedTransportFactory.new
272 logger = Logger.new(STDERR)
273 logger.level = Logger::WARN
274 @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
275 handler.server = @server
276
277 @server_thread = Thread.new(Thread.current) do |master_thread|
278 begin
279 @server.serve
280 rescue => e
281 master_thread.raise e
282 end
283 end
284
285 @clients = []
286 wait_until_listening
287 end
288
289 after(:each) do
290 @clients.each(&:close)
291 @server.shutdown if @server
292 @server_thread.join(2) if @server_thread
293 @transport.close if @transport
294 end
295
296 it "should handle requests over TLS" do
297 expect(@server_thread).to be_alive
298
299 client = setup_tls_client
300 expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
301
302 @server.shutdown
303 expect(@server_thread.join(2)).to be_an_instance_of(Thread)
304 end
305
306 def setup_tls_client
307 transport = Thrift::FramedTransport.new(
308 Thrift::SSLSocket.new('localhost', @port, nil, create_client_ssl_context)
309 )
310 protocol = Thrift::BinaryProtocol.new(transport)
311 client = SpecNamespace::NonblockingService::Client.new(protocol)
312 transport.open
313 @clients << transport
314 client
315 end
316
317 def wait_until_listening
318 Timeout.timeout(2) do
319 until @transport.handle
320 raise "Server thread exited unexpectedly" unless @server_thread.alive?
321 sleep 0.01
322 end
323 end
324 end
325
326 def available_port
327 TCPServer.open('localhost', 0) { |server| server.addr[1] }
328 end
329
330 def ssl_keys_dir
331 File.expand_path('../../../test/keys', __dir__)
332 end
333
334 def create_server_ssl_context
335 OpenSSL::SSL::SSLContext.new.tap do |ctx|
336 ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
337 if ctx.respond_to?(:min_version=) && OpenSSL::SSL.const_defined?(:TLS1_2_VERSION)
338 ctx.min_version = OpenSSL::SSL::TLS1_2_VERSION
339 end
340 ctx.ca_file = File.join(ssl_keys_dir, 'CA.pem')
341 ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(ssl_keys_dir, 'server.crt')))
342 ctx.cert_store = OpenSSL::X509::Store.new
343 ctx.cert_store.add_file(File.join(ssl_keys_dir, 'client.pem'))
344 ctx.key = OpenSSL::PKey::RSA.new(File.read(File.join(ssl_keys_dir, 'server.key')))
345 end
346 end
347
348 def create_client_ssl_context
349 OpenSSL::SSL::SSLContext.new.tap do |ctx|
350 ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
351 if ctx.respond_to?(:min_version=) && OpenSSL::SSL.const_defined?(:TLS1_2_VERSION)
352 ctx.min_version = OpenSSL::SSL::TLS1_2_VERSION
353 end
354 ctx.ca_file = File.join(ssl_keys_dir, 'CA.pem')
355 ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(ssl_keys_dir, 'client.crt')))
356 ctx.cert_store = OpenSSL::X509::Store.new
357 ctx.cert_store.add_file(File.join(ssl_keys_dir, 'server.pem'))
358 ctx.key = OpenSSL::PKey::RSA.new(File.read(File.join(ssl_keys_dir, 'client.key')))
359 end
360 end
361 end
Kevin Clarke0fddde2008-06-18 01:16:02 +0000362end