Reformat rust code with rustfmt 1.0
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
index 3d8ccb2..70b381a 100644
--- a/lib/rs/src/server/mod.rs
+++ b/lib/rs/src/server/mod.rs
@@ -17,8 +17,8 @@
//! Types used to implement a Thrift server.
-use {ApplicationError, ApplicationErrorKind};
use protocol::{TInputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
+use {ApplicationError, ApplicationErrorKind};
mod multiplexed;
mod threaded;
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index a7f6d04..e433794 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -16,16 +16,17 @@
// under the License.
use std::collections::HashMap;
+use std::convert::Into;
use std::fmt;
use std::fmt::{Debug, Formatter};
-use std::convert::Into;
use std::sync::{Arc, Mutex};
use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
-use super::{TProcessor, handle_process_result};
+use super::{handle_process_result, TProcessor};
-const MISSING_SEPARATOR_AND_NO_DEFAULT: &'static str = "missing service separator and no default processor set";
+const MISSING_SEPARATOR_AND_NO_DEFAULT: &'static str =
+ "missing service separator and no default processor set";
type ThreadSafeProcessor = Box<TProcessor + Send + Sync>;
/// A `TProcessor` that can demux service calls to multiple underlying
@@ -54,12 +55,10 @@
/// processors.
pub fn new() -> TMultiplexedProcessor {
TMultiplexedProcessor {
- stored: Mutex::new(
- StoredProcessors {
- processors: HashMap::new(),
- default_processor: None,
- },
- ),
+ stored: Mutex::new(StoredProcessors {
+ processors: HashMap::new(),
+ default_processor: None,
+ }),
}
}
@@ -97,7 +96,7 @@
Ok(())
}
} else {
- Err(format!("cannot overwrite existing processor for service {}", name).into(),)
+ Err(format!("cannot overwrite existing processor for service {}", name).into())
}
}
@@ -160,13 +159,11 @@
fn split_ident_name(ident_name: &str) -> (Option<&str>, &str) {
ident_name
.find(':')
- .map(
- |pos| {
- let (svc_name, svc_call) = ident_name.split_at(pos);
- let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name
- (Some(svc_name), svc_call)
- },
- )
+ .map(|pos| {
+ let (svc_name, svc_call) = ident_name.split_at(pos);
+ let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name
+ (Some(svc_name), svc_call)
+ })
.or_else(|| Some((None, ident_name)))
.unwrap()
}
@@ -181,12 +178,12 @@
#[cfg(test)]
mod tests {
use std::convert::Into;
- use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
+ use std::sync::Arc;
- use {ApplicationError, ApplicationErrorKind};
use protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TMessageIdentifier, TMessageType};
use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
+ use {ApplicationError, ApplicationErrorKind};
use super::*;
@@ -219,8 +216,7 @@
let p = TMultiplexedProcessor::new();
p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
- i.transport
- .set_readable_bytes(&o.transport.write_bytes());
+ i.transport.set_readable_bytes(&o.transport.write_bytes());
let rcvd_ident = i.read_message_begin().unwrap();
let expected_ident = TMessageIdentifier::new("foo", TMessageType::Exception, 10);
assert_eq!(rcvd_ident, expected_ident);
@@ -245,8 +241,7 @@
let p = TMultiplexedProcessor::new();
p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
- i.transport
- .set_readable_bytes(&o.transport.write_bytes());
+ i.transport.set_readable_bytes(&o.transport.write_bytes());
let rcvd_ident = i.read_message_begin().unwrap();
let expected_ident = TMessageIdentifier::new("missing:call", TMessageType::Exception, 10);
assert_eq!(rcvd_ident, expected_ident);
@@ -265,7 +260,8 @@
impl TProcessor for Service {
fn process(&self, _: &mut TInputProtocol, _: &mut TOutputProtocol) -> ::Result<()> {
- let res = self.invoked
+ let res = self
+ .invoked
.compare_and_swap(false, true, Ordering::Relaxed);
if res {
Ok(())
@@ -280,9 +276,13 @@
let (mut i, mut o) = build_objects();
// build the services
- let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let svc_1 = Service {
+ invoked: Arc::new(AtomicBool::new(false)),
+ };
let atm_1 = svc_1.invoked.clone();
- let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let svc_2 = Service {
+ invoked: Arc::new(AtomicBool::new(false)),
+ };
let atm_2 = svc_2.invoked.clone();
// register them
@@ -309,9 +309,13 @@
let (mut i, mut o) = build_objects();
// build the services
- let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let svc_1 = Service {
+ invoked: Arc::new(AtomicBool::new(false)),
+ };
let atm_1 = svc_1.invoked.clone();
- let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let svc_2 = Service {
+ invoked: Arc::new(AtomicBool::new(false)),
+ };
let atm_2 = svc_2.invoked.clone();
// register them
@@ -333,12 +337,15 @@
assert_eq!(atm_2.load(Ordering::Relaxed), true);
}
- fn build_objects()
- -> (TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
- TBinaryOutputProtocol<WriteHalf<TBufferChannel>>)
- {
+ fn build_objects() -> (
+ TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
+ TBinaryOutputProtocol<WriteHalf<TBufferChannel>>,
+ ) {
let c = TBufferChannel::with_capacity(128, 128);
let (r_c, w_c) = c.split().unwrap();
- (TBinaryInputProtocol::new(r_c, true), TBinaryOutputProtocol::new(w_c, true))
+ (
+ TBinaryInputProtocol::new(r_c, true),
+ TBinaryOutputProtocol::new(w_c, true),
+ )
}
}
diff --git a/lib/rs/src/server/threaded.rs b/lib/rs/src/server/threaded.rs
index 515b20d..8139a4e 100644
--- a/lib/rs/src/server/threaded.rs
+++ b/lib/rs/src/server/threaded.rs
@@ -19,9 +19,9 @@
use std::sync::Arc;
use threadpool::ThreadPool;
-use {ApplicationError, ApplicationErrorKind};
use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
+use {ApplicationError, ApplicationErrorKind};
use super::TProcessor;
@@ -129,11 +129,13 @@
}
impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
- where PRC: TProcessor + Send + Sync + 'static,
- RTF: TReadTransportFactory + 'static,
- IPF: TInputProtocolFactory + 'static,
- WTF: TWriteTransportFactory + 'static,
- OPF: TOutputProtocolFactory + 'static {
+where
+ PRC: TProcessor + Send + Sync + 'static,
+ RTF: TReadTransportFactory + 'static,
+ IPF: TInputProtocolFactory + 'static,
+ WTF: TWriteTransportFactory + 'static,
+ OPF: TOutputProtocolFactory + 'static,
+{
/// Create a `TServer`.
///
/// Each accepted connection has an input and output half, each of which
@@ -155,10 +157,7 @@
w_trans_factory: write_transport_factory,
o_proto_factory: output_protocol_factory,
processor: Arc::new(processor),
- worker_pool: ThreadPool::with_name(
- "Thrift service processor".to_owned(),
- num_workers,
- ),
+ worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
}
}
@@ -179,7 +178,7 @@
let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
let processor = self.processor.clone();
self.worker_pool
- .execute(move || handle_incoming_connection(processor, i_prot, o_prot),);
+ .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
}
Err(e) => {
warn!("failed to accept remote connection with error {:?}", e);
@@ -187,17 +186,12 @@
}
}
- Err(
- ::Error::Application(
- ApplicationError {
- kind: ApplicationErrorKind::Unknown,
- message: "aborted listen loop".into(),
- },
- ),
- )
+ Err(::Error::Application(ApplicationError {
+ kind: ApplicationErrorKind::Unknown,
+ message: "aborted listen loop".into(),
+ }))
}
-
fn new_protocols_for_connection(
&mut self,
stream: TcpStream,