rb: Completely rewrite Thrift::NonblockingServer
It now has a much better and cleaner architecture, a proper persistent thread pool,
a dedicated acceptor thread, and no concurrency issues
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669012 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/lib/thrift/server/nonblockingserver.rb b/lib/rb/lib/thrift/server/nonblockingserver.rb
index 920175b..283d5f5 100644
--- a/lib/rb/lib/thrift/server/nonblockingserver.rb
+++ b/lib/rb/lib/thrift/server/nonblockingserver.rb
@@ -1,161 +1,269 @@
require 'thrift/server'
-require 'sync'
-# thrift/server already imports fastthread/thread
+require 'logger'
+require 'thread'
module Thrift
# this class expects to always use a FramedTransport for reading messages
- #--
- # this isn't very pretty, but we're working around the fact that FramedTransport
- # and the processors are all written in a synchronous manner.
- # So lets read data off the wire ourselves, check if we have a full frame, and
- # only then hand it to the transport to parse
- #
- # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
- class NonblockingServer < ThreadPoolServer
- def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20)
- super
- @sync = Sync.new
+ class NonblockingServer < Server
+ def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20, logger = nil)
+ super(processor, serverTransport, transportFactory, protocolFactory)
+ @num_threads = num
+ if logger.nil?
+ @logger = Logger.new(STDERR)
+ @logger.level = Logger::WARN
+ else
+ @logger = logger
+ end
+ @shutdown_semaphore = Mutex.new
end
def serve
- @server_thread = Thread.current
+ @logger.info "Starting #{self}"
@serverTransport.listen
+ @io_manager = start_io_manager
begin
- connections = {}
- running_connections = {}
- # the swapping_connections stuff is to ensure the thread doesn't
- # put the connection back into the regular list, then have the server
- # thread process it again, then have the first thread remove it from
- # the running_connections list
- swapping_connections = {}
- thread_group = ThreadGroup.new
loop do
- break if @shutdown
- handles = [@serverTransport.handle]
- @sync.synchronize(Sync::SH) do
- handles.concat connections.keys
- end
- rd, = select(handles)
- next if rd.nil?
- rd.each do |socket|
- if socket == @serverTransport.handle
- client = @serverTransport.accept
- buffer = ''
- outtrans = @transportFactory.get_transport(client)
- outprot = @protocolFactory.get_protocol(outtrans)
- @sync.synchronize(Sync::EX) do
- connections[client.handle] = [client, buffer, outtrans, outprot]
- end
- else
- client, buffer, outtrans, outprot = nil # for scope
- @sync.synchronize(Sync::SH) do
- client, buffer, outtrans, outprot = connections[socket]
- end
- if socket.eof?
- client.close
- @sync.synchronize(Sync::EX) do
- connections.delete(socket)
- end
- else
- buffer << client.read(4096, true)
- if has_full_frame?(buffer)
- @sync.synchronize(Sync::EX) do
- running_connections[socket] = connections.delete(socket)
- end
- @thread_q.push :token
- t = Thread.new(Thread.current) do |master|
- begin
- membuf = MemoryBuffer.new(buffer)
- intrans = @transportFactory.get_transport(membuf)
- inprot = @protocolFactory.get_protocol(intrans)
- @processor.process(inprot, outprot)
- if @shutdown
- client.close
- @sync.synchronize(Sync::EX) do
- running_connections.delete(socket)
- end
- else
- @sync.synchronize(Sync::EX) do
- swapping_connections[socket] = running_connections.delete(socket)
- end
- end
- rescue => e
- outtrans.close
- @exception_q.push e
- ensure
- should_wakeup = false
- @sync.synchronize(Sync::EX) do
- running_connections.delete(socket)
- if swapping_connections.include? socket
- connections[socket] = swapping_connections.delete(socket)
- should_wakeup = true
- end
- end
- master.wakeup if should_wakeup
- intrans.close
- @thread_q.pop
- end
- end
- thread_group.add t
- end
- end
- end
- end
+ socket = @serverTransport.accept
+ @logger.debug "Accepted socket: #{socket.inspect}"
+ @io_manager.add_connection socket
end
- if @shutdown
- @serverTransport.close
- handles = []
- @sync.synchronize(Sync::SH) do
- handles = connections
- handles.merge! running_connections
- handles.merge! swapping_connections
- end
- handles.values.each do |client, buffer, outtrans, outprot|
- # can't close completely or we'll break active messages
- # but lets at least stop accepting input
- client.handle.close_read
- end
- start = Time.now.to_f
- until thread_group.list.empty?
- if @shutdown_timeout
- now = Time.now.to_f
- cur_timeout = @shutdown_timeout - (now - start)
- break if cur_timeout <= 0
- thread_group.list.first.join(cur_timeout)
- else
- thread_group.list.first.join
- end
- end
- thread_group.list.each { |t| t.kill } if @shutdown_kill
- # now kill connections completely if they still exists
- handles.values.each do |client, buffer, outtrans, outprot|
- client.close
- end
- end
- ensure
- @serverTransport.close
+ rescue IOError => e
+ # we must be shutting down
+ @logger.info "#{self} is shutting down, goodbye"
end
+ ensure
+ @serverTransport.close
+ @io_manager.ensure_closed unless @io_manager.nil?
end
- # Stop accepting new messages and wait for active messages to finish
- # If the given timeout passes without the active messages finishing,
- # control will exit from #serve and leave the remaining threads active.
- # If you pass true for kill, the remaining threads will be reaped instead.
- # A false timeout means wait indefinitely
- def shutdown(timeout = nil, kill = false)
- @shutdown_timeout = timeout
- @shutdown_kill = kill
- @shutdown = true
- @server_thread.wakeup
+ def shutdown(timeout = 0, block = true)
+ @shutdown_semaphore.synchronize do
+ return if @is_shutdown
+ @is_shutdown = true
+ end
+ # nonblocking is intended for calling from within a Handler
+ # but we can't change the order of operations here, so lets thread
+ shutdown_proc = lambda do
+ @io_manager.shutdown(timeout)
+ @serverTransport.close # this will break the accept loop
+ end
+ if block
+ shutdown_proc.call
+ else
+ Thread.new &shutdown_proc
+ end
end
private
- def has_full_frame?(buf)
- return no unless buf.length >= 4
- size = buf.unpack('N').first
- size + 4 <= buf.length
+ def start_io_manager
+ iom = IOManager.new(@processor, @serverTransport, @transportFactory, @protocolFactory, @num_threads, @logger)
+ iom.spawn
+ iom
+ end
+
+ class IOManager # :nodoc:
+ def initialize(processor, serverTransport, transportFactory, protocolFactory, num, logger)
+ @processor = processor
+ @serverTransport = serverTransport
+ @transportFactory = transportFactory
+ @protocolFactory = protocolFactory
+ @num_threads = num
+ @logger = logger
+ @connections = []
+ @buffers = Hash.new { |h,k| h[k] = '' }
+ @signal_queue = Queue.new
+ @signal_pipes = IO.pipe
+ @signal_pipes[1].sync = true
+ @worker_queue = Queue.new
+ @shutdown_queue = Queue.new
+ end
+
+ def add_connection(socket)
+ signal [:connection, socket]
+ end
+
+ def spawn
+ @iom_thread = Thread.new do
+ @logger.debug "Starting #{self}"
+ run
+ end
+ end
+
+ def shutdown(timeout = 0)
+ @logger.debug "#{self} is shutting down workers"
+ @worker_queue.clear
+ @num_threads.times { @worker_queue.push [:shutdown] }
+ signal [:shutdown, timeout]
+ @shutdown_queue.pop
+ @signal_pipes[0].close
+ @signal_pipes[1].close
+ @logger.debug "#{self} is shutting down, goodbye"
+ end
+
+ def ensure_closed
+ kill_worker_threads if @worker_threads
+ @iom_thread.kill
+ end
+
+ private
+
+ def run
+ spin_worker_threads
+
+ loop do
+ rd, = select([@signal_pipes[0], *@connections])
+ if rd.delete @signal_pipes[0]
+ break if read_signals == :shutdown
+ end
+ rd.each do |fd|
+ if fd.handle.eof?
+ remove_connection fd
+ else
+ read_connection fd
+ end
+ end
+ end
+ join_worker_threads(@shutdown_timeout)
+ ensure
+ @shutdown_queue.push :shutdown
+ end
+
+ def read_connection(fd)
+ buffer = ''
+ begin
+ buffer << fd.read_nonblock(4096) while true
+ rescue Errno::EAGAIN, EOFError
+ @buffers[fd] << buffer
+ end
+ frame = slice_frame!(@buffers[fd])
+ if frame
+ @worker_queue.push [:frame, fd, frame]
+ end
+ end
+
+ def spin_worker_threads
+ @logger.debug "#{self} is spinning up worker threads"
+ @worker_threads = []
+ @num_threads.times do
+ @worker_threads << spin_thread
+ end
+ end
+
+ def spin_thread
+ Worker.new(@processor, @transportFactory, @protocolFactory, @logger, @worker_queue).spawn
+ end
+
+ def signal(msg)
+ @signal_queue << msg
+ @signal_pipes[1].write " "
+ end
+
+ def read_signals
+ # clear the signal pipe
+ begin
+ @signal_pipes[0].read_nonblock(1024) while true
+ rescue Errno::EAGAIN
+ end
+ # now read the signals
+ begin
+ loop do
+ signal, obj = @signal_queue.pop(true)
+ case signal
+ when :connection
+ @connections << obj
+ when :shutdown
+ @shutdown_timeout = obj
+ return :shutdown
+ end
+ end
+ rescue ThreadError
+ # out of signals
+ end
+ end
+
+ def remove_connection(fd)
+ # don't explicitly close it, a thread may still be writing to it
+ @connections.delete fd
+ @buffers.delete fd
+ end
+
+ def join_worker_threads(shutdown_timeout)
+ start = Time.now
+ @worker_threads.each do |t|
+ if shutdown_timeout > 0
+ timeout = Time.now - (start + shutdown_timeout)
+ break if timeout <= 0
+ t.join(timeout)
+ else
+ t.join
+ end
+ end
+ kill_worker_threads
+ end
+
+ def kill_worker_threads
+ @worker_threads.each do |t|
+ t.kill if t.status
+ end
+ @worker_threads.clear
+ end
+
+ def slice_frame!(buf)
+ if buf.length >= 4
+ size = buf.unpack('N').first
+ if buf.length >= size + 4
+ buf.slice!(0, size + 4)
+ else
+ nil
+ end
+ else
+ nil
+ end
+ end
+
+ class Worker # :nodoc:
+ def initialize(processor, transportFactory, protocolFactory, logger, queue)
+ @processor = processor
+ @transportFactory = transportFactory
+ @protocolFactory = protocolFactory
+ @logger = logger
+ @queue = queue
+ end
+
+ def spawn
+ Thread.new do
+ @logger.debug "#{self} is spawning"
+ run
+ end
+ end
+
+ private
+
+ def run
+ loop do
+ cmd, *args = @queue.pop
+ case cmd
+ when :shutdown
+ @logger.debug "#{self} is shutting down, goodbye"
+ break
+ when :frame
+ fd, frame = args
+ begin
+ otrans = @transportFactory.get_transport(fd)
+ oprot = @protocolFactory.get_protocol(otrans)
+ membuf = MemoryBuffer.new(frame)
+ itrans = @transportFactory.get_transport(membuf)
+ iprot = @protocolFactory.get_protocol(itrans)
+ @processor.process(iprot, oprot)
+ rescue => e
+ @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"
+ end
+ end
+ end
+ end
+ end
end
end
-end
+end
\ No newline at end of file
diff --git a/lib/rb/spec/nonblockingserver_spec.rb b/lib/rb/spec/nonblockingserver_spec.rb
index 773fc03..bf390bd 100644
--- a/lib/rb/spec/nonblockingserver_spec.rb
+++ b/lib/rb/spec/nonblockingserver_spec.rb
@@ -35,139 +35,189 @@
end
def shutdown
- @server.shutdown
+ @server.shutdown(0, false)
end
end
- before(:each) do
- @port = 43251
- handler = Handler.new
- processor = NonblockingService::Processor.new(handler)
- @transport = ServerSocket.new('localhost', @port)
- transportFactory = FramedTransportFactory.new
- @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5)
- handler.server = @server
- @server_thread = Thread.new do
- begin
- @server.serve
- rescue => e
- p e
- puts e.backtrace * "\n"
- raise e
- end
+ class SpecTransport < Transport
+ def initialize(transport, queue)
+ @transport = transport
+ @queue = queue
+ @flushed = false
end
- Thread.pass
- @clients = []
+ def open?
+ @transport.open?
+ end
+
+ def open
+ @transport.open
+ end
+
+ def close
+ @transport.close
+ end
+
+ def read(sz)
+ @transport.read(sz)
+ end
+
+ def write(buf,sz=nil)
+ @transport.write(buf, sz)
+ end
+
+ def flush
+ @queue.push :flushed unless @flushed or @queue.nil?
+ @flushed = true
+ @transport.flush
+ end
end
- after(:each) do
- @clients.each { |client, trans| trans.close }
- @server_thread.kill
- @transport.close
- end
-
- def setup_client
- transport = FramedTransport.new(Socket.new('localhost', @port))
- protocol = BinaryProtocol.new(transport)
- client = NonblockingService::Client.new(protocol)
- transport.open
- @clients << [client, transport]
- client
- end
-
- def setup_client_thread(result)
- queue = Queue.new
- Thread.new do
- client = setup_client
- while (msg = queue.pop)
- case msg
- when :block
- result << client.block
- when :unblock
- client.unblock
- when :hello
- result << client.greeting(true) # ignore result
- when :sleep
- client.sleep(0.5)
- result << :slept
- when :shutdown
- client.shutdown
- when :exit
- result << :done
- break
+ describe Thrift::NonblockingServer do
+ before(:each) do
+ @port = 43251
+ handler = Handler.new
+ processor = NonblockingService::Processor.new(handler)
+ @transport = ServerSocket.new('localhost', @port)
+ transportFactory = FramedTransportFactory.new
+ logger = Logger.new(STDERR)
+ logger.level = Logger::WARN
+ @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
+ handler.server = @server
+ @server_thread = Thread.new(Thread.current) do |master_thread|
+ begin
+ @server.serve
+ rescue => e
+ p e
+ puts e.backtrace * "\n"
+ master_thread.raise e
end
end
- @clients.each { |c,t| t.close and break if c == client } #close the transport
+ Thread.pass
+
+ @clients = []
+ @catch_exceptions = false
end
- queue
- end
- it "should handle basic message passing" do
- client = setup_client
- client.greeting(true).should == Hello.new
- client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
- end
-
- it "should handle concurrent clients" do
- queue = Queue.new
- 4.times { Thread.new { queue.push setup_client.block } }
- setup_client.unblock
- 4.times { queue.pop.should be_true }
- end
-
- it "should handle messages from more than 5 long-lived connections" do
- queues = []
- result = Queue.new
- 7.times do |i|
- queues << setup_client_thread(result)
- Thread.pass if i == 4 # give the server time to accept connections
+ after(:each) do
+ @clients.each { |client, trans| trans.close }
+ # @server.shutdown(1)
+ @server_thread.kill
+ @transport.close
end
- client = setup_client
- # block 4 connections
- 4.times { |i| queues[i] << :block }
- queues[4] << :hello
- queues[5] << :hello
- queues[6] << :hello
- 3.times { result.pop.should == Hello.new }
- client.greeting(true).should == Hello.new
- queues[5] << :unblock
- 4.times { result.pop.should be_true }
- queues[2] << :hello
- result.pop.should == Hello.new
- client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
- 7.times { queues.shift << :exit }
- client.greeting(true).should == Hello.new
- end
- it "should shut down when asked" do
- @server.shutdown
- @server_thread.join(2).should be_an_instance_of(Thread)
- end
+ def setup_client(queue = nil)
+ transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
+ protocol = BinaryProtocol.new(transport)
+ client = NonblockingService::Client.new(protocol)
+ transport.open
+ @clients << [client, transport]
+ client
+ end
- it "should continue processing active messages when shutting down" do
- result = Queue.new
- client = setup_client_thread(result)
- client << :sleep
- sleep 0.1 # give the server time to start processing the client's message
- @server.shutdown
- @server_thread.join(2).should be_an_instance_of(Thread)
- result.pop.should == :slept
- end
+ def setup_client_thread(result)
+ queue = Queue.new
+ Thread.new do
+ begin
+ client = setup_client
+ while (msg = queue.pop)
+ case msg
+ when :block
+ result << client.block
+ when :unblock
+ client.unblock
+ when :hello
+ result << client.greeting(true) # ignore result
+ when :sleep
+ client.sleep(0.5)
+ result << :slept
+ when :shutdown
+ client.shutdown
+ when :exit
+ result << :done
+ break
+ end
+ end
+ @clients.each { |c,t| t.close and break if c == client } #close the transport
+ rescue => e
+ raise e unless @catch_exceptions
+ end
+ end
+ queue
+ end
- it "should kill active messages when they don't expire while shutting down" do
- result = Queue.new
- client = setup_client_thread(result)
- client << :block
- sleep 0.1 # start processing the client's message
- @server.shutdown(1, true)
- @server_thread.join(3).should_not be_nil
- end
+ it "should handle basic message passing" do
+ client = setup_client
+ client.greeting(true).should == Hello.new
+ client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+ end
- it "should allow shutting down in response to a message" do
- client = setup_client
- client.greeting(true).should == Hello.new
- client.shutdown
- @server_thread.join(2).should_not be_nil
+ it "should handle concurrent clients" do
+ queue = Queue.new
+ trans_queue = Queue.new
+ 4.times { Thread.new { queue.push setup_client(trans_queue).block } }
+ 4.times { trans_queue.pop }
+ setup_client.unblock
+ 4.times { queue.pop.should be_true }
+ end
+
+ it "should handle messages from more than 5 long-lived connections" do
+ queues = []
+ result = Queue.new
+ 7.times do |i|
+ queues << setup_client_thread(result)
+ Thread.pass if i == 4 # give the server time to accept connections
+ end
+ client = setup_client
+ # block 4 connections
+ 4.times { |i| queues[i] << :block }
+ queues[4] << :hello
+ queues[5] << :hello
+ queues[6] << :hello
+ 3.times { result.pop.should == Hello.new }
+ client.greeting(true).should == Hello.new
+ queues[5] << :unblock
+ 4.times { result.pop.should be_true }
+ queues[2] << :hello
+ result.pop.should == Hello.new
+ client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+ 7.times { queues.shift << :exit }
+ client.greeting(true).should == Hello.new
+ end
+
+ it "should shut down when asked" do
+ # connect first to ensure it's running
+ client = setup_client
+ client.greeting(false) # force a message pass
+ @server.shutdown
+ @server_thread.join(2).should be_an_instance_of(Thread)
+ end
+
+ it "should continue processing active messages when shutting down" do
+ result = Queue.new
+ client = setup_client_thread(result)
+ client << :sleep
+ sleep 0.1 # give the server time to start processing the client's message
+ @server.shutdown
+ @server_thread.join(2).should be_an_instance_of(Thread)
+ result.pop.should == :slept
+ end
+
+ it "should kill active messages when they don't expire while shutting down" do
+ result = Queue.new
+ client = setup_client_thread(result)
+ client << :block
+ sleep 0.1 # start processing the client's message
+ @server.shutdown(1)
+ @catch_exceptions = true
+ @server_thread.join(3).should_not be_nil
+ end
+
+ it "should allow shutting down in response to a message" do
+ client = setup_client
+ client.greeting(true).should == Hello.new
+ client.shutdown
+ @server_thread.join(2).should_not be_nil
+ end
end
end