Implement NonblockingServer and add specs
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@668999 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/lib/thrift/server/nonblockingserver.rb b/lib/rb/lib/thrift/server/nonblockingserver.rb
new file mode 100644
index 0000000..54dad04
--- /dev/null
+++ b/lib/rb/lib/thrift/server/nonblockingserver.rb
@@ -0,0 +1,129 @@
+require 'thrift/server'
+
+# thrift/server already imports fastthread/thread
+
+module Thrift
+ # this class expects to always use a FramedTransport for reading messages
+ #--
+ # this isn't very pretty, but we're working around the fact that FramedTransport
+ # and the processors are all written in a synchronous manner.
+ # So lets read data off the wire ourselves, check if we have a full frame, and
+ # only then hand it to the transport to parse
+ #
+ # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
+ class NonblockingServer < ThreadPoolServer
+ def serve
+ @server_thread = Thread.current
+ @serverTransport.listen
+
+ begin
+ connections = {}
+ running_connections = {}
+ # the swapping_connections stuff is to ensure the thread doesn't
+ # put the connection back into the regular list, then have the server
+ # thread process it again, then have the first thread remove it from
+ # the running_connections list
+ swapping_connections = {}
+ thread_group = ThreadGroup.new
+ loop do
+ break if @shutdown
+ rd, = select([@serverTransport.handle, *connections.keys])
+ next if rd.nil?
+ rd.each do |socket|
+ if socket == @serverTransport.handle
+ client = @serverTransport.accept
+ buffer = ''
+ outtrans = @transportFactory.get_transport(client)
+ outprot = @protocolFactory.get_protocol(outtrans)
+ connections[client.handle] = [client, buffer, outtrans, outprot]
+ else
+ client, buffer, outtrans, outprot = connections[socket]
+ if socket.eof?
+ client.close
+ connections.delete(socket)
+ else
+ buffer << client.read(4096, true)
+ if has_full_frame?(buffer)
+ running_connections[socket] = connections.delete(socket)
+ @thread_q.push :token
+ t = Thread.new(Thread.current) do |master|
+ begin
+ membuf = MemoryBuffer.new(buffer)
+ intrans = @transportFactory.get_transport(membuf)
+ inprot = @protocolFactory.get_protocol(intrans)
+ @processor.process(inprot, outprot)
+ if @shutdown
+ client.close
+ running_connections.delete(socket)
+ else
+ swapping_connections[socket] = running_connections.delete(socket)
+ master.wakeup
+ end
+ rescue => e
+ outtrans.close
+ @exception_q.push e
+ ensure
+ running_connections.delete(socket)
+ connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket
+ intrans.close
+ @thread_q.pop
+ end
+ end
+ thread_group.add t
+ end
+ end
+ end
+ end
+ end
+ if @shutdown
+ @serverTransport.close
+ connections.merge! running_connections
+ connections.merge! swapping_connections
+ connections.values.each do |client, buffer, outtrans, outprot|
+ # can't close completely or we'll break active messages
+ # but lets at least stop accepting input
+ client.handle.close_read
+ end
+ start = Time.now.to_f
+ until thread_group.list.empty?
+ if @shutdown_timeout
+ now = Time.now.to_f
+ cur_timeout = @shutdown_timeout - (now - start)
+ break if cur_timeout <= 0
+ thread_group.list.first.join(cur_timeout)
+ else
+ thread_group.list.first.join
+ end
+ end
+ thread_group.list.each { |t| t.kill } if @shutdown_kill
+ # now kill connections completely if they still exists
+ connections.values.each do |client, buffer, outtrans, outprot|
+ client.close
+ end
+ end
+ ensure
+ @serverTransport.close
+ end
+ end
+
+ # Stop accepting new messages and wait for active messages to finish
+ # If the given timeout passes without the active messages finishing,
+ # control will exit from #serve and leave the remaining threads active.
+ # If you pass true for kill, the remaining threads will be reaped instead.
+ # A false timeout means wait indefinitely
+ def shutdown(timeout = nil, kill = false)
+ @shutdown_timeout = timeout
+ @shutdown_kill = kill
+ @shutdown = true
+ @server_thread.wakeup
+ end
+
+ private
+
+ def has_full_frame?(buf)
+ return no unless buf.length >= 4
+ size = buf.unpack('N').first
+ size + 4 <= buf.length
+ end
+ end
+end
diff --git a/lib/rb/spec/ThriftSpec.thrift b/lib/rb/spec/ThriftSpec.thrift
index 101caff..1012e51 100644
--- a/lib/rb/spec/ThriftSpec.thrift
+++ b/lib/rb/spec/ThriftSpec.thrift
@@ -16,3 +16,11 @@
struct BoolStruct {
1: bool yesno = 1
}
+
+service NonblockingService {
+ Hello greeting(1:bool english)
+ bool block()
+ async void unblock()
+ async void shutdown()
+ void sleep(1:double seconds)
+}
diff --git a/lib/rb/spec/gen-rb/NonblockingService.rb b/lib/rb/spec/gen-rb/NonblockingService.rb
new file mode 100644
index 0000000..46e7c60
--- /dev/null
+++ b/lib/rb/spec/gen-rb/NonblockingService.rb
@@ -0,0 +1,192 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+
+require 'thrift/protocol'
+require 'thrift'
+require 'ThriftSpec_types'
+
+ module SpecNamespace
+ module NonblockingService
+ class Client
+ include Thrift::Client
+
+ def greeting(english)
+ send_greeting(english)
+ return recv_greeting()
+ end
+
+ def send_greeting(english)
+ send_message('greeting', Greeting_args, :english => english)
+ end
+
+ def recv_greeting()
+ result = receive_message(Greeting_result)
+ return result.success unless result.success.nil?
+ raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'greeting failed: unknown result')
+ end
+
+ def block()
+ send_block()
+ return recv_block()
+ end
+
+ def send_block()
+ send_message('block', Block_args)
+ end
+
+ def recv_block()
+ result = receive_message(Block_result)
+ return result.success unless result.success.nil?
+ raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'block failed: unknown result')
+ end
+
+ def unblock()
+ send_unblock()
+ end
+
+ def send_unblock()
+ send_message('unblock', Unblock_args)
+ end
+ def shutdown()
+ send_shutdown()
+ end
+
+ def send_shutdown()
+ send_message('shutdown', Shutdown_args)
+ end
+ def sleep(seconds)
+ send_sleep(seconds)
+ recv_sleep()
+ end
+
+ def send_sleep(seconds)
+ send_message('sleep', Sleep_args, :seconds => seconds)
+ end
+
+ def recv_sleep()
+ result = receive_message(Sleep_result)
+ return
+ end
+
+ end
+
+ class Processor
+ include Thrift::Processor
+
+ def process_greeting(seqid, iprot, oprot)
+ args = read_args(iprot, Greeting_args)
+ result = Greeting_result.new()
+ result.success = @handler.greeting(args.english)
+ write_result(result, oprot, 'greeting', seqid)
+ end
+
+ def process_block(seqid, iprot, oprot)
+ args = read_args(iprot, Block_args)
+ result = Block_result.new()
+ result.success = @handler.block()
+ write_result(result, oprot, 'block', seqid)
+ end
+
+ def process_unblock(seqid, iprot, oprot)
+ args = read_args(iprot, Unblock_args)
+ @handler.unblock()
+ return
+ end
+
+ def process_shutdown(seqid, iprot, oprot)
+ args = read_args(iprot, Shutdown_args)
+ @handler.shutdown()
+ return
+ end
+
+ def process_sleep(seqid, iprot, oprot)
+ args = read_args(iprot, Sleep_args)
+ result = Sleep_result.new()
+ @handler.sleep(args.seconds)
+ write_result(result, oprot, 'sleep', seqid)
+ end
+
+ end
+
+ # HELPER FUNCTIONS AND STRUCTURES
+
+ class Greeting_args
+ include Thrift::Struct
+ attr_accessor :english
+ FIELDS = {
+ 1 => {:type => Thrift::Types::BOOL, :name => 'english'}
+ }
+ end
+
+ class Greeting_result
+ include Thrift::Struct
+ attr_accessor :success
+ FIELDS = {
+ 0 => {:type => Thrift::Types::STRUCT, :name => 'success', :class => Hello}
+ }
+ end
+
+ class Block_args
+ include Thrift::Struct
+ FIELDS = {
+
+ }
+ end
+
+ class Block_result
+ include Thrift::Struct
+ attr_accessor :success
+ FIELDS = {
+ 0 => {:type => Thrift::Types::BOOL, :name => 'success'}
+ }
+ end
+
+ class Unblock_args
+ include Thrift::Struct
+ FIELDS = {
+
+ }
+ end
+
+ class Unblock_result
+ include Thrift::Struct
+ FIELDS = {
+
+ }
+ end
+
+ class Shutdown_args
+ include Thrift::Struct
+ FIELDS = {
+
+ }
+ end
+
+ class Shutdown_result
+ include Thrift::Struct
+ FIELDS = {
+
+ }
+ end
+
+ class Sleep_args
+ include Thrift::Struct
+ attr_accessor :seconds
+ FIELDS = {
+ 1 => {:type => Thrift::Types::DOUBLE, :name => 'seconds'}
+ }
+ end
+
+ class Sleep_result
+ include Thrift::Struct
+ FIELDS = {
+
+ }
+ end
+
+ end
+
+ end
diff --git a/lib/rb/spec/nonblockingserver_spec.rb b/lib/rb/spec/nonblockingserver_spec.rb
new file mode 100644
index 0000000..5850413
--- /dev/null
+++ b/lib/rb/spec/nonblockingserver_spec.rb
@@ -0,0 +1,166 @@
+require File.dirname(__FILE__) + '/spec_helper'
+require 'thrift/server/nonblockingserver'
+$:.unshift File.dirname(__FILE__) + '/gen-rb'
+require 'NonblockingService'
+
+class ThriftNonblockingServerSpec < Spec::ExampleGroup
+ include Thrift
+ include SpecNamespace
+
+ class Handler
+ def initialize
+ @queue = Queue.new
+ end
+
+ attr_accessor :server
+
+ def greeting(english)
+ if english
+ SpecNamespace::Hello.new
+ else
+ SpecNamespace::Hello.new(:greeting => "Aloha!")
+ end
+ end
+
+ def block
+ @queue.pop
+ end
+
+ def unblock
+ @queue.num_waiting.times { @queue.push true }
+ end
+
+ def sleep(time)
+ Kernel.sleep time
+ end
+
+ def shutdown
+ @server.shutdown
+ end
+ end
+
+ before(:each) do
+ @port = 43251
+ handler = Handler.new
+ processor = NonblockingService::Processor.new(handler)
+ @transport = ServerSocket.new('localhost', @port)
+ transportFactory = FramedTransportFactory.new
+ @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5)
+ handler.server = @server
+ @server_thread = Thread.new do
+ begin
+ @server.serve
+ rescue => e
+ p e
+ puts e.backtrace * "\n"
+ raise e
+ end
+ end
+ Thread.pass
+
+ @clients = []
+ end
+
+ after(:each) do
+ @clients.each { |client, trans| trans.close }
+ @server_thread.kill
+ @transport.close
+ end
+
+ def setup_client
+ transport = FramedTransport.new(Socket.new('localhost', @port))
+ protocol = BinaryProtocol.new(transport)
+ client = NonblockingService::Client.new(protocol)
+ transport.open
+ @clients << [client, transport]
+ client
+ end
+
+ def setup_client_thread(result)
+ queue = Queue.new
+ Thread.new do
+ client = setup_client
+ while (msg = queue.pop)
+ case msg
+ when :block
+ result << client.block
+ when :unblock
+ client.unblock
+ when :hello
+ result << client.greeting(true) # ignore result
+ when :sleep
+ client.sleep(0.5)
+ result << :slept
+ when :shutdown
+ client.shutdown
+ when :exit
+ result << :done
+ break
+ end
+ end
+ @clients.each { |c,t| t.close and break if c == client } #close the transport
+ end
+ queue
+ end
+
+ it "should handle basic message passing" do
+ client = setup_client
+ client.greeting(true).should == Hello.new
+ client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+ end
+
+ it "should handle concurrent clients" do
+ queue = Queue.new
+ 4.times { Thread.new { queue.push setup_client.block } }
+ setup_client.unblock
+ 4.times { queue.pop.should be_true }
+ end
+
+ it "should handle messages from more than 5 long-lived connections" do
+ queues = []
+ result = Queue.new
+ 7.times do |i|
+ queues << setup_client_thread(result)
+ Thread.pass if i == 4 # give the server time to accept connections
+ end
+ client = setup_client
+ # block 4 connections
+ 4.times { |i| queues[i] << :block }
+ queues[4] << :hello
+ queues[5] << :hello
+ queues[6] << :hello
+ 3.times { result.pop.should == Hello.new }
+ client.greeting(true).should == Hello.new
+ queues[5] << :unblock
+ 4.times { result.pop.should be_true }
+ queues[2] << :hello
+ result.pop.should == Hello.new
+ client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+ 7.times { queues.shift << :exit }
+ client.greeting(true).should == Hello.new
+ end
+
+ it "should shut down when asked" do
+ @server.shutdown
+ @server_thread.join(2).should be_an_instance_of(Thread)
+ end
+
+ it "should continue processing active messages when shutting down" do
+ result = Queue.new
+ client = setup_client_thread(result)
+ client << :sleep
+ sleep 0.1 # give the server time to start processing the client's message
+ @server.shutdown
+ @server_thread.join(2).should be_an_instance_of(Thread)
+ result.pop.should == :slept
+ end
+
+ it "should kill active messages when they don't expire while shutting down" do
+ result = Queue.new
+ client = setup_client_thread(result)
+ client << :block
+ sleep 0.1 # start processing the client's message
+ @server.shutdown(1, true)
+ @server_thread.join(3).should_not be_nil
+ end
+end