blob: 15fe6086d6614608db525f563a7c759e98b1d325 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType,
19 TOutputProtocol, TSetIdentifier, TStructIdentifier};
20
21/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift
22/// messages.
23///
24/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services
25/// send messages over a single I/O channel. By prefixing service identifiers
26/// to outgoing messages receivers are able to demux them and route them to the
27/// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor`
28/// to process incoming messages, while other languages must use their
29/// corresponding multiplexed processor implementations.
30///
31/// For example, given a service `TestService` and a service call `test_call`,
32/// this implementation would identify messages as originating from
33/// `TestService:test_call`.
34///
35/// # Examples
36///
37/// Create and use a `TMultiplexedOutputProtocol`.
38///
39/// ```no_run
40/// use std::cell::RefCell;
41/// use std::rc::Rc;
42/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
43/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
44/// use thrift::transport::{TTcpTransport, TTransport};
45///
46/// let mut transport = TTcpTransport::new();
47/// transport.open("localhost:9090").unwrap();
48/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
49///
50/// let o_prot = TBinaryOutputProtocol::new(transport, true);
51/// let mut o_prot = TMultiplexedOutputProtocol::new("service_name", Box::new(o_prot));
52///
53/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
54/// o_prot.write_message_begin(&ident).unwrap();
55/// ```
56pub struct TMultiplexedOutputProtocol {
57 service_name: String,
58 inner: Box<TOutputProtocol>,
59}
60
61impl TMultiplexedOutputProtocol {
62 /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
63 /// as originating from a service named `service_name` and sends them over
64 /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
65 /// by `wrapped`, not by this instance.
66 pub fn new(service_name: &str, wrapped: Box<TOutputProtocol>) -> TMultiplexedOutputProtocol {
67 TMultiplexedOutputProtocol {
68 service_name: service_name.to_owned(),
69 inner: wrapped,
70 }
71 }
72}
73
74// FIXME: avoid passthrough methods
75impl TOutputProtocol for TMultiplexedOutputProtocol {
76 fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
77 match identifier.message_type { // FIXME: is there a better way to override identifier here?
78 TMessageType::Call | TMessageType::OneWay => {
79 let identifier = TMessageIdentifier {
80 name: format!("{}:{}", self.service_name, identifier.name),
81 ..*identifier
82 };
83 self.inner.write_message_begin(&identifier)
84 }
85 _ => self.inner.write_message_begin(identifier),
86 }
87 }
88
89 fn write_message_end(&mut self) -> ::Result<()> {
90 self.inner.write_message_end()
91 }
92
93 fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()> {
94 self.inner.write_struct_begin(identifier)
95 }
96
97 fn write_struct_end(&mut self) -> ::Result<()> {
98 self.inner.write_struct_end()
99 }
100
101 fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> {
102 self.inner.write_field_begin(identifier)
103 }
104
105 fn write_field_end(&mut self) -> ::Result<()> {
106 self.inner.write_field_end()
107 }
108
109 fn write_field_stop(&mut self) -> ::Result<()> {
110 self.inner.write_field_stop()
111 }
112
113 fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
114 self.inner.write_bytes(b)
115 }
116
117 fn write_bool(&mut self, b: bool) -> ::Result<()> {
118 self.inner.write_bool(b)
119 }
120
121 fn write_i8(&mut self, i: i8) -> ::Result<()> {
122 self.inner.write_i8(i)
123 }
124
125 fn write_i16(&mut self, i: i16) -> ::Result<()> {
126 self.inner.write_i16(i)
127 }
128
129 fn write_i32(&mut self, i: i32) -> ::Result<()> {
130 self.inner.write_i32(i)
131 }
132
133 fn write_i64(&mut self, i: i64) -> ::Result<()> {
134 self.inner.write_i64(i)
135 }
136
137 fn write_double(&mut self, d: f64) -> ::Result<()> {
138 self.inner.write_double(d)
139 }
140
141 fn write_string(&mut self, s: &str) -> ::Result<()> {
142 self.inner.write_string(s)
143 }
144
145 fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()> {
146 self.inner.write_list_begin(identifier)
147 }
148
149 fn write_list_end(&mut self) -> ::Result<()> {
150 self.inner.write_list_end()
151 }
152
153 fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> {
154 self.inner.write_set_begin(identifier)
155 }
156
157 fn write_set_end(&mut self) -> ::Result<()> {
158 self.inner.write_set_end()
159 }
160
161 fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> {
162 self.inner.write_map_begin(identifier)
163 }
164
165 fn write_map_end(&mut self) -> ::Result<()> {
166 self.inner.write_map_end()
167 }
168
169 fn flush(&mut self) -> ::Result<()> {
170 self.inner.flush()
171 }
172
173 // utility
174 //
175
176 fn write_byte(&mut self, b: u8) -> ::Result<()> {
177 self.inner.write_byte(b)
178 }
179}
180
181#[cfg(test)]
182mod tests {
183
184 use std::cell::RefCell;
185 use std::rc::Rc;
186
187 use ::protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
188 use ::transport::{TPassThruTransport, TTransport};
189 use ::transport::mem::TBufferTransport;
190
191 use super::*;
192
193 #[test]
194 fn must_write_message_begin_with_prefixed_service_name() {
195 let (trans, mut o_prot) = test_objects();
196
197 let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
198 assert_success!(o_prot.write_message_begin(&ident));
199
200 let expected: [u8; 19] =
201 [0x80, 0x01 /* protocol identifier */, 0x00, 0x01 /* message type */, 0x00,
202 0x00, 0x00, 0x07, 0x66, 0x6F, 0x6F /* "foo" */, 0x3A /* ":" */, 0x62, 0x61,
203 0x72 /* "bar" */, 0x00, 0x00, 0x00, 0x02 /* sequence number */];
204
205 assert_eq!(&trans.borrow().write_buffer_to_vec(), &expected);
206 }
207
208 fn test_objects() -> (Rc<RefCell<Box<TBufferTransport>>>, TMultiplexedOutputProtocol) {
209 let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity(40, 40))));
210
211 let inner: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
212 let inner = Rc::new(RefCell::new(inner));
213
214 let o_prot = TBinaryOutputProtocol::new(inner.clone(), true);
215 let o_prot = TMultiplexedOutputProtocol::new("foo", Box::new(o_prot));
216
217 (mem, o_prot)
218 }
219}