THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/test/src/bin/kitchen_sink_client.rs b/lib/rs/test/src/bin/kitchen_sink_client.rs
index 27171be..9738298 100644
--- a/lib/rs/test/src/bin/kitchen_sink_client.rs
+++ b/lib/rs/test/src/bin/kitchen_sink_client.rs
@@ -21,13 +21,11 @@
extern crate kitchen_sink;
extern crate thrift;
-use std::cell::RefCell;
-use std::rc::Rc;
-
use kitchen_sink::base_two::{TNapkinServiceSyncClient, TRamenServiceSyncClient};
use kitchen_sink::midlayer::{MealServiceSyncClient, TMealServiceSyncClient};
use kitchen_sink::ultimate::{FullMealServiceSyncClient, TFullMealServiceSyncClient};
-use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel,
+ TTcpChannel, WriteHalf};
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol,
TCompactOutputProtocol, TInputProtocol, TOutputProtocol};
@@ -50,24 +48,25 @@
(@arg port: --port +takes_value "Port on which the Thrift test server is listening")
(@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
(@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")")
- ).get_matches();
+ )
+ .get_matches();
let host = matches.value_of("host").unwrap_or("127.0.0.1");
let port = value_t!(matches, "port", u16).unwrap_or(9090);
let protocol = matches.value_of("protocol").unwrap_or("compact");
let service = matches.value_of("service").unwrap_or("part");
- let t = open_tcp_transport(host, port)?;
- let t = Rc::new(RefCell::new(Box::new(TFramedTransport::new(t)) as Box<TTransport>));
+ let (i_chan, o_chan) = tcp_channel(host, port)?;
+ let (i_tran, o_tran) = (TFramedReadTransport::new(i_chan), TFramedWriteTransport::new(o_chan));
let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol {
"binary" => {
- (Box::new(TBinaryInputProtocol::new(t.clone(), true)),
- Box::new(TBinaryOutputProtocol::new(t.clone(), true)))
+ (Box::new(TBinaryInputProtocol::new(i_tran, true)),
+ Box::new(TBinaryOutputProtocol::new(o_tran, true)))
}
"compact" => {
- (Box::new(TCompactInputProtocol::new(t.clone())),
- Box::new(TCompactOutputProtocol::new(t.clone())))
+ (Box::new(TCompactInputProtocol::new(i_tran)),
+ Box::new(TCompactOutputProtocol::new(o_tran)))
}
unmatched => return Err(format!("unsupported protocol {}", unmatched).into()),
};
@@ -75,28 +74,31 @@
run_client(service, i_prot, o_prot)
}
-fn run_client(service: &str,
- i_prot: Box<TInputProtocol>,
- o_prot: Box<TOutputProtocol>)
- -> thrift::Result<()> {
+fn run_client(
+ service: &str,
+ i_prot: Box<TInputProtocol>,
+ o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
match service {
"full" => run_full_meal_service(i_prot, o_prot),
"part" => run_meal_service(i_prot, o_prot),
- _ => Err(thrift::Error::from(format!("unknown service type {}", service))),
+ _ => Err(thrift::Error::from(format!("unknown service type {}", service)),),
}
}
-fn open_tcp_transport(host: &str, port: u16) -> thrift::Result<Rc<RefCell<Box<TTransport>>>> {
- let mut t = TTcpTransport::new();
- match t.open(&format!("{}:{}", host, port)) {
- Ok(()) => Ok(Rc::new(RefCell::new(Box::new(t) as Box<TTransport>))),
- Err(e) => Err(e),
- }
+fn tcp_channel(
+ host: &str,
+ port: u16,
+) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> {
+ let mut c = TTcpChannel::new();
+ c.open(&format!("{}:{}", host, port))?;
+ c.split()
}
-fn run_meal_service(i_prot: Box<TInputProtocol>,
- o_prot: Box<TOutputProtocol>)
- -> thrift::Result<()> {
+fn run_meal_service(
+ i_prot: Box<TInputProtocol>,
+ o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
let mut client = MealServiceSyncClient::new(i_prot, o_prot);
// client.full_meal(); // <-- IMPORTANT: if you uncomment this, compilation *should* fail
@@ -110,9 +112,10 @@
Ok(())
}
-fn run_full_meal_service(i_prot: Box<TInputProtocol>,
- o_prot: Box<TOutputProtocol>)
- -> thrift::Result<()> {
+fn run_full_meal_service(
+ i_prot: Box<TInputProtocol>,
+ o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
let mut client = FullMealServiceSyncClient::new(i_prot, o_prot);
execute_call("full", "ramen", || client.ramen(100))?;
@@ -124,17 +127,20 @@
}
fn execute_call<F, R>(service_type: &str, call_name: &str, mut f: F) -> thrift::Result<()>
- where F: FnMut() -> thrift::Result<R>
+where
+ F: FnMut() -> thrift::Result<R>,
{
let res = f();
match res {
Ok(_) => println!("{}: completed {} call", service_type, call_name),
Err(ref e) => {
- println!("{}: failed {} call with error {:?}",
- service_type,
- call_name,
- e)
+ println!(
+ "{}: failed {} call with error {:?}",
+ service_type,
+ call_name,
+ e
+ )
}
}
diff --git a/lib/rs/test/src/bin/kitchen_sink_server.rs b/lib/rs/test/src/bin/kitchen_sink_server.rs
index 4ce4fa3..19112cd 100644
--- a/lib/rs/test/src/bin/kitchen_sink_server.rs
+++ b/lib/rs/test/src/bin/kitchen_sink_server.rs
@@ -22,7 +22,7 @@
extern crate thrift;
use kitchen_sink::base_one::Noodle;
-use kitchen_sink::base_two::{Napkin, Ramen, NapkinServiceSyncHandler, RamenServiceSyncHandler};
+use kitchen_sink::base_two::{Napkin, NapkinServiceSyncHandler, Ramen, RamenServiceSyncHandler};
use kitchen_sink::midlayer::{Dessert, Meal, MealServiceSyncHandler, MealServiceSyncProcessor};
use kitchen_sink::ultimate::{Drink, FullMeal, FullMealAndDrinks,
FullMealAndDrinksServiceSyncProcessor, FullMealServiceSyncHandler};
@@ -30,8 +30,9 @@
use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory,
TCompactInputProtocolFactory, TCompactOutputProtocolFactory,
TInputProtocolFactory, TOutputProtocolFactory};
-use thrift::transport::{TFramedTransportFactory, TTransportFactory};
-use thrift::server::TSimpleServer;
+use thrift::transport::{TFramedReadTransportFactory, TFramedWriteTransportFactory,
+ TReadTransportFactory, TWriteTransportFactory};
+use thrift::server::TServer;
fn main() {
match run() {
@@ -52,7 +53,8 @@
(@arg port: --port +takes_value "port on which the test server listens")
(@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
(@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")")
- ).get_matches();
+ )
+ .get_matches();
let port = value_t!(matches, "port", u16).unwrap_or(9090);
let protocol = matches.value_of("protocol").unwrap_or("compact");
@@ -61,9 +63,8 @@
println!("binding to {}", listen_address);
- let (i_transport_factory, o_transport_factory): (Box<TTransportFactory>,
- Box<TTransportFactory>) =
- (Box::new(TFramedTransportFactory {}), Box::new(TFramedTransportFactory {}));
+ let r_transport_factory = TFramedReadTransportFactory::new();
+ let w_transport_factory = TFramedWriteTransportFactory::new();
let (i_protocol_factory, o_protocol_factory): (Box<TInputProtocolFactory>,
Box<TOutputProtocolFactory>) =
@@ -93,51 +94,75 @@
// Since what I'm doing is uncommon I'm just going to duplicate the code
match &*service {
"part" => {
- run_meal_server(&listen_address,
- i_transport_factory,
- i_protocol_factory,
- o_transport_factory,
- o_protocol_factory)
+ run_meal_server(
+ &listen_address,
+ r_transport_factory,
+ i_protocol_factory,
+ w_transport_factory,
+ o_protocol_factory,
+ )
}
"full" => {
- run_full_meal_server(&listen_address,
- i_transport_factory,
- i_protocol_factory,
- o_transport_factory,
- o_protocol_factory)
+ run_full_meal_server(
+ &listen_address,
+ r_transport_factory,
+ i_protocol_factory,
+ w_transport_factory,
+ o_protocol_factory,
+ )
}
unknown => Err(format!("unsupported service type {}", unknown).into()),
}
}
-fn run_meal_server(listen_address: &str,
- i_transport_factory: Box<TTransportFactory>,
- i_protocol_factory: Box<TInputProtocolFactory>,
- o_transport_factory: Box<TTransportFactory>,
- o_protocol_factory: Box<TOutputProtocolFactory>)
- -> thrift::Result<()> {
+fn run_meal_server<RTF, IPF, WTF, OPF>(
+ listen_address: &str,
+ r_transport_factory: RTF,
+ i_protocol_factory: IPF,
+ w_transport_factory: WTF,
+ o_protocol_factory: OPF,
+) -> thrift::Result<()>
+where
+ RTF: TReadTransportFactory + 'static,
+ IPF: TInputProtocolFactory + 'static,
+ WTF: TWriteTransportFactory + 'static,
+ OPF: TOutputProtocolFactory + 'static,
+{
let processor = MealServiceSyncProcessor::new(PartHandler {});
- let mut server = TSimpleServer::new(i_transport_factory,
- i_protocol_factory,
- o_transport_factory,
- o_protocol_factory,
- processor);
+ let mut server = TServer::new(
+ r_transport_factory,
+ i_protocol_factory,
+ w_transport_factory,
+ o_protocol_factory,
+ processor,
+ 1,
+ );
server.listen(listen_address)
}
-fn run_full_meal_server(listen_address: &str,
- i_transport_factory: Box<TTransportFactory>,
- i_protocol_factory: Box<TInputProtocolFactory>,
- o_transport_factory: Box<TTransportFactory>,
- o_protocol_factory: Box<TOutputProtocolFactory>)
- -> thrift::Result<()> {
+fn run_full_meal_server<RTF, IPF, WTF, OPF>(
+ listen_address: &str,
+ r_transport_factory: RTF,
+ i_protocol_factory: IPF,
+ w_transport_factory: WTF,
+ o_protocol_factory: OPF,
+) -> thrift::Result<()>
+where
+ RTF: TReadTransportFactory + 'static,
+ IPF: TInputProtocolFactory + 'static,
+ WTF: TWriteTransportFactory + 'static,
+ OPF: TOutputProtocolFactory + 'static,
+{
let processor = FullMealAndDrinksServiceSyncProcessor::new(FullHandler {});
- let mut server = TSimpleServer::new(i_transport_factory,
- i_protocol_factory,
- o_transport_factory,
- o_protocol_factory,
- processor);
+ let mut server = TServer::new(
+ r_transport_factory,
+ i_protocol_factory,
+ w_transport_factory,
+ o_protocol_factory,
+ processor,
+ 1,
+ );
server.listen(listen_address)
}
@@ -145,21 +170,21 @@
struct PartHandler;
impl MealServiceSyncHandler for PartHandler {
- fn handle_meal(&mut self) -> thrift::Result<Meal> {
+ fn handle_meal(&self) -> thrift::Result<Meal> {
println!("part: handling meal call");
Ok(meal())
}
}
impl RamenServiceSyncHandler for PartHandler {
- fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> {
+ fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
println!("part: handling ramen call");
Ok(ramen())
}
}
impl NapkinServiceSyncHandler for PartHandler {
- fn handle_napkin(&mut self) -> thrift::Result<Napkin> {
+ fn handle_napkin(&self) -> thrift::Result<Napkin> {
println!("part: handling napkin call");
Ok(napkin())
}
@@ -171,34 +196,34 @@
struct FullHandler;
impl FullMealAndDrinksServiceSyncHandler for FullHandler {
- fn handle_full_meal_and_drinks(&mut self) -> thrift::Result<FullMealAndDrinks> {
+ fn handle_full_meal_and_drinks(&self) -> thrift::Result<FullMealAndDrinks> {
Ok(FullMealAndDrinks::new(full_meal(), Drink::WHISKEY))
}
}
impl FullMealServiceSyncHandler for FullHandler {
- fn handle_full_meal(&mut self) -> thrift::Result<FullMeal> {
+ fn handle_full_meal(&self) -> thrift::Result<FullMeal> {
println!("full: handling full meal call");
Ok(full_meal())
}
}
impl MealServiceSyncHandler for FullHandler {
- fn handle_meal(&mut self) -> thrift::Result<Meal> {
+ fn handle_meal(&self) -> thrift::Result<Meal> {
println!("full: handling meal call");
Ok(meal())
}
}
impl RamenServiceSyncHandler for FullHandler {
- fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> {
+ fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
println!("full: handling ramen call");
Ok(ramen())
}
}
impl NapkinServiceSyncHandler for FullHandler {
- fn handle_napkin(&mut self) -> thrift::Result<Napkin> {
+ fn handle_napkin(&self) -> thrift::Result<Napkin> {
println!("full: handling napkin call");
Ok(napkin())
}
diff --git a/lib/rs/test/src/lib.rs b/lib/rs/test/src/lib.rs
index 8a7ccd0..53f4873 100644
--- a/lib/rs/test/src/lib.rs
+++ b/lib/rs/test/src/lib.rs
@@ -48,6 +48,9 @@
#[test]
fn must_be_able_to_use_defaults() {
- let _ = midlayer::Meal { noodle: Some(base_one::Noodle::default()), ..Default::default() };
+ let _ = midlayer::Meal {
+ noodle: Some(base_one::Noodle::default()),
+ ..Default::default()
+ };
}
}