blob: db08027f2bd997fb139671b363246c83e95ddc76 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType,
19 TOutputProtocol, TSetIdentifier, TStructIdentifier};
20
21/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift
22/// messages.
23///
24/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services
25/// send messages over a single I/O channel. By prefixing service identifiers
26/// to outgoing messages receivers are able to demux them and route them to the
27/// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor`
28/// to process incoming messages, while other languages must use their
29/// corresponding multiplexed processor implementations.
30///
31/// For example, given a service `TestService` and a service call `test_call`,
32/// this implementation would identify messages as originating from
33/// `TestService:test_call`.
34///
35/// # Examples
36///
37/// Create and use a `TMultiplexedOutputProtocol`.
38///
39/// ```no_run
Allen George8b96bfb2016-11-02 08:01:08 -040040/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
41/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
Allen George0e22c362017-01-30 07:15:00 -050042/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040043///
Allen George0e22c362017-01-30 07:15:00 -050044/// let mut channel = TTcpChannel::new();
45/// channel.open("localhost:9090").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040046///
Allen George0e22c362017-01-30 07:15:00 -050047/// let protocol = TBinaryOutputProtocol::new(channel, true);
48/// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol);
Allen George8b96bfb2016-11-02 08:01:08 -040049///
50/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
Allen George0e22c362017-01-30 07:15:00 -050051/// protocol.write_message_begin(&ident).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040052/// ```
Allen George0e22c362017-01-30 07:15:00 -050053#[derive(Debug)]
54pub struct TMultiplexedOutputProtocol<P>
55where
56 P: TOutputProtocol,
57{
Allen George8b96bfb2016-11-02 08:01:08 -040058 service_name: String,
Allen George0e22c362017-01-30 07:15:00 -050059 inner: P,
Allen George8b96bfb2016-11-02 08:01:08 -040060}
61
Allen George0e22c362017-01-30 07:15:00 -050062impl<P> TMultiplexedOutputProtocol<P>
63where
64 P: TOutputProtocol,
65{
Allen George8b96bfb2016-11-02 08:01:08 -040066 /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
67 /// as originating from a service named `service_name` and sends them over
68 /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
69 /// by `wrapped`, not by this instance.
Allen George0e22c362017-01-30 07:15:00 -050070 pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> {
Allen George8b96bfb2016-11-02 08:01:08 -040071 TMultiplexedOutputProtocol {
72 service_name: service_name.to_owned(),
73 inner: wrapped,
74 }
75 }
76}
77
78// FIXME: avoid passthrough methods
Allen George0e22c362017-01-30 07:15:00 -050079impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P>
80where
81 P: TOutputProtocol,
82{
Allen George8b96bfb2016-11-02 08:01:08 -040083 fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
84 match identifier.message_type { // FIXME: is there a better way to override identifier here?
85 TMessageType::Call | TMessageType::OneWay => {
86 let identifier = TMessageIdentifier {
87 name: format!("{}:{}", self.service_name, identifier.name),
88 ..*identifier
89 };
90 self.inner.write_message_begin(&identifier)
91 }
92 _ => self.inner.write_message_begin(identifier),
93 }
94 }
95
96 fn write_message_end(&mut self) -> ::Result<()> {
97 self.inner.write_message_end()
98 }
99
100 fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()> {
101 self.inner.write_struct_begin(identifier)
102 }
103
104 fn write_struct_end(&mut self) -> ::Result<()> {
105 self.inner.write_struct_end()
106 }
107
108 fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> {
109 self.inner.write_field_begin(identifier)
110 }
111
112 fn write_field_end(&mut self) -> ::Result<()> {
113 self.inner.write_field_end()
114 }
115
116 fn write_field_stop(&mut self) -> ::Result<()> {
117 self.inner.write_field_stop()
118 }
119
120 fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
121 self.inner.write_bytes(b)
122 }
123
124 fn write_bool(&mut self, b: bool) -> ::Result<()> {
125 self.inner.write_bool(b)
126 }
127
128 fn write_i8(&mut self, i: i8) -> ::Result<()> {
129 self.inner.write_i8(i)
130 }
131
132 fn write_i16(&mut self, i: i16) -> ::Result<()> {
133 self.inner.write_i16(i)
134 }
135
136 fn write_i32(&mut self, i: i32) -> ::Result<()> {
137 self.inner.write_i32(i)
138 }
139
140 fn write_i64(&mut self, i: i64) -> ::Result<()> {
141 self.inner.write_i64(i)
142 }
143
144 fn write_double(&mut self, d: f64) -> ::Result<()> {
145 self.inner.write_double(d)
146 }
147
148 fn write_string(&mut self, s: &str) -> ::Result<()> {
149 self.inner.write_string(s)
150 }
151
152 fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()> {
153 self.inner.write_list_begin(identifier)
154 }
155
156 fn write_list_end(&mut self) -> ::Result<()> {
157 self.inner.write_list_end()
158 }
159
160 fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> {
161 self.inner.write_set_begin(identifier)
162 }
163
164 fn write_set_end(&mut self) -> ::Result<()> {
165 self.inner.write_set_end()
166 }
167
168 fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> {
169 self.inner.write_map_begin(identifier)
170 }
171
172 fn write_map_end(&mut self) -> ::Result<()> {
173 self.inner.write_map_end()
174 }
175
176 fn flush(&mut self) -> ::Result<()> {
177 self.inner.flush()
178 }
179
180 // utility
181 //
182
183 fn write_byte(&mut self, b: u8) -> ::Result<()> {
184 self.inner.write_byte(b)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190
Allen George0e22c362017-01-30 07:15:00 -0500191 use protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
192 use transport::{TBufferChannel, TIoChannel, WriteHalf};
Allen George8b96bfb2016-11-02 08:01:08 -0400193
194 use super::*;
195
196 #[test]
197 fn must_write_message_begin_with_prefixed_service_name() {
Allen George0e22c362017-01-30 07:15:00 -0500198 let mut o_prot = test_objects();
Allen George8b96bfb2016-11-02 08:01:08 -0400199
200 let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
201 assert_success!(o_prot.write_message_begin(&ident));
202
Allen George0e22c362017-01-30 07:15:00 -0500203 let expected: [u8; 19] = [
204 0x80,
205 0x01, /* protocol identifier */
206 0x00,
207 0x01, /* message type */
208 0x00,
209 0x00,
210 0x00,
211 0x07,
212 0x66,
213 0x6F,
214 0x6F, /* "foo" */
215 0x3A, /* ":" */
216 0x62,
217 0x61,
218 0x72, /* "bar" */
219 0x00,
220 0x00,
221 0x00,
222 0x02 /* sequence number */,
223 ];
Allen George8b96bfb2016-11-02 08:01:08 -0400224
Allen George0e22c362017-01-30 07:15:00 -0500225 assert_eq!(o_prot.inner.transport.write_bytes(), expected);
Allen George8b96bfb2016-11-02 08:01:08 -0400226 }
227
Allen George0e22c362017-01-30 07:15:00 -0500228 fn test_objects
229 ()
230 -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>
231 {
232 let c = TBufferChannel::with_capacity(40, 40);
233 let (_, w_chan) = c.split().unwrap();
234 let prot = TBinaryOutputProtocol::new(w_chan, true);
235 TMultiplexedOutputProtocol::new("foo", prot)
Allen George8b96bfb2016-11-02 08:01:08 -0400236 }
237}