rb: split up benchmark into separate server/client files and distinct interpreters


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669025 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/Rakefile b/lib/rb/Rakefile
index 04aaff7..ba0fe8c 100644
--- a/lib/rb/Rakefile
+++ b/lib/rb/Rakefile
@@ -37,5 +37,5 @@
 
 desc 'Run benchmarking of NonblockingServer'
 task :benchmark do
-  ruby 'benchmark/fairness.rb'
+  ruby 'benchmark/benchmark.rb'
 end
diff --git a/lib/rb/benchmark/fairness.rb b/lib/rb/benchmark/benchmark.rb
similarity index 64%
rename from lib/rb/benchmark/fairness.rb
rename to lib/rb/benchmark/benchmark.rb
index 11a9f04..5e6dd40 100644
--- a/lib/rb/benchmark/fairness.rb
+++ b/lib/rb/benchmark/benchmark.rb
@@ -3,68 +3,47 @@
 require 'thrift'
 require 'thrift/server/nonblockingserver'
 require 'thrift/transport/unixsocket'
-$:.unshift File.dirname(__FILE__) + "/gen-rb"
-require 'BenchmarkService'
-require 'thread'
 require 'stringio'
+
 HOST = 'localhost'
 PORT = 42587
 
-Thread.abort_on_exception = true
-
 ###############
 ## Server
 ###############
 
-module Server
-  include Thrift
+class Server
+  attr_accessor :serverclass
+  attr_accessor :interpreter
+  attr_accessor :host
+  attr_accessor :port
 
-  class BenchmarkHandler
-    # 1-based index into the fibonacci sequence
-    def fibonacci(n)
-      seq = [1, 1]
-      3.upto(n) do
-        seq << seq[-1] + seq[-2]
-      end
-      seq[n-1] # n is 1-based
-    end
+  def initialize(opts)
+    @serverclass = opts.fetch(:class, Thrift::NonblockingServer)
+    @interpreter = opts.fetch(:interpreter, "ruby")
+    @host = opts.fetch(:host, ::HOST)
+    @port = opts.fetch(:port, ::PORT)
   end
 
-  def self.start_server(serverClass, trans = nil)
-    return if serverClass == Object
-    handler = BenchmarkHandler.new
-    processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
-    transport = trans || ServerSocket.new(HOST, PORT)
-    transportFactory = FramedTransportFactory.new
-    args = [processor, transport, transportFactory, nil, 20]
-    if serverClass == NonblockingServer
-      logger = Logger.new(STDERR)
-      logger.level = Logger::WARN
-      args << logger
-    end
-    server = serverClass.new(*args)
-    @server_thread = Thread.new do
-      server.serve
-    end
-    @server = server
+  def start
+    return if @class == Object
+    @pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/server.rb #{@host} #{@port} #{@serverclass.name}", "r+")
   end
 
-  def self.shutdown
-    return if @server.nil?
-    if @server.respond_to? :shutdown
-      @server.shutdown
-    else
-      @server_thread.kill
+  def shutdown
+    return unless @pipe
+    Marshal.dump(:shutdown, @pipe)
+    begin
+      @pipe.read(10) # block until the server shuts down
+    rescue EOFError
     end
-  end
-
-  def self.class
-    @server and @server.class
+    @pipe.close
+    @pipe = nil
   end
 end
 
 class BenchmarkManager
-  def initialize(opts)
+  def initialize(opts, server)
     @socket = opts.fetch(:socket) do
       @host = opts.fetch(:host, 'localhost')
       @port = opts.fetch(:port)
@@ -73,6 +52,8 @@
     @num_processes = opts.fetch(:num_processes, 40)
     @clients_per_process = opts.fetch(:clients_per_process, 10)
     @calls_per_client = opts.fetch(:calls_per_client, 50)
+    @interpreter = opts.fetch(:interpreter, "ruby")
+    @server = server
   end
 
   def run
@@ -91,38 +72,8 @@
   end
 
   def spawn
-    rd, wr = IO.pipe
-    pid = fork do
-      STDIN.close
-      rd.close
-      @clients_per_process.times do
-        if @socket
-          socket = Thrift::UNIXSocket.new(@socket)
-        else
-          socket = Thrift::Socket.new(@host, @port)
-        end
-        transport = Thrift::FramedTransport.new(socket)
-        protocol = Thrift::BinaryProtocol.new(transport)
-        client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
-        begin
-          transport.open
-        rescue
-          Marshal.dump [:connection_failure, Time.now], wr
-        else
-          Marshal.dump [:start, Time.now], wr
-          @calls_per_client.times do
-            Marshal.dump [:call_start, Time.now], wr
-            client.fibonacci(15)
-            Marshal.dump [:call_end, Time.now], wr
-          end
-          transport.close
-          Marshal.dump [:end, Time.now], wr
-        end
-      end
-    end
-    wr.close
-    @pool << rd
-    pid
+    pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}")
+    @pool << pipe
   end
 
   def socket_class
