blob: a7f6d0474f802c07c22da7d8b31e1451aa533eab [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
Allen Georgebc1344d2017-04-28 10:22:03 -040019use std::fmt;
20use std::fmt::{Debug, Formatter};
Allen George8b96bfb2016-11-02 08:01:08 -040021use std::convert::Into;
Allen George0e22c362017-01-30 07:15:00 -050022use std::sync::{Arc, Mutex};
Allen George8b96bfb2016-11-02 08:01:08 -040023
Allen George0e22c362017-01-30 07:15:00 -050024use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
Allen George8b96bfb2016-11-02 08:01:08 -040025
Allen Georgebc1344d2017-04-28 10:22:03 -040026use super::{TProcessor, handle_process_result};
27
28const MISSING_SEPARATOR_AND_NO_DEFAULT: &'static str = "missing service separator and no default processor set";
29type ThreadSafeProcessor = Box<TProcessor + Send + Sync>;
Allen George8b96bfb2016-11-02 08:01:08 -040030
31/// A `TProcessor` that can demux service calls to multiple underlying
32/// Thrift services.
33///
34/// Users register service-specific `TProcessor` instances with a
35/// `TMultiplexedProcessor`, and then register that processor with a server
36/// implementation. Following that, all incoming service calls are automatically
37/// routed to the service-specific `TProcessor`.
38///
39/// A `TMultiplexedProcessor` can only handle messages sent by a
40/// `TMultiplexedOutputProtocol`.
Allen Georgebc1344d2017-04-28 10:22:03 -040041#[derive(Default)]
Allen George8b96bfb2016-11-02 08:01:08 -040042pub struct TMultiplexedProcessor {
Allen Georgebc1344d2017-04-28 10:22:03 -040043 stored: Mutex<StoredProcessors>,
44}
45
46#[derive(Default)]
47struct StoredProcessors {
48 processors: HashMap<String, Arc<ThreadSafeProcessor>>,
49 default_processor: Option<Arc<ThreadSafeProcessor>>,
Allen George8b96bfb2016-11-02 08:01:08 -040050}
51
52impl TMultiplexedProcessor {
Allen Georgebc1344d2017-04-28 10:22:03 -040053 /// Create a new `TMultiplexedProcessor` with no registered service-specific
54 /// processors.
55 pub fn new() -> TMultiplexedProcessor {
56 TMultiplexedProcessor {
57 stored: Mutex::new(
58 StoredProcessors {
59 processors: HashMap::new(),
60 default_processor: None,
61 },
62 ),
Allen George8b96bfb2016-11-02 08:01:08 -040063 }
64 }
Allen George8b96bfb2016-11-02 08:01:08 -040065
Allen Georgebc1344d2017-04-28 10:22:03 -040066 /// Register a service-specific `processor` for the service named
67 /// `service_name`. This implementation is also backwards-compatible with
68 /// non-multiplexed clients. Set `as_default` to `true` to allow
69 /// non-namespaced requests to be dispatched to a default processor.
70 ///
71 /// Returns success if a new entry was inserted. Returns an error if:
72 /// * A processor exists for `service_name`
73 /// * You attempt to register a processor as default, and an existing default exists
74 #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
75 pub fn register<S: Into<String>>(
76 &mut self,
77 service_name: S,
78 processor: Box<TProcessor + Send + Sync>,
79 as_default: bool,
80 ) -> ::Result<()> {
81 let mut stored = self.stored.lock().unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040082
Allen Georgebc1344d2017-04-28 10:22:03 -040083 let name = service_name.into();
84 if !stored.processors.contains_key(&name) {
85 let processor = Arc::new(processor);
Allen George8b96bfb2016-11-02 08:01:08 -040086
Allen Georgebc1344d2017-04-28 10:22:03 -040087 if as_default {
88 if stored.default_processor.is_none() {
89 stored.processors.insert(name, processor.clone());
90 stored.default_processor = Some(processor.clone());
91 Ok(())
92 } else {
93 Err("cannot reset default processor".into())
94 }
95 } else {
96 stored.processors.insert(name, processor);
97 Ok(())
98 }
99 } else {
100 Err(format!("cannot overwrite existing processor for service {}", name).into(),)
101 }
102 }
103
104 fn process_message(
105 &self,
106 msg_ident: &TMessageIdentifier,
107 i_prot: &mut TInputProtocol,
108 o_prot: &mut TOutputProtocol,
109 ) -> ::Result<()> {
110 let (svc_name, svc_call) = split_ident_name(&msg_ident.name);
111 debug!("routing svc_name {:?} svc_call {}", &svc_name, &svc_call);
112
113 let processor: Option<Arc<ThreadSafeProcessor>> = {
114 let stored = self.stored.lock().unwrap();
115 if let Some(name) = svc_name {
116 stored.processors.get(name).cloned()
117 } else {
118 stored.default_processor.clone()
119 }
Allen George0e22c362017-01-30 07:15:00 -0500120 };
121
122 match processor {
123 Some(arc) => {
124 let new_msg_ident = TMessageIdentifier::new(
125 svc_call,
126 msg_ident.message_type,
127 msg_ident.sequence_number,
128 );
Allen George8b96bfb2016-11-02 08:01:08 -0400129 let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
Allen George0e22c362017-01-30 07:15:00 -0500130 (*arc).process(&mut proxy_i_prot, o_prot)
Allen George8b96bfb2016-11-02 08:01:08 -0400131 }
Allen Georgebc1344d2017-04-28 10:22:03 -0400132 None => Err(missing_processor_message(svc_name).into()),
133 }
134 }
135}
136
137impl TProcessor for TMultiplexedProcessor {
138 fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
139 let msg_ident = i_prot.read_message_begin()?;
140
141 debug!("process incoming msg id:{:?}", &msg_ident);
142 let res = self.process_message(&msg_ident, i_prot, o_prot);
143
144 handle_process_result(&msg_ident, res, o_prot)
145 }
146}
147
148impl Debug for TMultiplexedProcessor {
149 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
150 let stored = self.stored.lock().unwrap();
151 write!(
152 f,
153 "TMultiplexedProcess {{ registered_count: {:?} default: {:?} }}",
154 stored.processors.keys().len(),
155 stored.default_processor.is_some()
156 )
157 }
158}
159
160fn split_ident_name(ident_name: &str) -> (Option<&str>, &str) {
161 ident_name
162 .find(':')
163 .map(
164 |pos| {
165 let (svc_name, svc_call) = ident_name.split_at(pos);
166 let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name
167 (Some(svc_name), svc_call)
168 },
169 )
170 .or_else(|| Some((None, ident_name)))
171 .unwrap()
172}
173
174fn missing_processor_message(svc_name: Option<&str>) -> String {
175 match svc_name {
176 Some(name) => format!("no processor found for service {}", name),
177 None => MISSING_SEPARATOR_AND_NO_DEFAULT.to_owned(),
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::convert::Into;
184 use std::sync::Arc;
185 use std::sync::atomic::{AtomicBool, Ordering};
186
187 use {ApplicationError, ApplicationErrorKind};
188 use protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TMessageIdentifier, TMessageType};
189 use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
190
191 use super::*;
192
193 #[test]
194 fn should_split_name_into_proper_separator_and_service_call() {
195 let ident_name = "foo:bar_call";
196 let (serv, call) = split_ident_name(&ident_name);
197 assert_eq!(serv, Some("foo"));
198 assert_eq!(call, "bar_call");
199 }
200
201 #[test]
202 fn should_return_full_ident_if_no_separator_exists() {
203 let ident_name = "bar_call";
204 let (serv, call) = split_ident_name(&ident_name);
205 assert_eq!(serv, None);
206 assert_eq!(call, "bar_call");
207 }
208
209 #[test]
210 fn should_write_error_if_no_separator_found_and_no_default_processor_exists() {
211 let (mut i, mut o) = build_objects();
212
213 let sent_ident = TMessageIdentifier::new("foo", TMessageType::Call, 10);
214 o.write_message_begin(&sent_ident).unwrap();
215 o.flush().unwrap();
216 o.transport.copy_write_buffer_to_read_buffer();
217 o.transport.empty_write_buffer();
218
219 let p = TMultiplexedProcessor::new();
220 p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
221
222 i.transport
223 .set_readable_bytes(&o.transport.write_bytes());
224 let rcvd_ident = i.read_message_begin().unwrap();
225 let expected_ident = TMessageIdentifier::new("foo", TMessageType::Exception, 10);
226 assert_eq!(rcvd_ident, expected_ident);
227 let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
228 let expected_err = ApplicationError::new(
229 ApplicationErrorKind::Unknown,
230 MISSING_SEPARATOR_AND_NO_DEFAULT,
231 );
232 assert_eq!(rcvd_err, expected_err);
233 }
234
235 #[test]
236 fn should_write_error_if_separator_exists_and_no_processor_found() {
237 let (mut i, mut o) = build_objects();
238
239 let sent_ident = TMessageIdentifier::new("missing:call", TMessageType::Call, 10);
240 o.write_message_begin(&sent_ident).unwrap();
241 o.flush().unwrap();
242 o.transport.copy_write_buffer_to_read_buffer();
243 o.transport.empty_write_buffer();
244
245 let p = TMultiplexedProcessor::new();
246 p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
247
248 i.transport
249 .set_readable_bytes(&o.transport.write_bytes());
250 let rcvd_ident = i.read_message_begin().unwrap();
251 let expected_ident = TMessageIdentifier::new("missing:call", TMessageType::Exception, 10);
252 assert_eq!(rcvd_ident, expected_ident);
253 let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
254 let expected_err = ApplicationError::new(
255 ApplicationErrorKind::Unknown,
256 missing_processor_message(Some("missing")),
257 );
258 assert_eq!(rcvd_err, expected_err);
259 }
260
261 #[derive(Default)]
262 struct Service {
263 pub invoked: Arc<AtomicBool>,
264 }
265
266 impl TProcessor for Service {
267 fn process(&self, _: &mut TInputProtocol, _: &mut TOutputProtocol) -> ::Result<()> {
268 let res = self.invoked
269 .compare_and_swap(false, true, Ordering::Relaxed);
270 if res {
271 Ok(())
272 } else {
273 Err("failed swap".into())
Allen George8b96bfb2016-11-02 08:01:08 -0400274 }
275 }
276 }
Allen Georgebc1344d2017-04-28 10:22:03 -0400277
278 #[test]
279 fn should_route_call_to_correct_processor() {
280 let (mut i, mut o) = build_objects();
281
282 // build the services
283 let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
284 let atm_1 = svc_1.invoked.clone();
285 let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
286 let atm_2 = svc_2.invoked.clone();
287
288 // register them
289 let mut p = TMultiplexedProcessor::new();
290 p.register("service_1", Box::new(svc_1), false).unwrap();
291 p.register("service_2", Box::new(svc_2), false).unwrap();
292
293 // make the service call
294 let sent_ident = TMessageIdentifier::new("service_1:call", TMessageType::Call, 10);
295 o.write_message_begin(&sent_ident).unwrap();
296 o.flush().unwrap();
297 o.transport.copy_write_buffer_to_read_buffer();
298 o.transport.empty_write_buffer();
299
300 p.process(&mut i, &mut o).unwrap();
301
302 // service 1 should have been invoked, not service 2
303 assert_eq!(atm_1.load(Ordering::Relaxed), true);
304 assert_eq!(atm_2.load(Ordering::Relaxed), false);
305 }
306
307 #[test]
308 fn should_route_call_to_correct_processor_if_no_separator_exists_and_default_processor_set() {
309 let (mut i, mut o) = build_objects();
310
311 // build the services
312 let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
313 let atm_1 = svc_1.invoked.clone();
314 let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
315 let atm_2 = svc_2.invoked.clone();
316
317 // register them
318 let mut p = TMultiplexedProcessor::new();
319 p.register("service_1", Box::new(svc_1), false).unwrap();
320 p.register("service_2", Box::new(svc_2), true).unwrap(); // second processor is default
321
322 // make the service call (it's an old client, so we have to be backwards compatible)
323 let sent_ident = TMessageIdentifier::new("old_call", TMessageType::Call, 10);
324 o.write_message_begin(&sent_ident).unwrap();
325 o.flush().unwrap();
326 o.transport.copy_write_buffer_to_read_buffer();
327 o.transport.empty_write_buffer();
328
329 p.process(&mut i, &mut o).unwrap();
330
331 // service 2 should have been invoked, not service 1
332 assert_eq!(atm_1.load(Ordering::Relaxed), false);
333 assert_eq!(atm_2.load(Ordering::Relaxed), true);
334 }
335
336 fn build_objects()
337 -> (TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
338 TBinaryOutputProtocol<WriteHalf<TBufferChannel>>)
339 {
340 let c = TBufferChannel::with_capacity(128, 128);
341 let (r_c, w_c) = c.split().unwrap();
342 (TBinaryInputProtocol::new(r_c, true), TBinaryOutputProtocol::new(w_c, true))
343 }
Allen George8b96bfb2016-11-02 08:01:08 -0400344}