| require File.dirname(__FILE__) + '/spec_helper' |
| require 'thrift/server/nonblockingserver' |
| require File.dirname(__FILE__) + '/gen-rb/NonblockingService' |
| |
| class ThriftNonblockingServerSpec < Spec::ExampleGroup |
| include Thrift |
| include SpecNamespace |
| |
| class Handler |
| def initialize |
| @queue = Queue.new |
| end |
| |
| attr_accessor :server |
| |
| def greeting(english) |
| if english |
| SpecNamespace::Hello.new |
| else |
| SpecNamespace::Hello.new(:greeting => "Aloha!") |
| end |
| end |
| |
| def block |
| @queue.pop |
| end |
| |
| def unblock(n) |
| n.times { @queue.push true } |
| end |
| |
| def sleep(time) |
| Kernel.sleep time |
| end |
| |
| def shutdown |
| @server.shutdown(0, false) |
| end |
| end |
| |
| class SpecTransport < Transport |
| def initialize(transport, queue) |
| @transport = transport |
| @queue = queue |
| @flushed = false |
| end |
| |
| def open? |
| @transport.open? |
| end |
| |
| def open |
| @transport.open |
| end |
| |
| def close |
| @transport.close |
| end |
| |
| def read(sz) |
| @transport.read(sz) |
| end |
| |
| def write(buf,sz=nil) |
| @transport.write(buf, sz) |
| end |
| |
| def flush |
| @queue.push :flushed unless @flushed or @queue.nil? |
| @flushed = true |
| @transport.flush |
| end |
| end |
| |
| class SpecServerSocket < ServerSocket |
| def initialize(host, port, queue) |
| super(host, port) |
| @queue = queue |
| end |
| |
| def listen |
| super |
| @queue.push :listen |
| end |
| end |
| |
| describe Thrift::NonblockingServer do |
| before(:each) do |
| @port = 43251 |
| handler = Handler.new |
| processor = NonblockingService::Processor.new(handler) |
| queue = Queue.new |
| @transport = SpecServerSocket.new('localhost', @port, queue) |
| transportFactory = FramedTransportFactory.new |
| logger = Logger.new(STDERR) |
| logger.level = Logger::WARN |
| @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger) |
| handler.server = @server |
| @server_thread = Thread.new(Thread.current) do |master_thread| |
| begin |
| @server.serve |
| rescue => e |
| p e |
| puts e.backtrace * "\n" |
| master_thread.raise e |
| end |
| end |
| queue.pop |
| |
| @clients = [] |
| @catch_exceptions = false |
| end |
| |
| after(:each) do |
| @clients.each { |client, trans| trans.close } |
| # @server.shutdown(1) |
| @server_thread.kill |
| @transport.close |
| end |
| |
| def setup_client(queue = nil) |
| transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue) |
| protocol = BinaryProtocol.new(transport) |
| client = NonblockingService::Client.new(protocol) |
| transport.open |
| @clients << [client, transport] |
| client |
| end |
| |
| def setup_client_thread(result) |
| queue = Queue.new |
| Thread.new do |
| begin |
| client = setup_client |
| while (cmd = queue.pop) |
| msg, *args = cmd |
| case msg |
| when :block |
| result << client.block |
| when :unblock |
| client.unblock(args.first) |
| when :hello |
| result << client.greeting(true) # ignore result |
| when :sleep |
| client.sleep(args[0] || 0.5) |
| result << :slept |
| when :shutdown |
| client.shutdown |
| when :exit |
| result << :done |
| break |
| end |
| end |
| @clients.each { |c,t| t.close and break if c == client } #close the transport |
| rescue => e |
| raise e unless @catch_exceptions |
| end |
| end |
| queue |
| end |
| |
| it "should handle basic message passing" do |
| client = setup_client |
| client.greeting(true).should == Hello.new |
| client.greeting(false).should == Hello.new(:greeting => 'Aloha!') |
| end |
| |
| it "should handle concurrent clients" do |
| queue = Queue.new |
| trans_queue = Queue.new |
| 4.times do |
| Thread.new(Thread.current) do |main_thread| |
| begin |
| queue.push setup_client(trans_queue).block |
| rescue => e |
| main_thread.raise e |
| end |
| end |
| end |
| 4.times { trans_queue.pop } |
| setup_client.unblock(4) |
| 4.times { queue.pop.should be_true } |
| end |
| |
| it "should handle messages from more than 5 long-lived connections" do |
| queues = [] |
| result = Queue.new |
| 7.times do |i| |
| queues << setup_client_thread(result) |
| Thread.pass if i == 4 # give the server time to accept connections |
| end |
| client = setup_client |
| # block 4 connections |
| 4.times { |i| queues[i] << :block } |
| queues[4] << :hello |
| queues[5] << :hello |
| queues[6] << :hello |
| 3.times { result.pop.should == Hello.new } |
| client.greeting(true).should == Hello.new |
| queues[5] << [:unblock, 4] |
| 4.times { result.pop.should be_true } |
| queues[2] << :hello |
| result.pop.should == Hello.new |
| client.greeting(false).should == Hello.new(:greeting => 'Aloha!') |
| 7.times { queues.shift << :exit } |
| client.greeting(true).should == Hello.new |
| end |
| |
| it "should shut down when asked" do |
| # connect first to ensure it's running |
| client = setup_client |
| client.greeting(false) # force a message pass |
| @server.shutdown |
| @server_thread.join(2).should be_an_instance_of(Thread) |
| end |
| |
| it "should continue processing active messages when shutting down" do |
| result = Queue.new |
| client = setup_client_thread(result) |
| client << :sleep |
| sleep 0.1 # give the server time to start processing the client's message |
| @server.shutdown |
| @server_thread.join(2).should be_an_instance_of(Thread) |
| result.pop.should == :slept |
| end |
| |
| it "should kill active messages when they don't expire while shutting down" do |
| result = Queue.new |
| client = setup_client_thread(result) |
| client << [:sleep, 10] |
| sleep 0.1 # start processing the client's message |
| @server.shutdown(1) |
| @catch_exceptions = true |
| @server_thread.join(3).should_not be_nil |
| result.should be_empty |
| end |
| |
| it "should allow shutting down in response to a message" do |
| client = setup_client |
| client.greeting(true).should == Hello.new |
| client.shutdown |
| @server_thread.join(2).should_not be_nil |
| end |
| end |
| end |