blob: 8331d91efd620201f3b8e8f55fbcdb369f7ab434 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
Allen George7ddbcc02020-11-08 09:51:19 -050018use log::debug;
19
Allen George8b96bfb2016-11-02 08:01:08 -040020use std::collections::HashMap;
Allen Georgeef7a1892018-12-16 18:01:37 -050021use std::convert::Into;
Allen Georgebc1344d2017-04-28 10:22:03 -040022use std::fmt;
23use std::fmt::{Debug, Formatter};
Allen George0e22c362017-01-30 07:15:00 -050024use std::sync::{Arc, Mutex};
Allen George8b96bfb2016-11-02 08:01:08 -040025
Allen Georgeb0d14132020-03-29 11:48:55 -040026use crate::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
Allen George8b96bfb2016-11-02 08:01:08 -040027
Allen Georgeef7a1892018-12-16 18:01:37 -050028use super::{handle_process_result, TProcessor};
Allen Georgebc1344d2017-04-28 10:22:03 -040029
Allen George7ddbcc02020-11-08 09:51:19 -050030const MISSING_SEPARATOR_AND_NO_DEFAULT: &str =
Allen Georgeef7a1892018-12-16 18:01:37 -050031 "missing service separator and no default processor set";
Danny Browning77d96c12019-08-21 13:41:07 -060032type ThreadSafeProcessor = Box<dyn TProcessor + Send + Sync>;
Allen George8b96bfb2016-11-02 08:01:08 -040033
34/// A `TProcessor` that can demux service calls to multiple underlying
35/// Thrift services.
36///
37/// Users register service-specific `TProcessor` instances with a
38/// `TMultiplexedProcessor`, and then register that processor with a server
39/// implementation. Following that, all incoming service calls are automatically
40/// routed to the service-specific `TProcessor`.
41///
42/// A `TMultiplexedProcessor` can only handle messages sent by a
43/// `TMultiplexedOutputProtocol`.
Allen Georgebc1344d2017-04-28 10:22:03 -040044#[derive(Default)]
Allen George8b96bfb2016-11-02 08:01:08 -040045pub struct TMultiplexedProcessor {
Allen Georgebc1344d2017-04-28 10:22:03 -040046 stored: Mutex<StoredProcessors>,
47}
48
49#[derive(Default)]
50struct StoredProcessors {
51 processors: HashMap<String, Arc<ThreadSafeProcessor>>,
52 default_processor: Option<Arc<ThreadSafeProcessor>>,
Allen George8b96bfb2016-11-02 08:01:08 -040053}
54
55impl TMultiplexedProcessor {
Allen Georgebc1344d2017-04-28 10:22:03 -040056 /// Create a new `TMultiplexedProcessor` with no registered service-specific
57 /// processors.
58 pub fn new() -> TMultiplexedProcessor {
59 TMultiplexedProcessor {
Allen Georgeef7a1892018-12-16 18:01:37 -050060 stored: Mutex::new(StoredProcessors {
61 processors: HashMap::new(),
62 default_processor: None,
63 }),
Allen George8b96bfb2016-11-02 08:01:08 -040064 }
65 }
Allen George8b96bfb2016-11-02 08:01:08 -040066
Allen Georgebc1344d2017-04-28 10:22:03 -040067 /// Register a service-specific `processor` for the service named
68 /// `service_name`. This implementation is also backwards-compatible with
69 /// non-multiplexed clients. Set `as_default` to `true` to allow
70 /// non-namespaced requests to be dispatched to a default processor.
71 ///
72 /// Returns success if a new entry was inserted. Returns an error if:
73 /// * A processor exists for `service_name`
74 /// * You attempt to register a processor as default, and an existing default exists
Allen George7ddbcc02020-11-08 09:51:19 -050075 #[allow(clippy::map_entry)]
Allen Georgebc1344d2017-04-28 10:22:03 -040076 pub fn register<S: Into<String>>(
77 &mut self,
78 service_name: S,
Danny Browning77d96c12019-08-21 13:41:07 -060079 processor: Box<dyn TProcessor + Send + Sync>,
Allen Georgebc1344d2017-04-28 10:22:03 -040080 as_default: bool,
Allen Georgeb0d14132020-03-29 11:48:55 -040081 ) -> crate::Result<()> {
Allen Georgebc1344d2017-04-28 10:22:03 -040082 let mut stored = self.stored.lock().unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040083
Allen Georgebc1344d2017-04-28 10:22:03 -040084 let name = service_name.into();
85 if !stored.processors.contains_key(&name) {
86 let processor = Arc::new(processor);
Allen George8b96bfb2016-11-02 08:01:08 -040087
Allen Georgebc1344d2017-04-28 10:22:03 -040088 if as_default {
89 if stored.default_processor.is_none() {
90 stored.processors.insert(name, processor.clone());
91 stored.default_processor = Some(processor.clone());
92 Ok(())
93 } else {
94 Err("cannot reset default processor".into())
95 }
96 } else {
97 stored.processors.insert(name, processor);
98 Ok(())
99 }
100 } else {
Allen Georgeef7a1892018-12-16 18:01:37 -0500101 Err(format!("cannot overwrite existing processor for service {}", name).into())
Allen Georgebc1344d2017-04-28 10:22:03 -0400102 }
103 }
104
105 fn process_message(
106 &self,
107 msg_ident: &TMessageIdentifier,
Danny Browning77d96c12019-08-21 13:41:07 -0600108 i_prot: &mut dyn TInputProtocol,
109 o_prot: &mut dyn TOutputProtocol,
Allen Georgeb0d14132020-03-29 11:48:55 -0400110 ) -> crate::Result<()> {
Allen Georgebc1344d2017-04-28 10:22:03 -0400111 let (svc_name, svc_call) = split_ident_name(&msg_ident.name);
112 debug!("routing svc_name {:?} svc_call {}", &svc_name, &svc_call);
113
114 let processor: Option<Arc<ThreadSafeProcessor>> = {
115 let stored = self.stored.lock().unwrap();
116 if let Some(name) = svc_name {
117 stored.processors.get(name).cloned()
118 } else {
119 stored.default_processor.clone()
120 }
Allen George0e22c362017-01-30 07:15:00 -0500121 };
122
123 match processor {
124 Some(arc) => {
125 let new_msg_ident = TMessageIdentifier::new(
126 svc_call,
127 msg_ident.message_type,
128 msg_ident.sequence_number,
129 );
Allen George8b96bfb2016-11-02 08:01:08 -0400130 let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
Allen George0e22c362017-01-30 07:15:00 -0500131 (*arc).process(&mut proxy_i_prot, o_prot)
Allen George8b96bfb2016-11-02 08:01:08 -0400132 }
Allen Georgebc1344d2017-04-28 10:22:03 -0400133 None => Err(missing_processor_message(svc_name).into()),
134 }
135 }
136}
137
138impl TProcessor for TMultiplexedProcessor {
Allen George55c3e4c2021-03-01 23:19:52 -0500139 fn process(
140 &self,
141 i_prot: &mut dyn TInputProtocol,
142 o_prot: &mut dyn TOutputProtocol,
143 ) -> crate::Result<()> {
Allen Georgebc1344d2017-04-28 10:22:03 -0400144 let msg_ident = i_prot.read_message_begin()?;
145
146 debug!("process incoming msg id:{:?}", &msg_ident);
147 let res = self.process_message(&msg_ident, i_prot, o_prot);
148
149 handle_process_result(&msg_ident, res, o_prot)
150 }
151}
152
153impl Debug for TMultiplexedProcessor {
Allen George7ddbcc02020-11-08 09:51:19 -0500154 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Allen Georgebc1344d2017-04-28 10:22:03 -0400155 let stored = self.stored.lock().unwrap();
156 write!(
157 f,
158 "TMultiplexedProcess {{ registered_count: {:?} default: {:?} }}",
159 stored.processors.keys().len(),
160 stored.default_processor.is_some()
161 )
162 }
163}
164
165fn split_ident_name(ident_name: &str) -> (Option<&str>, &str) {
166 ident_name
167 .find(':')
Allen Georgeef7a1892018-12-16 18:01:37 -0500168 .map(|pos| {
169 let (svc_name, svc_call) = ident_name.split_at(pos);
170 let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name
171 (Some(svc_name), svc_call)
172 })
Allen Georgebc1344d2017-04-28 10:22:03 -0400173 .or_else(|| Some((None, ident_name)))
174 .unwrap()
175}
176
177fn missing_processor_message(svc_name: Option<&str>) -> String {
178 match svc_name {
179 Some(name) => format!("no processor found for service {}", name),
180 None => MISSING_SEPARATOR_AND_NO_DEFAULT.to_owned(),
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use std::convert::Into;
Allen Georgebc1344d2017-04-28 10:22:03 -0400187 use std::sync::atomic::{AtomicBool, Ordering};
Allen Georgeef7a1892018-12-16 18:01:37 -0500188 use std::sync::Arc;
Allen Georgebc1344d2017-04-28 10:22:03 -0400189
Allen George55c3e4c2021-03-01 23:19:52 -0500190 use crate::protocol::{
191 TBinaryInputProtocol, TBinaryOutputProtocol, TMessageIdentifier, TMessageType,
192 };
Allen Georgeb0d14132020-03-29 11:48:55 -0400193 use crate::transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
194 use crate::{ApplicationError, ApplicationErrorKind};
Allen Georgebc1344d2017-04-28 10:22:03 -0400195
196 use super::*;
197
198 #[test]
199 fn should_split_name_into_proper_separator_and_service_call() {
200 let ident_name = "foo:bar_call";
201 let (serv, call) = split_ident_name(&ident_name);
202 assert_eq!(serv, Some("foo"));
203 assert_eq!(call, "bar_call");
204 }
205
206 #[test]
207 fn should_return_full_ident_if_no_separator_exists() {
208 let ident_name = "bar_call";
209 let (serv, call) = split_ident_name(&ident_name);
210 assert_eq!(serv, None);
211 assert_eq!(call, "bar_call");
212 }
213
214 #[test]
215 fn should_write_error_if_no_separator_found_and_no_default_processor_exists() {
216 let (mut i, mut o) = build_objects();
217
218 let sent_ident = TMessageIdentifier::new("foo", TMessageType::Call, 10);
219 o.write_message_begin(&sent_ident).unwrap();
220 o.flush().unwrap();
221 o.transport.copy_write_buffer_to_read_buffer();
222 o.transport.empty_write_buffer();
223
224 let p = TMultiplexedProcessor::new();
225 p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
226
Allen Georgeef7a1892018-12-16 18:01:37 -0500227 i.transport.set_readable_bytes(&o.transport.write_bytes());
Allen Georgebc1344d2017-04-28 10:22:03 -0400228 let rcvd_ident = i.read_message_begin().unwrap();
229 let expected_ident = TMessageIdentifier::new("foo", TMessageType::Exception, 10);
230 assert_eq!(rcvd_ident, expected_ident);
Allen Georgeb0d14132020-03-29 11:48:55 -0400231 let rcvd_err = crate::Error::read_application_error_from_in_protocol(&mut i).unwrap();
Allen Georgebc1344d2017-04-28 10:22:03 -0400232 let expected_err = ApplicationError::new(
233 ApplicationErrorKind::Unknown,
234 MISSING_SEPARATOR_AND_NO_DEFAULT,
235 );
236 assert_eq!(rcvd_err, expected_err);
237 }
238
239 #[test]
240 fn should_write_error_if_separator_exists_and_no_processor_found() {
241 let (mut i, mut o) = build_objects();
242
243 let sent_ident = TMessageIdentifier::new("missing:call", TMessageType::Call, 10);
244 o.write_message_begin(&sent_ident).unwrap();
245 o.flush().unwrap();
246 o.transport.copy_write_buffer_to_read_buffer();
247 o.transport.empty_write_buffer();
248
249 let p = TMultiplexedProcessor::new();
250 p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
251
Allen Georgeef7a1892018-12-16 18:01:37 -0500252 i.transport.set_readable_bytes(&o.transport.write_bytes());
Allen Georgebc1344d2017-04-28 10:22:03 -0400253 let rcvd_ident = i.read_message_begin().unwrap();
254 let expected_ident = TMessageIdentifier::new("missing:call", TMessageType::Exception, 10);
255 assert_eq!(rcvd_ident, expected_ident);
Allen Georgeb0d14132020-03-29 11:48:55 -0400256 let rcvd_err = crate::Error::read_application_error_from_in_protocol(&mut i).unwrap();
Allen Georgebc1344d2017-04-28 10:22:03 -0400257 let expected_err = ApplicationError::new(
258 ApplicationErrorKind::Unknown,
259 missing_processor_message(Some("missing")),
260 );
261 assert_eq!(rcvd_err, expected_err);
262 }
263
264 #[derive(Default)]
265 struct Service {
266 pub invoked: Arc<AtomicBool>,
267 }
268
269 impl TProcessor for Service {
Allen George55c3e4c2021-03-01 23:19:52 -0500270 fn process(
271 &self,
272 _: &mut dyn TInputProtocol,
273 _: &mut dyn TOutputProtocol,
274 ) -> crate::Result<()> {
Allen Georgeef7a1892018-12-16 18:01:37 -0500275 let res = self
276 .invoked
Allen Georgebc1344d2017-04-28 10:22:03 -0400277 .compare_and_swap(false, true, Ordering::Relaxed);
278 if res {
279 Ok(())
280 } else {
281 Err("failed swap".into())
Allen George8b96bfb2016-11-02 08:01:08 -0400282 }
283 }
284 }
Allen Georgebc1344d2017-04-28 10:22:03 -0400285
286 #[test]
287 fn should_route_call_to_correct_processor() {
288 let (mut i, mut o) = build_objects();
289
290 // build the services
Allen Georgeef7a1892018-12-16 18:01:37 -0500291 let svc_1 = Service {
292 invoked: Arc::new(AtomicBool::new(false)),
293 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400294 let atm_1 = svc_1.invoked.clone();
Allen Georgeef7a1892018-12-16 18:01:37 -0500295 let svc_2 = Service {
296 invoked: Arc::new(AtomicBool::new(false)),
297 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400298 let atm_2 = svc_2.invoked.clone();
299
300 // register them
301 let mut p = TMultiplexedProcessor::new();
302 p.register("service_1", Box::new(svc_1), false).unwrap();
303 p.register("service_2", Box::new(svc_2), false).unwrap();
304
305 // make the service call
306 let sent_ident = TMessageIdentifier::new("service_1:call", TMessageType::Call, 10);
307 o.write_message_begin(&sent_ident).unwrap();
308 o.flush().unwrap();
309 o.transport.copy_write_buffer_to_read_buffer();
310 o.transport.empty_write_buffer();
311
312 p.process(&mut i, &mut o).unwrap();
313
314 // service 1 should have been invoked, not service 2
315 assert_eq!(atm_1.load(Ordering::Relaxed), true);
316 assert_eq!(atm_2.load(Ordering::Relaxed), false);
317 }
318
319 #[test]
320 fn should_route_call_to_correct_processor_if_no_separator_exists_and_default_processor_set() {
321 let (mut i, mut o) = build_objects();
322
323 // build the services
Allen Georgeef7a1892018-12-16 18:01:37 -0500324 let svc_1 = Service {
325 invoked: Arc::new(AtomicBool::new(false)),
326 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400327 let atm_1 = svc_1.invoked.clone();
Allen Georgeef7a1892018-12-16 18:01:37 -0500328 let svc_2 = Service {
329 invoked: Arc::new(AtomicBool::new(false)),
330 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400331 let atm_2 = svc_2.invoked.clone();
332
333 // register them
334 let mut p = TMultiplexedProcessor::new();
335 p.register("service_1", Box::new(svc_1), false).unwrap();
336 p.register("service_2", Box::new(svc_2), true).unwrap(); // second processor is default
337
338 // make the service call (it's an old client, so we have to be backwards compatible)
339 let sent_ident = TMessageIdentifier::new("old_call", TMessageType::Call, 10);
340 o.write_message_begin(&sent_ident).unwrap();
341 o.flush().unwrap();
342 o.transport.copy_write_buffer_to_read_buffer();
343 o.transport.empty_write_buffer();
344
345 p.process(&mut i, &mut o).unwrap();
346
347 // service 2 should have been invoked, not service 1
348 assert_eq!(atm_1.load(Ordering::Relaxed), false);
349 assert_eq!(atm_2.load(Ordering::Relaxed), true);
350 }
351
Allen Georgeef7a1892018-12-16 18:01:37 -0500352 fn build_objects() -> (
353 TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
354 TBinaryOutputProtocol<WriteHalf<TBufferChannel>>,
355 ) {
Allen Georgebc1344d2017-04-28 10:22:03 -0400356 let c = TBufferChannel::with_capacity(128, 128);
357 let (r_c, w_c) = c.split().unwrap();
Allen Georgeef7a1892018-12-16 18:01:37 -0500358 (
359 TBinaryInputProtocol::new(r_c, true),
360 TBinaryOutputProtocol::new(w_c, true),
361 )
Allen Georgebc1344d2017-04-28 10:22:03 -0400362 }
Allen George8b96bfb2016-11-02 08:01:08 -0400363}