Implemented header protocol for Ruby client library
diff --git a/lib/rb/benchmark/benchmark.rb b/lib/rb/benchmark/benchmark.rb
index 4a520a5..0111ee8 100644
--- a/lib/rb/benchmark/benchmark.rb
+++ b/lib/rb/benchmark/benchmark.rb
@@ -35,19 +35,21 @@
attr_accessor :interpreter
attr_accessor :host
attr_accessor :port
+ attr_accessor :protocol_type
def initialize(opts)
@serverclass = opts.fetch(:class, Thrift::NonblockingServer)
@interpreter = opts.fetch(:interpreter, "ruby")
@host = opts.fetch(:host, ::HOST)
@port = opts.fetch(:port, ::PORT)
+ @protocol_type = opts.fetch(:protocol_type, 'binary')
@tls = opts.fetch(:tls, false)
end
def start
return if @serverclass == Object
args = (File.basename(@interpreter) == "jruby" ? "-J-server" : "")
- @pipe = IO.popen("#{@interpreter} #{args} #{File.dirname(__FILE__)}/server.rb #{"-tls" if @tls} #{@host} #{@port} #{@serverclass.name}", "r+")
+ @pipe = IO.popen("#{@interpreter} #{args} #{File.dirname(__FILE__)}/server.rb #{"-tls" if @tls} #{@host} #{@port} #{@serverclass.name} #{@protocol_type}", "r+")
Marshal.load(@pipe) # wait until the server has started
sleep 0.4 # give the server time to actually start spawning sockets
end
@@ -77,6 +79,7 @@
@interpreter = opts.fetch(:interpreter, "ruby")
@server = server
@log_exceptions = opts.fetch(:log_exceptions, false)
+ @protocol_type = opts.fetch(:protocol_type, 'binary')
@tls = opts.fetch(:tls, false)
end
@@ -96,7 +99,7 @@
end
def spawn
- pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{"-log-exceptions" if @log_exceptions} #{"-tls" if @tls} #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}")
+ pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{"-log-exceptions" if @log_exceptions} #{"-tls" if @tls} #{@host} #{@port} #{@clients_per_process} #{@calls_per_client} #{@protocol_type}")
@pool << pipe
end
@@ -202,6 +205,7 @@
[["Server class", "%s"], @server.serverclass == Object ? "" : @server.serverclass],
[["Server interpreter", "%s"], @server.interpreter],
[["Client interpreter", "%s"], @interpreter],
+ [["Protocol type", "%s"], @protocol_type],
[["Socket class", "%s"], socket_class],
["Number of processes", @num_processes],
["Clients per process", @clients_per_process],
@@ -255,12 +259,14 @@
end
puts "Starting server..."
+protocol_type = ENV['THRIFT_PROTOCOL'] || 'binary'
args = {}
args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
args[:host] = ENV['THRIFT_HOST'] || HOST
args[:port] = (ENV['THRIFT_PORT'] || PORT).to_i
args[:tls] = ENV['THRIFT_TLS'] == 'true'
+args[:protocol_type] = protocol_type
server = Server.new(args)
server.start
@@ -273,6 +279,7 @@
args[:calls_per_client] = (ENV['THRIFT_NUM_CALLS'] || 50).to_i
args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby"
args[:log_exceptions] = !!ENV['THRIFT_LOG_EXCEPTIONS']
+args[:protocol_type] = protocol_type
BenchmarkManager.new(args, server).run
server.shutdown
diff --git a/lib/rb/benchmark/client.rb b/lib/rb/benchmark/client.rb
index 693bf60..304e6d8 100644
--- a/lib/rb/benchmark/client.rb
+++ b/lib/rb/benchmark/client.rb
@@ -25,13 +25,36 @@
require 'benchmark_service'
class Client
- def initialize(host, port, clients_per_process, calls_per_client, log_exceptions, tls)
+ def initialize(host, port, clients_per_process, calls_per_client, log_exceptions, tls, protocol_type)
@host = host
@port = port
@clients_per_process = clients_per_process
@calls_per_client = calls_per_client
@log_exceptions = log_exceptions
@tls = tls
+ @protocol_type = protocol_type || 'binary'
+ end
+
+ def create_protocol(socket)
+ case @protocol_type
+ when 'binary'
+ transport = Thrift::FramedTransport.new(socket)
+ Thrift::BinaryProtocol.new(transport)
+ when 'compact'
+ transport = Thrift::FramedTransport.new(socket)
+ Thrift::CompactProtocol.new(transport)
+ when 'header'
+ Thrift::HeaderProtocol.new(socket)
+ when 'header-compact'
+ Thrift::HeaderProtocol.new(socket, nil, Thrift::HeaderSubprotocolID::COMPACT)
+ when 'header-zlib'
+ protocol = Thrift::HeaderProtocol.new(socket)
+ protocol.add_transform(Thrift::HeaderTransformID::ZLIB)
+ protocol
+ else
+ transport = Thrift::FramedTransport.new(socket)
+ Thrift::BinaryProtocol.new(transport)
+ end
end
def run
@@ -53,8 +76,8 @@
else
Thrift::Socket.new(@host, @port)
end
- transport = Thrift::FramedTransport.new(socket)
- protocol = Thrift::BinaryProtocol.new(transport)
+ protocol = create_protocol(socket)
+ transport = protocol.trans
client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
begin
start = Time.now
@@ -89,6 +112,6 @@
log_exceptions = true if ARGV[0] == '-log-exceptions' and ARGV.shift
tls = true if ARGV[0] == '-tls' and ARGV.shift
-host, port, clients_per_process, calls_per_client = ARGV
+host, port, clients_per_process, calls_per_client, protocol_type = ARGV
-Client.new(host, port.to_i, clients_per_process.to_i, calls_per_client.to_i, log_exceptions, tls).run
+Client.new(host, port.to_i, clients_per_process.to_i, calls_per_client.to_i, log_exceptions, tls, protocol_type).run
diff --git a/lib/rb/benchmark/server.rb b/lib/rb/benchmark/server.rb
index 153eb0f..6df3fa9 100644
--- a/lib/rb/benchmark/server.rb
+++ b/lib/rb/benchmark/server.rb
@@ -38,7 +38,25 @@
end
end
- def self.start_server(host, port, serverClass, tls)
+ def self.create_factories(protocol_type)
+ case protocol_type
+ when 'binary'
+ [FramedTransportFactory.new, BinaryProtocolFactory.new]
+ when 'compact'
+ [FramedTransportFactory.new, CompactProtocolFactory.new]
+ when 'header'
+ [HeaderTransportFactory.new, HeaderProtocolFactory.new]
+ when 'header-compact'
+ [HeaderTransportFactory.new, HeaderProtocolFactory.new(nil, HeaderSubprotocolID::COMPACT)]
+ when 'header-zlib'
+ # Note: Server doesn't add transforms - it mirrors client's transforms
+ [HeaderTransportFactory.new, HeaderProtocolFactory.new]
+ else
+ [FramedTransportFactory.new, BinaryProtocolFactory.new]
+ end
+ end
+
+ def self.start_server(host, port, serverClass, tls, protocol_type = nil)
handler = BenchmarkHandler.new
processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
transport = if tls
@@ -58,8 +76,8 @@
else
ServerSocket.new(host, port)
end
- transport_factory = FramedTransportFactory.new
- args = [processor, transport, transport_factory, nil, 20]
+ transport_factory, protocol_factory = create_factories(protocol_type || 'binary')
+ args = [processor, transport, transport_factory, protocol_factory, 20]
if serverClass == NonblockingServer
logger = Logger.new(STDERR)
logger.level = Logger::WARN
@@ -88,9 +106,9 @@
tls = true if ARGV[0] == '-tls' and ARGV.shift
-host, port, serverklass = ARGV
+host, port, serverklass, protocol_type = ARGV
-Server.start_server(host, port.to_i, resolve_const(serverklass), tls)
+Server.start_server(host, port.to_i, resolve_const(serverklass), tls, protocol_type)
# let our host know that the interpreter has started
# ideally we'd wait until the server was serving, but we don't have a hook for that