@@ -212,7 +163,9 @@
     fmt = "%.4f seconds"
     puts
     tabulate "%d",
-             [["Server class", "%s"], Server.class],
+             [["Server class", "%s"], @server.serverclass],
+             [["Server interpreter", "%s"], @server.interpreter],
+             [["Client interpreter", "%s"], @interpreter],
              [["Socket class", "%s"], socket_class],
              ["Number of processes", @num_processes],
              ["Clients per process", @clients_per_process],
@@ -245,21 +198,16 @@
 end
 
 puts "Starting server..."
-serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
-servertrans = nil
-if ENV['THRIFT_SOCKET']
-  servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET'])
-end
-Server.start_server(serverklass, servertrans)
+args = {}
+args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
+args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
+server = Server.new(args)
+server.start
 
 sleep 0.2 # give the server time to start
 
-args = { :num_processes => 40, :clients_per_process => 5 }
-if ENV['THRIFT_SOCKET']
-  args[:socket] = ENV['THRIFT_SOCKET']
-else
-  args.merge!(:host => HOST, :port => PORT)
-end
-BenchmarkManager.new(args).run
+args = { :num_processes => 40, :clients_per_process => 5, :host => HOST, :port => PORT }
+args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
+BenchmarkManager.new(args, server).run
 
-Server.shutdown
+server.shutdown
diff --git a/lib/rb/benchmark/client.rb b/lib/rb/benchmark/client.rb
new file mode 100644
index 0000000..e8d3351
--- /dev/null
+++ b/lib/rb/benchmark/client.rb
@@ -0,0 +1,41 @@
+$:.unshift File.dirname(__FILE__) + '/../lib'
+require 'thrift'
+require 'thrift/server/nonblockingserver'
+$:.unshift File.dirname(__FILE__) + "/gen-rb"
+require 'BenchmarkService'
+
+class Client
+  def initialize(host, port, clients_per_process, calls_per_client)
+    @host = host
+    @port = port
+    @clients_per_process = clients_per_process
+    @calls_per_client = calls_per_client
+  end
+
+  def run
+    @clients_per_process.times do
+      socket = Thrift::Socket.new(@host, @port)
+      transport = Thrift::FramedTransport.new(socket)
+      protocol = Thrift::BinaryProtocol.new(transport)
+      client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
+      begin
+        transport.open
+      rescue
+        Marshal.dump [:connection_failure, Time.now], STDOUT
+      else
+        Marshal.dump [:start, Time.now], STDOUT
+        @calls_per_client.times do
+          Marshal.dump [:call_start, Time.now], STDOUT
+          client.fibonacci(15)
+          Marshal.dump [:call_end, Time.now], STDOUT
+        end
+        transport.close
+        Marshal.dump [:end, Time.now], STDOUT
+      end
+    end
+  end
+end
+
+host, port, clients_per_process, calls_per_client = ARGV
+
+Client.new(host, port.to_i, clients_per_process.to_i, calls_per_client.to_i).run
diff --git a/lib/rb/benchmark/server.rb b/lib/rb/benchmark/server.rb
new file mode 100644
index 0000000..400c90d
--- /dev/null
+++ b/lib/rb/benchmark/server.rb
@@ -0,0 +1,59 @@
+$:.unshift File.dirname(__FILE__) + '/../lib'
+require 'thrift'
+require 'thrift/server/nonblockingserver'
+$:.unshift File.dirname(__FILE__) + "/gen-rb"
+require 'BenchmarkService'
+
+module Server
+  include Thrift
+
+  class BenchmarkHandler
+    # 1-based index into the fibonacci sequence
+    def fibonacci(n)
+      seq = [1, 1]
+      3.upto(n) do
+        seq << seq[-1] + seq[-2]
+      end
+      seq[n-1] # n is 1-based
+    end
+  end
+
+  def self.start_server(host, port, serverClass)
+    handler = BenchmarkHandler.new
+    processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
+    transport = ServerSocket.new(host, port)
+    transportFactory = FramedTransportFactory.new
+    args = [processor, transport, transportFactory, nil, 20]
+    if serverClass == NonblockingServer
+      logger = Logger.new(STDERR)
+      logger.level = Logger::WARN
+      args << logger
+    end
+    server = serverClass.new(*args)
+    @server_thread = Thread.new do
+      server.serve
+    end
+    @server = server
+  end
+
+  def self.shutdown
+    return if @server.nil?
+    if @server.respond_to? :shutdown
+      @server.shutdown
+    else
+      @server_thread.kill
+    end
+  end
+end
+
+def resolve_const(const)
+  const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
+end
+
+host, port, serverklass = ARGV
+
+Server.start_server(host, port.to_i, resolve_const(serverklass))
+
+Marshal.load(STDIN)
+
+Server.shutdown