blob: ad55b4459f271b4be9324f682b2f68dddc744e28 [file] [log] [blame]
Allen George0e22c362017-01-30 07:15:00 -05001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
Allen George7ddbcc02020-11-08 09:51:19 -050018use log::warn;
19
tokcumf0336412022-03-30 11:39:08 +020020use std::net::{TcpListener, ToSocketAddrs};
Allen George0e22c362017-01-30 07:15:00 -050021use std::sync::Arc;
22use threadpool::ThreadPool;
23
tokcumf0336412022-03-30 11:39:08 +020024#[cfg(unix)]
25use std::os::unix::net::UnixListener;
26#[cfg(unix)]
27use std::path::Path;
28
Allen George55c3e4c2021-03-01 23:19:52 -050029use crate::protocol::{
30 TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
31};
Allen Georgeb0d14132020-03-29 11:48:55 -040032use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
33use crate::{ApplicationError, ApplicationErrorKind};
Allen George0e22c362017-01-30 07:15:00 -050034
35use super::TProcessor;
Allen Georgeb0d14132020-03-29 11:48:55 -040036use crate::TransportErrorKind;
Allen George0e22c362017-01-30 07:15:00 -050037
38/// Fixed-size thread-pool blocking Thrift server.
39///
40/// A `TServer` listens on a given address and submits accepted connections
41/// to an **unbounded** queue. Connections from this queue are serviced by
42/// the first available worker thread from a **fixed-size** thread pool. Each
43/// accepted connection is handled by that worker thread, and communication
44/// over this thread occurs sequentially and synchronously (i.e. calls block).
45/// Accepted connections have an input half and an output half, each of which
46/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
47/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
48/// and `TTransport` may be used.
49///
50/// # Examples
51///
52/// Creating and running a `TServer` using Thrift-compiler-generated
53/// service code.
54///
55/// ```no_run
Allen George0e22c362017-01-30 07:15:00 -050056/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
57/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
58/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
Allen Georgebc1344d2017-04-28 10:22:03 -040059/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
60/// TReadTransportFactory, TWriteTransportFactory};
Allen George0e22c362017-01-30 07:15:00 -050061/// use thrift::server::{TProcessor, TServer};
62///
63/// //
64/// // auto-generated
65/// //
66///
67/// // processor for `SimpleService`
68/// struct SimpleServiceSyncProcessor;
69/// impl SimpleServiceSyncProcessor {
70/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
71/// unimplemented!();
72/// }
73/// }
74///
75/// // `TProcessor` implementation for `SimpleService`
76/// impl TProcessor for SimpleServiceSyncProcessor {
Marcin Pajkowskic6308412019-12-02 11:39:28 +010077/// fn process(&self, i: &mut dyn TInputProtocol, o: &mut dyn TOutputProtocol) -> thrift::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -050078/// unimplemented!();
79/// }
80/// }
81///
82/// // service functions for SimpleService
83/// trait SimpleServiceSyncHandler {
84/// fn service_call(&self) -> thrift::Result<()>;
85/// }
86///
87/// //
88/// // user-code follows
89/// //
90///
91/// // define a handler that will be invoked when `service_call` is received
92/// struct SimpleServiceHandlerImpl;
93/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
94/// fn service_call(&self) -> thrift::Result<()> {
95/// unimplemented!();
96/// }
97/// }
98///
99/// // instantiate the processor
100/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
101///
102/// // instantiate the server
Marcin Pajkowskic6308412019-12-02 11:39:28 +0100103/// let i_tr_fact: Box<dyn TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
104/// let i_pr_fact: Box<dyn TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
105/// let o_tr_fact: Box<dyn TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
106/// let o_pr_fact: Box<dyn TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
Allen George0e22c362017-01-30 07:15:00 -0500107///
108/// let mut server = TServer::new(
109/// i_tr_fact,
110/// i_pr_fact,
111/// o_tr_fact,
112/// o_pr_fact,
113/// processor,
114/// 10
115/// );
116///
117/// // start listening for incoming connections
118/// match server.listen("127.0.0.1:8080") {
119/// Ok(_) => println!("listen completed"),
120/// Err(e) => println!("listen failed with error {:?}", e),
121/// }
122/// ```
123#[derive(Debug)]
124pub struct TServer<PRC, RTF, IPF, WTF, OPF>
125where
126 PRC: TProcessor + Send + Sync + 'static,
127 RTF: TReadTransportFactory + 'static,
128 IPF: TInputProtocolFactory + 'static,
129 WTF: TWriteTransportFactory + 'static,
130 OPF: TOutputProtocolFactory + 'static,
131{
132 r_trans_factory: RTF,
133 i_proto_factory: IPF,
134 w_trans_factory: WTF,
135 o_proto_factory: OPF,
136 processor: Arc<PRC>,
137 worker_pool: ThreadPool,
138}
139
140impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
Allen Georgeef7a1892018-12-16 18:01:37 -0500141where
142 PRC: TProcessor + Send + Sync + 'static,
143 RTF: TReadTransportFactory + 'static,
144 IPF: TInputProtocolFactory + 'static,
145 WTF: TWriteTransportFactory + 'static,
146 OPF: TOutputProtocolFactory + 'static,
147{
Allen George0e22c362017-01-30 07:15:00 -0500148 /// Create a `TServer`.
149 ///
150 /// Each accepted connection has an input and output half, each of which
151 /// requires a `TTransport` and `TProtocol`. `TServer` uses
152 /// `read_transport_factory` and `input_protocol_factory` to create
153 /// implementations for the input, and `write_transport_factory` and
154 /// `output_protocol_factory` to create implementations for the output.
155 pub fn new(
156 read_transport_factory: RTF,
157 input_protocol_factory: IPF,
158 write_transport_factory: WTF,
159 output_protocol_factory: OPF,
160 processor: PRC,
161 num_workers: usize,
162 ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
163 TServer {
164 r_trans_factory: read_transport_factory,
165 i_proto_factory: input_protocol_factory,
166 w_trans_factory: write_transport_factory,
167 o_proto_factory: output_protocol_factory,
168 processor: Arc::new(processor),
Allen Georgeef7a1892018-12-16 18:01:37 -0500169 worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
Allen George0e22c362017-01-30 07:15:00 -0500170 }
171 }
172
173 /// Listen for incoming connections on `listen_address`.
174 ///
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +0100175 /// `listen_address` should implement `ToSocketAddrs` trait.
Allen George0e22c362017-01-30 07:15:00 -0500176 ///
177 /// Return `()` if successful.
178 ///
179 /// Return `Err` when the server cannot bind to `listen_address` or there
180 /// is an unrecoverable error.
Allen Georgeb0d14132020-03-29 11:48:55 -0400181 pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500182 let listener = TcpListener::bind(listen_address)?;
183 for stream in listener.incoming() {
184 match stream {
185 Ok(s) => {
tokcumf0336412022-03-30 11:39:08 +0200186 let channel = TTcpChannel::with_stream(s);
187 self.handle_stream(channel)?;
Allen George0e22c362017-01-30 07:15:00 -0500188 }
189 Err(e) => {
190 warn!("failed to accept remote connection with error {:?}", e);
191 }
192 }
193 }
194
Allen Georgeb0d14132020-03-29 11:48:55 -0400195 Err(crate::Error::Application(ApplicationError {
Allen Georgeef7a1892018-12-16 18:01:37 -0500196 kind: ApplicationErrorKind::Unknown,
197 message: "aborted listen loop".into(),
198 }))
Allen George0e22c362017-01-30 07:15:00 -0500199 }
200
tokcumf0336412022-03-30 11:39:08 +0200201 /// Listen for incoming connections on `listen_path`.
202 ///
203 /// `listen_path` should implement `AsRef<Path>` trait.
204 ///
205 /// Return `()` if successful.
206 ///
207 /// Return `Err` when the server cannot bind to `listen_path` or there
208 /// is an unrecoverable error.
209 #[cfg(unix)]
210 pub fn listen_uds<P: AsRef<Path>>(&mut self, listen_path: P) -> crate::Result<()> {
211 let listener = UnixListener::bind(listen_path)?;
212 for stream in listener.incoming() {
213 match stream {
214 Ok(s) => {
215 self.handle_stream(s)?;
216 }
217 Err(e) => {
218 warn!(
219 "failed to accept connection via unix domain socket with error {:?}",
220 e
221 );
222 }
223 }
224 }
225
226 Err(crate::Error::Application(ApplicationError {
227 kind: ApplicationErrorKind::Unknown,
228 message: "aborted listen loop".into(),
229 }))
230 }
231
232 fn handle_stream<S: TIoChannel + Send + 'static>(&mut self, stream: S) -> crate::Result<()> {
233 let (i_prot, o_prot) = self.new_protocols_for_connection(stream)?;
234 let processor = self.processor.clone();
235 self.worker_pool
236 .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
237 Ok(())
238 }
239
240 fn new_protocols_for_connection<S: TIoChannel + Send + 'static>(
Allen George0e22c362017-01-30 07:15:00 -0500241 &mut self,
tokcumf0336412022-03-30 11:39:08 +0200242 stream: S,
Allen George55c3e4c2021-03-01 23:19:52 -0500243 ) -> crate::Result<(
244 Box<dyn TInputProtocol + Send>,
245 Box<dyn TOutputProtocol + Send>,
246 )> {
Allen George0e22c362017-01-30 07:15:00 -0500247 // split it into two - one to be owned by the
248 // input tran/proto and the other by the output
tokcumf0336412022-03-30 11:39:08 +0200249 let (r_chan, w_chan) = stream.split()?;
Allen George0e22c362017-01-30 07:15:00 -0500250
251 // input protocol and transport
252 let r_tran = self.r_trans_factory.create(Box::new(r_chan));
253 let i_prot = self.i_proto_factory.create(r_tran);
254
255 // output protocol and transport
256 let w_tran = self.w_trans_factory.create(Box::new(w_chan));
257 let o_prot = self.o_proto_factory.create(w_tran);
258
259 Ok((i_prot, o_prot))
260 }
261}
262
263fn handle_incoming_connection<PRC>(
264 processor: Arc<PRC>,
Danny Browning77d96c12019-08-21 13:41:07 -0600265 i_prot: Box<dyn TInputProtocol>,
266 o_prot: Box<dyn TOutputProtocol>,
Allen George0e22c362017-01-30 07:15:00 -0500267) where
268 PRC: TProcessor,
269{
270 let mut i_prot = i_prot;
271 let mut o_prot = o_prot;
272 loop {
Sam De Roeck436bce32019-11-12 17:44:06 +0100273 match processor.process(&mut *i_prot, &mut *o_prot) {
Allen George55c3e4c2021-03-01 23:19:52 -0500274 Ok(()) => {}
Sam De Roeck436bce32019-11-12 17:44:06 +0100275 Err(err) => {
276 match err {
Allen George55c3e4c2021-03-01 23:19:52 -0500277 crate::Error::Transport(ref transport_err)
278 if transport_err.kind == TransportErrorKind::EndOfFile => {}
Sam De Roeck436bce32019-11-12 17:44:06 +0100279 other => warn!("processor completed with error: {:?}", other),
280 }
281 break;
282 }
Allen George0e22c362017-01-30 07:15:00 -0500283 }
284 }
285}