blob: 697b7e6bced5198f10cbd6f1bf24bf51e9826db3 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
Allen Georgeef7a1892018-12-16 18:01:37 -050018use super::{
19 TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType,
20 TOutputProtocol, TSetIdentifier, TStructIdentifier,
21};
Allen George8b96bfb2016-11-02 08:01:08 -040022
23/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift
24/// messages.
25///
26/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services
27/// send messages over a single I/O channel. By prefixing service identifiers
28/// to outgoing messages receivers are able to demux them and route them to the
29/// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor`
30/// to process incoming messages, while other languages must use their
31/// corresponding multiplexed processor implementations.
32///
33/// For example, given a service `TestService` and a service call `test_call`,
34/// this implementation would identify messages as originating from
35/// `TestService:test_call`.
36///
37/// # Examples
38///
39/// Create and use a `TMultiplexedOutputProtocol`.
40///
41/// ```no_run
Allen George8b96bfb2016-11-02 08:01:08 -040042/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
43/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
Allen George0e22c362017-01-30 07:15:00 -050044/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040045///
Allen George0e22c362017-01-30 07:15:00 -050046/// let mut channel = TTcpChannel::new();
47/// channel.open("localhost:9090").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040048///
Allen George0e22c362017-01-30 07:15:00 -050049/// let protocol = TBinaryOutputProtocol::new(channel, true);
50/// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol);
Allen George8b96bfb2016-11-02 08:01:08 -040051///
52/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
Allen George0e22c362017-01-30 07:15:00 -050053/// protocol.write_message_begin(&ident).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040054/// ```
Allen George0e22c362017-01-30 07:15:00 -050055#[derive(Debug)]
56pub struct TMultiplexedOutputProtocol<P>
57where
58 P: TOutputProtocol,
59{
Allen George8b96bfb2016-11-02 08:01:08 -040060 service_name: String,
Allen George0e22c362017-01-30 07:15:00 -050061 inner: P,
Allen George8b96bfb2016-11-02 08:01:08 -040062}
63
Allen George0e22c362017-01-30 07:15:00 -050064impl<P> TMultiplexedOutputProtocol<P>
65where
66 P: TOutputProtocol,
67{
Allen George8b96bfb2016-11-02 08:01:08 -040068 /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
69 /// as originating from a service named `service_name` and sends them over
70 /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
71 /// by `wrapped`, not by this instance.
Allen George0e22c362017-01-30 07:15:00 -050072 pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> {
Allen George8b96bfb2016-11-02 08:01:08 -040073 TMultiplexedOutputProtocol {
74 service_name: service_name.to_owned(),
75 inner: wrapped,
76 }
77 }
78}
79
80// FIXME: avoid passthrough methods
Allen George0e22c362017-01-30 07:15:00 -050081impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P>
82where
83 P: TOutputProtocol,
84{
Allen Georgeb0d14132020-03-29 11:48:55 -040085 fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> crate::Result<()> {
Allen Georgeef7a1892018-12-16 18:01:37 -050086 match identifier.message_type {
87 // FIXME: is there a better way to override identifier here?
Allen George8b96bfb2016-11-02 08:01:08 -040088 TMessageType::Call | TMessageType::OneWay => {
89 let identifier = TMessageIdentifier {
90 name: format!("{}:{}", self.service_name, identifier.name),
91 ..*identifier
92 };
93 self.inner.write_message_begin(&identifier)
94 }
95 _ => self.inner.write_message_begin(identifier),
96 }
97 }
98
Allen Georgeb0d14132020-03-29 11:48:55 -040099 fn write_message_end(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400100 self.inner.write_message_end()
101 }
102
Allen Georgeb0d14132020-03-29 11:48:55 -0400103 fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400104 self.inner.write_struct_begin(identifier)
105 }
106
Allen Georgeb0d14132020-03-29 11:48:55 -0400107 fn write_struct_end(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400108 self.inner.write_struct_end()
109 }
110
Allen Georgeb0d14132020-03-29 11:48:55 -0400111 fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400112 self.inner.write_field_begin(identifier)
113 }
114
Allen Georgeb0d14132020-03-29 11:48:55 -0400115 fn write_field_end(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400116 self.inner.write_field_end()
117 }
118
Allen Georgeb0d14132020-03-29 11:48:55 -0400119 fn write_field_stop(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400120 self.inner.write_field_stop()
121 }
122
Allen Georgeb0d14132020-03-29 11:48:55 -0400123 fn write_bytes(&mut self, b: &[u8]) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400124 self.inner.write_bytes(b)
125 }
126
Allen Georgeb0d14132020-03-29 11:48:55 -0400127 fn write_bool(&mut self, b: bool) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400128 self.inner.write_bool(b)
129 }
130
Allen Georgeb0d14132020-03-29 11:48:55 -0400131 fn write_i8(&mut self, i: i8) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400132 self.inner.write_i8(i)
133 }
134
Allen Georgeb0d14132020-03-29 11:48:55 -0400135 fn write_i16(&mut self, i: i16) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400136 self.inner.write_i16(i)
137 }
138
Allen Georgeb0d14132020-03-29 11:48:55 -0400139 fn write_i32(&mut self, i: i32) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400140 self.inner.write_i32(i)
141 }
142
Allen Georgeb0d14132020-03-29 11:48:55 -0400143 fn write_i64(&mut self, i: i64) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400144 self.inner.write_i64(i)
145 }
146
Allen Georgeb0d14132020-03-29 11:48:55 -0400147 fn write_double(&mut self, d: f64) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400148 self.inner.write_double(d)
149 }
150
Allen Georgeb0d14132020-03-29 11:48:55 -0400151 fn write_string(&mut self, s: &str) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400152 self.inner.write_string(s)
153 }
154
Allen Georgeb0d14132020-03-29 11:48:55 -0400155 fn write_list_begin(&mut self, identifier: &TListIdentifier) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400156 self.inner.write_list_begin(identifier)
157 }
158
Allen Georgeb0d14132020-03-29 11:48:55 -0400159 fn write_list_end(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400160 self.inner.write_list_end()
161 }
162
Allen Georgeb0d14132020-03-29 11:48:55 -0400163 fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400164 self.inner.write_set_begin(identifier)
165 }
166
Allen Georgeb0d14132020-03-29 11:48:55 -0400167 fn write_set_end(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400168 self.inner.write_set_end()
169 }
170
Allen Georgeb0d14132020-03-29 11:48:55 -0400171 fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400172 self.inner.write_map_begin(identifier)
173 }
174
Allen Georgeb0d14132020-03-29 11:48:55 -0400175 fn write_map_end(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400176 self.inner.write_map_end()
177 }
178
Allen Georgeb0d14132020-03-29 11:48:55 -0400179 fn flush(&mut self) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400180 self.inner.flush()
181 }
182
183 // utility
184 //
185
Allen Georgeb0d14132020-03-29 11:48:55 -0400186 fn write_byte(&mut self, b: u8) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -0400187 self.inner.write_byte(b)
188 }
189}
190
191#[cfg(test)]
192mod tests {
193
Allen George55c3e4c2021-03-01 23:19:52 -0500194 use crate::protocol::{
195 TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol,
196 };
Allen Georgeb0d14132020-03-29 11:48:55 -0400197 use crate::transport::{TBufferChannel, TIoChannel, WriteHalf};
Allen George8b96bfb2016-11-02 08:01:08 -0400198
199 use super::*;
200
201 #[test]
202 fn must_write_message_begin_with_prefixed_service_name() {
Allen George0e22c362017-01-30 07:15:00 -0500203 let mut o_prot = test_objects();
Allen George8b96bfb2016-11-02 08:01:08 -0400204
205 let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
206 assert_success!(o_prot.write_message_begin(&ident));
207
Allen George7ddbcc02020-11-08 09:51:19 -0500208 #[rustfmt::skip]
Allen George0e22c362017-01-30 07:15:00 -0500209 let expected: [u8; 19] = [
210 0x80,
211 0x01, /* protocol identifier */
212 0x00,
213 0x01, /* message type */
214 0x00,
215 0x00,
216 0x00,
217 0x07,
218 0x66,
219 0x6F,
220 0x6F, /* "foo" */
221 0x3A, /* ":" */
222 0x62,
223 0x61,
224 0x72, /* "bar" */
225 0x00,
226 0x00,
227 0x00,
228 0x02 /* sequence number */,
229 ];
Allen George8b96bfb2016-11-02 08:01:08 -0400230
Allen George0e22c362017-01-30 07:15:00 -0500231 assert_eq!(o_prot.inner.transport.write_bytes(), expected);
Allen George8b96bfb2016-11-02 08:01:08 -0400232 }
233
Allen Georgeef7a1892018-12-16 18:01:37 -0500234 fn test_objects() -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>
Allen George0e22c362017-01-30 07:15:00 -0500235 {
236 let c = TBufferChannel::with_capacity(40, 40);
237 let (_, w_chan) = c.split().unwrap();
238 let prot = TBinaryOutputProtocol::new(w_chan, true);
239 TMultiplexedOutputProtocol::new("foo", prot)
Allen George8b96bfb2016-11-02 08:01:08 -0400240 }
241}