blob: 02f1cec6d3a38faf41c36df52c80057b2335cf5e [file] [log] [blame]
Kevin Clarkca8a1b32008-06-18 01:17:06 +00001require 'rubygems'
2$:.unshift File.dirname(__FILE__) + '/../lib'
3require 'thrift'
4require 'thrift/server/nonblockingserver'
5$:.unshift File.dirname(__FILE__) + "/gen-rb"
6require 'BenchmarkService'
7require 'thread'
8require 'stringio'
9HOST = 'localhost'
10PORT = 42587
11
12Thread.abort_on_exception = true
13
14###############
15## Server
16###############
17
18module Server
19 include Thrift
20
21 class BenchmarkHandler
22 # 1-based index into the fibonacci sequence
23 def fibonacci(n)
24 seq = [1, 1]
25 3.upto(n) do
26 seq << seq[-1] + seq[-2]
27 end
28 seq[n-1] # n is 1-based
29 end
30 end
31
32 def self.start_server(serverClass)
33 handler = BenchmarkHandler.new
34 processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
35 transport = ServerSocket.new(HOST, PORT)
36 transportFactory = FramedTransportFactory.new
37 args = [processor, transport, transportFactory, nil, 20]
38 if serverClass == NonblockingServer
39 logger = Logger.new(STDERR)
40 logger.level = Logger::WARN
41 args << logger
42 end
43 server = serverClass.new(*args)
44 @server_thread = Thread.new do
45 server.serve
46 end
47 @server = server
48 end
49
50 def self.shutdown
51 if @server.respond_to? :shutdown
52 @server.shutdown
53 else
54 @server_thread.kill
55 end
56 end
57
58 def self.class
59 @server and @server.class
60 end
61end
62
63module Client
64 include Thrift
65
66 def self.start_client(&block)
67 transport = FramedTransport.new(Socket.new(HOST, PORT))
68 protocol = BinaryProtocol.new(transport)
69 client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
70 # transport.open
71 Thread.new do
72 block.call(client, transport)
73 end
74 end
75end
76
77class BenchmarkManager
78 def initialize(opts)
79 @host = opts.fetch(:host, 'localhost')
80 @port = opts.fetch(:port)
81 @num_processes = opts.fetch(:num_processes, 40)
82 @clients_per_process = opts.fetch(:clients_per_process, 10)
83 @calls_per_client = opts.fetch(:calls_per_client, 50)
84 end
85
86 def run
87 @pool = []
88 @benchmark_start = Time.now
89 puts "Spawning benchmark processes..."
90 @num_processes.times do
91 spawn
92 sleep 0.05 # space out spawns
93 end
94 collect_output
95 @benchmark_end = Time.now # we know the procs are done here
96 translate_output
97 analyze_output
98 report_output
99 end
100
101 def spawn
102 rd, wr = IO.pipe
103 pid = fork do
104 STDIN.close
105 rd.close
106 @clients_per_process.times do
107 transport = Thrift::FramedTransport.new(Thrift::Socket.new(@host, @port))
108 protocol = Thrift::BinaryProtocol.new(transport)
109 client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
110 begin
111 transport.open
112 rescue
113 Marshal.dump [:connection_failure, Time.now], wr
114 else
115 Marshal.dump [:start, Time.now], wr
116 @calls_per_client.times do
117 Marshal.dump [:call_start, Time.now], wr
118 client.fibonacci(15)
119 Marshal.dump [:call_end, Time.now], wr
120 end
121 transport.close
122 Marshal.dump [:end, Time.now], wr
123 end
124 end
125 end
126 wr.close
127 @pool << rd
128 pid
129 end
130
131 def collect_output
132 puts "Collecting output..."
133 # read from @pool until all sockets are closed
134 @buffers = Hash.new { |h,k| h[k] = '' }
135 until @pool.empty?
136 rd, = select(@pool)
137 next if rd.nil?
138 rd.each do |fd|
139 begin
140 @buffers[fd] << fd.read_nonblock(4096)
141 rescue EOFError
142 @pool.delete fd
143 end
144 end
145 end
146 end
147
148 def translate_output
149 puts "Translating output..."
150 @output = []
151 @buffers.each do |fd, buffer|
152 strio = StringIO.new(buffer)
153 logs = []
154 begin
155 loop do
156 logs << Marshal.load(strio)
157 end
158 rescue EOFError
159 @output << logs
160 end
161 end
162 end
163
164 def analyze_output
165 puts "Analyzing output..."
166 call_times = []
167 client_times = []
168 connection_failures = []
169 longest_call = 0
170 longest_client = 0
171 @output.each do |logs|
172 cur_call, cur_client = nil
173 logs.each do |tok, time|
174 case tok
175 when :start
176 cur_client = time
177 when :call_start
178 cur_call = time
179 when :call_end
180 delta = time - cur_call
181 call_times << delta
182 longest_call = delta unless longest_call > delta
183 cur_call = nil
184 when :end
185 delta = time - cur_client
186 client_times << delta
187 longest_client = delta unless longest_client > delta
188 cur_client = nil
189 when :connection_failure
190 connection_failures << time
191 end
192 end
193 end
194 @report = {}
195 @report[:total_calls] = call_times.inject(0.0) { |a,t| a += t }
196 @report[:avg_calls] = @report[:total_calls] / call_times.size
197 @report[:total_clients] = client_times.inject(0.0) { |a,t| a += t }
198 @report[:avg_clients] = @report[:total_clients] / client_times.size
199 @report[:connection_failures] = connection_failures.size
200 @report[:longest_call] = longest_call
201 @report[:longest_client] = longest_client
202 @report[:total_benchmark_time] = @benchmark_end - @benchmark_start
203 @report[:fastthread] = $".include?('fastthread.bundle')
204 end
205
206 def report_output
207 fmt = "%.4f seconds"
208 puts
209 tabulate "%d",
210 [["Server class", "%s"], Server.class],
211 ["Number of processes", @num_processes],
212 ["Clients per process", @clients_per_process],
213 ["Calls per client", @calls_per_client],
214 [["Using fastthread", "%s"], @report[:fastthread] ? "yes" : "no"]
215 puts
216 tabulate fmt,
217 [["Connection failures", "%d"], @report[:connection_failures]],
218 ["Average time per call", @report[:avg_calls]],
219 ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
220 ["Total time for all calls", @report[:total_calls]],
221 ["Real time for benchmarking", @report[:total_benchmark_time]],
222 ["Longest call time", @report[:longest_call]],
223 ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
224 end
225
226 def tabulate(fmt, *labels_and_values)
227 labels = labels_and_values.map { |(l,)| Array === l ? l.first : l }
228 label_width = labels.inject(0) { |w,l| l.size > w ? l.size : w }
229 labels_and_values.each do |(l,v)|
230 f = fmt
231 l, f = l if Array === l
232 puts "%-#{label_width+1}s #{f}" % [l+":", v]
233 end
234 end
235end
236
237def resolve_const(const)
238 const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
239end
240
241puts "Starting server..."
242serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
243Server.start_server(serverklass)
244
245sleep 0.2 # give the server time to start
246
247BenchmarkManager.new(:host => HOST, :port => PORT, :num_processes => 40, :clients_per_process => 5).run
248
249Server.shutdown