THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/src/protocol/multiplexed.rs b/lib/rs/src/protocol/multiplexed.rs
index a30aca8..db08027 100644
--- a/lib/rs/src/protocol/multiplexed.rs
+++ b/lib/rs/src/protocol/multiplexed.rs
@@ -37,33 +37,37 @@
/// Create and use a `TMultiplexedOutputProtocol`.
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
///
-/// let o_prot = TBinaryOutputProtocol::new(transport, true);
-/// let mut o_prot = TMultiplexedOutputProtocol::new("service_name", Box::new(o_prot));
+/// let protocol = TBinaryOutputProtocol::new(channel, true);
+/// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol);
///
/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
-/// o_prot.write_message_begin(&ident).unwrap();
+/// protocol.write_message_begin(&ident).unwrap();
/// ```
-pub struct TMultiplexedOutputProtocol<'a> {
+#[derive(Debug)]
+pub struct TMultiplexedOutputProtocol<P>
+where
+ P: TOutputProtocol,
+{
service_name: String,
- inner: Box<TOutputProtocol + 'a>,
+ inner: P,
}
-impl<'a> TMultiplexedOutputProtocol<'a> {
+impl<P> TMultiplexedOutputProtocol<P>
+where
+ P: TOutputProtocol,
+{
/// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
/// as originating from a service named `service_name` and sends them over
/// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
/// by `wrapped`, not by this instance.
- pub fn new(service_name: &str, wrapped: Box<TOutputProtocol + 'a>) -> TMultiplexedOutputProtocol<'a> {
+ pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> {
TMultiplexedOutputProtocol {
service_name: service_name.to_owned(),
inner: wrapped,
@@ -72,7 +76,10 @@
}
// FIXME: avoid passthrough methods
-impl<'a> TOutputProtocol for TMultiplexedOutputProtocol<'a> {
+impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P>
+where
+ P: TOutputProtocol,
+{
fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
match identifier.message_type { // FIXME: is there a better way to override identifier here?
TMessageType::Call | TMessageType::OneWay => {
@@ -181,39 +188,50 @@
#[cfg(test)]
mod tests {
- use std::cell::RefCell;
- use std::rc::Rc;
-
- use ::protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
- use ::transport::{TPassThruTransport, TTransport};
- use ::transport::mem::TBufferTransport;
+ use protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
+ use transport::{TBufferChannel, TIoChannel, WriteHalf};
use super::*;
#[test]
fn must_write_message_begin_with_prefixed_service_name() {
- let (trans, mut o_prot) = test_objects();
+ let mut o_prot = test_objects();
let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
assert_success!(o_prot.write_message_begin(&ident));
- let expected: [u8; 19] =
- [0x80, 0x01 /* protocol identifier */, 0x00, 0x01 /* message type */, 0x00,
- 0x00, 0x00, 0x07, 0x66, 0x6F, 0x6F /* "foo" */, 0x3A /* ":" */, 0x62, 0x61,
- 0x72 /* "bar" */, 0x00, 0x00, 0x00, 0x02 /* sequence number */];
+ let expected: [u8; 19] = [
+ 0x80,
+ 0x01, /* protocol identifier */
+ 0x00,
+ 0x01, /* message type */
+ 0x00,
+ 0x00,
+ 0x00,
+ 0x07,
+ 0x66,
+ 0x6F,
+ 0x6F, /* "foo" */
+ 0x3A, /* ":" */
+ 0x62,
+ 0x61,
+ 0x72, /* "bar" */
+ 0x00,
+ 0x00,
+ 0x00,
+ 0x02 /* sequence number */,
+ ];
- assert_eq!(&trans.borrow().write_buffer_to_vec(), &expected);
+ assert_eq!(o_prot.inner.transport.write_bytes(), expected);
}
- fn test_objects<'a>() -> (Rc<RefCell<Box<TBufferTransport>>>, TMultiplexedOutputProtocol<'a>) {
- let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity(40, 40))));
-
- let inner: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
- let inner = Rc::new(RefCell::new(inner));
-
- let o_prot = TBinaryOutputProtocol::new(inner.clone(), true);
- let o_prot = TMultiplexedOutputProtocol::new("foo", Box::new(o_prot));
-
- (mem, o_prot)
+ fn test_objects
+ ()
+ -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>
+ {
+ let c = TBufferChannel::with_capacity(40, 40);
+ let (_, w_chan) = c.split().unwrap();
+ let prot = TBinaryOutputProtocol::new(w_chan, true);
+ TMultiplexedOutputProtocol::new("foo", prot)
}
}