Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | use std::collections::HashMap; |
| 19 | use std::convert::Into; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 20 | use std::sync::{Arc, Mutex}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 21 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 22 | use {ApplicationErrorKind, new_application_error}; |
| 23 | use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 24 | |
| 25 | use super::TProcessor; |
| 26 | |
| 27 | /// A `TProcessor` that can demux service calls to multiple underlying |
| 28 | /// Thrift services. |
| 29 | /// |
| 30 | /// Users register service-specific `TProcessor` instances with a |
| 31 | /// `TMultiplexedProcessor`, and then register that processor with a server |
| 32 | /// implementation. Following that, all incoming service calls are automatically |
| 33 | /// routed to the service-specific `TProcessor`. |
| 34 | /// |
| 35 | /// A `TMultiplexedProcessor` can only handle messages sent by a |
| 36 | /// `TMultiplexedOutputProtocol`. |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 37 | // FIXME: implement Debug |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 38 | pub struct TMultiplexedProcessor { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 39 | processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>, |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 40 | } |
| 41 | |
| 42 | impl TMultiplexedProcessor { |
| 43 | /// Register a service-specific `processor` for the service named |
| 44 | /// `service_name`. |
| 45 | /// |
| 46 | /// Return `true` if this is the first registration for `service_name`. |
| 47 | /// |
| 48 | /// Return `false` if a mapping previously existed (the previous mapping is |
| 49 | /// *not* overwritten). |
| 50 | #[cfg_attr(feature = "cargo-clippy", allow(map_entry))] |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 51 | pub fn register_processor<S: Into<String>>( |
| 52 | &mut self, |
| 53 | service_name: S, |
| 54 | processor: Box<TProcessor>, |
| 55 | ) -> bool { |
| 56 | let mut processors = self.processors.lock().unwrap(); |
| 57 | |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 58 | let name = service_name.into(); |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 59 | if processors.contains_key(&name) { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 60 | false |
| 61 | } else { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 62 | processors.insert(name, Arc::new(processor)); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 63 | true |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | impl TProcessor for TMultiplexedProcessor { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 69 | fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 70 | let msg_ident = i_prot.read_message_begin()?; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 71 | let sep_index = msg_ident |
| 72 | .name |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 73 | .find(':') |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 74 | .ok_or_else( |
| 75 | || { |
| 76 | new_application_error( |
| 77 | ApplicationErrorKind::Unknown, |
| 78 | "no service separator found in incoming message", |
| 79 | ) |
| 80 | }, |
| 81 | )?; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 82 | |
| 83 | let (svc_name, svc_call) = msg_ident.name.split_at(sep_index); |
| 84 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 85 | let processor: Option<Arc<Box<TProcessor>>> = { |
| 86 | let processors = self.processors.lock().unwrap(); |
| 87 | processors.get(svc_name).cloned() |
| 88 | }; |
| 89 | |
| 90 | match processor { |
| 91 | Some(arc) => { |
| 92 | let new_msg_ident = TMessageIdentifier::new( |
| 93 | svc_call, |
| 94 | msg_ident.message_type, |
| 95 | msg_ident.sequence_number, |
| 96 | ); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 97 | let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident); |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 98 | (*arc).process(&mut proxy_i_prot, o_prot) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 99 | } |
| 100 | None => { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 101 | Err( |
| 102 | new_application_error( |
| 103 | ApplicationErrorKind::Unknown, |
| 104 | format!("no processor found for service {}", svc_name), |
| 105 | ), |
| 106 | ) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 107 | } |
| 108 | } |
| 109 | } |
| 110 | } |