blob: ba77d946446dd3e8077eab64ead7d596ee0f6285 [file] [log] [blame]
Jens Geyer8a701962013-03-25 01:28:12 +02001(*
Jens Geyerd5436f52014-10-03 19:50:38 +02002 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
Jens Geyer8a701962013-03-25 01:28:12 +020020unit Thrift.Processor.Multiplex;
21
22
23interface
24
25uses
26 SysUtils,
27 Generics.Collections,
28 Thrift,
29 Thrift.Protocol,
30 Thrift.Protocol.Multiplex;
31
32{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
33 To do so, you instantiate the processor and then register additional processors with it,
34 as shown in the following example:
35
36
37 TMultiplexedProcessor processor = new TMultiplexedProcessor();
38
39 processor.registerProcessor(
40 "Calculator",
41 new Calculator.Processor(new CalculatorHandler()));
42
43 processor.registerProcessor(
44 "WeatherReport",
45 new WeatherReport.Processor(new WeatherReportHandler()));
46
47 TServerTransport t = new TServerSocket(9090);
48 TSimpleServer server = new TSimpleServer(processor, t);
49
50 server.serve();
51}
52
53
54type
55 IMultiplexedProcessor = interface( IProcessor)
Jens Geyer33d30fc2018-02-08 18:18:05 +010056 ['{807F9D19-6CF4-4789-840E-93E87A12EB63}']
Jens Geyer8a701962013-03-25 01:28:12 +020057 // Register a service with this TMultiplexedProcessor. This allows us
58 // to broker requests to individual services by using the service name
59 // to select them at request time.
Jens Geyer33d30fc2018-02-08 18:18:05 +010060 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE);
Jens Geyer8a701962013-03-25 01:28:12 +020061 end;
62
63
64 TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
Jens Geyerfad7fd32019-11-09 23:24:52 +010065 strict private type
Jens Geyer8a701962013-03-25 01:28:12 +020066 // Our goal was to work with any protocol. In order to do that, we needed
67 // to allow them to call readMessageBegin() and get a TMessage in exactly
68 // the standard format, without the service name prepended to TMessage.name.
69 TStoredMessageProtocol = class( TProtocolDecorator)
Jens Geyerfad7fd32019-11-09 23:24:52 +010070 strict private
Jens Geyer17c3ad92017-09-05 20:31:27 +020071 FMessageBegin : TThriftMessage;
Jens Geyer8a701962013-03-25 01:28:12 +020072 public
Jens Geyer17c3ad92017-09-05 20:31:27 +020073 constructor Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage);
74 function ReadMessageBegin: TThriftMessage; override;
Jens Geyer8a701962013-03-25 01:28:12 +020075 end;
76
Jens Geyerfad7fd32019-11-09 23:24:52 +010077 strict private
Jens Geyer8a701962013-03-25 01:28:12 +020078 FServiceProcessorMap : TDictionary<String, IProcessor>;
Jens Geyer33d30fc2018-02-08 18:18:05 +010079 FDefaultProcessor : IProcessor;
Jens Geyer8a701962013-03-25 01:28:12 +020080
Jens Geyer17c3ad92017-09-05 20:31:27 +020081 procedure Error( const oprot : IProtocol; const msg : TThriftMessage;
Jens Geyere0e32402016-04-20 21:50:48 +020082 extype : TApplicationExceptionSpecializedClass; const etxt : string);
Jens Geyer2d2b3b22013-05-13 22:03:08 +020083
Jens Geyer8a701962013-03-25 01:28:12 +020084 public
85 constructor Create;
86 destructor Destroy; override;
87
88 // Register a service with this TMultiplexedProcessorImpl. This allows us
89 // to broker requests to individual services by using the service name
90 // to select them at request time.
Jens Geyer33d30fc2018-02-08 18:18:05 +010091 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE);
Jens Geyer8a701962013-03-25 01:28:12 +020092
93 { This implementation of process performs the following steps:
94 - Read the beginning of the message.
95 - Extract the service name from the message.
96 - Using the service name to locate the appropriate processor.
97 - Dispatch to the processor, with a decorated instance of TProtocol
98 that allows readMessageBegin() to return the original TMessage.
99
100 An exception is thrown if the message type is not CALL or ONEWAY
101 or if the service is unknown (or not properly registered).
102 }
Jens Geyerd960e6e2013-12-19 22:06:30 +0100103 function Process( const iprot, oprot: IProtocol; const events : IProcessorEvents = nil): Boolean;
Jens Geyer8a701962013-03-25 01:28:12 +0200104 end;
105
106
107implementation
108
Jens Geyer17c3ad92017-09-05 20:31:27 +0200109constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage);
Jens Geyer8a701962013-03-25 01:28:12 +0200110begin
111 inherited Create( protocol);
112 FMessageBegin := aMsgBegin;
113end;
114
115
Jens Geyer8a701962013-03-25 01:28:12 +0200116constructor TMultiplexedProcessorImpl.Create;
117begin
118 inherited Create;
119 FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
120end;
121
122
123destructor TMultiplexedProcessorImpl.Destroy;
124begin
125 try
126 FreeAndNil( FServiceProcessorMap);
127 finally
128 inherited Destroy;
129 end;
130end;
131
132
Jens Geyerfad7fd32019-11-09 23:24:52 +0100133function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage;
134begin
Jens Geyer41f47af2019-11-09 23:24:52 +0100135 Reset;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100136 result := FMessageBegin;
137end;
138
139
Jens Geyer33d30fc2018-02-08 18:18:05 +0100140procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean);
Jens Geyer8a701962013-03-25 01:28:12 +0200141begin
142 FServiceProcessorMap.Add( serviceName, processor);
Jens Geyer33d30fc2018-02-08 18:18:05 +0100143
144 if asDefault then begin
145 if FDefaultProcessor = nil
146 then FDefaultProcessor := processor
147 else raise TApplicationExceptionInternalError.Create('Only one default service allowed');
148 end;
Jens Geyer8a701962013-03-25 01:28:12 +0200149end;
150
151
Jens Geyer17c3ad92017-09-05 20:31:27 +0200152procedure TMultiplexedProcessorImpl.Error( const oprot : IProtocol; const msg : TThriftMessage;
Jens Geyere0e32402016-04-20 21:50:48 +0200153 extype : TApplicationExceptionSpecializedClass;
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200154 const etxt : string);
155var appex : TApplicationException;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200156 newMsg : TThriftMessage;
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200157begin
Jens Geyere0e32402016-04-20 21:50:48 +0200158 appex := extype.Create(etxt);
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200159 try
Jens Geyer17c3ad92017-09-05 20:31:27 +0200160 Init( newMsg, msg.Name, TMessageType.Exception, msg.SeqID);
Jens Geyerd5436f52014-10-03 19:50:38 +0200161
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200162 oprot.WriteMessageBegin(newMsg);
163 appex.Write(oprot);
164 oprot.WriteMessageEnd();
165 oprot.Transport.Flush();
166
167 finally
Jens Geyerd5436f52014-10-03 19:50:38 +0200168 appex.Free;
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200169 end;
170end;
171
172
Jens Geyerd960e6e2013-12-19 22:06:30 +0100173function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol; const events : IProcessorEvents = nil): Boolean;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200174var msg, newMsg : TThriftMessage;
Jens Geyer8a701962013-03-25 01:28:12 +0200175 idx : Integer;
176 sService : string;
177 processor : IProcessor;
178 protocol : IProtocol;
179const
180 ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"';
181 ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
182 ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor';
183begin
184 // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
185 // This pulls the message "off the wire", which we'll deal with at the end of this method.
186 msg := iprot.readMessageBegin();
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200187 if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) then begin
188 Error( oprot, msg,
Jens Geyere0e32402016-04-20 21:50:48 +0200189 TApplicationExceptionInvalidMessageType,
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200190 ERROR_INVALID_MSGTYPE);
Jens Geyerd5436f52014-10-03 19:50:38 +0200191 Exit( FALSE);
192 end;
193
Jens Geyer8a701962013-03-25 01:28:12 +0200194 // Extract the service name
Jens Geyer33d30fc2018-02-08 18:18:05 +0100195 // use FDefaultProcessor as fallback if there is no separator
Jens Geyer8a701962013-03-25 01:28:12 +0200196 idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
Jens Geyer33d30fc2018-02-08 18:18:05 +0100197 if idx > 0 then begin
198
199 // Create a new TMessage, something that can be consumed by any TProtocol
200 sService := Copy( msg.Name, 1, idx-1);
201 if not FServiceProcessorMap.TryGetValue( sService, processor)
202 then begin
203 Error( oprot, msg,
204 TApplicationExceptionInternalError,
205 Format(ERROR_UNKNOWN_SERVICE,[sService]));
206 Exit( FALSE);
207 end;
208
209 // Create a new TMessage, removing the service name
210 Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
211 Init( newMsg, Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
212
213 end
214 else if FDefaultProcessor <> nil then begin
215 processor := FDefaultProcessor;
216 newMsg := msg; // no need to change
217
218 end
219 else begin
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200220 Error( oprot, msg,
Jens Geyere0e32402016-04-20 21:50:48 +0200221 TApplicationExceptionInvalidProtocol,
Jens Geyer2d2b3b22013-05-13 22:03:08 +0200222 Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
223 Exit( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +0200224 end;
Jens Geyer8a701962013-03-25 01:28:12 +0200225
Jens Geyer8a701962013-03-25 01:28:12 +0200226 // Dispatch processing to the stored processor
227 protocol := TStoredMessageProtocol.Create( iprot, newMsg);
Jens Geyerd960e6e2013-12-19 22:06:30 +0100228 result := processor.process( protocol, oprot, events);
Jens Geyer8a701962013-03-25 01:28:12 +0200229end;
230
231
232end.