Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 1 | (* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one
|
| 3 | * or more contributor license agreements. See the NOTICE file
|
| 4 | * distributed with this work for additional information
|
| 5 | * regarding copyright ownership. The ASF licenses this file
|
| 6 | * to you under the Apache License, Version 2.0 (the
|
| 7 | * "License"); you may not use this file except in compliance
|
| 8 | * with the License. You may obtain a copy of the License at
|
| 9 | *
|
| 10 | * http://www.apache.org/licenses/LICENSE-2.0
|
| 11 | *
|
| 12 | * Unless required by applicable law or agreed to in writing,
|
| 13 | * software distributed under the License is distributed on an
|
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
| 15 | * KIND, either express or implied. See the License for the
|
| 16 | * specific language governing permissions and limitations
|
| 17 | * under the License.
|
| 18 | *)
|
| 19 |
|
| 20 | unit Thrift.Processor.Multiplex; |
| 21 | |
| 22 | |
| 23 | interface |
| 24 | |
| 25 | uses |
| 26 | SysUtils, |
| 27 | Generics.Collections, |
| 28 | Thrift, |
| 29 | Thrift.Protocol, |
| 30 | Thrift.Protocol.Multiplex; |
| 31 | |
| 32 | { TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services. |
| 33 | To do so, you instantiate the processor and then register additional processors with it, |
| 34 | as shown in the following example: |
| 35 | |
| 36 | |
| 37 | TMultiplexedProcessor processor = new TMultiplexedProcessor(); |
| 38 | |
| 39 | processor.registerProcessor( |
| 40 | "Calculator", |
| 41 | new Calculator.Processor(new CalculatorHandler())); |
| 42 | |
| 43 | processor.registerProcessor( |
| 44 | "WeatherReport", |
| 45 | new WeatherReport.Processor(new WeatherReportHandler())); |
| 46 | |
| 47 | TServerTransport t = new TServerSocket(9090); |
| 48 | TSimpleServer server = new TSimpleServer(processor, t); |
| 49 | |
| 50 | server.serve(); |
| 51 | } |
| 52 | |
| 53 | |
| 54 | type |
| 55 | IMultiplexedProcessor = interface( IProcessor) |
| 56 | ['{810FF32D-22A2-4D58-B129-B0590703ECEC}']
|
| 57 | // Register a service with this TMultiplexedProcessor. This allows us |
| 58 | // to broker requests to individual services by using the service name |
| 59 | // to select them at request time. |
| 60 | procedure RegisterProcessor( const serviceName : String; const processor : IProcessor); |
| 61 | end; |
| 62 | |
| 63 | |
| 64 | TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor) |
| 65 | private type |
| 66 | // Our goal was to work with any protocol. In order to do that, we needed |
| 67 | // to allow them to call readMessageBegin() and get a TMessage in exactly |
| 68 | // the standard format, without the service name prepended to TMessage.name. |
| 69 | TStoredMessageProtocol = class( TProtocolDecorator) |
| 70 | private |
| 71 | FMessageBegin : IMessage; |
| 72 | public |
| 73 | constructor Create( const protocol : IProtocol; const aMsgBegin : IMessage); |
| 74 | function ReadMessageBegin: IMessage; override; |
| 75 | end; |
| 76 | |
| 77 | private |
| 78 | FServiceProcessorMap : TDictionary<String, IProcessor>; |
| 79 | |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 80 | procedure Error( const oprot : IProtocol; const msg : IMessage; |
| 81 | extype : TApplicationException.TExceptionType; const etxt : string); |
| 82 | |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 83 | public |
| 84 | constructor Create; |
| 85 | destructor Destroy; override; |
| 86 | |
| 87 | // Register a service with this TMultiplexedProcessorImpl. This allows us |
| 88 | // to broker requests to individual services by using the service name |
| 89 | // to select them at request time. |
| 90 | procedure RegisterProcessor( const serviceName : String; const processor : IProcessor); |
| 91 | |
| 92 | { This implementation of process performs the following steps: |
| 93 | - Read the beginning of the message. |
| 94 | - Extract the service name from the message. |
| 95 | - Using the service name to locate the appropriate processor. |
| 96 | - Dispatch to the processor, with a decorated instance of TProtocol |
| 97 | that allows readMessageBegin() to return the original TMessage. |
| 98 | |
| 99 | An exception is thrown if the message type is not CALL or ONEWAY |
| 100 | or if the service is unknown (or not properly registered). |
| 101 | } |
Jens Geyer | d960e6e | 2013-12-19 22:06:30 +0100 | [diff] [blame^] | 102 | function Process( const iprot, oprot: IProtocol; const events : IProcessorEvents = nil): Boolean; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 103 | end; |
| 104 | |
| 105 | |
| 106 | implementation |
| 107 | |
| 108 | constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : IMessage); |
| 109 | begin |
| 110 | inherited Create( protocol); |
| 111 | FMessageBegin := aMsgBegin; |
| 112 | end; |
| 113 | |
| 114 | |
| 115 | function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: IMessage; |
| 116 | begin |
| 117 | result := FMessageBegin; |
| 118 | end; |
| 119 | |
| 120 | |
| 121 | constructor TMultiplexedProcessorImpl.Create; |
| 122 | begin |
| 123 | inherited Create; |
| 124 | FServiceProcessorMap := TDictionary<string,IProcessor>.Create; |
| 125 | end; |
| 126 | |
| 127 | |
| 128 | destructor TMultiplexedProcessorImpl.Destroy; |
| 129 | begin |
| 130 | try |
| 131 | FreeAndNil( FServiceProcessorMap); |
| 132 | finally |
| 133 | inherited Destroy; |
| 134 | end; |
| 135 | end; |
| 136 | |
| 137 | |
| 138 | procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor); |
| 139 | begin |
| 140 | FServiceProcessorMap.Add( serviceName, processor); |
| 141 | end; |
| 142 | |
| 143 | |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 144 | procedure TMultiplexedProcessorImpl.Error( const oprot : IProtocol; const msg : IMessage; |
| 145 | extype : TApplicationException.TExceptionType; |
| 146 | const etxt : string); |
| 147 | var appex : TApplicationException; |
| 148 | newMsg : IMessage; |
| 149 | begin |
| 150 | appex := TApplicationException.Create( extype, etxt); |
| 151 | try |
| 152 | newMsg := TMessageImpl.Create( msg.Name, TMessageType.Exception, msg.SeqID); |
| 153 |
|
| 154 | oprot.WriteMessageBegin(newMsg); |
| 155 | appex.Write(oprot); |
| 156 | oprot.WriteMessageEnd(); |
| 157 | oprot.Transport.Flush(); |
| 158 | |
| 159 | finally |
| 160 | appex.Free;
|
| 161 | end; |
| 162 | end; |
| 163 | |
| 164 | |
Jens Geyer | d960e6e | 2013-12-19 22:06:30 +0100 | [diff] [blame^] | 165 | function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol; const events : IProcessorEvents = nil): Boolean; |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 166 | var msg, newMsg : IMessage; |
| 167 | idx : Integer; |
| 168 | sService : string; |
| 169 | processor : IProcessor; |
| 170 | protocol : IProtocol; |
| 171 | const |
| 172 | ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"'; |
| 173 | ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.'; |
| 174 | ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor'; |
| 175 | begin |
| 176 | // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header. |
| 177 | // This pulls the message "off the wire", which we'll deal with at the end of this method. |
| 178 | msg := iprot.readMessageBegin(); |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 179 | if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway]) then begin |
| 180 | Error( oprot, msg, |
| 181 | TApplicationException.TExceptionType.InvalidMessageType, |
| 182 | ERROR_INVALID_MSGTYPE); |
| 183 | Exit( FALSE);
|
| 184 | end;
|
| 185 |
|
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 186 | // Extract the service name |
| 187 | idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name); |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 188 | if idx < 1 then begin |
| 189 | Error( oprot, msg, |
| 190 | TApplicationException.TExceptionType.InvalidProtocol, |
| 191 | Format(ERROR_INCOMPATIBLE_PROT,[msg.Name])); |
| 192 | Exit( FALSE); |
| 193 | end;
|
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 194 | |
| 195 | // Create a new TMessage, something that can be consumed by any TProtocol |
| 196 | sService := Copy( msg.Name, 1, idx-1); |
| 197 | if not FServiceProcessorMap.TryGetValue( sService, processor) |
Jens Geyer | 2d2b3b2 | 2013-05-13 22:03:08 +0200 | [diff] [blame] | 198 | then begin |
| 199 | Error( oprot, msg, |
| 200 | TApplicationException.TExceptionType.InternalError, |
| 201 | Format(ERROR_UNKNOWN_SERVICE,[sService])); |
| 202 | Exit( FALSE); |
| 203 | end;
|
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 204 | |
| 205 | // Create a new TMessage, removing the service name |
| 206 | Inc( idx, Length(TMultiplexedProtocol.SEPARATOR)); |
| 207 | newMsg := TMessageImpl.Create( Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID); |
| 208 | |
| 209 | // Dispatch processing to the stored processor |
| 210 | protocol := TStoredMessageProtocol.Create( iprot, newMsg); |
Jens Geyer | d960e6e | 2013-12-19 22:06:30 +0100 | [diff] [blame^] | 211 | result := processor.process( protocol, oprot, events); |
Jens Geyer | 8a70196 | 2013-03-25 01:28:12 +0200 | [diff] [blame] | 212 | end; |
| 213 | |
| 214 | |
| 215 | end. |
| 216 | |