THRIFT-4186 Add travis build for Rust
Client: rs
Patch: Allen George <allen.george@gmail.com>
This closes #1260
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
index 21c392c..3d8ccb2 100644
--- a/lib/rs/src/server/mod.rs
+++ b/lib/rs/src/server/mod.rs
@@ -17,7 +17,8 @@
//! Types used to implement a Thrift server.
-use protocol::{TInputProtocol, TOutputProtocol};
+use {ApplicationError, ApplicationErrorKind};
+use protocol::{TInputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
mod multiplexed;
mod threaded;
@@ -93,3 +94,31 @@
/// Returns `()` if the handler was executed; `Err` otherwise.
fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
}
+
+/// Convenience function used in generated `TProcessor` implementations to
+/// return an `ApplicationError` if thrift message processing failed.
+pub fn handle_process_result(
+ msg_ident: &TMessageIdentifier,
+ res: ::Result<()>,
+ o_prot: &mut TOutputProtocol,
+) -> ::Result<()> {
+ if let Err(e) = res {
+ let e = match e {
+ ::Error::Application(a) => a,
+ _ => ApplicationError::new(ApplicationErrorKind::Unknown, format!("{:?}", e)),
+ };
+
+ let ident = TMessageIdentifier::new(
+ msg_ident.name.clone(),
+ TMessageType::Exception,
+ msg_ident.sequence_number,
+ );
+
+ o_prot.write_message_begin(&ident)?;
+ ::Error::write_application_error_to_out_protocol(&e, o_prot)?;
+ o_prot.write_message_end()?;
+ o_prot.flush()
+ } else {
+ Ok(())
+ }
+}
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index b1243a8..a7f6d04 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -16,13 +16,17 @@
// under the License.
use std::collections::HashMap;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
use std::convert::Into;
use std::sync::{Arc, Mutex};
-use {ApplicationErrorKind, new_application_error};
use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
-use super::TProcessor;
+use super::{TProcessor, handle_process_result};
+
+const MISSING_SEPARATOR_AND_NO_DEFAULT: &'static str = "missing service separator and no default processor set";
+type ThreadSafeProcessor = Box<TProcessor + Send + Sync>;
/// A `TProcessor` that can demux service calls to multiple underlying
/// Thrift services.
@@ -34,57 +38,85 @@
///
/// A `TMultiplexedProcessor` can only handle messages sent by a
/// `TMultiplexedOutputProtocol`.
-// FIXME: implement Debug
+#[derive(Default)]
pub struct TMultiplexedProcessor {
- processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
+ stored: Mutex<StoredProcessors>,
+}
+
+#[derive(Default)]
+struct StoredProcessors {
+ processors: HashMap<String, Arc<ThreadSafeProcessor>>,
+ default_processor: Option<Arc<ThreadSafeProcessor>>,
}
impl TMultiplexedProcessor {
- /// Register a service-specific `processor` for the service named
- /// `service_name`.
- ///
- /// Return `true` if this is the first registration for `service_name`.
- ///
- /// Return `false` if a mapping previously existed (the previous mapping is
- /// *not* overwritten).
- #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
- pub fn register_processor<S: Into<String>>(
- &mut self,
- service_name: S,
- processor: Box<TProcessor>,
- ) -> bool {
- let mut processors = self.processors.lock().unwrap();
-
- let name = service_name.into();
- if processors.contains_key(&name) {
- false
- } else {
- processors.insert(name, Arc::new(processor));
- true
+ /// Create a new `TMultiplexedProcessor` with no registered service-specific
+ /// processors.
+ pub fn new() -> TMultiplexedProcessor {
+ TMultiplexedProcessor {
+ stored: Mutex::new(
+ StoredProcessors {
+ processors: HashMap::new(),
+ default_processor: None,
+ },
+ ),
}
}
-}
-impl TProcessor for TMultiplexedProcessor {
- fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
- let msg_ident = i_prot.read_message_begin()?;
- let sep_index = msg_ident
- .name
- .find(':')
- .ok_or_else(
- || {
- new_application_error(
- ApplicationErrorKind::Unknown,
- "no service separator found in incoming message",
- )
- },
- )?;
+ /// Register a service-specific `processor` for the service named
+ /// `service_name`. This implementation is also backwards-compatible with
+ /// non-multiplexed clients. Set `as_default` to `true` to allow
+ /// non-namespaced requests to be dispatched to a default processor.
+ ///
+ /// Returns success if a new entry was inserted. Returns an error if:
+ /// * A processor exists for `service_name`
+ /// * You attempt to register a processor as default, and an existing default exists
+ #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
+ pub fn register<S: Into<String>>(
+ &mut self,
+ service_name: S,
+ processor: Box<TProcessor + Send + Sync>,
+ as_default: bool,
+ ) -> ::Result<()> {
+ let mut stored = self.stored.lock().unwrap();
- let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
+ let name = service_name.into();
+ if !stored.processors.contains_key(&name) {
+ let processor = Arc::new(processor);
- let processor: Option<Arc<Box<TProcessor>>> = {
- let processors = self.processors.lock().unwrap();
- processors.get(svc_name).cloned()
+ if as_default {
+ if stored.default_processor.is_none() {
+ stored.processors.insert(name, processor.clone());
+ stored.default_processor = Some(processor.clone());
+ Ok(())
+ } else {
+ Err("cannot reset default processor".into())
+ }
+ } else {
+ stored.processors.insert(name, processor);
+ Ok(())
+ }
+ } else {
+ Err(format!("cannot overwrite existing processor for service {}", name).into(),)
+ }
+ }
+
+ fn process_message(
+ &self,
+ msg_ident: &TMessageIdentifier,
+ i_prot: &mut TInputProtocol,
+ o_prot: &mut TOutputProtocol,
+ ) -> ::Result<()> {
+ let (svc_name, svc_call) = split_ident_name(&msg_ident.name);
+ debug!("routing svc_name {:?} svc_call {}", &svc_name, &svc_call);
+
+ let processor: Option<Arc<ThreadSafeProcessor>> = {
+ let stored = self.stored.lock().unwrap();
+ if let Some(name) = svc_name {
+ stored.processors.get(name).cloned()
+ } else {
+ stored.default_processor.clone()
+ }
};
match processor {
@@ -97,14 +129,216 @@
let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
(*arc).process(&mut proxy_i_prot, o_prot)
}
- None => {
- Err(
- new_application_error(
- ApplicationErrorKind::Unknown,
- format!("no processor found for service {}", svc_name),
- ),
- )
+ None => Err(missing_processor_message(svc_name).into()),
+ }
+ }
+}
+
+impl TProcessor for TMultiplexedProcessor {
+ fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
+ let msg_ident = i_prot.read_message_begin()?;
+
+ debug!("process incoming msg id:{:?}", &msg_ident);
+ let res = self.process_message(&msg_ident, i_prot, o_prot);
+
+ handle_process_result(&msg_ident, res, o_prot)
+ }
+}
+
+impl Debug for TMultiplexedProcessor {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ let stored = self.stored.lock().unwrap();
+ write!(
+ f,
+ "TMultiplexedProcess {{ registered_count: {:?} default: {:?} }}",
+ stored.processors.keys().len(),
+ stored.default_processor.is_some()
+ )
+ }
+}
+
+fn split_ident_name(ident_name: &str) -> (Option<&str>, &str) {
+ ident_name
+ .find(':')
+ .map(
+ |pos| {
+ let (svc_name, svc_call) = ident_name.split_at(pos);
+ let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name
+ (Some(svc_name), svc_call)
+ },
+ )
+ .or_else(|| Some((None, ident_name)))
+ .unwrap()
+}
+
+fn missing_processor_message(svc_name: Option<&str>) -> String {
+ match svc_name {
+ Some(name) => format!("no processor found for service {}", name),
+ None => MISSING_SEPARATOR_AND_NO_DEFAULT.to_owned(),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::convert::Into;
+ use std::sync::Arc;
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ use {ApplicationError, ApplicationErrorKind};
+ use protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TMessageIdentifier, TMessageType};
+ use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
+
+ use super::*;
+
+ #[test]
+ fn should_split_name_into_proper_separator_and_service_call() {
+ let ident_name = "foo:bar_call";
+ let (serv, call) = split_ident_name(&ident_name);
+ assert_eq!(serv, Some("foo"));
+ assert_eq!(call, "bar_call");
+ }
+
+ #[test]
+ fn should_return_full_ident_if_no_separator_exists() {
+ let ident_name = "bar_call";
+ let (serv, call) = split_ident_name(&ident_name);
+ assert_eq!(serv, None);
+ assert_eq!(call, "bar_call");
+ }
+
+ #[test]
+ fn should_write_error_if_no_separator_found_and_no_default_processor_exists() {
+ let (mut i, mut o) = build_objects();
+
+ let sent_ident = TMessageIdentifier::new("foo", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ let p = TMultiplexedProcessor::new();
+ p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
+
+ i.transport
+ .set_readable_bytes(&o.transport.write_bytes());
+ let rcvd_ident = i.read_message_begin().unwrap();
+ let expected_ident = TMessageIdentifier::new("foo", TMessageType::Exception, 10);
+ assert_eq!(rcvd_ident, expected_ident);
+ let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
+ let expected_err = ApplicationError::new(
+ ApplicationErrorKind::Unknown,
+ MISSING_SEPARATOR_AND_NO_DEFAULT,
+ );
+ assert_eq!(rcvd_err, expected_err);
+ }
+
+ #[test]
+ fn should_write_error_if_separator_exists_and_no_processor_found() {
+ let (mut i, mut o) = build_objects();
+
+ let sent_ident = TMessageIdentifier::new("missing:call", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ let p = TMultiplexedProcessor::new();
+ p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
+
+ i.transport
+ .set_readable_bytes(&o.transport.write_bytes());
+ let rcvd_ident = i.read_message_begin().unwrap();
+ let expected_ident = TMessageIdentifier::new("missing:call", TMessageType::Exception, 10);
+ assert_eq!(rcvd_ident, expected_ident);
+ let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
+ let expected_err = ApplicationError::new(
+ ApplicationErrorKind::Unknown,
+ missing_processor_message(Some("missing")),
+ );
+ assert_eq!(rcvd_err, expected_err);
+ }
+
+ #[derive(Default)]
+ struct Service {
+ pub invoked: Arc<AtomicBool>,
+ }
+
+ impl TProcessor for Service {
+ fn process(&self, _: &mut TInputProtocol, _: &mut TOutputProtocol) -> ::Result<()> {
+ let res = self.invoked
+ .compare_and_swap(false, true, Ordering::Relaxed);
+ if res {
+ Ok(())
+ } else {
+ Err("failed swap".into())
}
}
}
+
+ #[test]
+ fn should_route_call_to_correct_processor() {
+ let (mut i, mut o) = build_objects();
+
+ // build the services
+ let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_1 = svc_1.invoked.clone();
+ let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_2 = svc_2.invoked.clone();
+
+ // register them
+ let mut p = TMultiplexedProcessor::new();
+ p.register("service_1", Box::new(svc_1), false).unwrap();
+ p.register("service_2", Box::new(svc_2), false).unwrap();
+
+ // make the service call
+ let sent_ident = TMessageIdentifier::new("service_1:call", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ p.process(&mut i, &mut o).unwrap();
+
+ // service 1 should have been invoked, not service 2
+ assert_eq!(atm_1.load(Ordering::Relaxed), true);
+ assert_eq!(atm_2.load(Ordering::Relaxed), false);
+ }
+
+ #[test]
+ fn should_route_call_to_correct_processor_if_no_separator_exists_and_default_processor_set() {
+ let (mut i, mut o) = build_objects();
+
+ // build the services
+ let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_1 = svc_1.invoked.clone();
+ let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_2 = svc_2.invoked.clone();
+
+ // register them
+ let mut p = TMultiplexedProcessor::new();
+ p.register("service_1", Box::new(svc_1), false).unwrap();
+ p.register("service_2", Box::new(svc_2), true).unwrap(); // second processor is default
+
+ // make the service call (it's an old client, so we have to be backwards compatible)
+ let sent_ident = TMessageIdentifier::new("old_call", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ p.process(&mut i, &mut o).unwrap();
+
+ // service 2 should have been invoked, not service 1
+ assert_eq!(atm_1.load(Ordering::Relaxed), false);
+ assert_eq!(atm_2.load(Ordering::Relaxed), true);
+ }
+
+ fn build_objects()
+ -> (TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
+ TBinaryOutputProtocol<WriteHalf<TBufferChannel>>)
+ {
+ let c = TBufferChannel::with_capacity(128, 128);
+ let (r_c, w_c) = c.split().unwrap();
+ (TBinaryInputProtocol::new(r_c, true), TBinaryOutputProtocol::new(w_c, true))
+ }
}
diff --git a/lib/rs/src/server/threaded.rs b/lib/rs/src/server/threaded.rs
index a486c5a..66680b1 100644
--- a/lib/rs/src/server/threaded.rs
+++ b/lib/rs/src/server/threaded.rs
@@ -47,7 +47,8 @@
/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
-/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, TReadTransportFactory, TWriteTransportFactory};
+/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
+/// TReadTransportFactory, TWriteTransportFactory};
/// use thrift::server::{TProcessor, TServer};
///
/// //