blob: 897235c2b787ee41535eef352d7d9b97337ee9bf [file] [log] [blame]
Allen George0e22c362017-01-30 07:15:00 -05001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
Allen George7ddbcc02020-11-08 09:51:19 -050018use log::warn;
19
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +010020use std::net::{TcpListener, TcpStream, ToSocketAddrs};
Allen George0e22c362017-01-30 07:15:00 -050021use std::sync::Arc;
22use threadpool::ThreadPool;
23
Allen George55c3e4c2021-03-01 23:19:52 -050024use crate::protocol::{
25 TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
26};
Allen Georgeb0d14132020-03-29 11:48:55 -040027use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
28use crate::{ApplicationError, ApplicationErrorKind};
Allen George0e22c362017-01-30 07:15:00 -050029
30use super::TProcessor;
Allen Georgeb0d14132020-03-29 11:48:55 -040031use crate::TransportErrorKind;
Allen George0e22c362017-01-30 07:15:00 -050032
33/// Fixed-size thread-pool blocking Thrift server.
34///
35/// A `TServer` listens on a given address and submits accepted connections
36/// to an **unbounded** queue. Connections from this queue are serviced by
37/// the first available worker thread from a **fixed-size** thread pool. Each
38/// accepted connection is handled by that worker thread, and communication
39/// over this thread occurs sequentially and synchronously (i.e. calls block).
40/// Accepted connections have an input half and an output half, each of which
41/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
42/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
43/// and `TTransport` may be used.
44///
45/// # Examples
46///
47/// Creating and running a `TServer` using Thrift-compiler-generated
48/// service code.
49///
50/// ```no_run
Allen George0e22c362017-01-30 07:15:00 -050051/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
52/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
53/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
Allen Georgebc1344d2017-04-28 10:22:03 -040054/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
55/// TReadTransportFactory, TWriteTransportFactory};
Allen George0e22c362017-01-30 07:15:00 -050056/// use thrift::server::{TProcessor, TServer};
57///
58/// //
59/// // auto-generated
60/// //
61///
62/// // processor for `SimpleService`
63/// struct SimpleServiceSyncProcessor;
64/// impl SimpleServiceSyncProcessor {
65/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
66/// unimplemented!();
67/// }
68/// }
69///
70/// // `TProcessor` implementation for `SimpleService`
71/// impl TProcessor for SimpleServiceSyncProcessor {
Marcin Pajkowskic6308412019-12-02 11:39:28 +010072/// fn process(&self, i: &mut dyn TInputProtocol, o: &mut dyn TOutputProtocol) -> thrift::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -050073/// unimplemented!();
74/// }
75/// }
76///
77/// // service functions for SimpleService
78/// trait SimpleServiceSyncHandler {
79/// fn service_call(&self) -> thrift::Result<()>;
80/// }
81///
82/// //
83/// // user-code follows
84/// //
85///
86/// // define a handler that will be invoked when `service_call` is received
87/// struct SimpleServiceHandlerImpl;
88/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
89/// fn service_call(&self) -> thrift::Result<()> {
90/// unimplemented!();
91/// }
92/// }
93///
94/// // instantiate the processor
95/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
96///
97/// // instantiate the server
Marcin Pajkowskic6308412019-12-02 11:39:28 +010098/// let i_tr_fact: Box<dyn TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
99/// let i_pr_fact: Box<dyn TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
100/// let o_tr_fact: Box<dyn TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
101/// let o_pr_fact: Box<dyn TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
Allen George0e22c362017-01-30 07:15:00 -0500102///
103/// let mut server = TServer::new(
104/// i_tr_fact,
105/// i_pr_fact,
106/// o_tr_fact,
107/// o_pr_fact,
108/// processor,
109/// 10
110/// );
111///
112/// // start listening for incoming connections
113/// match server.listen("127.0.0.1:8080") {
114/// Ok(_) => println!("listen completed"),
115/// Err(e) => println!("listen failed with error {:?}", e),
116/// }
117/// ```
118#[derive(Debug)]
119pub struct TServer<PRC, RTF, IPF, WTF, OPF>
120where
121 PRC: TProcessor + Send + Sync + 'static,
122 RTF: TReadTransportFactory + 'static,
123 IPF: TInputProtocolFactory + 'static,
124 WTF: TWriteTransportFactory + 'static,
125 OPF: TOutputProtocolFactory + 'static,
126{
127 r_trans_factory: RTF,
128 i_proto_factory: IPF,
129 w_trans_factory: WTF,
130 o_proto_factory: OPF,
131 processor: Arc<PRC>,
132 worker_pool: ThreadPool,
133}
134
135impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
Allen Georgeef7a1892018-12-16 18:01:37 -0500136where
137 PRC: TProcessor + Send + Sync + 'static,
138 RTF: TReadTransportFactory + 'static,
139 IPF: TInputProtocolFactory + 'static,
140 WTF: TWriteTransportFactory + 'static,
141 OPF: TOutputProtocolFactory + 'static,
142{
Allen George0e22c362017-01-30 07:15:00 -0500143 /// Create a `TServer`.
144 ///
145 /// Each accepted connection has an input and output half, each of which
146 /// requires a `TTransport` and `TProtocol`. `TServer` uses
147 /// `read_transport_factory` and `input_protocol_factory` to create
148 /// implementations for the input, and `write_transport_factory` and
149 /// `output_protocol_factory` to create implementations for the output.
150 pub fn new(
151 read_transport_factory: RTF,
152 input_protocol_factory: IPF,
153 write_transport_factory: WTF,
154 output_protocol_factory: OPF,
155 processor: PRC,
156 num_workers: usize,
157 ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
158 TServer {
159 r_trans_factory: read_transport_factory,
160 i_proto_factory: input_protocol_factory,
161 w_trans_factory: write_transport_factory,
162 o_proto_factory: output_protocol_factory,
163 processor: Arc::new(processor),
Allen Georgeef7a1892018-12-16 18:01:37 -0500164 worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
Allen George0e22c362017-01-30 07:15:00 -0500165 }
166 }
167
168 /// Listen for incoming connections on `listen_address`.
169 ///
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +0100170 /// `listen_address` should implement `ToSocketAddrs` trait.
Allen George0e22c362017-01-30 07:15:00 -0500171 ///
172 /// Return `()` if successful.
173 ///
174 /// Return `Err` when the server cannot bind to `listen_address` or there
175 /// is an unrecoverable error.
Allen Georgeb0d14132020-03-29 11:48:55 -0400176 pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500177 let listener = TcpListener::bind(listen_address)?;
178 for stream in listener.incoming() {
179 match stream {
180 Ok(s) => {
181 let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
182 let processor = self.processor.clone();
183 self.worker_pool
Allen Georgeef7a1892018-12-16 18:01:37 -0500184 .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
Allen George0e22c362017-01-30 07:15:00 -0500185 }
186 Err(e) => {
187 warn!("failed to accept remote connection with error {:?}", e);
188 }
189 }
190 }
191
Allen Georgeb0d14132020-03-29 11:48:55 -0400192 Err(crate::Error::Application(ApplicationError {
Allen Georgeef7a1892018-12-16 18:01:37 -0500193 kind: ApplicationErrorKind::Unknown,
194 message: "aborted listen loop".into(),
195 }))
Allen George0e22c362017-01-30 07:15:00 -0500196 }
197
Allen George0e22c362017-01-30 07:15:00 -0500198 fn new_protocols_for_connection(
199 &mut self,
200 stream: TcpStream,
Allen George55c3e4c2021-03-01 23:19:52 -0500201 ) -> crate::Result<(
202 Box<dyn TInputProtocol + Send>,
203 Box<dyn TOutputProtocol + Send>,
204 )> {
Allen George0e22c362017-01-30 07:15:00 -0500205 // create the shared tcp stream
206 let channel = TTcpChannel::with_stream(stream);
207
208 // split it into two - one to be owned by the
209 // input tran/proto and the other by the output
210 let (r_chan, w_chan) = channel.split()?;
211
212 // input protocol and transport
213 let r_tran = self.r_trans_factory.create(Box::new(r_chan));
214 let i_prot = self.i_proto_factory.create(r_tran);
215
216 // output protocol and transport
217 let w_tran = self.w_trans_factory.create(Box::new(w_chan));
218 let o_prot = self.o_proto_factory.create(w_tran);
219
220 Ok((i_prot, o_prot))
221 }
222}
223
224fn handle_incoming_connection<PRC>(
225 processor: Arc<PRC>,
Danny Browning77d96c12019-08-21 13:41:07 -0600226 i_prot: Box<dyn TInputProtocol>,
227 o_prot: Box<dyn TOutputProtocol>,
Allen George0e22c362017-01-30 07:15:00 -0500228) where
229 PRC: TProcessor,
230{
231 let mut i_prot = i_prot;
232 let mut o_prot = o_prot;
233 loop {
Sam De Roeck436bce32019-11-12 17:44:06 +0100234 match processor.process(&mut *i_prot, &mut *o_prot) {
Allen George55c3e4c2021-03-01 23:19:52 -0500235 Ok(()) => {}
Sam De Roeck436bce32019-11-12 17:44:06 +0100236 Err(err) => {
237 match err {
Allen George55c3e4c2021-03-01 23:19:52 -0500238 crate::Error::Transport(ref transport_err)
239 if transport_err.kind == TransportErrorKind::EndOfFile => {}
Sam De Roeck436bce32019-11-12 17:44:06 +0100240 other => warn!("processor completed with error: {:?}", other),
241 }
242 break;
243 }
Allen George0e22c362017-01-30 07:15:00 -0500244 }
245 }
246}