THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/test/rs/src/bin/test_client.rs b/test/rs/src/bin/test_client.rs
index a2ea832..aad78a0 100644
--- a/test/rs/src/bin/test_client.rs
+++ b/test/rs/src/bin/test_client.rs
@@ -22,14 +22,14 @@
extern crate thrift_test; // huh. I have to do this to use my lib
use ordered_float::OrderedFloat;
-use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
-use std::rc::Rc;
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol,
TCompactOutputProtocol, TInputProtocol, TOutputProtocol};
-use thrift::transport::{TBufferedTransport, TFramedTransport, TTcpTransport, TTransport};
+use thrift::transport::{ReadHalf, TBufferedReadTransport, TBufferedWriteTransport,
+ TFramedReadTransport, TFramedWriteTransport, TIoChannel, TReadTransport,
+ TTcpChannel, TWriteTransport, WriteHalf};
use thrift_test::*;
fn main() {
@@ -58,7 +58,8 @@
(@arg transport: --transport +takes_value "Thrift transport implementation to use (\"buffered\", \"framed\")")
(@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
(@arg testloops: -n --testloops +takes_value "Number of times to run tests")
- ).get_matches();
+ )
+ .get_matches();
let host = matches.value_of("host").unwrap_or("127.0.0.1");
let port = value_t!(matches, "port", u16).unwrap_or(9090);
@@ -66,32 +67,39 @@
let transport = matches.value_of("transport").unwrap_or("buffered");
let protocol = matches.value_of("protocol").unwrap_or("binary");
- let t = open_tcp_transport(host, port)?;
+ let (i_chan, o_chan) = tcp_channel(host, port)?;
- let t: Box<TTransport> = match transport {
- "buffered" => Box::new(TBufferedTransport::new(t)),
- "framed" => Box::new(TFramedTransport::new(t)),
+ let (i_tran, o_tran) = match transport {
+ "buffered" => {
+ (Box::new(TBufferedReadTransport::new(i_chan)) as Box<TReadTransport>,
+ Box::new(TBufferedWriteTransport::new(o_chan)) as Box<TWriteTransport>)
+ }
+ "framed" => {
+ (Box::new(TFramedReadTransport::new(i_chan)) as Box<TReadTransport>,
+ Box::new(TFramedWriteTransport::new(o_chan)) as Box<TWriteTransport>)
+ }
unmatched => return Err(format!("unsupported transport {}", unmatched).into()),
};
- let t = Rc::new(RefCell::new(t));
let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol {
"binary" => {
- (Box::new(TBinaryInputProtocol::new(t.clone(), true)),
- Box::new(TBinaryOutputProtocol::new(t.clone(), true)))
+ (Box::new(TBinaryInputProtocol::new(i_tran, true)),
+ Box::new(TBinaryOutputProtocol::new(o_tran, true)))
}
"compact" => {
- (Box::new(TCompactInputProtocol::new(t.clone())),
- Box::new(TCompactOutputProtocol::new(t.clone())))
+ (Box::new(TCompactInputProtocol::new(i_tran)),
+ Box::new(TCompactOutputProtocol::new(o_tran)))
}
unmatched => return Err(format!("unsupported protocol {}", unmatched).into()),
};
- println!("connecting to {}:{} with {}+{} stack",
- host,
- port,
- protocol,
- transport);
+ println!(
+ "connecting to {}:{} with {}+{} stack",
+ host,
+ port,
+ protocol,
+ transport
+ );
let mut client = ThriftTestSyncClient::new(i_prot, o_prot);
@@ -102,16 +110,19 @@
Ok(())
}
-// FIXME: expose "open" through the client interface so I don't have to early open the transport
-fn open_tcp_transport(host: &str, port: u16) -> thrift::Result<Rc<RefCell<Box<TTransport>>>> {
- let mut t = TTcpTransport::new();
- match t.open(&format!("{}:{}", host, port)) {
- Ok(()) => Ok(Rc::new(RefCell::new(Box::new(t) as Box<TTransport>))),
- Err(e) => Err(e),
- }
+// FIXME: expose "open" through the client interface so I don't have to early
+// open
+fn tcp_channel(
+ host: &str,
+ port: u16,
+) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> {
+ let mut c = TTcpChannel::new();
+ c.open(&format!("{}:{}", host, port))?;
+ c.split()
}
-fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Error> {
+fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<TOutputProtocol>>,)
+ -> Result<(), thrift::Error> {
println!("testVoid");
client.test_void()?;
@@ -131,12 +142,15 @@
verify_expected_result(client.test_i32(1159348374), 1159348374)?;
println!("testi64");
- // try!(verify_expected_result(client.test_i64(-8651829879438294565), -8651829879438294565));
+ // try!(verify_expected_result(client.test_i64(-8651829879438294565),
+ // -8651829879438294565));
verify_expected_result(client.test_i64(i64::min_value()), i64::min_value())?;
println!("testDouble");
- verify_expected_result(client.test_double(OrderedFloat::from(42.42)),
- OrderedFloat::from(42.42))?;
+ verify_expected_result(
+ client.test_double(OrderedFloat::from(42.42)),
+ OrderedFloat::from(42.42),
+ )?;
println!("testTypedef");
{
@@ -175,10 +189,14 @@
}
// Xtruct again, with optional values
- // FIXME: apparently the erlang thrift server does not like opt-in-req-out parameters that are undefined. Joy.
+ // FIXME: apparently the erlang thrift server does not like opt-in-req-out
+ // parameters that are undefined. Joy.
// {
- // let x_snd = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: None, i32_thing: None, i64_thing: Some(12938492818) };
- // let x_cmp = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: Some(0), i32_thing: Some(0), i64_thing: Some(12938492818) }; // the C++ server is responding correctly
+ // let x_snd = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: None,
+ // i32_thing: None, i64_thing: Some(12938492818) };
+ // let x_cmp = Xtruct { string_thing: Some("foo".to_owned()), byte_thing:
+ // Some(0), i32_thing: Some(0), i64_thing: Some(12938492818) }; // the C++
+ // server is responding correctly
// try!(verify_expected_result(client.test_struct(x_snd), x_cmp));
// }
//
@@ -188,22 +206,26 @@
{
let x_snd = Xtruct2 {
byte_thing: Some(32),
- struct_thing: Some(Xtruct {
- string_thing: Some("foo".to_owned()),
- byte_thing: Some(1),
- i32_thing: Some(324382098),
- i64_thing: Some(12938492818),
- }),
+ struct_thing: Some(
+ Xtruct {
+ string_thing: Some("foo".to_owned()),
+ byte_thing: Some(1),
+ i32_thing: Some(324382098),
+ i64_thing: Some(12938492818),
+ },
+ ),
i32_thing: Some(293481098),
};
let x_cmp = Xtruct2 {
byte_thing: Some(32),
- struct_thing: Some(Xtruct {
- string_thing: Some("foo".to_owned()),
- byte_thing: Some(1),
- i32_thing: Some(324382098),
- i64_thing: Some(12938492818),
- }),
+ struct_thing: Some(
+ Xtruct {
+ string_thing: Some("foo".to_owned()),
+ byte_thing: Some(1),
+ i32_thing: Some(324382098),
+ i64_thing: Some(12938492818),
+ },
+ ),
i32_thing: Some(293481098),
};
verify_expected_result(client.test_nest(x_snd), x_cmp)?;
@@ -270,7 +292,8 @@
}
// nested map
- // expect : {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2 => 2, 3 => 3, 4 => 4, }, }
+ // expect : {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2
+ // => 2, 3 => 3, 4 => 4, }, }
println!("testMapMap");
{
let mut m_cmp_nested_0: BTreeMap<i32, i32> = BTreeMap::new();
@@ -302,13 +325,10 @@
i64_thing: Some(-19234123981),
};
- verify_expected_result(client.test_multi(1,
- -123948,
- -19234123981,
- m_snd,
- Numberz::EIGHT,
- 81),
- s_cmp)?;
+ verify_expected_result(
+ client.test_multi(1, -123948, -19234123981, m_snd, Numberz::EIGHT, 81),
+ s_cmp,
+ )?;
}
// Insanity
@@ -324,24 +344,30 @@
arg_map_usermap.insert(Numberz::EIGHT, 19);
let mut arg_vec_xtructs: Vec<Xtruct> = Vec::new();
- arg_vec_xtructs.push(Xtruct {
- string_thing: Some("foo".to_owned()),
- byte_thing: Some(8),
- i32_thing: Some(29),
- i64_thing: Some(92384),
- });
- arg_vec_xtructs.push(Xtruct {
- string_thing: Some("bar".to_owned()),
- byte_thing: Some(28),
- i32_thing: Some(2),
- i64_thing: Some(-1281),
- });
- arg_vec_xtructs.push(Xtruct {
- string_thing: Some("baz".to_owned()),
- byte_thing: Some(0),
- i32_thing: Some(3948539),
- i64_thing: Some(-12938492),
- });
+ arg_vec_xtructs.push(
+ Xtruct {
+ string_thing: Some("foo".to_owned()),
+ byte_thing: Some(8),
+ i32_thing: Some(29),
+ i64_thing: Some(92384),
+ },
+ );
+ arg_vec_xtructs.push(
+ Xtruct {
+ string_thing: Some("bar".to_owned()),
+ byte_thing: Some(28),
+ i32_thing: Some(2),
+ i64_thing: Some(-1281),
+ },
+ );
+ arg_vec_xtructs.push(
+ Xtruct {
+ string_thing: Some("baz".to_owned()),
+ byte_thing: Some(0),
+ i32_thing: Some(3948539),
+ i64_thing: Some(-12938492),
+ },
+ );
let mut s_cmp_nested_1: BTreeMap<Numberz, Insanity> = BTreeMap::new();
let insanity = Insanity {
@@ -372,7 +398,7 @@
Err(thrift::Error::User(ref e)) => {
match e.downcast_ref::<Xception>() {
Some(x) => Ok(x),
- None => Err(thrift::Error::User("did not get expected Xception struct".into())),
+ None => Err(thrift::Error::User("did not get expected Xception struct".into()),),
}
}
_ => Err(thrift::Error::User("did not get exception".into())),
@@ -414,7 +440,7 @@
Err(thrift::Error::User(ref e)) => {
match e.downcast_ref::<Xception>() {
Some(x) => Ok(x),
- None => Err(thrift::Error::User("did not get expected Xception struct".into())),
+ None => Err(thrift::Error::User("did not get expected Xception struct".into()),),
}
}
_ => Err(thrift::Error::User("did not get exception".into())),
@@ -435,7 +461,7 @@
Err(thrift::Error::User(ref e)) => {
match e.downcast_ref::<Xception2>() {
Some(x) => Ok(x),
- None => Err(thrift::Error::User("did not get expected Xception struct".into())),
+ None => Err(thrift::Error::User("did not get expected Xception struct".into()),),
}
}
_ => Err(thrift::Error::User("did not get exception".into())),
@@ -443,12 +469,17 @@
let x_cmp = Xception2 {
error_code: Some(2002),
- struct_thing: Some(Xtruct {
- string_thing: Some("This is an Xception2".to_owned()),
- byte_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */
- i32_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */
- i64_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */
- }),
+ struct_thing: Some(
+ Xtruct {
+ string_thing: Some("This is an Xception2".to_owned()),
+ // since this is an OPT_IN_REQ_OUT field the sender sets a default
+ byte_thing: Some(0),
+ // since this is an OPT_IN_REQ_OUT field the sender sets a default
+ i32_thing: Some(0),
+ // since this is an OPT_IN_REQ_OUT field the sender sets a default
+ i64_thing: Some(0),
+ },
+ ),
};
verify_expected_result(Ok(x), &x_cmp)?;
@@ -458,17 +489,18 @@
{
let r = client.test_multi_exception("haha".to_owned(), "RETURNED".to_owned());
let x = match r {
- Err(e) => {
- Err(thrift::Error::User(format!("received an unexpected exception {:?}", e).into()))
- }
+ Err(e) => Err(thrift::Error::User(format!("received an unexpected exception {:?}", e).into(),),),
_ => r,
}?;
let x_cmp = Xtruct {
string_thing: Some("RETURNED".to_owned()),
- byte_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default
- i32_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default
- i64_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default
+ // since this is an OPT_IN_REQ_OUT field the sender sets a default
+ byte_thing: Some(0),
+ // since this is an OPT_IN_REQ_OUT field the sender sets a default
+ i32_thing: Some(0),
+ // since this is an OPT_IN_REQ_OUT field the sender sets a default
+ i64_thing: Some(0),
};
verify_expected_result(Ok(x), x_cmp)?;
@@ -479,20 +511,22 @@
client.test_oneway(1)?;
}
- // final test to verify that the connection is still writable after the one-way call
+ // final test to verify that the connection is still writable after the one-way
+ // call
client.test_void()
}
-fn verify_expected_result<T: Debug + PartialEq + Sized>(actual: Result<T, thrift::Error>,
- expected: T)
- -> Result<(), thrift::Error> {
+#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
+fn verify_expected_result<T: Debug + PartialEq + Sized>(
+ actual: Result<T, thrift::Error>,
+ expected: T,
+) -> Result<(), thrift::Error> {
match actual {
Ok(v) => {
if v == expected {
Ok(())
} else {
- Err(thrift::Error::User(format!("expected {:?} but got {:?}", &expected, &v)
- .into()))
+ Err(thrift::Error::User(format!("expected {:?} but got {:?}", &expected, &v).into()),)
}
}
Err(e) => Err(e),