blob: b1243a86fc88ba39cfa29f00ef4520198885c646 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::convert::Into;
Allen George0e22c362017-01-30 07:15:00 -050020use std::sync::{Arc, Mutex};
Allen George8b96bfb2016-11-02 08:01:08 -040021
Allen George0e22c362017-01-30 07:15:00 -050022use {ApplicationErrorKind, new_application_error};
23use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
Allen George8b96bfb2016-11-02 08:01:08 -040024
25use super::TProcessor;
26
27/// A `TProcessor` that can demux service calls to multiple underlying
28/// Thrift services.
29///
30/// Users register service-specific `TProcessor` instances with a
31/// `TMultiplexedProcessor`, and then register that processor with a server
32/// implementation. Following that, all incoming service calls are automatically
33/// routed to the service-specific `TProcessor`.
34///
35/// A `TMultiplexedProcessor` can only handle messages sent by a
36/// `TMultiplexedOutputProtocol`.
Allen George0e22c362017-01-30 07:15:00 -050037// FIXME: implement Debug
Allen George8b96bfb2016-11-02 08:01:08 -040038pub struct TMultiplexedProcessor {
Allen George0e22c362017-01-30 07:15:00 -050039 processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
Allen George8b96bfb2016-11-02 08:01:08 -040040}
41
42impl TMultiplexedProcessor {
43 /// Register a service-specific `processor` for the service named
44 /// `service_name`.
45 ///
46 /// Return `true` if this is the first registration for `service_name`.
47 ///
48 /// Return `false` if a mapping previously existed (the previous mapping is
49 /// *not* overwritten).
50 #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
Allen George0e22c362017-01-30 07:15:00 -050051 pub fn register_processor<S: Into<String>>(
52 &mut self,
53 service_name: S,
54 processor: Box<TProcessor>,
55 ) -> bool {
56 let mut processors = self.processors.lock().unwrap();
57
Allen George8b96bfb2016-11-02 08:01:08 -040058 let name = service_name.into();
Allen George0e22c362017-01-30 07:15:00 -050059 if processors.contains_key(&name) {
Allen George8b96bfb2016-11-02 08:01:08 -040060 false
61 } else {
Allen George0e22c362017-01-30 07:15:00 -050062 processors.insert(name, Arc::new(processor));
Allen George8b96bfb2016-11-02 08:01:08 -040063 true
64 }
65 }
66}
67
68impl TProcessor for TMultiplexedProcessor {
Allen George0e22c362017-01-30 07:15:00 -050069 fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -040070 let msg_ident = i_prot.read_message_begin()?;
Allen George0e22c362017-01-30 07:15:00 -050071 let sep_index = msg_ident
72 .name
Allen George8b96bfb2016-11-02 08:01:08 -040073 .find(':')
Allen George0e22c362017-01-30 07:15:00 -050074 .ok_or_else(
75 || {
76 new_application_error(
77 ApplicationErrorKind::Unknown,
78 "no service separator found in incoming message",
79 )
80 },
81 )?;
Allen George8b96bfb2016-11-02 08:01:08 -040082
83 let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
84
Allen George0e22c362017-01-30 07:15:00 -050085 let processor: Option<Arc<Box<TProcessor>>> = {
86 let processors = self.processors.lock().unwrap();
87 processors.get(svc_name).cloned()
88 };
89
90 match processor {
91 Some(arc) => {
92 let new_msg_ident = TMessageIdentifier::new(
93 svc_call,
94 msg_ident.message_type,
95 msg_ident.sequence_number,
96 );
Allen George8b96bfb2016-11-02 08:01:08 -040097 let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
Allen George0e22c362017-01-30 07:15:00 -050098 (*arc).process(&mut proxy_i_prot, o_prot)
Allen George8b96bfb2016-11-02 08:01:08 -040099 }
100 None => {
Allen George0e22c362017-01-30 07:15:00 -0500101 Err(
102 new_application_error(
103 ApplicationErrorKind::Unknown,
104 format!("no processor found for service {}", svc_name),
105 ),
106 )
Allen George8b96bfb2016-11-02 08:01:08 -0400107 }
108 }
109 }
110}