Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 1 | (* |
Jens Geyer | d5436f5 | 2014-10-03 19:50:38 +0200 | [diff] [blame] | 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | *) |
| 19 | |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 20 | unit Thrift.Processor.Multiplex; |
| 21 | |
| 22 | |
| 23 | interface |
| 24 | |
| 25 | uses |
| 26 | SysUtils, |
| 27 | Generics.Collections, |
| 28 | Thrift, |
| 29 | Thrift.Protocol, |
| 30 | Thrift.Protocol.Multiplex; |
| 31 | |
| 32 | { TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services. |
| 33 | To do so, you instantiate the processor and then register additional processors with it, |
| 34 | as shown in the following example: |
| 35 | |
| 36 | |
| 37 | TMultiplexedProcessor processor = new TMultiplexedProcessor(); |
| 38 | |
| 39 | processor.registerProcessor( |
| 40 | "Calculator", |
| 41 | new Calculator.Processor(new CalculatorHandler())); |
| 42 | |
| 43 | processor.registerProcessor( |
| 44 | "WeatherReport", |
| 45 | new WeatherReport.Processor(new WeatherReportHandler())); |
| 46 | |
| 47 | TServerTransport t = new TServerSocket(9090); |
| 48 | TSimpleServer server = new TSimpleServer(processor, t); |
| 49 | |
| 50 | server.serve(); |
| 51 | } |
| 52 | |
| 53 | |
| 54 | type |
| 55 | IMultiplexedProcessor = interface( IProcessor) |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 56 | ['{807F9D19-6CF4-4789-840E-93E87A12EB63}'] |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 57 | // Register a service with this TMultiplexedProcessor. This allows us |
| 58 | // to broker requests to individual services by using the service name |
| 59 | // to select them at request time. |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 60 | procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE); |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 61 | end; |
| 62 | |
| 63 | |
| 64 | TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor) |
Jens Geyer | fad7fd3 | 2019-11-09 23:24:52 +0100 | [diff] [blame] | 65 | strict private type |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 66 | // Our goal was to work with any protocol. In order to do that, we needed |
| 67 | // to allow them to call readMessageBegin() and get a TMessage in exactly |
| 68 | // the standard format, without the service name prepended to TMessage.name. |
| 69 | TStoredMessageProtocol = class( TProtocolDecorator) |
Jens Geyer | fad7fd3 | 2019-11-09 23:24:52 +0100 | [diff] [blame] | 70 | strict private |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 71 | FMessageBegin : TThriftMessage; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 72 | public |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 73 | constructor Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage); |
| 74 | function ReadMessageBegin: TThriftMessage; override; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 75 | end; |
| 76 | |
Jens Geyer | fad7fd3 | 2019-11-09 23:24:52 +0100 | [diff] [blame] | 77 | strict private |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 78 | FServiceProcessorMap : TDictionary<String, IProcessor>; |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 79 | FDefaultProcessor : IProcessor; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 80 | |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 81 | procedure Error( const oprot : IProtocol; const msg : TThriftMessage; |
Jens Geyer | e0e3240 | 2016-04-20 21:50:48 +0200 | [diff] [blame] | 82 | extype : TApplicationExceptionSpecializedClass; const etxt : string); |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 83 | |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 84 | public |
| 85 | constructor Create; |
| 86 | destructor Destroy; override; |
| 87 | |
| 88 | // Register a service with this TMultiplexedProcessorImpl. This allows us |
| 89 | // to broker requests to individual services by using the service name |
| 90 | // to select them at request time. |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 91 | procedure RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean = FALSE); |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 92 | |
| 93 | { This implementation of process performs the following steps: |
| 94 | - Read the beginning of the message. |
| 95 | - Extract the service name from the message. |
| 96 | - Using the service name to locate the appropriate processor. |
| 97 | - Dispatch to the processor, with a decorated instance of TProtocol |
| 98 | that allows readMessageBegin() to return the original TMessage. |
| 99 | |
| 100 | An exception is thrown if the message type is not CALL or ONEWAY |
| 101 | or if the service is unknown (or not properly registered). |
| 102 | } |
Jens Geyer | d960e6e | 2013-12-19 22:06:30 +0100 | [diff] [blame] | 103 | function Process( const iprot, oprot: IProtocol; const events : IProcessorEvents = nil): Boolean; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 104 | end; |
| 105 | |
| 106 | |
| 107 | implementation |
| 108 | |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 109 | constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : TThriftMessage); |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 110 | begin |
| 111 | inherited Create( protocol); |
| 112 | FMessageBegin := aMsgBegin; |
| 113 | end; |
| 114 | |
| 115 | |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 116 | constructor TMultiplexedProcessorImpl.Create; |
| 117 | begin |
| 118 | inherited Create; |
| 119 | FServiceProcessorMap := TDictionary<string,IProcessor>.Create; |
| 120 | end; |
| 121 | |
| 122 | |
| 123 | destructor TMultiplexedProcessorImpl.Destroy; |
| 124 | begin |
| 125 | try |
| 126 | FreeAndNil( FServiceProcessorMap); |
| 127 | finally |
| 128 | inherited Destroy; |
| 129 | end; |
| 130 | end; |
| 131 | |
| 132 | |
Jens Geyer | fad7fd3 | 2019-11-09 23:24:52 +0100 | [diff] [blame] | 133 | function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: TThriftMessage; |
| 134 | begin |
Jens Geyer | 41f47af | 2019-11-09 23:24:52 +0100 | [diff] [blame] | 135 | Reset; |
Jens Geyer | fad7fd3 | 2019-11-09 23:24:52 +0100 | [diff] [blame] | 136 | result := FMessageBegin; |
| 137 | end; |
| 138 | |
| 139 | |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 140 | procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor; const asDefault : Boolean); |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 141 | begin |
| 142 | FServiceProcessorMap.Add( serviceName, processor); |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 143 | |
| 144 | if asDefault then begin |
| 145 | if FDefaultProcessor = nil |
| 146 | then FDefaultProcessor := processor |
| 147 | else raise TApplicationExceptionInternalError.Create('Only one default service allowed'); |
| 148 | end; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 149 | end; |
| 150 | |
| 151 | |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 152 | procedure TMultiplexedProcessorImpl.Error( const oprot : IProtocol; const msg : TThriftMessage; |
Jens Geyer | e0e3240 | 2016-04-20 21:50:48 +0200 | [diff] [blame] | 153 | extype : TApplicationExceptionSpecializedClass; |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 154 | const etxt : string); |
| 155 | var appex : TApplicationException; |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 156 | newMsg : TThriftMessage; |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 157 | begin |
Jens Geyer | e0e3240 | 2016-04-20 21:50:48 +0200 | [diff] [blame] | 158 | appex := extype.Create(etxt); |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 159 | try |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 160 | Init( newMsg, msg.Name, TMessageType.Exception, msg.SeqID); |
Jens Geyer | d5436f5 | 2014-10-03 19:50:38 +0200 | [diff] [blame] | 161 | |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 162 | oprot.WriteMessageBegin(newMsg); |
| 163 | appex.Write(oprot); |
| 164 | oprot.WriteMessageEnd(); |
| 165 | oprot.Transport.Flush(); |
| 166 | |
| 167 | finally |
Jens Geyer | d5436f5 | 2014-10-03 19:50:38 +0200 | [diff] [blame] | 168 | appex.Free; |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 169 | end; |
| 170 | end; |
| 171 | |
| 172 | |
Jens Geyer | d960e6e | 2013-12-19 22:06:30 +0100 | [diff] [blame] | 173 | function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol; const events : IProcessorEvents = nil): Boolean; |
Jens Geyer | 17c3ad9 | 2017-09-05 20:31:27 +0200 | [diff] [blame] | 174 | var msg, newMsg : TThriftMessage; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 175 | idx : Integer; |
| 176 | sService : string; |
| 177 | processor : IProcessor; |
| 178 | protocol : IProtocol; |
| 179 | const |
| 180 | ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"'; |
| 181 | ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.'; |
| 182 | ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor'; |
| 183 | begin |
| 184 | // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header. |
| 185 | // This pulls the message "off the wire", which we'll deal with at the end of this method. |
| 186 | msg := iprot.readMessageBegin(); |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 187 | if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) then begin |
| 188 | Error( oprot, msg, |
Jens Geyer | e0e3240 | 2016-04-20 21:50:48 +0200 | [diff] [blame] | 189 | TApplicationExceptionInvalidMessageType, |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 190 | ERROR_INVALID_MSGTYPE); |
Jens Geyer | d5436f5 | 2014-10-03 19:50:38 +0200 | [diff] [blame] | 191 | Exit( FALSE); |
| 192 | end; |
| 193 | |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 194 | // Extract the service name |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 195 | // use FDefaultProcessor as fallback if there is no separator |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 196 | idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name); |
Jens Geyer | 33d30fc | 2018-02-08 18:18:05 +0100 | [diff] [blame] | 197 | if idx > 0 then begin |
| 198 | |
| 199 | // Create a new TMessage, something that can be consumed by any TProtocol |
| 200 | sService := Copy( msg.Name, 1, idx-1); |
| 201 | if not FServiceProcessorMap.TryGetValue( sService, processor) |
| 202 | then begin |
| 203 | Error( oprot, msg, |
| 204 | TApplicationExceptionInternalError, |
| 205 | Format(ERROR_UNKNOWN_SERVICE,[sService])); |
| 206 | Exit( FALSE); |
| 207 | end; |
| 208 | |
| 209 | // Create a new TMessage, removing the service name |
| 210 | Inc( idx, Length(TMultiplexedProtocol.SEPARATOR)); |
| 211 | Init( newMsg, Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID); |
| 212 | |
| 213 | end |
| 214 | else if FDefaultProcessor <> nil then begin |
| 215 | processor := FDefaultProcessor; |
| 216 | newMsg := msg; // no need to change |
| 217 | |
| 218 | end |
| 219 | else begin |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 220 | Error( oprot, msg, |
Jens Geyer | e0e3240 | 2016-04-20 21:50:48 +0200 | [diff] [blame] | 221 | TApplicationExceptionInvalidProtocol, |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 222 | Format(ERROR_INCOMPATIBLE_PROT,[msg.Name])); |
| 223 | Exit( FALSE); |
Jens Geyer | d5436f5 | 2014-10-03 19:50:38 +0200 | [diff] [blame] | 224 | end; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 225 | |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 226 | // Dispatch processing to the stored processor |
| 227 | protocol := TStoredMessageProtocol.Create( iprot, newMsg); |
Jens Geyer | d960e6e | 2013-12-19 22:06:30 +0100 | [diff] [blame] | 228 | result := processor.process( protocol, oprot, events); |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 229 | end; |
| 230 | |
| 231 | |
| 232 | end. |