blob: 4cd80ba8e38691cb508002705b3518f46f56c8e1 [file] [log] [blame]
Jens Geyer8a701962013-03-25 01:28:12 +02001(*
Jens Geyerd5436f52014-10-03 19:50:38 +02002 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
Jens Geyer8a701962013-03-25 01:28:12 +020020unit Thrift.Processor.Multiplex;
21
22
23interface
24
25uses
26 SysUtils,
27 Generics.Collections,
28 Thrift,
29 Thrift.Protocol,
30 Thrift.Protocol.Multiplex;
31
32{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
33 To do so, you instantiate the processor and then register additional processors with it,
34 as shown in the following example:
35
36
37 TMultiplexedProcessor processor = new TMultiplexedProcessor();
38
39 processor.registerProcessor(
40 "Calculator",
41 new Calculator.Processor(new CalculatorHandler()));
42
43 processor.registerProcessor(
44 "WeatherReport",
45 new WeatherReport.Processor(new WeatherReportHandler()));
46
47 TServerTransport t = new TServerSocket(9090);
48 TSimpleServer server = new TSimpleServer(processor, t);
49
50 server.serve();
51}
52
53
54type
55 IMultiplexedProcessor = interface( IProcessor)
Jens Geyerd5436f52014-10-03 19:50:38 +020056 ['{810FF32D-22A2-4D58-B129-B0590703ECEC}']
Jens Geyer8a701962013-03-25 01:28:12 +020057 // Register a service with this TMultiplexedProcessor. This allows us
58 // to broker requests to individual services by using the service name
59 // to select them at request time.
60 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
61 end;
62
63
64 TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
65 private type
66 // Our goal was to work with any protocol. In order to do that, we needed
67 // to allow them to call readMessageBegin() and get a TMessage in exactly
68 // the standard format, without the service name prepended to TMessage.name.
69 TStoredMessageProtocol = class( TProtocolDecorator)
70 private
Jens Geyer17c3ad92017-09-05 20:31:27 +020071 FMessageBegin : TThriftMessage;
Jens Geyer8a701962013-03-25 01:28:12 +020072 public
Jens Geyer17c3ad92017-09-05 20:31:27 +020073 constructor Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage);
74 function ReadMessageBegin: TThriftMessage; override;
Jens Geyer8a701962013-03-25 01:28:12 +020075 end;
76
77 private
78 FServiceProcessorMap : TDictionary<String, IProcessor>;
79
Jens Geyer17c3ad92017-09-05 20:31:27 +020080 procedure Error( const oprot : IProtocol; const msg : TThriftMessage;
Jens Geyere0e32402016-04-20 21:50:48 +020081 extype : TApplicationExceptionSpecializedClass; const etxt : string);
Jens Geyer2d2b3b22013-05-13 22:03:08 +020082
Jens Geyer8a701962013-03-25 01:28:12 +020083 public
84 constructor Create;
85 destructor Destroy; override;
86
87 // Register a service with this TMultiplexedProcessorImpl. This allows us
88 // to broker requests to individual services by using the service name
89 // to select them at request time.
90 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
91
92 { This implementation of process performs the following steps:
93 - Read the beginning of the message.
94 - Extract the service name from the message.
95 - Using the service name to locate the appropriate processor.
96 - Dispatch to the processor, with a decorated instance of TProtocol
97 that allows readMessageBegin() to return the original TMessage.
98
99 An exception is thrown if the message type is not CALL or ONEWAY
100 or if the service is unknown (or not properly registered).
101 }
Jens Geyerd960e6e2013-12-19 22:06:30 +0100102 function Process( const iprot, oprot: IProtocol; const events : IProcessorEvents = nil): Boolean;
Jens Geyer8a701962013-03-25 01:28:12 +0200103 end;
104
105
106implementation
107
Jens Geyer17c3ad92017-09-05 20:31:27 +0200108constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage);
Jens Geyer8a701962013-03-25 01:28:12 +0200109begin
110 inherited Create( protocol);
111 FMessageBegin := aMsgBegin;
112end;
113
114
Jens Geyer17c3ad92017-09-05 20:31:27 +0200115function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage;
Jens Geyer8a701962013-03-25 01:28:12 +0200116begin
117 result := FMessageBegin;
118end;
119
120
121constructor TMultiplexedProcessorImpl.Create;
122begin
123 inherited Create;
124 FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
125end;
126
127
128destructor TMultiplexedProcessorImpl.Destroy;
129begin
130 try
131 FreeAndNil( FServiceProcessorMap);
132 finally
133 inherited Destroy;
134 end;
135end;
136
137
138procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor);
139begin
140 FServiceProcessorMap.Add( serviceName, processor);
141end;
142
143
Jens Geyer17c3ad92017-09-05 20:31:27 +0200144procedure TMultiplexedProcessorImpl.Error( const oprot : IProtocol; const msg : TThriftMessage;
Jens Geyere0e32402016-04-20 21:50:48 +0200145 extype : TApplicationExceptionSpecializedClass;
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200146 const etxt : string);
147var appex : TApplicationException;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200148 newMsg : TThriftMessage;
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200149begin
Jens Geyere0e32402016-04-20 21:50:48 +0200150 appex := extype.Create(etxt);
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200151 try
Jens Geyer17c3ad92017-09-05 20:31:27 +0200152 Init( newMsg, msg.Name, TMessageType.Exception, msg.SeqID);
Jens Geyerd5436f52014-10-03 19:50:38 +0200153
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200154 oprot.WriteMessageBegin(newMsg);
155 appex.Write(oprot);
156 oprot.WriteMessageEnd();
157 oprot.Transport.Flush();
158
159 finally
Jens Geyerd5436f52014-10-03 19:50:38 +0200160 appex.Free;
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200161 end;
162end;
163
164
Jens Geyerd960e6e2013-12-19 22:06:30 +0100165function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol; const events : IProcessorEvents = nil): Boolean;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200166var msg, newMsg : TThriftMessage;
Jens Geyer8a701962013-03-25 01:28:12 +0200167 idx : Integer;
168 sService : string;
169 processor : IProcessor;
170 protocol : IProtocol;
171const
172 ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"';
173 ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
174 ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor';
175begin
176 // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
177 // This pulls the message "off the wire", which we'll deal with at the end of this method.
178 msg := iprot.readMessageBegin();
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200179 if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) then begin
180 Error( oprot, msg,
Jens Geyere0e32402016-04-20 21:50:48 +0200181 TApplicationExceptionInvalidMessageType,
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200182 ERROR_INVALID_MSGTYPE);
Jens Geyerd5436f52014-10-03 19:50:38 +0200183 Exit( FALSE);
184 end;
185
Jens Geyer8a701962013-03-25 01:28:12 +0200186 // Extract the service name
187 idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200188 if idx < 1 then begin
189 Error( oprot, msg,
Jens Geyere0e32402016-04-20 21:50:48 +0200190 TApplicationExceptionInvalidProtocol,
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200191 Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
192 Exit( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +0200193 end;
Jens Geyer8a701962013-03-25 01:28:12 +0200194
195 // Create a new TMessage, something that can be consumed by any TProtocol
196 sService := Copy( msg.Name, 1, idx-1);
197 if not FServiceProcessorMap.TryGetValue( sService, processor)
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200198 then begin
199 Error( oprot, msg,
Jens Geyere0e32402016-04-20 21:50:48 +0200200 TApplicationExceptionInternalError,
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200201 Format(ERROR_UNKNOWN_SERVICE,[sService]));
202 Exit( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +0200203 end;
Jens Geyer8a701962013-03-25 01:28:12 +0200204
205 // Create a new TMessage, removing the service name
206 Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
Jens Geyer17c3ad92017-09-05 20:31:27 +0200207 Init( newMsg, Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
Jens Geyer8a701962013-03-25 01:28:12 +0200208
209 // Dispatch processing to the stored processor
210 protocol := TStoredMessageProtocol.Create( iprot, newMsg);
Jens Geyerd960e6e2013-12-19 22:06:30 +0100211 result := processor.process( protocol, oprot, events);
Jens Geyer8a701962013-03-25 01:28:12 +0200212end;
213
214
215end.
216