blob: 3ff0a3969c7b33e5e6c1cc0889e433d276e099be [file] [log] [blame]
Jens Geyerf4598682014-05-08 23:18:44 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package main
21
22import (
John Boiles57852792018-01-05 14:37:05 -080023 "context"
Jens Geyerf4598682014-05-08 23:18:44 +020024 "flag"
25 "fmt"
Jens Geyerf4598682014-05-08 23:18:44 +020026 "log"
27 _ "net/http/pprof"
28 "os"
29 "runtime"
30 "runtime/pprof"
31 "sync"
32 "sync/atomic"
Jens Geyerf4598682014-05-08 23:18:44 +020033 "time"
Yuxuan 'fishy' Wangb71f11e2021-03-22 15:01:00 -070034
35 "github.com/apache/thrift/lib/go/thrift"
36 "github.com/apache/thrift/test/go/src/gen/stress"
Jens Geyerf4598682014-05-08 23:18:44 +020037)
38
39var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to this file")
40var memprofile = flag.String("memprofile", "", "write memory profile to this file")
41
42var (
43 host = flag.String("host", "localhost", "Host to connect")
44 port = flag.Int64("port", 9091, "Port number to connect")
45 loop = flag.Int("loops", 50000, "The number of remote thrift calls each client makes")
46 runserver = flag.Int("server", 1, "Run the Thrift server in this process")
47 clients = flag.Int("clients", 20, "Number of client threads to create - 0 implies no clients, i.e. server only")
48 callName = flag.String("call", "echoVoid", "Service method to call, one of echoVoid, echoByte, echoI32, echoI64, echoString, echiList, echoSet, echoMap")
49 compact = flag.Bool("compact", false, "Use compact protocol instead of binary.")
50 framed = flag.Bool("framed", false, "Use framed transport instead of buffered.")
51)
52var hostPort string
53
54type callT int64
55
56const (
57 echoVoid callT = iota
58 echoByte
59 echoI32
60 echoI64
61 echoString
62 echiList
63 echoSet
64 echoMap
65)
66
67var callTMap = map[string]callT{
68 "echoVoid": echoVoid,
69 "echoByte": echoByte,
70 "echoI32": echoI32,
71 "echoI64": echoI64,
72 "echoString": echoString,
73 "echiList": echiList,
74 "echoSet": echoSet,
75 "echoMap": echoMap,
76}
77var callType callT
78
79var ready, done sync.WaitGroup
80
81var clicounter int64 = 0
82var counter int64 = 0
83
84func main() {
85 flag.Parse()
86 if *memprofile != "" {
87 runtime.MemProfileRate = 100
88 }
89 var ok bool
90 if callType, ok = callTMap[*callName]; !ok {
91 log.Fatal("Unknown service call", *callName)
92 }
93 if *cpuprofile != "" {
94 f, err := os.Create(*cpuprofile)
95 if err != nil {
96 log.Fatal(err)
97 }
98 pprof.StartCPUProfile(f)
99 defer pprof.StopCPUProfile()
100 }
101 hostPort = fmt.Sprintf("%s:%d", *host, *port)
102 var protocolFactory thrift.TProtocolFactory
103 var transportFactory thrift.TTransportFactory
104
105 if *compact {
106 protocolFactory = thrift.NewTCompactProtocolFactory()
107 } else {
108 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
109 }
110
111 if *framed {
112 transportFactory = thrift.NewTTransportFactory()
113 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
114 } else {
115 transportFactory = thrift.NewTBufferedTransportFactory(8192)
116 }
117
118 if *runserver > 0 {
119 serverTransport, err := thrift.NewTServerSocket(hostPort)
120 if err != nil {
121 log.Fatalf("Unable to create server socket: %s", err)
122 }
123
124 processor := stress.NewServiceProcessor(&handler{})
125 server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
126 if *clients == 0 {
127 server.Serve()
128 } else {
129 go server.Serve()
130 }
131 }
132 //start clients
133 if *clients != 0 {
134 ready.Add(*clients + 1)
135 done.Add(*clients)
136 for i := 0; i < *clients; i++ {
137 go client(protocolFactory)
138 }
139 ready.Done()
140 ready.Wait()
141 //run!
142 startTime := time.Now()
143 //wait for completion
144 done.Wait()
145 endTime := time.Now()
146 duration := endTime.Sub(startTime)
147 log.Printf("%d calls in %v (%f calls per second)\n", clicounter, duration, float64(clicounter)/duration.Seconds())
148 }
149 if *memprofile != "" {
150 f, err := os.Create(*memprofile)
151 if err != nil {
152 log.Fatal(err)
153 }
154 pprof.WriteHeapProfile(f)
155 f.Close()
156 return
157 }
158}
159
160func client(protocolFactory thrift.TProtocolFactory) {
161 trans, err := thrift.NewTSocket(hostPort)
162 if err != nil {
163 log.Fatalf("Unable to create server socket: %s", err)
164 }
165 btrans := thrift.NewTBufferedTransport(trans, 2048)
166 client := stress.NewServiceClientFactory(btrans, protocolFactory)
167 err = trans.Open()
168 if err != nil {
169 log.Fatalf("Unable to open connection: %s", err)
170 }
171 ready.Done()
172 ready.Wait()
173 switch callType {
174 case echoVoid:
175 for i := 0; i < *loop; i++ {
176 client.EchoVoid()
177 atomic.AddInt64(&clicounter, 1)
178 }
179 case echoByte:
180 for i := 0; i < *loop; i++ {
181 client.EchoByte(42)
182 atomic.AddInt64(&clicounter, 1)
183 }
184 case echoI32:
185 for i := 0; i < *loop; i++ {
186 client.EchoI32(4242)
187 atomic.AddInt64(&clicounter, 1)
188 }
189 case echoI64:
190 for i := 0; i < *loop; i++ {
191 client.EchoI64(424242)
192 atomic.AddInt64(&clicounter, 1)
193 }
194 case echoString:
195 for i := 0; i < *loop; i++ {
196 client.EchoString("TestString")
197 atomic.AddInt64(&clicounter, 1)
198 }
199 case echiList:
200 l := []int8{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8}
201 for i := 0; i < *loop; i++ {
202 client.EchoList(l)
203 atomic.AddInt64(&clicounter, 1)
204 }
205 case echoSet:
crekerca714c42016-04-04 19:19:47 +0300206 s := map[int8]struct{}{-10: {}, -9: {}, -8: {}, -7: {}, -6: {}, -5: {}, -4: {}, -3: {}, -2: {}, -1: {}, 0: {}, 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}}
Jens Geyerf4598682014-05-08 23:18:44 +0200207 for i := 0; i < *loop; i++ {
208 client.EchoSet(s)
209 atomic.AddInt64(&clicounter, 1)
210 }
211 case echoMap:
crekerca714c42016-04-04 19:19:47 +0300212 m := map[int8]int8{-10: 10, -9: 9, -8: 8, -7: 7, -6: 6, -5: 5, -4: 4, -3: 3, -2: 2, -1: 1, 0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8}
Jens Geyerf4598682014-05-08 23:18:44 +0200213 for i := 0; i < *loop; i++ {
214 client.EchoMap(m)
215 atomic.AddInt64(&clicounter, 1)
216 }
217 }
218
219 done.Done()
220}
John Boiles57852792018-01-05 14:37:05 -0800221
222type handler struct{}
223
224func (h *handler) EchoVoid(ctx context.Context) (err error) {
225 atomic.AddInt64(&counter, 1)
226 return nil
227}
228func (h *handler) EchoByte(ctx context.Context, arg int8) (r int8, err error) {
229 atomic.AddInt64(&counter, 1)
230 return arg, nil
231}
232func (h *handler) EchoI32(ctx context.Context, arg int32) (r int32, err error) {
233 atomic.AddInt64(&counter, 1)
234 return arg, nil
235}
236func (h *handler) EchoI64(ctx context.Context, arg int64) (r int64, err error) {
237 atomic.AddInt64(&counter, 1)
238 return arg, nil
239}
240func (h *handler) EchoString(ctx context.Context, arg string) (r string, err error) {
241 atomic.AddInt64(&counter, 1)
242 return arg, nil
243}
244func (h *handler) EchoList(ctx context.Context, arg []int8) (r []int8, err error) {
245 atomic.AddInt64(&counter, 1)
246 return arg, nil
247}
248func (h *handler) EchoSet(ctx context.Context, arg map[int8]struct{}) (r map[int8]struct{}, err error) {
249 atomic.AddInt64(&counter, 1)
250 return arg, nil
251}
252func (h *handler) EchoMap(ctx context.Context, arg map[int8]int8) (r map[int8]int8, err error) {
253 atomic.AddInt64(&counter, 1)
254 return arg, nil
255}