blob: 6035802516070272b34bdbfab107b50189f5b86c [file] [log] [blame]
Jens Geyer0e87c462013-06-18 22:25:07 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
23 "log"
Jens Geyer91cfb992014-05-17 01:07:28 +020024 "runtime/debug"
Paul2df9c202016-09-24 22:47:58 +030025 "sync"
Zachary Wassermanc1794352017-06-29 17:15:01 -070026 "sync/atomic"
Jens Geyer0e87c462013-06-18 22:25:07 +020027)
28
James E. King, III847fae92017-03-22 15:17:30 -040029/*
30 * This is not a typical TSimpleServer as it is not blocked after accept a socket.
31 * It is more like a TThreadedServer that can handle different connections in different goroutines.
32 * This will work if golang user implements a conn-pool like thing in client side.
33 */
Jens Geyer0e87c462013-06-18 22:25:07 +020034type TSimpleServer struct {
Zachary Wassermanc1794352017-06-29 17:15:01 -070035 closed int32
36 wg sync.WaitGroup
37 mu sync.Mutex
Jens Geyer0e87c462013-06-18 22:25:07 +020038
39 processorFactory TProcessorFactory
40 serverTransport TServerTransport
41 inputTransportFactory TTransportFactory
42 outputTransportFactory TTransportFactory
43 inputProtocolFactory TProtocolFactory
44 outputProtocolFactory TProtocolFactory
45}
46
47func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer {
48 return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport)
49}
50
51func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
52 return NewTSimpleServerFactory4(NewTProcessorFactory(processor),
53 serverTransport,
54 transportFactory,
55 protocolFactory,
56 )
57}
58
59func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
60 return NewTSimpleServerFactory6(NewTProcessorFactory(processor),
61 serverTransport,
62 inputTransportFactory,
63 outputTransportFactory,
64 inputProtocolFactory,
65 outputProtocolFactory,
66 )
67}
68
69func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer {
70 return NewTSimpleServerFactory6(processorFactory,
71 serverTransport,
72 NewTTransportFactory(),
73 NewTTransportFactory(),
74 NewTBinaryProtocolFactoryDefault(),
75 NewTBinaryProtocolFactoryDefault(),
76 )
77}
78
79func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
80 return NewTSimpleServerFactory6(processorFactory,
81 serverTransport,
82 transportFactory,
83 transportFactory,
84 protocolFactory,
85 protocolFactory,
86 )
87}
88
89func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
Jens Geyerc975bbc2014-03-06 21:11:46 +010090 return &TSimpleServer{
91 processorFactory: processorFactory,
Jens Geyer0e87c462013-06-18 22:25:07 +020092 serverTransport: serverTransport,
93 inputTransportFactory: inputTransportFactory,
94 outputTransportFactory: outputTransportFactory,
95 inputProtocolFactory: inputProtocolFactory,
96 outputProtocolFactory: outputProtocolFactory,
97 }
98}
99
100func (p *TSimpleServer) ProcessorFactory() TProcessorFactory {
101 return p.processorFactory
102}
103
104func (p *TSimpleServer) ServerTransport() TServerTransport {
105 return p.serverTransport
106}
107
108func (p *TSimpleServer) InputTransportFactory() TTransportFactory {
109 return p.inputTransportFactory
110}
111
112func (p *TSimpleServer) OutputTransportFactory() TTransportFactory {
113 return p.outputTransportFactory
114}
115
116func (p *TSimpleServer) InputProtocolFactory() TProtocolFactory {
117 return p.inputProtocolFactory
118}
119
120func (p *TSimpleServer) OutputProtocolFactory() TProtocolFactory {
121 return p.outputProtocolFactory
122}
123
Jens Geyerf4598682014-05-08 23:18:44 +0200124func (p *TSimpleServer) Listen() error {
125 return p.serverTransport.Listen()
126}
Jens Geyerc975bbc2014-03-06 21:11:46 +0100127
Matthew Pound8a83b042018-03-29 14:03:50 -0700128func (p *TSimpleServer) innerAccept() (int32, error) {
129 client, err := p.serverTransport.Accept()
130 p.mu.Lock()
131 defer p.mu.Unlock()
132 closed := atomic.LoadInt32(&p.closed)
133 if closed != 0 {
134 return closed, nil
135 }
136 if err != nil {
137 return 0, err
138 }
139 if client != nil {
140 p.wg.Add(1)
141 go func() {
142 defer p.wg.Done()
143 if err := p.processRequests(client); err != nil {
144 log.Println("error processing request:", err)
145 }
146 }()
147 }
148 return 0, nil
149}
150
Jens Geyerf4598682014-05-08 23:18:44 +0200151func (p *TSimpleServer) AcceptLoop() error {
Jens Geyerc975bbc2014-03-06 21:11:46 +0100152 for {
Matthew Pound8a83b042018-03-29 14:03:50 -0700153 closed, err := p.innerAccept()
Jens Geyer0e87c462013-06-18 22:25:07 +0200154 if err != nil {
Jens Geyer57cd4212014-12-08 21:25:00 +0100155 return err
Jens Geyer0e87c462013-06-18 22:25:07 +0200156 }
Matthew Pound8a83b042018-03-29 14:03:50 -0700157 if closed != 0 {
158 return nil
Jens Geyer0e87c462013-06-18 22:25:07 +0200159 }
160 }
Jens Geyerf4598682014-05-08 23:18:44 +0200161}
162
163func (p *TSimpleServer) Serve() error {
164 err := p.Listen()
165 if err != nil {
166 return err
167 }
168 p.AcceptLoop()
Jens Geyer0e87c462013-06-18 22:25:07 +0200169 return nil
170}
171
172func (p *TSimpleServer) Stop() error {
Zachary Wassermanc1794352017-06-29 17:15:01 -0700173 p.mu.Lock()
174 defer p.mu.Unlock()
175 if atomic.LoadInt32(&p.closed) != 0 {
176 return nil
ZhiyuYin47f9b9d2016-06-16 17:28:42 +0800177 }
Zachary Wassermanc1794352017-06-29 17:15:01 -0700178 atomic.StoreInt32(&p.closed, 1)
179 p.serverTransport.Interrupt()
180 p.wg.Wait()
Jens Geyer0e87c462013-06-18 22:25:07 +0200181 return nil
182}
183
Jens Geyer91cfb992014-05-17 01:07:28 +0200184func (p *TSimpleServer) processRequests(client TTransport) error {
Jens Geyer0e87c462013-06-18 22:25:07 +0200185 processor := p.processorFactory.GetProcessor(client)
D. Can Celasun8da0e722017-06-02 14:33:32 +0200186 inputTransport, err := p.inputTransportFactory.GetTransport(client)
187 if err != nil {
188 return err
189 }
190 outputTransport, err := p.outputTransportFactory.GetTransport(client)
191 if err != nil {
192 return err
193 }
Jens Geyer0e87c462013-06-18 22:25:07 +0200194 inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
195 outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
Jens Geyer91cfb992014-05-17 01:07:28 +0200196 defer func() {
197 if e := recover(); e != nil {
198 log.Printf("panic in processor: %s: %s", e, debug.Stack())
199 }
200 }()
libinbina5768962017-05-18 14:18:28 +0800201
Jens Geyer0e87c462013-06-18 22:25:07 +0200202 if inputTransport != nil {
203 defer inputTransport.Close()
204 }
205 if outputTransport != nil {
206 defer outputTransport.Close()
207 }
208 for {
Zachary Wassermanc1794352017-06-29 17:15:01 -0700209 if atomic.LoadInt32(&p.closed) != 0 {
libinbina5768962017-05-18 14:18:28 +0800210 return nil
libinbina5768962017-05-18 14:18:28 +0800211 }
212
taozlec0d384a2017-07-17 18:40:42 +0200213 ok, err := processor.Process(defaultCtx, inputProtocol, outputProtocol)
Jens Geyerf4598682014-05-08 23:18:44 +0200214 if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
Jens Geyer0e87c462013-06-18 22:25:07 +0200215 return nil
216 } else if err != nil {
217 return err
218 }
zhangxin54f49f82016-09-19 12:17:20 +0800219 if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
220 continue
221 }
libinbina5768962017-05-18 14:18:28 +0800222 if !ok {
Jens Geyer0e87c462013-06-18 22:25:07 +0200223 break
224 }
225 }
226 return nil
227}