blob: 89ed9778e52e6de32a46e5dfe71bf1034348fc86 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cell::RefCell;
19use std::net::{TcpListener, TcpStream};
20use std::rc::Rc;
21
22use ::{ApplicationError, ApplicationErrorKind};
23use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
24use ::transport::{TTcpTransport, TTransport, TTransportFactory};
25
26use super::TProcessor;
27
28/// Single-threaded blocking Thrift socket server.
29///
30/// A `TSimpleServer` listens on a given address and services accepted
31/// connections *synchronously* and *sequentially* - i.e. in a blocking manner,
32/// one at a time - on the main thread. Each accepted connection has an input
33/// half and an output half, each of which uses a `TTransport` and `TProtocol`
34/// to translate messages to and from byes. Any combination of `TProtocol` and
35/// `TTransport` may be used.
36///
37/// # Examples
38///
39/// Creating and running a `TSimpleServer` using Thrift-compiler-generated
40/// service code.
41///
42/// ```no_run
43/// use thrift;
44/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
45/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
46/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
47/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
48/// use thrift::server::{TProcessor, TSimpleServer};
49///
50/// //
51/// // auto-generated
52/// //
53///
54/// // processor for `SimpleService`
55/// struct SimpleServiceSyncProcessor;
56/// impl SimpleServiceSyncProcessor {
57/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
58/// unimplemented!();
59/// }
60/// }
61///
62/// // `TProcessor` implementation for `SimpleService`
63/// impl TProcessor for SimpleServiceSyncProcessor {
64/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
65/// unimplemented!();
66/// }
67/// }
68///
69/// // service functions for SimpleService
70/// trait SimpleServiceSyncHandler {
71/// fn service_call(&mut self) -> thrift::Result<()>;
72/// }
73///
74/// //
75/// // user-code follows
76/// //
77///
78/// // define a handler that will be invoked when `service_call` is received
79/// struct SimpleServiceHandlerImpl;
80/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
81/// fn service_call(&mut self) -> thrift::Result<()> {
82/// unimplemented!();
83/// }
84/// }
85///
86/// // instantiate the processor
87/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
88///
89/// // instantiate the server
90/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
91/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
92/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
93/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
94///
95/// let mut server = TSimpleServer::new(
96/// i_tr_fact,
97/// i_pr_fact,
98/// o_tr_fact,
99/// o_pr_fact,
100/// processor
101/// );
102///
103/// // start listening for incoming connections
104/// match server.listen("127.0.0.1:8080") {
105/// Ok(_) => println!("listen completed"),
106/// Err(e) => println!("listen failed with error {:?}", e),
107/// }
108/// ```
109pub struct TSimpleServer<PR: TProcessor> {
110 i_trans_factory: Box<TTransportFactory>,
111 i_proto_factory: Box<TInputProtocolFactory>,
112 o_trans_factory: Box<TTransportFactory>,
113 o_proto_factory: Box<TOutputProtocolFactory>,
114 processor: PR,
115}
116
117impl<PR: TProcessor> TSimpleServer<PR> {
118 /// Create a `TSimpleServer`.
119 ///
120 /// Each accepted connection has an input and output half, each of which
121 /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses
122 /// `input_transport_factory` and `input_protocol_factory` to create
123 /// implementations for the input, and `output_transport_factory` and
124 /// `output_protocol_factory` to create implementations for the output.
125 pub fn new(input_transport_factory: Box<TTransportFactory>,
126 input_protocol_factory: Box<TInputProtocolFactory>,
127 output_transport_factory: Box<TTransportFactory>,
128 output_protocol_factory: Box<TOutputProtocolFactory>,
129 processor: PR)
130 -> TSimpleServer<PR> {
131 TSimpleServer {
132 i_trans_factory: input_transport_factory,
133 i_proto_factory: input_protocol_factory,
134 o_trans_factory: output_transport_factory,
135 o_proto_factory: output_protocol_factory,
136 processor: processor,
137 }
138 }
139
140 /// Listen for incoming connections on `listen_address`.
141 ///
142 /// `listen_address` should be in the form `host:port`,
143 /// for example: `127.0.0.1:8080`.
144 ///
145 /// Return `()` if successful.
146 ///
147 /// Return `Err` when the server cannot bind to `listen_address` or there
148 /// is an unrecoverable error.
149 pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
150 let listener = TcpListener::bind(listen_address)?;
151 for stream in listener.incoming() {
152 match stream {
153 Ok(s) => self.handle_incoming_connection(s),
154 Err(e) => warn!("failed to accept remote connection with error {:?}", e),
155 }
156 }
157
158 Err(::Error::Application(ApplicationError {
159 kind: ApplicationErrorKind::Unknown,
160 message: "aborted listen loop".into(),
161 }))
162 }
163
164 fn handle_incoming_connection(&mut self, stream: TcpStream) {
165 // create the shared tcp stream
166 let stream = TTcpTransport::with_stream(stream);
167 let stream: Box<TTransport> = Box::new(stream);
168 let stream = Rc::new(RefCell::new(stream));
169
170 // input protocol and transport
171 let i_tran = self.i_trans_factory.create(stream.clone());
172 let i_tran = Rc::new(RefCell::new(i_tran));
173 let mut i_prot = self.i_proto_factory.create(i_tran);
174
175 // output protocol and transport
176 let o_tran = self.o_trans_factory.create(stream.clone());
177 let o_tran = Rc::new(RefCell::new(o_tran));
178 let mut o_prot = self.o_proto_factory.create(o_tran);
179
180 // process loop
181 loop {
182 let r = self.processor.process(&mut *i_prot, &mut *o_prot);
183 if let Err(e) = r {
184 warn!("processor failed with error: {:?}", e);
185 break; // FIXME: close here
186 }
187 }
188 }
189}