blob: f2e0f20730d7c17aa7da4a4d3515753e2962cbca [file] [log] [blame]
Jens Geyerf4598682014-05-08 23:18:44 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package main
21
22import (
John Boiles57852792018-01-05 14:37:05 -080023 "context"
Jens Geyerf4598682014-05-08 23:18:44 +020024 "flag"
25 "fmt"
26 "gen/stress"
27 "log"
28 _ "net/http/pprof"
29 "os"
30 "runtime"
31 "runtime/pprof"
32 "sync"
33 "sync/atomic"
34 "thrift"
35 "time"
36)
37
38var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to this file")
39var memprofile = flag.String("memprofile", "", "write memory profile to this file")
40
41var (
42 host = flag.String("host", "localhost", "Host to connect")
43 port = flag.Int64("port", 9091, "Port number to connect")
44 loop = flag.Int("loops", 50000, "The number of remote thrift calls each client makes")
45 runserver = flag.Int("server", 1, "Run the Thrift server in this process")
46 clients = flag.Int("clients", 20, "Number of client threads to create - 0 implies no clients, i.e. server only")
47 callName = flag.String("call", "echoVoid", "Service method to call, one of echoVoid, echoByte, echoI32, echoI64, echoString, echiList, echoSet, echoMap")
48 compact = flag.Bool("compact", false, "Use compact protocol instead of binary.")
49 framed = flag.Bool("framed", false, "Use framed transport instead of buffered.")
50)
51var hostPort string
52
53type callT int64
54
55const (
56 echoVoid callT = iota
57 echoByte
58 echoI32
59 echoI64
60 echoString
61 echiList
62 echoSet
63 echoMap
64)
65
66var callTMap = map[string]callT{
67 "echoVoid": echoVoid,
68 "echoByte": echoByte,
69 "echoI32": echoI32,
70 "echoI64": echoI64,
71 "echoString": echoString,
72 "echiList": echiList,
73 "echoSet": echoSet,
74 "echoMap": echoMap,
75}
76var callType callT
77
78var ready, done sync.WaitGroup
79
80var clicounter int64 = 0
81var counter int64 = 0
82
83func main() {
84 flag.Parse()
85 if *memprofile != "" {
86 runtime.MemProfileRate = 100
87 }
88 var ok bool
89 if callType, ok = callTMap[*callName]; !ok {
90 log.Fatal("Unknown service call", *callName)
91 }
92 if *cpuprofile != "" {
93 f, err := os.Create(*cpuprofile)
94 if err != nil {
95 log.Fatal(err)
96 }
97 pprof.StartCPUProfile(f)
98 defer pprof.StopCPUProfile()
99 }
100 hostPort = fmt.Sprintf("%s:%d", *host, *port)
101 var protocolFactory thrift.TProtocolFactory
102 var transportFactory thrift.TTransportFactory
103
104 if *compact {
105 protocolFactory = thrift.NewTCompactProtocolFactory()
106 } else {
107 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
108 }
109
110 if *framed {
111 transportFactory = thrift.NewTTransportFactory()
112 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
113 } else {
114 transportFactory = thrift.NewTBufferedTransportFactory(8192)
115 }
116
117 if *runserver > 0 {
118 serverTransport, err := thrift.NewTServerSocket(hostPort)
119 if err != nil {
120 log.Fatalf("Unable to create server socket: %s", err)
121 }
122
123 processor := stress.NewServiceProcessor(&handler{})
124 server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
125 if *clients == 0 {
126 server.Serve()
127 } else {
128 go server.Serve()
129 }
130 }
131 //start clients
132 if *clients != 0 {
133 ready.Add(*clients + 1)
134 done.Add(*clients)
135 for i := 0; i < *clients; i++ {
136 go client(protocolFactory)
137 }
138 ready.Done()
139 ready.Wait()
140 //run!
141 startTime := time.Now()
142 //wait for completion
143 done.Wait()
144 endTime := time.Now()
145 duration := endTime.Sub(startTime)
146 log.Printf("%d calls in %v (%f calls per second)\n", clicounter, duration, float64(clicounter)/duration.Seconds())
147 }
148 if *memprofile != "" {
149 f, err := os.Create(*memprofile)
150 if err != nil {
151 log.Fatal(err)
152 }
153 pprof.WriteHeapProfile(f)
154 f.Close()
155 return
156 }
157}
158
159func client(protocolFactory thrift.TProtocolFactory) {
160 trans, err := thrift.NewTSocket(hostPort)
161 if err != nil {
162 log.Fatalf("Unable to create server socket: %s", err)
163 }
164 btrans := thrift.NewTBufferedTransport(trans, 2048)
165 client := stress.NewServiceClientFactory(btrans, protocolFactory)
166 err = trans.Open()
167 if err != nil {
168 log.Fatalf("Unable to open connection: %s", err)
169 }
170 ready.Done()
171 ready.Wait()
172 switch callType {
173 case echoVoid:
174 for i := 0; i < *loop; i++ {
175 client.EchoVoid()
176 atomic.AddInt64(&clicounter, 1)
177 }
178 case echoByte:
179 for i := 0; i < *loop; i++ {
180 client.EchoByte(42)
181 atomic.AddInt64(&clicounter, 1)
182 }
183 case echoI32:
184 for i := 0; i < *loop; i++ {
185 client.EchoI32(4242)
186 atomic.AddInt64(&clicounter, 1)
187 }
188 case echoI64:
189 for i := 0; i < *loop; i++ {
190 client.EchoI64(424242)
191 atomic.AddInt64(&clicounter, 1)
192 }
193 case echoString:
194 for i := 0; i < *loop; i++ {
195 client.EchoString("TestString")
196 atomic.AddInt64(&clicounter, 1)
197 }
198 case echiList:
199 l := []int8{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8}
200 for i := 0; i < *loop; i++ {
201 client.EchoList(l)
202 atomic.AddInt64(&clicounter, 1)
203 }
204 case echoSet:
crekerca714c42016-04-04 19:19:47 +0300205 s := map[int8]struct{}{-10: {}, -9: {}, -8: {}, -7: {}, -6: {}, -5: {}, -4: {}, -3: {}, -2: {}, -1: {}, 0: {}, 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}}
Jens Geyerf4598682014-05-08 23:18:44 +0200206 for i := 0; i < *loop; i++ {
207 client.EchoSet(s)
208 atomic.AddInt64(&clicounter, 1)
209 }
210 case echoMap:
crekerca714c42016-04-04 19:19:47 +0300211 m := map[int8]int8{-10: 10, -9: 9, -8: 8, -7: 7, -6: 6, -5: 5, -4: 4, -3: 3, -2: 2, -1: 1, 0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8}
Jens Geyerf4598682014-05-08 23:18:44 +0200212 for i := 0; i < *loop; i++ {
213 client.EchoMap(m)
214 atomic.AddInt64(&clicounter, 1)
215 }
216 }
217
218 done.Done()
219}
John Boiles57852792018-01-05 14:37:05 -0800220
221type handler struct{}
222
223func (h *handler) EchoVoid(ctx context.Context) (err error) {
224 atomic.AddInt64(&counter, 1)
225 return nil
226}
227func (h *handler) EchoByte(ctx context.Context, arg int8) (r int8, err error) {
228 atomic.AddInt64(&counter, 1)
229 return arg, nil
230}
231func (h *handler) EchoI32(ctx context.Context, arg int32) (r int32, err error) {
232 atomic.AddInt64(&counter, 1)
233 return arg, nil
234}
235func (h *handler) EchoI64(ctx context.Context, arg int64) (r int64, err error) {
236 atomic.AddInt64(&counter, 1)
237 return arg, nil
238}
239func (h *handler) EchoString(ctx context.Context, arg string) (r string, err error) {
240 atomic.AddInt64(&counter, 1)
241 return arg, nil
242}
243func (h *handler) EchoList(ctx context.Context, arg []int8) (r []int8, err error) {
244 atomic.AddInt64(&counter, 1)
245 return arg, nil
246}
247func (h *handler) EchoSet(ctx context.Context, arg map[int8]struct{}) (r map[int8]struct{}, err error) {
248 atomic.AddInt64(&counter, 1)
249 return arg, nil
250}
251func (h *handler) EchoMap(ctx context.Context, arg map[int8]int8) (r map[int8]int8, err error) {
252 atomic.AddInt64(&counter, 1)
253 return arg, nil
254}