Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | use std::cell::RefCell; |
| 19 | use std::net::{TcpListener, TcpStream}; |
| 20 | use std::rc::Rc; |
| 21 | |
| 22 | use ::{ApplicationError, ApplicationErrorKind}; |
| 23 | use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; |
| 24 | use ::transport::{TTcpTransport, TTransport, TTransportFactory}; |
| 25 | |
| 26 | use super::TProcessor; |
| 27 | |
| 28 | /// Single-threaded blocking Thrift socket server. |
| 29 | /// |
| 30 | /// A `TSimpleServer` listens on a given address and services accepted |
| 31 | /// connections *synchronously* and *sequentially* - i.e. in a blocking manner, |
| 32 | /// one at a time - on the main thread. Each accepted connection has an input |
| 33 | /// half and an output half, each of which uses a `TTransport` and `TProtocol` |
| 34 | /// to translate messages to and from byes. Any combination of `TProtocol` and |
| 35 | /// `TTransport` may be used. |
| 36 | /// |
| 37 | /// # Examples |
| 38 | /// |
| 39 | /// Creating and running a `TSimpleServer` using Thrift-compiler-generated |
| 40 | /// service code. |
| 41 | /// |
| 42 | /// ```no_run |
| 43 | /// use thrift; |
| 44 | /// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; |
| 45 | /// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory}; |
| 46 | /// use thrift::protocol::{TInputProtocol, TOutputProtocol}; |
| 47 | /// use thrift::transport::{TBufferedTransportFactory, TTransportFactory}; |
| 48 | /// use thrift::server::{TProcessor, TSimpleServer}; |
| 49 | /// |
| 50 | /// // |
| 51 | /// // auto-generated |
| 52 | /// // |
| 53 | /// |
| 54 | /// // processor for `SimpleService` |
| 55 | /// struct SimpleServiceSyncProcessor; |
| 56 | /// impl SimpleServiceSyncProcessor { |
| 57 | /// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor { |
| 58 | /// unimplemented!(); |
| 59 | /// } |
| 60 | /// } |
| 61 | /// |
| 62 | /// // `TProcessor` implementation for `SimpleService` |
| 63 | /// impl TProcessor for SimpleServiceSyncProcessor { |
| 64 | /// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> { |
| 65 | /// unimplemented!(); |
| 66 | /// } |
| 67 | /// } |
| 68 | /// |
| 69 | /// // service functions for SimpleService |
| 70 | /// trait SimpleServiceSyncHandler { |
| 71 | /// fn service_call(&mut self) -> thrift::Result<()>; |
| 72 | /// } |
| 73 | /// |
| 74 | /// // |
| 75 | /// // user-code follows |
| 76 | /// // |
| 77 | /// |
| 78 | /// // define a handler that will be invoked when `service_call` is received |
| 79 | /// struct SimpleServiceHandlerImpl; |
| 80 | /// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl { |
| 81 | /// fn service_call(&mut self) -> thrift::Result<()> { |
| 82 | /// unimplemented!(); |
| 83 | /// } |
| 84 | /// } |
| 85 | /// |
| 86 | /// // instantiate the processor |
| 87 | /// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {}); |
| 88 | /// |
| 89 | /// // instantiate the server |
| 90 | /// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new()); |
| 91 | /// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new()); |
| 92 | /// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new()); |
| 93 | /// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new()); |
| 94 | /// |
| 95 | /// let mut server = TSimpleServer::new( |
| 96 | /// i_tr_fact, |
| 97 | /// i_pr_fact, |
| 98 | /// o_tr_fact, |
| 99 | /// o_pr_fact, |
| 100 | /// processor |
| 101 | /// ); |
| 102 | /// |
| 103 | /// // start listening for incoming connections |
| 104 | /// match server.listen("127.0.0.1:8080") { |
| 105 | /// Ok(_) => println!("listen completed"), |
| 106 | /// Err(e) => println!("listen failed with error {:?}", e), |
| 107 | /// } |
| 108 | /// ``` |
| 109 | pub struct TSimpleServer<PR: TProcessor> { |
| 110 | i_trans_factory: Box<TTransportFactory>, |
| 111 | i_proto_factory: Box<TInputProtocolFactory>, |
| 112 | o_trans_factory: Box<TTransportFactory>, |
| 113 | o_proto_factory: Box<TOutputProtocolFactory>, |
| 114 | processor: PR, |
| 115 | } |
| 116 | |
| 117 | impl<PR: TProcessor> TSimpleServer<PR> { |
| 118 | /// Create a `TSimpleServer`. |
| 119 | /// |
| 120 | /// Each accepted connection has an input and output half, each of which |
| 121 | /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses |
| 122 | /// `input_transport_factory` and `input_protocol_factory` to create |
| 123 | /// implementations for the input, and `output_transport_factory` and |
| 124 | /// `output_protocol_factory` to create implementations for the output. |
| 125 | pub fn new(input_transport_factory: Box<TTransportFactory>, |
| 126 | input_protocol_factory: Box<TInputProtocolFactory>, |
| 127 | output_transport_factory: Box<TTransportFactory>, |
| 128 | output_protocol_factory: Box<TOutputProtocolFactory>, |
| 129 | processor: PR) |
| 130 | -> TSimpleServer<PR> { |
| 131 | TSimpleServer { |
| 132 | i_trans_factory: input_transport_factory, |
| 133 | i_proto_factory: input_protocol_factory, |
| 134 | o_trans_factory: output_transport_factory, |
| 135 | o_proto_factory: output_protocol_factory, |
| 136 | processor: processor, |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | /// Listen for incoming connections on `listen_address`. |
| 141 | /// |
| 142 | /// `listen_address` should be in the form `host:port`, |
| 143 | /// for example: `127.0.0.1:8080`. |
| 144 | /// |
| 145 | /// Return `()` if successful. |
| 146 | /// |
| 147 | /// Return `Err` when the server cannot bind to `listen_address` or there |
| 148 | /// is an unrecoverable error. |
| 149 | pub fn listen(&mut self, listen_address: &str) -> ::Result<()> { |
| 150 | let listener = TcpListener::bind(listen_address)?; |
| 151 | for stream in listener.incoming() { |
| 152 | match stream { |
| 153 | Ok(s) => self.handle_incoming_connection(s), |
| 154 | Err(e) => warn!("failed to accept remote connection with error {:?}", e), |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | Err(::Error::Application(ApplicationError { |
| 159 | kind: ApplicationErrorKind::Unknown, |
| 160 | message: "aborted listen loop".into(), |
| 161 | })) |
| 162 | } |
| 163 | |
| 164 | fn handle_incoming_connection(&mut self, stream: TcpStream) { |
| 165 | // create the shared tcp stream |
| 166 | let stream = TTcpTransport::with_stream(stream); |
| 167 | let stream: Box<TTransport> = Box::new(stream); |
| 168 | let stream = Rc::new(RefCell::new(stream)); |
| 169 | |
| 170 | // input protocol and transport |
| 171 | let i_tran = self.i_trans_factory.create(stream.clone()); |
| 172 | let i_tran = Rc::new(RefCell::new(i_tran)); |
| 173 | let mut i_prot = self.i_proto_factory.create(i_tran); |
| 174 | |
| 175 | // output protocol and transport |
| 176 | let o_tran = self.o_trans_factory.create(stream.clone()); |
| 177 | let o_tran = Rc::new(RefCell::new(o_tran)); |
| 178 | let mut o_prot = self.o_proto_factory.create(o_tran); |
| 179 | |
| 180 | // process loop |
| 181 | loop { |
| 182 | let r = self.processor.process(&mut *i_prot, &mut *o_prot); |
| 183 | if let Err(e) = r { |
| 184 | warn!("processor failed with error: {:?}", e); |
| 185 | break; // FIXME: close here |
| 186 | } |
| 187 | } |
| 188 | } |
| 189 | } |