blob: cc658bd9dd7405dee808891b377daef0c4660431 [file] [log] [blame]
Allen George0e22c362017-01-30 07:15:00 -05001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +010018use std::net::{TcpListener, TcpStream, ToSocketAddrs};
Allen George0e22c362017-01-30 07:15:00 -050019use std::sync::Arc;
20use threadpool::ThreadPool;
21
Allen George0e22c362017-01-30 07:15:00 -050022use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
23use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
Allen Georgeef7a1892018-12-16 18:01:37 -050024use {ApplicationError, ApplicationErrorKind};
Allen George0e22c362017-01-30 07:15:00 -050025
26use super::TProcessor;
27
28/// Fixed-size thread-pool blocking Thrift server.
29///
30/// A `TServer` listens on a given address and submits accepted connections
31/// to an **unbounded** queue. Connections from this queue are serviced by
32/// the first available worker thread from a **fixed-size** thread pool. Each
33/// accepted connection is handled by that worker thread, and communication
34/// over this thread occurs sequentially and synchronously (i.e. calls block).
35/// Accepted connections have an input half and an output half, each of which
36/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
37/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
38/// and `TTransport` may be used.
39///
40/// # Examples
41///
42/// Creating and running a `TServer` using Thrift-compiler-generated
43/// service code.
44///
45/// ```no_run
Allen George0e22c362017-01-30 07:15:00 -050046/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
47/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
48/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
Allen Georgebc1344d2017-04-28 10:22:03 -040049/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
50/// TReadTransportFactory, TWriteTransportFactory};
Allen George0e22c362017-01-30 07:15:00 -050051/// use thrift::server::{TProcessor, TServer};
52///
53/// //
54/// // auto-generated
55/// //
56///
57/// // processor for `SimpleService`
58/// struct SimpleServiceSyncProcessor;
59/// impl SimpleServiceSyncProcessor {
60/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
61/// unimplemented!();
62/// }
63/// }
64///
65/// // `TProcessor` implementation for `SimpleService`
66/// impl TProcessor for SimpleServiceSyncProcessor {
67/// fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
68/// unimplemented!();
69/// }
70/// }
71///
72/// // service functions for SimpleService
73/// trait SimpleServiceSyncHandler {
74/// fn service_call(&self) -> thrift::Result<()>;
75/// }
76///
77/// //
78/// // user-code follows
79/// //
80///
81/// // define a handler that will be invoked when `service_call` is received
82/// struct SimpleServiceHandlerImpl;
83/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
84/// fn service_call(&self) -> thrift::Result<()> {
85/// unimplemented!();
86/// }
87/// }
88///
89/// // instantiate the processor
90/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
91///
92/// // instantiate the server
93/// let i_tr_fact: Box<TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
94/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
95/// let o_tr_fact: Box<TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
96/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
97///
98/// let mut server = TServer::new(
99/// i_tr_fact,
100/// i_pr_fact,
101/// o_tr_fact,
102/// o_pr_fact,
103/// processor,
104/// 10
105/// );
106///
107/// // start listening for incoming connections
108/// match server.listen("127.0.0.1:8080") {
109/// Ok(_) => println!("listen completed"),
110/// Err(e) => println!("listen failed with error {:?}", e),
111/// }
112/// ```
113#[derive(Debug)]
114pub struct TServer<PRC, RTF, IPF, WTF, OPF>
115where
116 PRC: TProcessor + Send + Sync + 'static,
117 RTF: TReadTransportFactory + 'static,
118 IPF: TInputProtocolFactory + 'static,
119 WTF: TWriteTransportFactory + 'static,
120 OPF: TOutputProtocolFactory + 'static,
121{
122 r_trans_factory: RTF,
123 i_proto_factory: IPF,
124 w_trans_factory: WTF,
125 o_proto_factory: OPF,
126 processor: Arc<PRC>,
127 worker_pool: ThreadPool,
128}
129
130impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
Allen Georgeef7a1892018-12-16 18:01:37 -0500131where
132 PRC: TProcessor + Send + Sync + 'static,
133 RTF: TReadTransportFactory + 'static,
134 IPF: TInputProtocolFactory + 'static,
135 WTF: TWriteTransportFactory + 'static,
136 OPF: TOutputProtocolFactory + 'static,
137{
Allen George0e22c362017-01-30 07:15:00 -0500138 /// Create a `TServer`.
139 ///
140 /// Each accepted connection has an input and output half, each of which
141 /// requires a `TTransport` and `TProtocol`. `TServer` uses
142 /// `read_transport_factory` and `input_protocol_factory` to create
143 /// implementations for the input, and `write_transport_factory` and
144 /// `output_protocol_factory` to create implementations for the output.
145 pub fn new(
146 read_transport_factory: RTF,
147 input_protocol_factory: IPF,
148 write_transport_factory: WTF,
149 output_protocol_factory: OPF,
150 processor: PRC,
151 num_workers: usize,
152 ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
153 TServer {
154 r_trans_factory: read_transport_factory,
155 i_proto_factory: input_protocol_factory,
156 w_trans_factory: write_transport_factory,
157 o_proto_factory: output_protocol_factory,
158 processor: Arc::new(processor),
Allen Georgeef7a1892018-12-16 18:01:37 -0500159 worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
Allen George0e22c362017-01-30 07:15:00 -0500160 }
161 }
162
163 /// Listen for incoming connections on `listen_address`.
164 ///
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +0100165 /// `listen_address` should implement `ToSocketAddrs` trait.
Allen George0e22c362017-01-30 07:15:00 -0500166 ///
167 /// Return `()` if successful.
168 ///
169 /// Return `Err` when the server cannot bind to `listen_address` or there
170 /// is an unrecoverable error.
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +0100171 pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> ::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500172 let listener = TcpListener::bind(listen_address)?;
173 for stream in listener.incoming() {
174 match stream {
175 Ok(s) => {
176 let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
177 let processor = self.processor.clone();
178 self.worker_pool
Allen Georgeef7a1892018-12-16 18:01:37 -0500179 .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
Allen George0e22c362017-01-30 07:15:00 -0500180 }
181 Err(e) => {
182 warn!("failed to accept remote connection with error {:?}", e);
183 }
184 }
185 }
186
Allen Georgeef7a1892018-12-16 18:01:37 -0500187 Err(::Error::Application(ApplicationError {
188 kind: ApplicationErrorKind::Unknown,
189 message: "aborted listen loop".into(),
190 }))
Allen George0e22c362017-01-30 07:15:00 -0500191 }
192
Allen George0e22c362017-01-30 07:15:00 -0500193 fn new_protocols_for_connection(
194 &mut self,
195 stream: TcpStream,
Danny Browning77d96c12019-08-21 13:41:07 -0600196 ) -> ::Result<(Box<dyn TInputProtocol + Send>, Box<dyn TOutputProtocol + Send>)> {
Allen George0e22c362017-01-30 07:15:00 -0500197 // create the shared tcp stream
198 let channel = TTcpChannel::with_stream(stream);
199
200 // split it into two - one to be owned by the
201 // input tran/proto and the other by the output
202 let (r_chan, w_chan) = channel.split()?;
203
204 // input protocol and transport
205 let r_tran = self.r_trans_factory.create(Box::new(r_chan));
206 let i_prot = self.i_proto_factory.create(r_tran);
207
208 // output protocol and transport
209 let w_tran = self.w_trans_factory.create(Box::new(w_chan));
210 let o_prot = self.o_proto_factory.create(w_tran);
211
212 Ok((i_prot, o_prot))
213 }
214}
215
216fn handle_incoming_connection<PRC>(
217 processor: Arc<PRC>,
Danny Browning77d96c12019-08-21 13:41:07 -0600218 i_prot: Box<dyn TInputProtocol>,
219 o_prot: Box<dyn TOutputProtocol>,
Allen George0e22c362017-01-30 07:15:00 -0500220) where
221 PRC: TProcessor,
222{
223 let mut i_prot = i_prot;
224 let mut o_prot = o_prot;
225 loop {
226 let r = processor.process(&mut *i_prot, &mut *o_prot);
227 if let Err(e) = r {
228 warn!("processor completed with error: {:?}", e);
229 break;
230 }
231 }
232}