blob: 8373c469c5987a0f2479db124521714be989efae [file] [log] [blame]
Allen George0e22c362017-01-30 07:15:00 -05001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +010018use std::net::{TcpListener, TcpStream, ToSocketAddrs};
Allen George0e22c362017-01-30 07:15:00 -050019use std::sync::Arc;
20use threadpool::ThreadPool;
21
Allen Georgeb0d14132020-03-29 11:48:55 -040022use crate::protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
23use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
24use crate::{ApplicationError, ApplicationErrorKind};
Allen George0e22c362017-01-30 07:15:00 -050025
26use super::TProcessor;
Allen Georgeb0d14132020-03-29 11:48:55 -040027use crate::TransportErrorKind;
Allen George0e22c362017-01-30 07:15:00 -050028
29/// Fixed-size thread-pool blocking Thrift server.
30///
31/// A `TServer` listens on a given address and submits accepted connections
32/// to an **unbounded** queue. Connections from this queue are serviced by
33/// the first available worker thread from a **fixed-size** thread pool. Each
34/// accepted connection is handled by that worker thread, and communication
35/// over this thread occurs sequentially and synchronously (i.e. calls block).
36/// Accepted connections have an input half and an output half, each of which
37/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
38/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
39/// and `TTransport` may be used.
40///
41/// # Examples
42///
43/// Creating and running a `TServer` using Thrift-compiler-generated
44/// service code.
45///
46/// ```no_run
Allen George0e22c362017-01-30 07:15:00 -050047/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
48/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
49/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
Allen Georgebc1344d2017-04-28 10:22:03 -040050/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
51/// TReadTransportFactory, TWriteTransportFactory};
Allen George0e22c362017-01-30 07:15:00 -050052/// use thrift::server::{TProcessor, TServer};
53///
54/// //
55/// // auto-generated
56/// //
57///
58/// // processor for `SimpleService`
59/// struct SimpleServiceSyncProcessor;
60/// impl SimpleServiceSyncProcessor {
61/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
62/// unimplemented!();
63/// }
64/// }
65///
66/// // `TProcessor` implementation for `SimpleService`
67/// impl TProcessor for SimpleServiceSyncProcessor {
Marcin Pajkowskic6308412019-12-02 11:39:28 +010068/// fn process(&self, i: &mut dyn TInputProtocol, o: &mut dyn TOutputProtocol) -> thrift::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -050069/// unimplemented!();
70/// }
71/// }
72///
73/// // service functions for SimpleService
74/// trait SimpleServiceSyncHandler {
75/// fn service_call(&self) -> thrift::Result<()>;
76/// }
77///
78/// //
79/// // user-code follows
80/// //
81///
82/// // define a handler that will be invoked when `service_call` is received
83/// struct SimpleServiceHandlerImpl;
84/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
85/// fn service_call(&self) -> thrift::Result<()> {
86/// unimplemented!();
87/// }
88/// }
89///
90/// // instantiate the processor
91/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
92///
93/// // instantiate the server
Marcin Pajkowskic6308412019-12-02 11:39:28 +010094/// let i_tr_fact: Box<dyn TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
95/// let i_pr_fact: Box<dyn TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
96/// let o_tr_fact: Box<dyn TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
97/// let o_pr_fact: Box<dyn TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
Allen George0e22c362017-01-30 07:15:00 -050098///
99/// let mut server = TServer::new(
100/// i_tr_fact,
101/// i_pr_fact,
102/// o_tr_fact,
103/// o_pr_fact,
104/// processor,
105/// 10
106/// );
107///
108/// // start listening for incoming connections
109/// match server.listen("127.0.0.1:8080") {
110/// Ok(_) => println!("listen completed"),
111/// Err(e) => println!("listen failed with error {:?}", e),
112/// }
113/// ```
114#[derive(Debug)]
115pub struct TServer<PRC, RTF, IPF, WTF, OPF>
116where
117 PRC: TProcessor + Send + Sync + 'static,
118 RTF: TReadTransportFactory + 'static,
119 IPF: TInputProtocolFactory + 'static,
120 WTF: TWriteTransportFactory + 'static,
121 OPF: TOutputProtocolFactory + 'static,
122{
123 r_trans_factory: RTF,
124 i_proto_factory: IPF,
125 w_trans_factory: WTF,
126 o_proto_factory: OPF,
127 processor: Arc<PRC>,
128 worker_pool: ThreadPool,
129}
130
131impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
Allen Georgeef7a1892018-12-16 18:01:37 -0500132where
133 PRC: TProcessor + Send + Sync + 'static,
134 RTF: TReadTransportFactory + 'static,
135 IPF: TInputProtocolFactory + 'static,
136 WTF: TWriteTransportFactory + 'static,
137 OPF: TOutputProtocolFactory + 'static,
138{
Allen George0e22c362017-01-30 07:15:00 -0500139 /// Create a `TServer`.
140 ///
141 /// Each accepted connection has an input and output half, each of which
142 /// requires a `TTransport` and `TProtocol`. `TServer` uses
143 /// `read_transport_factory` and `input_protocol_factory` to create
144 /// implementations for the input, and `write_transport_factory` and
145 /// `output_protocol_factory` to create implementations for the output.
146 pub fn new(
147 read_transport_factory: RTF,
148 input_protocol_factory: IPF,
149 write_transport_factory: WTF,
150 output_protocol_factory: OPF,
151 processor: PRC,
152 num_workers: usize,
153 ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
154 TServer {
155 r_trans_factory: read_transport_factory,
156 i_proto_factory: input_protocol_factory,
157 w_trans_factory: write_transport_factory,
158 o_proto_factory: output_protocol_factory,
159 processor: Arc::new(processor),
Allen Georgeef7a1892018-12-16 18:01:37 -0500160 worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
Allen George0e22c362017-01-30 07:15:00 -0500161 }
162 }
163
164 /// Listen for incoming connections on `listen_address`.
165 ///
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +0100166 /// `listen_address` should implement `ToSocketAddrs` trait.
Allen George0e22c362017-01-30 07:15:00 -0500167 ///
168 /// Return `()` if successful.
169 ///
170 /// Return `Err` when the server cannot bind to `listen_address` or there
171 /// is an unrecoverable error.
Allen Georgeb0d14132020-03-29 11:48:55 -0400172 pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500173 let listener = TcpListener::bind(listen_address)?;
174 for stream in listener.incoming() {
175 match stream {
176 Ok(s) => {
177 let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
178 let processor = self.processor.clone();
179 self.worker_pool
Allen Georgeef7a1892018-12-16 18:01:37 -0500180 .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
Allen George0e22c362017-01-30 07:15:00 -0500181 }
182 Err(e) => {
183 warn!("failed to accept remote connection with error {:?}", e);
184 }
185 }
186 }
187
Allen Georgeb0d14132020-03-29 11:48:55 -0400188 Err(crate::Error::Application(ApplicationError {
Allen Georgeef7a1892018-12-16 18:01:37 -0500189 kind: ApplicationErrorKind::Unknown,
190 message: "aborted listen loop".into(),
191 }))
Allen George0e22c362017-01-30 07:15:00 -0500192 }
193
Allen George0e22c362017-01-30 07:15:00 -0500194 fn new_protocols_for_connection(
195 &mut self,
196 stream: TcpStream,
Allen Georgeb0d14132020-03-29 11:48:55 -0400197 ) -> crate::Result<(Box<dyn TInputProtocol + Send>, Box<dyn TOutputProtocol + Send>)> {
Allen George0e22c362017-01-30 07:15:00 -0500198 // create the shared tcp stream
199 let channel = TTcpChannel::with_stream(stream);
200
201 // split it into two - one to be owned by the
202 // input tran/proto and the other by the output
203 let (r_chan, w_chan) = channel.split()?;
204
205 // input protocol and transport
206 let r_tran = self.r_trans_factory.create(Box::new(r_chan));
207 let i_prot = self.i_proto_factory.create(r_tran);
208
209 // output protocol and transport
210 let w_tran = self.w_trans_factory.create(Box::new(w_chan));
211 let o_prot = self.o_proto_factory.create(w_tran);
212
213 Ok((i_prot, o_prot))
214 }
215}
216
217fn handle_incoming_connection<PRC>(
218 processor: Arc<PRC>,
Danny Browning77d96c12019-08-21 13:41:07 -0600219 i_prot: Box<dyn TInputProtocol>,
220 o_prot: Box<dyn TOutputProtocol>,
Allen George0e22c362017-01-30 07:15:00 -0500221) where
222 PRC: TProcessor,
223{
224 let mut i_prot = i_prot;
225 let mut o_prot = o_prot;
226 loop {
Sam De Roeck436bce32019-11-12 17:44:06 +0100227 match processor.process(&mut *i_prot, &mut *o_prot) {
228 Ok(()) => {},
229 Err(err) => {
230 match err {
Allen Georgeb0d14132020-03-29 11:48:55 -0400231 crate::Error::Transport(ref transport_err) if transport_err.kind == TransportErrorKind::EndOfFile => {},
Sam De Roeck436bce32019-11-12 17:44:06 +0100232 other => warn!("processor completed with error: {:?}", other),
233 }
234 break;
235 }
Allen George0e22c362017-01-30 07:15:00 -0500236 }
237 }
238}