rb: split up benchmark into separate server/client files and distinct interpreters


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669025 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/benchmark/benchmark.rb b/lib/rb/benchmark/benchmark.rb
new file mode 100644
index 0000000..5e6dd40
--- /dev/null
+++ b/lib/rb/benchmark/benchmark.rb
@@ -0,0 +1,213 @@
+require 'rubygems'
+$:.unshift File.dirname(__FILE__) + '/../lib'
+require 'thrift'
+require 'thrift/server/nonblockingserver'
+require 'thrift/transport/unixsocket'
+require 'stringio'
+
+HOST = 'localhost'
+PORT = 42587
+
+###############
+## Server
+###############
+
+class Server
+  attr_accessor :serverclass
+  attr_accessor :interpreter
+  attr_accessor :host
+  attr_accessor :port
+
+  def initialize(opts)
+    @serverclass = opts.fetch(:class, Thrift::NonblockingServer)
+    @interpreter = opts.fetch(:interpreter, "ruby")
+    @host = opts.fetch(:host, ::HOST)
+    @port = opts.fetch(:port, ::PORT)
+  end
+
+  def start
+    return if @class == Object
+    @pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/server.rb #{@host} #{@port} #{@serverclass.name}", "r+")
+  end
+
+  def shutdown
+    return unless @pipe
+    Marshal.dump(:shutdown, @pipe)
+    begin
+      @pipe.read(10) # block until the server shuts down
+    rescue EOFError
+    end
+    @pipe.close
+    @pipe = nil
+  end
+end
+
+class BenchmarkManager
+  def initialize(opts, server)
+    @socket = opts.fetch(:socket) do
+      @host = opts.fetch(:host, 'localhost')
+      @port = opts.fetch(:port)
+      nil
+    end
+    @num_processes = opts.fetch(:num_processes, 40)
+    @clients_per_process = opts.fetch(:clients_per_process, 10)
+    @calls_per_client = opts.fetch(:calls_per_client, 50)
+    @interpreter = opts.fetch(:interpreter, "ruby")
+    @server = server
+  end
+
+  def run
+    @pool = []
+    @benchmark_start = Time.now
+    puts "Spawning benchmark processes..."
+    @num_processes.times do
+      spawn
+      sleep 0.05 # space out spawns
+    end
+    collect_output
+    @benchmark_end = Time.now # we know the procs are done here
+    translate_output
+    analyze_output
+    report_output
+  end
+
+  def spawn
+    pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}")
+    @pool << pipe
+  end
+
+  def socket_class
+    if @socket
+      Thrift::UNIXSocket
+    else
+      Thrift::Socket
+    end
+  end
+
+  def collect_output
+    puts "Collecting output..."
+    # read from @pool until all sockets are closed
+    @buffers = Hash.new { |h,k| h[k] = '' }
+    until @pool.empty?
+      rd, = select(@pool)
+      next if rd.nil?
+      rd.each do |fd|
+        begin
+          @buffers[fd] << fd.readpartial(4096)
+        rescue EOFError
+          @pool.delete fd
+        end
+      end
+    end
+  end
+
+  def translate_output
+    puts "Translating output..."
+    @output = []
+    @buffers.each do |fd, buffer|
+      strio = StringIO.new(buffer)
+      logs = []
+      begin
+        loop do
+          logs << Marshal.load(strio)
+        end
+      rescue EOFError
+        @output << logs
+      end
+    end
+  end
+
+  def analyze_output
+    puts "Analyzing output..."
+    call_times = []
+    client_times = []
+    connection_failures = []
+    longest_call = 0
+    longest_client = 0
+    @output.each do |logs|
+      cur_call, cur_client = nil
+      logs.each do |tok, time|
+        case tok
+        when :start
+          cur_client = time
+        when :call_start
+          cur_call = time
+        when :call_end
+          delta = time - cur_call
+          call_times << delta
+          longest_call = delta unless longest_call > delta
+          cur_call = nil
+        when :end
+          delta = time - cur_client
+          client_times << delta
+          longest_client = delta unless longest_client > delta
+          cur_client = nil
+        when :connection_failure
+          connection_failures << time
+        end
+      end
+    end
+    @report = {}
+    @report[:total_calls] = call_times.inject(0.0) { |a,t| a += t }
+    @report[:avg_calls] = @report[:total_calls] / call_times.size
+    @report[:total_clients] = client_times.inject(0.0) { |a,t| a += t }
+    @report[:avg_clients] = @report[:total_clients] / client_times.size
+    @report[:connection_failures] = connection_failures.size
+    @report[:longest_call] = longest_call
+    @report[:longest_client] = longest_client
+    @report[:total_benchmark_time] = @benchmark_end - @benchmark_start
+    @report[:fastthread] = $".include?('fastthread.bundle')
+  end
+
+  def report_output
+    fmt = "%.4f seconds"
+    puts
+    tabulate "%d",
+             [["Server class", "%s"], @server.serverclass],
+             [["Server interpreter", "%s"], @server.interpreter],
+             [["Client interpreter", "%s"], @interpreter],
+             [["Socket class", "%s"], socket_class],
+             ["Number of processes", @num_processes],
+             ["Clients per process", @clients_per_process],
+             ["Calls per client", @calls_per_client],
+             [["Using fastthread", "%s"], @report[:fastthread] ? "yes" : "no"]
+    puts
+    tabulate fmt,
+             [["Connection failures", "%d"], @report[:connection_failures]],
+             ["Average time per call", @report[:avg_calls]],
+             ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
+             ["Total time for all calls", @report[:total_calls]],
+             ["Real time for benchmarking", @report[:total_benchmark_time]],
+             ["Shortest call time", @report[:longest_call]],
+             ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
+  end
+
+  def tabulate(fmt, *labels_and_values)
+    labels = labels_and_values.map { |(l,)| Array === l ? l.first : l }
+    label_width = labels.inject(0) { |w,l| l.size > w ? l.size : w }
+    labels_and_values.each do |(l,v)|
+      f = fmt
+      l, f = l if Array === l
+      puts "%-#{label_width+1}s #{f}" % [l+":", v]
+    end
+  end
+end
+
+def resolve_const(const)
+  const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
+end
+
+puts "Starting server..."
+args = {}
+args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
+args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
+server = Server.new(args)
+server.start
+
+sleep 0.2 # give the server time to start
+
+args = { :num_processes => 40, :clients_per_process => 5, :host => HOST, :port => PORT }
+args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
+BenchmarkManager.new(args, server).run
+
+server.shutdown