blob: 3f9bc78e4a56209b7d2e8953cc7bd03756cf73a7 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
Allen Georgeef7a1892018-12-16 18:01:37 -050019use std::convert::Into;
Allen Georgebc1344d2017-04-28 10:22:03 -040020use std::fmt;
21use std::fmt::{Debug, Formatter};
Allen George0e22c362017-01-30 07:15:00 -050022use std::sync::{Arc, Mutex};
Allen George8b96bfb2016-11-02 08:01:08 -040023
Allen George0e22c362017-01-30 07:15:00 -050024use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
Allen George8b96bfb2016-11-02 08:01:08 -040025
Allen Georgeef7a1892018-12-16 18:01:37 -050026use super::{handle_process_result, TProcessor};
Allen Georgebc1344d2017-04-28 10:22:03 -040027
Allen Georgeef7a1892018-12-16 18:01:37 -050028const MISSING_SEPARATOR_AND_NO_DEFAULT: &'static str =
29 "missing service separator and no default processor set";
Danny Browning77d96c12019-08-21 13:41:07 -060030type ThreadSafeProcessor = Box<dyn TProcessor + Send + Sync>;
Allen George8b96bfb2016-11-02 08:01:08 -040031
32/// A `TProcessor` that can demux service calls to multiple underlying
33/// Thrift services.
34///
35/// Users register service-specific `TProcessor` instances with a
36/// `TMultiplexedProcessor`, and then register that processor with a server
37/// implementation. Following that, all incoming service calls are automatically
38/// routed to the service-specific `TProcessor`.
39///
40/// A `TMultiplexedProcessor` can only handle messages sent by a
41/// `TMultiplexedOutputProtocol`.
Allen Georgebc1344d2017-04-28 10:22:03 -040042#[derive(Default)]
Allen George8b96bfb2016-11-02 08:01:08 -040043pub struct TMultiplexedProcessor {
Allen Georgebc1344d2017-04-28 10:22:03 -040044 stored: Mutex<StoredProcessors>,
45}
46
47#[derive(Default)]
48struct StoredProcessors {
49 processors: HashMap<String, Arc<ThreadSafeProcessor>>,
50 default_processor: Option<Arc<ThreadSafeProcessor>>,
Allen George8b96bfb2016-11-02 08:01:08 -040051}
52
53impl TMultiplexedProcessor {
Allen Georgebc1344d2017-04-28 10:22:03 -040054 /// Create a new `TMultiplexedProcessor` with no registered service-specific
55 /// processors.
56 pub fn new() -> TMultiplexedProcessor {
57 TMultiplexedProcessor {
Allen Georgeef7a1892018-12-16 18:01:37 -050058 stored: Mutex::new(StoredProcessors {
59 processors: HashMap::new(),
60 default_processor: None,
61 }),
Allen George8b96bfb2016-11-02 08:01:08 -040062 }
63 }
Allen George8b96bfb2016-11-02 08:01:08 -040064
Allen Georgebc1344d2017-04-28 10:22:03 -040065 /// Register a service-specific `processor` for the service named
66 /// `service_name`. This implementation is also backwards-compatible with
67 /// non-multiplexed clients. Set `as_default` to `true` to allow
68 /// non-namespaced requests to be dispatched to a default processor.
69 ///
70 /// Returns success if a new entry was inserted. Returns an error if:
71 /// * A processor exists for `service_name`
72 /// * You attempt to register a processor as default, and an existing default exists
73 #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
74 pub fn register<S: Into<String>>(
75 &mut self,
76 service_name: S,
Danny Browning77d96c12019-08-21 13:41:07 -060077 processor: Box<dyn TProcessor + Send + Sync>,
Allen Georgebc1344d2017-04-28 10:22:03 -040078 as_default: bool,
79 ) -> ::Result<()> {
80 let mut stored = self.stored.lock().unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040081
Allen Georgebc1344d2017-04-28 10:22:03 -040082 let name = service_name.into();
83 if !stored.processors.contains_key(&name) {
84 let processor = Arc::new(processor);
Allen George8b96bfb2016-11-02 08:01:08 -040085
Allen Georgebc1344d2017-04-28 10:22:03 -040086 if as_default {
87 if stored.default_processor.is_none() {
88 stored.processors.insert(name, processor.clone());
89 stored.default_processor = Some(processor.clone());
90 Ok(())
91 } else {
92 Err("cannot reset default processor".into())
93 }
94 } else {
95 stored.processors.insert(name, processor);
96 Ok(())
97 }
98 } else {
Allen Georgeef7a1892018-12-16 18:01:37 -050099 Err(format!("cannot overwrite existing processor for service {}", name).into())
Allen Georgebc1344d2017-04-28 10:22:03 -0400100 }
101 }
102
103 fn process_message(
104 &self,
105 msg_ident: &TMessageIdentifier,
Danny Browning77d96c12019-08-21 13:41:07 -0600106 i_prot: &mut dyn TInputProtocol,
107 o_prot: &mut dyn TOutputProtocol,
Allen Georgebc1344d2017-04-28 10:22:03 -0400108 ) -> ::Result<()> {
109 let (svc_name, svc_call) = split_ident_name(&msg_ident.name);
110 debug!("routing svc_name {:?} svc_call {}", &svc_name, &svc_call);
111
112 let processor: Option<Arc<ThreadSafeProcessor>> = {
113 let stored = self.stored.lock().unwrap();
114 if let Some(name) = svc_name {
115 stored.processors.get(name).cloned()
116 } else {
117 stored.default_processor.clone()
118 }
Allen George0e22c362017-01-30 07:15:00 -0500119 };
120
121 match processor {
122 Some(arc) => {
123 let new_msg_ident = TMessageIdentifier::new(
124 svc_call,
125 msg_ident.message_type,
126 msg_ident.sequence_number,
127 );
Allen George8b96bfb2016-11-02 08:01:08 -0400128 let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
Allen George0e22c362017-01-30 07:15:00 -0500129 (*arc).process(&mut proxy_i_prot, o_prot)
Allen George8b96bfb2016-11-02 08:01:08 -0400130 }
Allen Georgebc1344d2017-04-28 10:22:03 -0400131 None => Err(missing_processor_message(svc_name).into()),
132 }
133 }
134}
135
136impl TProcessor for TMultiplexedProcessor {
Danny Browning77d96c12019-08-21 13:41:07 -0600137 fn process(&self, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> ::Result<()> {
Allen Georgebc1344d2017-04-28 10:22:03 -0400138 let msg_ident = i_prot.read_message_begin()?;
139
140 debug!("process incoming msg id:{:?}", &msg_ident);
141 let res = self.process_message(&msg_ident, i_prot, o_prot);
142
143 handle_process_result(&msg_ident, res, o_prot)
144 }
145}
146
147impl Debug for TMultiplexedProcessor {
148 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
149 let stored = self.stored.lock().unwrap();
150 write!(
151 f,
152 "TMultiplexedProcess {{ registered_count: {:?} default: {:?} }}",
153 stored.processors.keys().len(),
154 stored.default_processor.is_some()
155 )
156 }
157}
158
159fn split_ident_name(ident_name: &str) -> (Option<&str>, &str) {
160 ident_name
161 .find(':')
Allen Georgeef7a1892018-12-16 18:01:37 -0500162 .map(|pos| {
163 let (svc_name, svc_call) = ident_name.split_at(pos);
164 let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name
165 (Some(svc_name), svc_call)
166 })
Allen Georgebc1344d2017-04-28 10:22:03 -0400167 .or_else(|| Some((None, ident_name)))
168 .unwrap()
169}
170
171fn missing_processor_message(svc_name: Option<&str>) -> String {
172 match svc_name {
173 Some(name) => format!("no processor found for service {}", name),
174 None => MISSING_SEPARATOR_AND_NO_DEFAULT.to_owned(),
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use std::convert::Into;
Allen Georgebc1344d2017-04-28 10:22:03 -0400181 use std::sync::atomic::{AtomicBool, Ordering};
Allen Georgeef7a1892018-12-16 18:01:37 -0500182 use std::sync::Arc;
Allen Georgebc1344d2017-04-28 10:22:03 -0400183
Allen Georgebc1344d2017-04-28 10:22:03 -0400184 use protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TMessageIdentifier, TMessageType};
185 use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
Allen Georgeef7a1892018-12-16 18:01:37 -0500186 use {ApplicationError, ApplicationErrorKind};
Allen Georgebc1344d2017-04-28 10:22:03 -0400187
188 use super::*;
189
190 #[test]
191 fn should_split_name_into_proper_separator_and_service_call() {
192 let ident_name = "foo:bar_call";
193 let (serv, call) = split_ident_name(&ident_name);
194 assert_eq!(serv, Some("foo"));
195 assert_eq!(call, "bar_call");
196 }
197
198 #[test]
199 fn should_return_full_ident_if_no_separator_exists() {
200 let ident_name = "bar_call";
201 let (serv, call) = split_ident_name(&ident_name);
202 assert_eq!(serv, None);
203 assert_eq!(call, "bar_call");
204 }
205
206 #[test]
207 fn should_write_error_if_no_separator_found_and_no_default_processor_exists() {
208 let (mut i, mut o) = build_objects();
209
210 let sent_ident = TMessageIdentifier::new("foo", TMessageType::Call, 10);
211 o.write_message_begin(&sent_ident).unwrap();
212 o.flush().unwrap();
213 o.transport.copy_write_buffer_to_read_buffer();
214 o.transport.empty_write_buffer();
215
216 let p = TMultiplexedProcessor::new();
217 p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
218
Allen Georgeef7a1892018-12-16 18:01:37 -0500219 i.transport.set_readable_bytes(&o.transport.write_bytes());
Allen Georgebc1344d2017-04-28 10:22:03 -0400220 let rcvd_ident = i.read_message_begin().unwrap();
221 let expected_ident = TMessageIdentifier::new("foo", TMessageType::Exception, 10);
222 assert_eq!(rcvd_ident, expected_ident);
223 let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
224 let expected_err = ApplicationError::new(
225 ApplicationErrorKind::Unknown,
226 MISSING_SEPARATOR_AND_NO_DEFAULT,
227 );
228 assert_eq!(rcvd_err, expected_err);
229 }
230
231 #[test]
232 fn should_write_error_if_separator_exists_and_no_processor_found() {
233 let (mut i, mut o) = build_objects();
234
235 let sent_ident = TMessageIdentifier::new("missing:call", TMessageType::Call, 10);
236 o.write_message_begin(&sent_ident).unwrap();
237 o.flush().unwrap();
238 o.transport.copy_write_buffer_to_read_buffer();
239 o.transport.empty_write_buffer();
240
241 let p = TMultiplexedProcessor::new();
242 p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
243
Allen Georgeef7a1892018-12-16 18:01:37 -0500244 i.transport.set_readable_bytes(&o.transport.write_bytes());
Allen Georgebc1344d2017-04-28 10:22:03 -0400245 let rcvd_ident = i.read_message_begin().unwrap();
246 let expected_ident = TMessageIdentifier::new("missing:call", TMessageType::Exception, 10);
247 assert_eq!(rcvd_ident, expected_ident);
248 let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
249 let expected_err = ApplicationError::new(
250 ApplicationErrorKind::Unknown,
251 missing_processor_message(Some("missing")),
252 );
253 assert_eq!(rcvd_err, expected_err);
254 }
255
256 #[derive(Default)]
257 struct Service {
258 pub invoked: Arc<AtomicBool>,
259 }
260
261 impl TProcessor for Service {
Danny Browning77d96c12019-08-21 13:41:07 -0600262 fn process(&self, _: &mut dyn TInputProtocol, _: &mut dyn TOutputProtocol) -> ::Result<()> {
Allen Georgeef7a1892018-12-16 18:01:37 -0500263 let res = self
264 .invoked
Allen Georgebc1344d2017-04-28 10:22:03 -0400265 .compare_and_swap(false, true, Ordering::Relaxed);
266 if res {
267 Ok(())
268 } else {
269 Err("failed swap".into())
Allen George8b96bfb2016-11-02 08:01:08 -0400270 }
271 }
272 }
Allen Georgebc1344d2017-04-28 10:22:03 -0400273
274 #[test]
275 fn should_route_call_to_correct_processor() {
276 let (mut i, mut o) = build_objects();
277
278 // build the services
Allen Georgeef7a1892018-12-16 18:01:37 -0500279 let svc_1 = Service {
280 invoked: Arc::new(AtomicBool::new(false)),
281 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400282 let atm_1 = svc_1.invoked.clone();
Allen Georgeef7a1892018-12-16 18:01:37 -0500283 let svc_2 = Service {
284 invoked: Arc::new(AtomicBool::new(false)),
285 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400286 let atm_2 = svc_2.invoked.clone();
287
288 // register them
289 let mut p = TMultiplexedProcessor::new();
290 p.register("service_1", Box::new(svc_1), false).unwrap();
291 p.register("service_2", Box::new(svc_2), false).unwrap();
292
293 // make the service call
294 let sent_ident = TMessageIdentifier::new("service_1:call", TMessageType::Call, 10);
295 o.write_message_begin(&sent_ident).unwrap();
296 o.flush().unwrap();
297 o.transport.copy_write_buffer_to_read_buffer();
298 o.transport.empty_write_buffer();
299
300 p.process(&mut i, &mut o).unwrap();
301
302 // service 1 should have been invoked, not service 2
303 assert_eq!(atm_1.load(Ordering::Relaxed), true);
304 assert_eq!(atm_2.load(Ordering::Relaxed), false);
305 }
306
307 #[test]
308 fn should_route_call_to_correct_processor_if_no_separator_exists_and_default_processor_set() {
309 let (mut i, mut o) = build_objects();
310
311 // build the services
Allen Georgeef7a1892018-12-16 18:01:37 -0500312 let svc_1 = Service {
313 invoked: Arc::new(AtomicBool::new(false)),
314 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400315 let atm_1 = svc_1.invoked.clone();
Allen Georgeef7a1892018-12-16 18:01:37 -0500316 let svc_2 = Service {
317 invoked: Arc::new(AtomicBool::new(false)),
318 };
Allen Georgebc1344d2017-04-28 10:22:03 -0400319 let atm_2 = svc_2.invoked.clone();
320
321 // register them
322 let mut p = TMultiplexedProcessor::new();
323 p.register("service_1", Box::new(svc_1), false).unwrap();
324 p.register("service_2", Box::new(svc_2), true).unwrap(); // second processor is default
325
326 // make the service call (it's an old client, so we have to be backwards compatible)
327 let sent_ident = TMessageIdentifier::new("old_call", TMessageType::Call, 10);
328 o.write_message_begin(&sent_ident).unwrap();
329 o.flush().unwrap();
330 o.transport.copy_write_buffer_to_read_buffer();
331 o.transport.empty_write_buffer();
332
333 p.process(&mut i, &mut o).unwrap();
334
335 // service 2 should have been invoked, not service 1
336 assert_eq!(atm_1.load(Ordering::Relaxed), false);
337 assert_eq!(atm_2.load(Ordering::Relaxed), true);
338 }
339
Allen Georgeef7a1892018-12-16 18:01:37 -0500340 fn build_objects() -> (
341 TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
342 TBinaryOutputProtocol<WriteHalf<TBufferChannel>>,
343 ) {
Allen Georgebc1344d2017-04-28 10:22:03 -0400344 let c = TBufferChannel::with_capacity(128, 128);
345 let (r_c, w_c) = c.split().unwrap();
Allen Georgeef7a1892018-12-16 18:01:37 -0500346 (
347 TBinaryInputProtocol::new(r_c, true),
348 TBinaryOutputProtocol::new(w_c, true),
349 )
Allen Georgebc1344d2017-04-28 10:22:03 -0400350 }
Allen George8b96bfb2016-11-02 08:01:08 -0400351}