blob: 67543f33000b48b6d983fc43b0a53bc909562537 [file] [log] [blame]
Kevin Clarkca8a1b32008-06-18 01:17:06 +00001require 'rubygems'
2$:.unshift File.dirname(__FILE__) + '/../lib'
3require 'thrift'
4require 'thrift/server/nonblockingserver'
Kevin Clark2ddd8ed2008-06-18 01:18:35 +00005require 'thrift/transport/unixsocket'
Kevin Clarkca8a1b32008-06-18 01:17:06 +00006$:.unshift File.dirname(__FILE__) + "/gen-rb"
7require 'BenchmarkService'
8require 'thread'
9require 'stringio'
10HOST = 'localhost'
11PORT = 42587
12
13Thread.abort_on_exception = true
14
15###############
16## Server
17###############
18
19module Server
20 include Thrift
21
22 class BenchmarkHandler
23 # 1-based index into the fibonacci sequence
24 def fibonacci(n)
25 seq = [1, 1]
26 3.upto(n) do
27 seq << seq[-1] + seq[-2]
28 end
29 seq[n-1] # n is 1-based
30 end
31 end
32
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000033 def self.start_server(serverClass, trans = nil)
Kevin Clarkca8a1b32008-06-18 01:17:06 +000034 handler = BenchmarkHandler.new
35 processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000036 transport = trans || ServerSocket.new(HOST, PORT)
Kevin Clarkca8a1b32008-06-18 01:17:06 +000037 transportFactory = FramedTransportFactory.new
38 args = [processor, transport, transportFactory, nil, 20]
39 if serverClass == NonblockingServer
40 logger = Logger.new(STDERR)
41 logger.level = Logger::WARN
42 args << logger
43 end
44 server = serverClass.new(*args)
45 @server_thread = Thread.new do
46 server.serve
47 end
48 @server = server
49 end
50
51 def self.shutdown
52 if @server.respond_to? :shutdown
53 @server.shutdown
54 else
55 @server_thread.kill
56 end
57 end
58
59 def self.class
60 @server and @server.class
61 end
62end
63
Kevin Clarkca8a1b32008-06-18 01:17:06 +000064class BenchmarkManager
65 def initialize(opts)
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000066 @socket = opts.fetch(:socket) do
67 @host = opts.fetch(:host, 'localhost')
68 @port = opts.fetch(:port)
69 nil
70 end
Kevin Clarkca8a1b32008-06-18 01:17:06 +000071 @num_processes = opts.fetch(:num_processes, 40)
72 @clients_per_process = opts.fetch(:clients_per_process, 10)
73 @calls_per_client = opts.fetch(:calls_per_client, 50)
74 end
75
76 def run
77 @pool = []
78 @benchmark_start = Time.now
79 puts "Spawning benchmark processes..."
80 @num_processes.times do
81 spawn
82 sleep 0.05 # space out spawns
83 end
84 collect_output
85 @benchmark_end = Time.now # we know the procs are done here
86 translate_output
87 analyze_output
88 report_output
89 end
90
91 def spawn
92 rd, wr = IO.pipe
93 pid = fork do
94 STDIN.close
95 rd.close
96 @clients_per_process.times do
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000097 if @socket
98 socket = Thrift::UNIXSocket.new(@socket)
99 else
100 socket = Thrift::Socket.new(@host, @port)
101 end
102 transport = Thrift::FramedTransport.new(socket)
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000103 protocol = Thrift::BinaryProtocol.new(transport)
104 client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
105 begin
106 transport.open
107 rescue
108 Marshal.dump [:connection_failure, Time.now], wr
109 else
110 Marshal.dump [:start, Time.now], wr
111 @calls_per_client.times do
112 Marshal.dump [:call_start, Time.now], wr
113 client.fibonacci(15)
114 Marshal.dump [:call_end, Time.now], wr
115 end
116 transport.close
117 Marshal.dump [:end, Time.now], wr
118 end
119 end
120 end
121 wr.close
122 @pool << rd
123 pid
124 end
125
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000126 def socket_class
127 if @socket
128 Thrift::UNIXSocket
129 else
130 Thrift::Socket
131 end
132 end
133
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000134 def collect_output
135 puts "Collecting output..."
136 # read from @pool until all sockets are closed
137 @buffers = Hash.new { |h,k| h[k] = '' }
138 until @pool.empty?
139 rd, = select(@pool)
140 next if rd.nil?
141 rd.each do |fd|
142 begin
143 @buffers[fd] << fd.read_nonblock(4096)
144 rescue EOFError
145 @pool.delete fd
146 end
147 end
148 end
149 end
150
151 def translate_output
152 puts "Translating output..."
153 @output = []
154 @buffers.each do |fd, buffer|
155 strio = StringIO.new(buffer)
156 logs = []
157 begin
158 loop do
159 logs << Marshal.load(strio)
160 end
161 rescue EOFError
162 @output << logs
163 end
164 end
165 end
166
167 def analyze_output
168 puts "Analyzing output..."
169 call_times = []
170 client_times = []
171 connection_failures = []
172 longest_call = 0
173 longest_client = 0
174 @output.each do |logs|
175 cur_call, cur_client = nil
176 logs.each do |tok, time|
177 case tok
178 when :start
179 cur_client = time
180 when :call_start
181 cur_call = time
182 when :call_end
183 delta = time - cur_call
184 call_times << delta
185 longest_call = delta unless longest_call > delta
186 cur_call = nil
187 when :end
188 delta = time - cur_client
189 client_times << delta
190 longest_client = delta unless longest_client > delta
191 cur_client = nil
192 when :connection_failure
193 connection_failures << time
194 end
195 end
196 end
197 @report = {}
198 @report[:total_calls] = call_times.inject(0.0) { |a,t| a += t }
199 @report[:avg_calls] = @report[:total_calls] / call_times.size
200 @report[:total_clients] = client_times.inject(0.0) { |a,t| a += t }
201 @report[:avg_clients] = @report[:total_clients] / client_times.size
202 @report[:connection_failures] = connection_failures.size
203 @report[:longest_call] = longest_call
204 @report[:longest_client] = longest_client
205 @report[:total_benchmark_time] = @benchmark_end - @benchmark_start
206 @report[:fastthread] = $".include?('fastthread.bundle')
207 end
208
209 def report_output
210 fmt = "%.4f seconds"
211 puts
212 tabulate "%d",
213 [["Server class", "%s"], Server.class],
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000214 [["Socket class", "%s"], socket_class],
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000215 ["Number of processes", @num_processes],
216 ["Clients per process", @clients_per_process],
217 ["Calls per client", @calls_per_client],
218 [["Using fastthread", "%s"], @report[:fastthread] ? "yes" : "no"]
219 puts
220 tabulate fmt,
221 [["Connection failures", "%d"], @report[:connection_failures]],
222 ["Average time per call", @report[:avg_calls]],
223 ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
224 ["Total time for all calls", @report[:total_calls]],
225 ["Real time for benchmarking", @report[:total_benchmark_time]],
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000226 ["Shortest call time", @report[:longest_call]],
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000227 ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
228 end
229
230 def tabulate(fmt, *labels_and_values)
231 labels = labels_and_values.map { |(l,)| Array === l ? l.first : l }
232 label_width = labels.inject(0) { |w,l| l.size > w ? l.size : w }
233 labels_and_values.each do |(l,v)|
234 f = fmt
235 l, f = l if Array === l
236 puts "%-#{label_width+1}s #{f}" % [l+":", v]
237 end
238 end
239end
240
241def resolve_const(const)
242 const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
243end
244
245puts "Starting server..."
246serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000247servertrans = nil
248if ENV['THRIFT_SOCKET']
249 servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET'])
250end
251Server.start_server(serverklass, servertrans)
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000252
253sleep 0.2 # give the server time to start
254
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000255args = { :num_processes => 40, :clients_per_process => 5 }
256if ENV['THRIFT_SOCKET']
257 args[:socket] = ENV['THRIFT_SOCKET']
258else
259 args.merge!(:host => HOST, :port => PORT)
260end
261BenchmarkManager.new(args).run
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000262
263Server.shutdown