blob: 11a9f049cc206a2cf1c899b488fea2d0cefc1400 [file] [log] [blame]
Kevin Clarkca8a1b32008-06-18 01:17:06 +00001require 'rubygems'
2$:.unshift File.dirname(__FILE__) + '/../lib'
3require 'thrift'
4require 'thrift/server/nonblockingserver'
Kevin Clark2ddd8ed2008-06-18 01:18:35 +00005require 'thrift/transport/unixsocket'
Kevin Clarkca8a1b32008-06-18 01:17:06 +00006$:.unshift File.dirname(__FILE__) + "/gen-rb"
7require 'BenchmarkService'
8require 'thread'
9require 'stringio'
10HOST = 'localhost'
11PORT = 42587
12
13Thread.abort_on_exception = true
14
15###############
16## Server
17###############
18
19module Server
20 include Thrift
21
22 class BenchmarkHandler
23 # 1-based index into the fibonacci sequence
24 def fibonacci(n)
25 seq = [1, 1]
26 3.upto(n) do
27 seq << seq[-1] + seq[-2]
28 end
29 seq[n-1] # n is 1-based
30 end
31 end
32
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000033 def self.start_server(serverClass, trans = nil)
Kevin Clark2ddd9d72008-06-18 01:18:47 +000034 return if serverClass == Object
Kevin Clarkca8a1b32008-06-18 01:17:06 +000035 handler = BenchmarkHandler.new
36 processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000037 transport = trans || ServerSocket.new(HOST, PORT)
Kevin Clarkca8a1b32008-06-18 01:17:06 +000038 transportFactory = FramedTransportFactory.new
39 args = [processor, transport, transportFactory, nil, 20]
40 if serverClass == NonblockingServer
41 logger = Logger.new(STDERR)
42 logger.level = Logger::WARN
43 args << logger
44 end
45 server = serverClass.new(*args)
46 @server_thread = Thread.new do
47 server.serve
48 end
49 @server = server
50 end
51
52 def self.shutdown
Kevin Clark2ddd9d72008-06-18 01:18:47 +000053 return if @server.nil?
Kevin Clarkca8a1b32008-06-18 01:17:06 +000054 if @server.respond_to? :shutdown
55 @server.shutdown
56 else
57 @server_thread.kill
58 end
59 end
60
61 def self.class
62 @server and @server.class
63 end
64end
65
Kevin Clarkca8a1b32008-06-18 01:17:06 +000066class BenchmarkManager
67 def initialize(opts)
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000068 @socket = opts.fetch(:socket) do
69 @host = opts.fetch(:host, 'localhost')
70 @port = opts.fetch(:port)
71 nil
72 end
Kevin Clarkca8a1b32008-06-18 01:17:06 +000073 @num_processes = opts.fetch(:num_processes, 40)
74 @clients_per_process = opts.fetch(:clients_per_process, 10)
75 @calls_per_client = opts.fetch(:calls_per_client, 50)
76 end
77
78 def run
79 @pool = []
80 @benchmark_start = Time.now
81 puts "Spawning benchmark processes..."
82 @num_processes.times do
83 spawn
84 sleep 0.05 # space out spawns
85 end
86 collect_output
87 @benchmark_end = Time.now # we know the procs are done here
88 translate_output
89 analyze_output
90 report_output
91 end
92
93 def spawn
94 rd, wr = IO.pipe
95 pid = fork do
96 STDIN.close
97 rd.close
98 @clients_per_process.times do
Kevin Clark2ddd8ed2008-06-18 01:18:35 +000099 if @socket
100 socket = Thrift::UNIXSocket.new(@socket)
101 else
102 socket = Thrift::Socket.new(@host, @port)
103 end
104 transport = Thrift::FramedTransport.new(socket)
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000105 protocol = Thrift::BinaryProtocol.new(transport)
106 client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
107 begin
108 transport.open
109 rescue
110 Marshal.dump [:connection_failure, Time.now], wr
111 else
112 Marshal.dump [:start, Time.now], wr
113 @calls_per_client.times do
114 Marshal.dump [:call_start, Time.now], wr
115 client.fibonacci(15)
116 Marshal.dump [:call_end, Time.now], wr
117 end
118 transport.close
119 Marshal.dump [:end, Time.now], wr
120 end
121 end
122 end
123 wr.close
124 @pool << rd
125 pid
126 end
127
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000128 def socket_class
129 if @socket
130 Thrift::UNIXSocket
131 else
132 Thrift::Socket
133 end
134 end
135
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000136 def collect_output
137 puts "Collecting output..."
138 # read from @pool until all sockets are closed
139 @buffers = Hash.new { |h,k| h[k] = '' }
140 until @pool.empty?
141 rd, = select(@pool)
142 next if rd.nil?
143 rd.each do |fd|
144 begin
Kevin Clarkfb5c0eb2008-06-18 01:19:04 +0000145 @buffers[fd] << fd.readpartial(4096)
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000146 rescue EOFError
147 @pool.delete fd
148 end
149 end
150 end
151 end
152
153 def translate_output
154 puts "Translating output..."
155 @output = []
156 @buffers.each do |fd, buffer|
157 strio = StringIO.new(buffer)
158 logs = []
159 begin
160 loop do
161 logs << Marshal.load(strio)
162 end
163 rescue EOFError
164 @output << logs
165 end
166 end
167 end
168
169 def analyze_output
170 puts "Analyzing output..."
171 call_times = []
172 client_times = []
173 connection_failures = []
174 longest_call = 0
175 longest_client = 0
176 @output.each do |logs|
177 cur_call, cur_client = nil
178 logs.each do |tok, time|
179 case tok
180 when :start
181 cur_client = time
182 when :call_start
183 cur_call = time
184 when :call_end
185 delta = time - cur_call
186 call_times << delta
187 longest_call = delta unless longest_call > delta
188 cur_call = nil
189 when :end
190 delta = time - cur_client
191 client_times << delta
192 longest_client = delta unless longest_client > delta
193 cur_client = nil
194 when :connection_failure
195 connection_failures << time
196 end
197 end
198 end
199 @report = {}
200 @report[:total_calls] = call_times.inject(0.0) { |a,t| a += t }
201 @report[:avg_calls] = @report[:total_calls] / call_times.size
202 @report[:total_clients] = client_times.inject(0.0) { |a,t| a += t }
203 @report[:avg_clients] = @report[:total_clients] / client_times.size
204 @report[:connection_failures] = connection_failures.size
205 @report[:longest_call] = longest_call
206 @report[:longest_client] = longest_client
207 @report[:total_benchmark_time] = @benchmark_end - @benchmark_start
208 @report[:fastthread] = $".include?('fastthread.bundle')
209 end
210
211 def report_output
212 fmt = "%.4f seconds"
213 puts
214 tabulate "%d",
215 [["Server class", "%s"], Server.class],
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000216 [["Socket class", "%s"], socket_class],
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000217 ["Number of processes", @num_processes],
218 ["Clients per process", @clients_per_process],
219 ["Calls per client", @calls_per_client],
220 [["Using fastthread", "%s"], @report[:fastthread] ? "yes" : "no"]
221 puts
222 tabulate fmt,
223 [["Connection failures", "%d"], @report[:connection_failures]],
224 ["Average time per call", @report[:avg_calls]],
225 ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
226 ["Total time for all calls", @report[:total_calls]],
227 ["Real time for benchmarking", @report[:total_benchmark_time]],
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000228 ["Shortest call time", @report[:longest_call]],
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000229 ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
230 end
231
232 def tabulate(fmt, *labels_and_values)
233 labels = labels_and_values.map { |(l,)| Array === l ? l.first : l }
234 label_width = labels.inject(0) { |w,l| l.size > w ? l.size : w }
235 labels_and_values.each do |(l,v)|
236 f = fmt
237 l, f = l if Array === l
238 puts "%-#{label_width+1}s #{f}" % [l+":", v]
239 end
240 end
241end
242
243def resolve_const(const)
244 const and const.split('::').inject(Object) { |k,c| k.const_get(c) }
245end
246
247puts "Starting server..."
248serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000249servertrans = nil
250if ENV['THRIFT_SOCKET']
251 servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET'])
252end
253Server.start_server(serverklass, servertrans)
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000254
255sleep 0.2 # give the server time to start
256
Kevin Clark2ddd8ed2008-06-18 01:18:35 +0000257args = { :num_processes => 40, :clients_per_process => 5 }
258if ENV['THRIFT_SOCKET']
259 args[:socket] = ENV['THRIFT_SOCKET']
260else
261 args.merge!(:host => HOST, :port => PORT)
262end
263BenchmarkManager.new(args).run
Kevin Clarkca8a1b32008-06-18 01:17:06 +0000264
265Server.shutdown