THRIFT-5283: add support for Unix Domain Sockets in lib/rs (#2545)
Client: rs
diff --git a/lib/rs/src/server/threaded.rs b/lib/rs/src/server/threaded.rs
index 897235c..ad55b44 100644
--- a/lib/rs/src/server/threaded.rs
+++ b/lib/rs/src/server/threaded.rs
@@ -17,10 +17,15 @@
use log::warn;
-use std::net::{TcpListener, TcpStream, ToSocketAddrs};
+use std::net::{TcpListener, ToSocketAddrs};
use std::sync::Arc;
use threadpool::ThreadPool;
+#[cfg(unix)]
+use std::os::unix::net::UnixListener;
+#[cfg(unix)]
+use std::path::Path;
+
use crate::protocol::{
TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
};
@@ -178,10 +183,8 @@
for stream in listener.incoming() {
match stream {
Ok(s) => {
- let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
- let processor = self.processor.clone();
- self.worker_pool
- .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
+ let channel = TTcpChannel::with_stream(s);
+ self.handle_stream(channel)?;
}
Err(e) => {
warn!("failed to accept remote connection with error {:?}", e);
@@ -195,19 +198,55 @@
}))
}
- fn new_protocols_for_connection(
+ /// Listen for incoming connections on `listen_path`.
+ ///
+ /// `listen_path` should implement `AsRef<Path>` trait.
+ ///
+ /// Return `()` if successful.
+ ///
+ /// Return `Err` when the server cannot bind to `listen_path` or there
+ /// is an unrecoverable error.
+ #[cfg(unix)]
+ pub fn listen_uds<P: AsRef<Path>>(&mut self, listen_path: P) -> crate::Result<()> {
+ let listener = UnixListener::bind(listen_path)?;
+ for stream in listener.incoming() {
+ match stream {
+ Ok(s) => {
+ self.handle_stream(s)?;
+ }
+ Err(e) => {
+ warn!(
+ "failed to accept connection via unix domain socket with error {:?}",
+ e
+ );
+ }
+ }
+ }
+
+ Err(crate::Error::Application(ApplicationError {
+ kind: ApplicationErrorKind::Unknown,
+ message: "aborted listen loop".into(),
+ }))
+ }
+
+ fn handle_stream<S: TIoChannel + Send + 'static>(&mut self, stream: S) -> crate::Result<()> {
+ let (i_prot, o_prot) = self.new_protocols_for_connection(stream)?;
+ let processor = self.processor.clone();
+ self.worker_pool
+ .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
+ Ok(())
+ }
+
+ fn new_protocols_for_connection<S: TIoChannel + Send + 'static>(
&mut self,
- stream: TcpStream,
+ stream: S,
) -> crate::Result<(
Box<dyn TInputProtocol + Send>,
Box<dyn TOutputProtocol + Send>,
)> {
- // create the shared tcp stream
- let channel = TTcpChannel::with_stream(stream);
-
// split it into two - one to be owned by the
// input tran/proto and the other by the output
- let (r_chan, w_chan) = channel.split()?;
+ let (r_chan, w_chan) = stream.split()?;
// input protocol and transport
let r_tran = self.r_trans_factory.create(Box::new(r_chan));