Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
Allen George | 7ddbcc0 | 2020-11-08 09:51:19 -0500 | [diff] [blame] | 18 | use log::warn; |
| 19 | |
tokcum | f033641 | 2022-03-30 11:39:08 +0200 | [diff] [blame] | 20 | use std::net::{TcpListener, ToSocketAddrs}; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 21 | use std::sync::Arc; |
| 22 | use threadpool::ThreadPool; |
| 23 | |
tokcum | f033641 | 2022-03-30 11:39:08 +0200 | [diff] [blame] | 24 | #[cfg(unix)] |
| 25 | use std::os::unix::net::UnixListener; |
| 26 | #[cfg(unix)] |
| 27 | use std::path::Path; |
| 28 | |
Allen George | 55c3e4c | 2021-03-01 23:19:52 -0500 | [diff] [blame] | 29 | use crate::protocol::{ |
| 30 | TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory, |
| 31 | }; |
Allen George | b0d1413 | 2020-03-29 11:48:55 -0400 | [diff] [blame] | 32 | use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory}; |
| 33 | use crate::{ApplicationError, ApplicationErrorKind}; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 34 | |
| 35 | use super::TProcessor; |
Allen George | b0d1413 | 2020-03-29 11:48:55 -0400 | [diff] [blame] | 36 | use crate::TransportErrorKind; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 37 | |
| 38 | /// Fixed-size thread-pool blocking Thrift server. |
| 39 | /// |
| 40 | /// A `TServer` listens on a given address and submits accepted connections |
| 41 | /// to an **unbounded** queue. Connections from this queue are serviced by |
| 42 | /// the first available worker thread from a **fixed-size** thread pool. Each |
| 43 | /// accepted connection is handled by that worker thread, and communication |
| 44 | /// over this thread occurs sequentially and synchronously (i.e. calls block). |
| 45 | /// Accepted connections have an input half and an output half, each of which |
| 46 | /// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate |
| 47 | /// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol` |
| 48 | /// and `TTransport` may be used. |
| 49 | /// |
| 50 | /// # Examples |
| 51 | /// |
| 52 | /// Creating and running a `TServer` using Thrift-compiler-generated |
| 53 | /// service code. |
| 54 | /// |
| 55 | /// ```no_run |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 56 | /// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; |
| 57 | /// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory}; |
| 58 | /// use thrift::protocol::{TInputProtocol, TOutputProtocol}; |
Allen George | bc1344d | 2017-04-28 10:22:03 -0400 | [diff] [blame] | 59 | /// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, |
| 60 | /// TReadTransportFactory, TWriteTransportFactory}; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 61 | /// use thrift::server::{TProcessor, TServer}; |
| 62 | /// |
| 63 | /// // |
| 64 | /// // auto-generated |
| 65 | /// // |
| 66 | /// |
| 67 | /// // processor for `SimpleService` |
| 68 | /// struct SimpleServiceSyncProcessor; |
| 69 | /// impl SimpleServiceSyncProcessor { |
| 70 | /// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor { |
| 71 | /// unimplemented!(); |
| 72 | /// } |
| 73 | /// } |
| 74 | /// |
| 75 | /// // `TProcessor` implementation for `SimpleService` |
| 76 | /// impl TProcessor for SimpleServiceSyncProcessor { |
Marcin Pajkowski | c630841 | 2019-12-02 11:39:28 +0100 | [diff] [blame] | 77 | /// fn process(&self, i: &mut dyn TInputProtocol, o: &mut dyn TOutputProtocol) -> thrift::Result<()> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 78 | /// unimplemented!(); |
| 79 | /// } |
| 80 | /// } |
| 81 | /// |
| 82 | /// // service functions for SimpleService |
| 83 | /// trait SimpleServiceSyncHandler { |
| 84 | /// fn service_call(&self) -> thrift::Result<()>; |
| 85 | /// } |
| 86 | /// |
| 87 | /// // |
| 88 | /// // user-code follows |
| 89 | /// // |
| 90 | /// |
| 91 | /// // define a handler that will be invoked when `service_call` is received |
| 92 | /// struct SimpleServiceHandlerImpl; |
| 93 | /// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl { |
| 94 | /// fn service_call(&self) -> thrift::Result<()> { |
| 95 | /// unimplemented!(); |
| 96 | /// } |
| 97 | /// } |
| 98 | /// |
| 99 | /// // instantiate the processor |
| 100 | /// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {}); |
| 101 | /// |
| 102 | /// // instantiate the server |
Marcin Pajkowski | c630841 | 2019-12-02 11:39:28 +0100 | [diff] [blame] | 103 | /// let i_tr_fact: Box<dyn TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new()); |
| 104 | /// let i_pr_fact: Box<dyn TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new()); |
| 105 | /// let o_tr_fact: Box<dyn TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new()); |
| 106 | /// let o_pr_fact: Box<dyn TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new()); |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 107 | /// |
| 108 | /// let mut server = TServer::new( |
| 109 | /// i_tr_fact, |
| 110 | /// i_pr_fact, |
| 111 | /// o_tr_fact, |
| 112 | /// o_pr_fact, |
| 113 | /// processor, |
| 114 | /// 10 |
| 115 | /// ); |
| 116 | /// |
| 117 | /// // start listening for incoming connections |
| 118 | /// match server.listen("127.0.0.1:8080") { |
| 119 | /// Ok(_) => println!("listen completed"), |
| 120 | /// Err(e) => println!("listen failed with error {:?}", e), |
| 121 | /// } |
| 122 | /// ``` |
| 123 | #[derive(Debug)] |
| 124 | pub struct TServer<PRC, RTF, IPF, WTF, OPF> |
| 125 | where |
| 126 | PRC: TProcessor + Send + Sync + 'static, |
| 127 | RTF: TReadTransportFactory + 'static, |
| 128 | IPF: TInputProtocolFactory + 'static, |
| 129 | WTF: TWriteTransportFactory + 'static, |
| 130 | OPF: TOutputProtocolFactory + 'static, |
| 131 | { |
| 132 | r_trans_factory: RTF, |
| 133 | i_proto_factory: IPF, |
| 134 | w_trans_factory: WTF, |
| 135 | o_proto_factory: OPF, |
| 136 | processor: Arc<PRC>, |
| 137 | worker_pool: ThreadPool, |
| 138 | } |
| 139 | |
| 140 | impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF> |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 141 | where |
| 142 | PRC: TProcessor + Send + Sync + 'static, |
| 143 | RTF: TReadTransportFactory + 'static, |
| 144 | IPF: TInputProtocolFactory + 'static, |
| 145 | WTF: TWriteTransportFactory + 'static, |
| 146 | OPF: TOutputProtocolFactory + 'static, |
| 147 | { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 148 | /// Create a `TServer`. |
| 149 | /// |
| 150 | /// Each accepted connection has an input and output half, each of which |
| 151 | /// requires a `TTransport` and `TProtocol`. `TServer` uses |
| 152 | /// `read_transport_factory` and `input_protocol_factory` to create |
| 153 | /// implementations for the input, and `write_transport_factory` and |
| 154 | /// `output_protocol_factory` to create implementations for the output. |
| 155 | pub fn new( |
| 156 | read_transport_factory: RTF, |
| 157 | input_protocol_factory: IPF, |
| 158 | write_transport_factory: WTF, |
| 159 | output_protocol_factory: OPF, |
| 160 | processor: PRC, |
| 161 | num_workers: usize, |
| 162 | ) -> TServer<PRC, RTF, IPF, WTF, OPF> { |
| 163 | TServer { |
| 164 | r_trans_factory: read_transport_factory, |
| 165 | i_proto_factory: input_protocol_factory, |
| 166 | w_trans_factory: write_transport_factory, |
| 167 | o_proto_factory: output_protocol_factory, |
| 168 | processor: Arc::new(processor), |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 169 | worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers), |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 170 | } |
| 171 | } |
| 172 | |
| 173 | /// Listen for incoming connections on `listen_address`. |
| 174 | /// |
Marcin Pajkowski | 98ce2c8 | 2019-11-05 00:20:15 +0100 | [diff] [blame] | 175 | /// `listen_address` should implement `ToSocketAddrs` trait. |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 176 | /// |
| 177 | /// Return `()` if successful. |
| 178 | /// |
| 179 | /// Return `Err` when the server cannot bind to `listen_address` or there |
| 180 | /// is an unrecoverable error. |
Allen George | b0d1413 | 2020-03-29 11:48:55 -0400 | [diff] [blame] | 181 | pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 182 | let listener = TcpListener::bind(listen_address)?; |
| 183 | for stream in listener.incoming() { |
| 184 | match stream { |
| 185 | Ok(s) => { |
tokcum | f033641 | 2022-03-30 11:39:08 +0200 | [diff] [blame] | 186 | let channel = TTcpChannel::with_stream(s); |
| 187 | self.handle_stream(channel)?; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 188 | } |
| 189 | Err(e) => { |
| 190 | warn!("failed to accept remote connection with error {:?}", e); |
| 191 | } |
| 192 | } |
| 193 | } |
| 194 | |
Allen George | b0d1413 | 2020-03-29 11:48:55 -0400 | [diff] [blame] | 195 | Err(crate::Error::Application(ApplicationError { |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 196 | kind: ApplicationErrorKind::Unknown, |
| 197 | message: "aborted listen loop".into(), |
| 198 | })) |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 199 | } |
| 200 | |
tokcum | f033641 | 2022-03-30 11:39:08 +0200 | [diff] [blame] | 201 | /// Listen for incoming connections on `listen_path`. |
| 202 | /// |
| 203 | /// `listen_path` should implement `AsRef<Path>` trait. |
| 204 | /// |
| 205 | /// Return `()` if successful. |
| 206 | /// |
| 207 | /// Return `Err` when the server cannot bind to `listen_path` or there |
| 208 | /// is an unrecoverable error. |
| 209 | #[cfg(unix)] |
| 210 | pub fn listen_uds<P: AsRef<Path>>(&mut self, listen_path: P) -> crate::Result<()> { |
| 211 | let listener = UnixListener::bind(listen_path)?; |
| 212 | for stream in listener.incoming() { |
| 213 | match stream { |
| 214 | Ok(s) => { |
| 215 | self.handle_stream(s)?; |
| 216 | } |
| 217 | Err(e) => { |
| 218 | warn!( |
| 219 | "failed to accept connection via unix domain socket with error {:?}", |
| 220 | e |
| 221 | ); |
| 222 | } |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | Err(crate::Error::Application(ApplicationError { |
| 227 | kind: ApplicationErrorKind::Unknown, |
| 228 | message: "aborted listen loop".into(), |
| 229 | })) |
| 230 | } |
| 231 | |
| 232 | fn handle_stream<S: TIoChannel + Send + 'static>(&mut self, stream: S) -> crate::Result<()> { |
| 233 | let (i_prot, o_prot) = self.new_protocols_for_connection(stream)?; |
| 234 | let processor = self.processor.clone(); |
| 235 | self.worker_pool |
| 236 | .execute(move || handle_incoming_connection(processor, i_prot, o_prot)); |
| 237 | Ok(()) |
| 238 | } |
| 239 | |
| 240 | fn new_protocols_for_connection<S: TIoChannel + Send + 'static>( |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 241 | &mut self, |
tokcum | f033641 | 2022-03-30 11:39:08 +0200 | [diff] [blame] | 242 | stream: S, |
Allen George | 55c3e4c | 2021-03-01 23:19:52 -0500 | [diff] [blame] | 243 | ) -> crate::Result<( |
| 244 | Box<dyn TInputProtocol + Send>, |
| 245 | Box<dyn TOutputProtocol + Send>, |
| 246 | )> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 247 | // split it into two - one to be owned by the |
| 248 | // input tran/proto and the other by the output |
tokcum | f033641 | 2022-03-30 11:39:08 +0200 | [diff] [blame] | 249 | let (r_chan, w_chan) = stream.split()?; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 250 | |
| 251 | // input protocol and transport |
| 252 | let r_tran = self.r_trans_factory.create(Box::new(r_chan)); |
| 253 | let i_prot = self.i_proto_factory.create(r_tran); |
| 254 | |
| 255 | // output protocol and transport |
| 256 | let w_tran = self.w_trans_factory.create(Box::new(w_chan)); |
| 257 | let o_prot = self.o_proto_factory.create(w_tran); |
| 258 | |
| 259 | Ok((i_prot, o_prot)) |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | fn handle_incoming_connection<PRC>( |
| 264 | processor: Arc<PRC>, |
Danny Browning | 77d96c1 | 2019-08-21 13:41:07 -0600 | [diff] [blame] | 265 | i_prot: Box<dyn TInputProtocol>, |
| 266 | o_prot: Box<dyn TOutputProtocol>, |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 267 | ) where |
| 268 | PRC: TProcessor, |
| 269 | { |
| 270 | let mut i_prot = i_prot; |
| 271 | let mut o_prot = o_prot; |
| 272 | loop { |
Sam De Roeck | 436bce3 | 2019-11-12 17:44:06 +0100 | [diff] [blame] | 273 | match processor.process(&mut *i_prot, &mut *o_prot) { |
Allen George | 55c3e4c | 2021-03-01 23:19:52 -0500 | [diff] [blame] | 274 | Ok(()) => {} |
Sam De Roeck | 436bce3 | 2019-11-12 17:44:06 +0100 | [diff] [blame] | 275 | Err(err) => { |
| 276 | match err { |
Allen George | 55c3e4c | 2021-03-01 23:19:52 -0500 | [diff] [blame] | 277 | crate::Error::Transport(ref transport_err) |
| 278 | if transport_err.kind == TransportErrorKind::EndOfFile => {} |
Sam De Roeck | 436bce3 | 2019-11-12 17:44:06 +0100 | [diff] [blame] | 279 | other => warn!("processor completed with error: {:?}", other), |
| 280 | } |
| 281 | break; |
| 282 | } |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 283 | } |
| 284 | } |
| 285 | } |