blob: b771d437960bc9c736767af4036a72e5894d7bdc [file] [log] [blame]
Jens Geyer8a701962013-03-25 01:28:12 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20unit Thrift.Processor.Multiplex;
21
22
23interface
24
25uses
26 SysUtils,
27 Generics.Collections,
28 Thrift,
29 Thrift.Protocol,
30 Thrift.Protocol.Multiplex;
31
32{ TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
33 To do so, you instantiate the processor and then register additional processors with it,
34 as shown in the following example:
35
36
37 TMultiplexedProcessor processor = new TMultiplexedProcessor();
38
39 processor.registerProcessor(
40 "Calculator",
41 new Calculator.Processor(new CalculatorHandler()));
42
43 processor.registerProcessor(
44 "WeatherReport",
45 new WeatherReport.Processor(new WeatherReportHandler()));
46
47 TServerTransport t = new TServerSocket(9090);
48 TSimpleServer server = new TSimpleServer(processor, t);
49
50 server.serve();
51}
52
53
54type
55 IMultiplexedProcessor = interface( IProcessor)
56 ['{810FF32D-22A2-4D58-B129-B0590703ECEC}']
57 // Register a service with this TMultiplexedProcessor. This allows us
58 // to broker requests to individual services by using the service name
59 // to select them at request time.
60 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
61 end;
62
63
64 TMultiplexedProcessorImpl = class( TInterfacedObject, IMultiplexedProcessor, IProcessor)
65 private type
66 // Our goal was to work with any protocol. In order to do that, we needed
67 // to allow them to call readMessageBegin() and get a TMessage in exactly
68 // the standard format, without the service name prepended to TMessage.name.
69 TStoredMessageProtocol = class( TProtocolDecorator)
70 private
71 FMessageBegin : IMessage;
72 public
73 constructor Create( const protocol : IProtocol; const aMsgBegin : IMessage);
74 function ReadMessageBegin: IMessage; override;
75 end;
76
77 private
78 FServiceProcessorMap : TDictionary<String, IProcessor>;
79
80 public
81 constructor Create;
82 destructor Destroy; override;
83
84 // Register a service with this TMultiplexedProcessorImpl. This allows us
85 // to broker requests to individual services by using the service name
86 // to select them at request time.
87 procedure RegisterProcessor( const serviceName : String; const processor : IProcessor);
88
89 { This implementation of process performs the following steps:
90 - Read the beginning of the message.
91 - Extract the service name from the message.
92 - Using the service name to locate the appropriate processor.
93 - Dispatch to the processor, with a decorated instance of TProtocol
94 that allows readMessageBegin() to return the original TMessage.
95
96 An exception is thrown if the message type is not CALL or ONEWAY
97 or if the service is unknown (or not properly registered).
98 }
99 function Process(const iprot, oprot : IProtocol) : Boolean;
100 end;
101
102
103implementation
104
105constructor TMultiplexedProcessorImpl.TStoredMessageProtocol.Create( const protocol : IProtocol; const aMsgBegin : IMessage);
106begin
107 inherited Create( protocol);
108 FMessageBegin := aMsgBegin;
109end;
110
111
112function TMultiplexedProcessorImpl.TStoredMessageProtocol.ReadMessageBegin: IMessage;
113begin
114 result := FMessageBegin;
115end;
116
117
118constructor TMultiplexedProcessorImpl.Create;
119begin
120 inherited Create;
121 FServiceProcessorMap := TDictionary<string,IProcessor>.Create;
122end;
123
124
125destructor TMultiplexedProcessorImpl.Destroy;
126begin
127 try
128 FreeAndNil( FServiceProcessorMap);
129 finally
130 inherited Destroy;
131 end;
132end;
133
134
135procedure TMultiplexedProcessorImpl.RegisterProcessor( const serviceName : String; const processor : IProcessor);
136begin
137 FServiceProcessorMap.Add( serviceName, processor);
138end;
139
140
141function TMultiplexedProcessorImpl.Process(const iprot, oprot : IProtocol) : Boolean;
142var msg, newMsg : IMessage;
143 idx : Integer;
144 sService : string;
145 processor : IProcessor;
146 protocol : IProtocol;
147const
148 ERROR_INVALID_MSGTYPE = 'Message must be "call" or "oneway"';
149 ERROR_INCOMPATIBLE_PROT = 'No service name found in "%s". Client is expected to use TMultiplexProtocol.';
150 ERROR_UNKNOWN_SERVICE = 'Service "%s" is not registered with MultiplexedProcessor';
151begin
152 // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the message header.
153 // This pulls the message "off the wire", which we'll deal with at the end of this method.
154 msg := iprot.readMessageBegin();
155 if not (msg.Type_ in [TMessageType.Call, TMessageType.Oneway])
156 then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidMessageType,
157 ERROR_INVALID_MSGTYPE);
158
159 // Extract the service name
160 idx := Pos( TMultiplexedProtocol.SEPARATOR, msg.Name);
161 if idx < 1
162 then raise TApplicationException.Create( TApplicationException.TExceptionType.InvalidProtocol,
163 Format(ERROR_INCOMPATIBLE_PROT,[msg.Name]));
164
165 // Create a new TMessage, something that can be consumed by any TProtocol
166 sService := Copy( msg.Name, 1, idx-1);
167 if not FServiceProcessorMap.TryGetValue( sService, processor)
168 then raise TApplicationException.Create( TApplicationException.TExceptionType.InternalError,
169 Format(ERROR_UNKNOWN_SERVICE,[sService]));
170
171 // Create a new TMessage, removing the service name
172 Inc( idx, Length(TMultiplexedProtocol.SEPARATOR));
173 newMsg := TMessageImpl.Create( Copy( msg.Name, idx, MAXINT), msg.Type_, msg.SeqID);
174
175 // Dispatch processing to the stored processor
176 protocol := TStoredMessageProtocol.Create( iprot, newMsg);
177 result := processor.process( protocol, oprot);
178end;
179
180
181end.
182