Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame^] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | use std::collections::HashMap; |
| 19 | use std::convert::Into; |
| 20 | |
| 21 | use ::{new_application_error, ApplicationErrorKind}; |
| 22 | use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol}; |
| 23 | |
| 24 | use super::TProcessor; |
| 25 | |
| 26 | /// A `TProcessor` that can demux service calls to multiple underlying |
| 27 | /// Thrift services. |
| 28 | /// |
| 29 | /// Users register service-specific `TProcessor` instances with a |
| 30 | /// `TMultiplexedProcessor`, and then register that processor with a server |
| 31 | /// implementation. Following that, all incoming service calls are automatically |
| 32 | /// routed to the service-specific `TProcessor`. |
| 33 | /// |
| 34 | /// A `TMultiplexedProcessor` can only handle messages sent by a |
| 35 | /// `TMultiplexedOutputProtocol`. |
| 36 | pub struct TMultiplexedProcessor { |
| 37 | processors: HashMap<String, Box<TProcessor>>, |
| 38 | } |
| 39 | |
| 40 | impl TMultiplexedProcessor { |
| 41 | /// Register a service-specific `processor` for the service named |
| 42 | /// `service_name`. |
| 43 | /// |
| 44 | /// Return `true` if this is the first registration for `service_name`. |
| 45 | /// |
| 46 | /// Return `false` if a mapping previously existed (the previous mapping is |
| 47 | /// *not* overwritten). |
| 48 | #[cfg_attr(feature = "cargo-clippy", allow(map_entry))] |
| 49 | pub fn register_processor<S: Into<String>>(&mut self, |
| 50 | service_name: S, |
| 51 | processor: Box<TProcessor>) |
| 52 | -> bool { |
| 53 | let name = service_name.into(); |
| 54 | if self.processors.contains_key(&name) { |
| 55 | false |
| 56 | } else { |
| 57 | self.processors.insert(name, processor); |
| 58 | true |
| 59 | } |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | impl TProcessor for TMultiplexedProcessor { |
| 64 | fn process(&mut self, |
| 65 | i_prot: &mut TInputProtocol, |
| 66 | o_prot: &mut TOutputProtocol) |
| 67 | -> ::Result<()> { |
| 68 | let msg_ident = i_prot.read_message_begin()?; |
| 69 | let sep_index = msg_ident.name |
| 70 | .find(':') |
| 71 | .ok_or_else(|| { |
| 72 | new_application_error(ApplicationErrorKind::Unknown, |
| 73 | "no service separator found in incoming message") |
| 74 | })?; |
| 75 | |
| 76 | let (svc_name, svc_call) = msg_ident.name.split_at(sep_index); |
| 77 | |
| 78 | match self.processors.get_mut(svc_name) { |
| 79 | Some(ref mut processor) => { |
| 80 | let new_msg_ident = TMessageIdentifier::new(svc_call, |
| 81 | msg_ident.message_type, |
| 82 | msg_ident.sequence_number); |
| 83 | let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident); |
| 84 | processor.process(&mut proxy_i_prot, o_prot) |
| 85 | } |
| 86 | None => { |
| 87 | Err(new_application_error(ApplicationErrorKind::Unknown, |
| 88 | format!("no processor found for service {}", svc_name))) |
| 89 | } |
| 90 | } |
| 91 | } |
| 92 | } |