blob: b567aefd353323c8a1726ee34a236cac8f9dd27e [file] [log] [blame]
Jake Farrell7ae13e12011-10-18 14:35:26 +00001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
22unit Thrift.Transport;
23
24interface
25
26uses
27 Classes,
28 SysUtils,
Jens Geyer6a7463a2013-03-07 20:40:59 +010029 Math,
Jake Farrell7ae13e12011-10-18 14:35:26 +000030 Sockets,
31 Generics.Collections,
32 Thrift.Collections,
33 Thrift.Utils,
34 Thrift.Stream,
35 ActiveX,
36 msxml;
37
38type
39 ITransport = interface
40 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
41 function GetIsOpen: Boolean;
42 property IsOpen: Boolean read GetIsOpen;
43 function Peek: Boolean;
44 procedure Open;
45 procedure Close;
46 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
47 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
48 procedure Write( const buf: TBytes); overload;
49 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
50 procedure Flush;
51 end;
52
53 TTransportImpl = class( TInterfacedObject, ITransport)
54 protected
55 function GetIsOpen: Boolean; virtual; abstract;
56 property IsOpen: Boolean read GetIsOpen;
Roger Meier3bef8c22012-10-06 06:58:00 +000057 function Peek: Boolean; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +000058 procedure Open(); virtual; abstract;
59 procedure Close(); virtual; abstract;
60 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
61 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
62 procedure Write( const buf: TBytes); overload; virtual;
63 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
64 procedure Flush; virtual;
65 end;
66
67 TTransportException = class( Exception )
68 public
69 type
70 TExceptionType = (
71 Unknown,
72 NotOpen,
73 AlreadyOpen,
74 TimedOut,
75 EndOfFile
76 );
77 private
78 FType : TExceptionType;
79 public
80 constructor Create( AType: TExceptionType); overload;
81 constructor Create( const msg: string); overload;
82 constructor Create( AType: TExceptionType; const msg: string); overload;
83 property Type_: TExceptionType read FType;
84 end;
85
86 IHTTPClient = interface( ITransport )
87 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
88 procedure SetConnectionTimeout(const Value: Integer);
89 function GetConnectionTimeout: Integer;
90 procedure SetReadTimeout(const Value: Integer);
91 function GetReadTimeout: Integer;
92 function GetCustomHeaders: IThriftDictionary<string,string>;
93 procedure SendRequest;
94 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
95 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
96 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
97 end;
98
99 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
100 private
101 FUri : string;
102 FInputStream : IThriftStream;
103 FOutputStream : IThriftStream;
104 FConnectionTimeout : Integer;
105 FReadTimeout : Integer;
106 FCustomHeaders : IThriftDictionary<string,string>;
107
108 function CreateRequest: IXMLHTTPRequest;
109 protected
110 function GetIsOpen: Boolean; override;
111 procedure Open(); override;
112 procedure Close(); override;
113 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
114 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
115 procedure Flush; override;
116
117 procedure SetConnectionTimeout(const Value: Integer);
118 function GetConnectionTimeout: Integer;
119 procedure SetReadTimeout(const Value: Integer);
120 function GetReadTimeout: Integer;
121 function GetCustomHeaders: IThriftDictionary<string,string>;
122 procedure SendRequest;
123 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
124 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
125 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
126 public
127 constructor Create( const AUri: string);
128 destructor Destroy; override;
129 end;
130
131 IServerTransport = interface
Jens Geyer01640402013-09-25 21:12:21 +0200132 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
Jake Farrell7ae13e12011-10-18 14:35:26 +0000133 procedure Listen;
134 procedure Close;
Jens Geyer01640402013-09-25 21:12:21 +0200135 function Accept( const fnAccepting: TProc): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000136 end;
137
138 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
139 protected
Jake Farrell7ae13e12011-10-18 14:35:26 +0000140 procedure Listen; virtual; abstract;
141 procedure Close; virtual; abstract;
Jens Geyer01640402013-09-25 21:12:21 +0200142 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000143 end;
144
145 ITransportFactory = interface
146 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
Roger Meier333bbf32012-01-08 21:51:08 +0000147 function GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000148 end;
149
150 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
Roger Meier333bbf32012-01-08 21:51:08 +0000151 function GetTransport( const ATrans: ITransport): ITransport; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000152 end;
153
154 TTcpSocketStreamImpl = class( TThriftStreamImpl )
155 private
156 FTcpClient : TCustomIpClient;
157 protected
158 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
159 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
160 procedure Open; override;
161 procedure Close; override;
162 procedure Flush; override;
163
164 function IsOpen: Boolean; override;
165 function ToArray: TBytes; override;
166 public
Roger Meier333bbf32012-01-08 21:51:08 +0000167 constructor Create( const ATcpClient: TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000168 end;
169
170 IStreamTransport = interface( ITransport )
171 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
172 function GetInputStream: IThriftStream;
173 function GetOutputStream: IThriftStream;
174 property InputStream : IThriftStream read GetInputStream;
175 property OutputStream : IThriftStream read GetOutputStream;
176 end;
177
178 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
179 protected
180 FInputStream : IThriftStream;
181 FOutputStream : IThriftStream;
182 protected
183 function GetIsOpen: Boolean; override;
184
185 function GetInputStream: IThriftStream;
186 function GetOutputStream: IThriftStream;
187 public
188 property InputStream : IThriftStream read GetInputStream;
189 property OutputStream : IThriftStream read GetOutputStream;
190
191 procedure Open; override;
192 procedure Close; override;
193 procedure Flush; override;
194 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
195 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000196 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000197 destructor Destroy; override;
198 end;
199
200 TBufferedStreamImpl = class( TThriftStreamImpl)
201 private
202 FStream : IThriftStream;
203 FBufSize : Integer;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100204 FReadBuffer : TMemoryStream;
205 FWriteBuffer : TMemoryStream;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000206 protected
207 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
208 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
209 procedure Open; override;
210 procedure Close; override;
211 procedure Flush; override;
212 function IsOpen: Boolean; override;
213 function ToArray: TBytes; override;
214 public
Roger Meier333bbf32012-01-08 21:51:08 +0000215 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000216 destructor Destroy; override;
217 end;
218
219 TServerSocketImpl = class( TServerTransportImpl)
220 private
221 FServer : TTcpServer;
222 FPort : Integer;
223 FClientTimeout : Integer;
224 FUseBufferedSocket : Boolean;
225 FOwnsServer : Boolean;
226 protected
Jens Geyer01640402013-09-25 21:12:21 +0200227 function Accept( const fnAccepting: TProc) : ITransport; override;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000228 public
Roger Meier333bbf32012-01-08 21:51:08 +0000229 constructor Create( const AServer: TTcpServer ); overload;
230 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000231 constructor Create( APort: Integer); overload;
232 constructor Create( APort: Integer; AClientTimeout: Integer); overload;
233 constructor Create( APort: Integer; AClientTimeout: Integer;
234 AUseBufferedSockets: Boolean); overload;
235 destructor Destroy; override;
236 procedure Listen; override;
237 procedure Close; override;
238 end;
239
240 TBufferedTransportImpl = class( TTransportImpl )
241 private
242 FInputBuffer : IThriftStream;
243 FOutputBuffer : IThriftStream;
244 FTransport : IStreamTransport;
245 FBufSize : Integer;
246
247 procedure InitBuffers;
248 function GetUnderlyingTransport: ITransport;
249 protected
250 function GetIsOpen: Boolean; override;
251 procedure Flush; override;
252 public
253 procedure Open(); override;
254 procedure Close(); override;
255 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
256 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000257 constructor Create( const ATransport : IStreamTransport ); overload;
258 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000259 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
260 property IsOpen: Boolean read GetIsOpen;
261 end;
262
263 TSocketImpl = class(TStreamTransportImpl)
264 private
265 FClient : TCustomIpClient;
266 FOwnsClient : Boolean;
267 FHost : string;
268 FPort : Integer;
269 FTimeout : Integer;
270
271 procedure InitSocket;
272 protected
273 function GetIsOpen: Boolean; override;
274 public
275 procedure Open; override;
Roger Meier333bbf32012-01-08 21:51:08 +0000276 constructor Create( const AClient : TCustomIpClient); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000277 constructor Create( const AHost: string; APort: Integer); overload;
278 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer); overload;
279 destructor Destroy; override;
280 procedure Close; override;
281 property TcpClient: TCustomIpClient read FClient;
282 property Host : string read FHost;
283 property Port: Integer read FPort;
284 end;
285
286 TFramedTransportImpl = class( TTransportImpl)
287 private const
288 FHeaderSize : Integer = 4;
289 private class var
290 FHeader_Dummy : array of Byte;
291 protected
292 FTransport : ITransport;
293 FWriteBuffer : TMemoryStream;
294 FReadBuffer : TMemoryStream;
295
296 procedure InitWriteBuffer;
297 procedure ReadFrame;
298 public
299 type
300 TFactory = class( TTransportFactoryImpl )
301 public
Roger Meier333bbf32012-01-08 21:51:08 +0000302 function GetTransport( const ATrans: ITransport): ITransport; override;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000303 end;
304
305{$IF CompilerVersion >= 21.0}
306 class constructor Create;
307{$IFEND}
308 constructor Create; overload;
Roger Meier333bbf32012-01-08 21:51:08 +0000309 constructor Create( const ATrans: ITransport); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000310 destructor Destroy; override;
311
312 procedure Open(); override;
313 function GetIsOpen: Boolean; override;
314
315 procedure Close(); override;
316 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
317 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
318 procedure Flush; override;
319 end;
320
321{$IF CompilerVersion < 21.0}
322procedure TFramedTransportImpl_Initialize;
323{$IFEND}
324
325implementation
326
327{ TTransportImpl }
328
329procedure TTransportImpl.Flush;
330begin
331
332end;
333
334function TTransportImpl.Peek: Boolean;
335begin
336 Result := IsOpen;
337end;
338
339function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
340var
341 got : Integer;
342 ret : Integer;
343begin
344 got := 0;
345 while ( got < len) do
346 begin
347 ret := Read( buf, off + got, len - got);
348 if ( ret <= 0 ) then
349 begin
350 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
351 end;
352 got := got + ret;
353 end;
354 Result := got;
355end;
356
357procedure TTransportImpl.Write( const buf: TBytes);
358begin
359 Self.Write( buf, 0, Length(buf) );
360end;
361
362{ THTTPClientImpl }
363
364procedure THTTPClientImpl.Close;
365begin
366 FInputStream := nil;
367 FOutputStream := nil;
368end;
369
370constructor THTTPClientImpl.Create(const AUri: string);
371begin
372 inherited Create;
373 FUri := AUri;
374 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
375 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
376end;
377
378function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
379var
380 pair : TPair<string,string>;
381begin
382{$IF CompilerVersion >= 21.0}
383 Result := CoXMLHTTP.Create;
384{$ELSE}
385 Result := CoXMLHTTPRequest.Create;
386{$IFEND}
387
388 Result.open('POST', FUri, False, '', '');
389 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
390 Result.setRequestHeader( 'Accept', 'application/x-thrift');
391 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
392
393 for pair in FCustomHeaders do
394 begin
395 Result.setRequestHeader( pair.Key, pair.Value );
396 end;
397end;
398
399destructor THTTPClientImpl.Destroy;
400begin
401 Close;
402 inherited;
403end;
404
405procedure THTTPClientImpl.Flush;
406begin
407 try
408 SendRequest;
409 finally
410 FOutputStream := nil;
411 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
412 end;
413end;
414
415function THTTPClientImpl.GetConnectionTimeout: Integer;
416begin
417 Result := FConnectionTimeout;
418end;
419
420function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
421begin
422 Result := FCustomHeaders;
423end;
424
425function THTTPClientImpl.GetIsOpen: Boolean;
426begin
427 Result := True;
428end;
429
430function THTTPClientImpl.GetReadTimeout: Integer;
431begin
432 Result := FReadTimeout;
433end;
434
435procedure THTTPClientImpl.Open;
436begin
437
438end;
439
440function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
441begin
442 if FInputStream = nil then
443 begin
444 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
445 'No request has been sent');
446 end;
447 try
448 Result := FInputStream.Read( buf, off, len )
449 except
450 on E: Exception do
451 begin
452 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
453 E.Message);
454 end;
455 end;
456end;
457
458procedure THTTPClientImpl.SendRequest;
459var
460 xmlhttp : IXMLHTTPRequest;
461 ms : TMemoryStream;
462 a : TBytes;
463 len : Integer;
464begin
465 xmlhttp := CreateRequest;
466
467 ms := TMemoryStream.Create;
468 try
469 a := FOutputStream.ToArray;
470 len := Length(a);
471 if len > 0 then
472 begin
473 ms.WriteBuffer( Pointer(@a[0])^, len);
474 end;
475 ms.Position := 0;
476 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
477 FInputStream := nil;
478 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
479 finally
480 ms.Free;
481 end;
482end;
483
484procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
485begin
486 FConnectionTimeout := Value;
487end;
488
489procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
490begin
491 FReadTimeout := Value
492end;
493
494procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
495begin
496 FOutputStream.Write( buf, off, len);
497end;
498
499{ TTransportException }
500
501constructor TTransportException.Create(AType: TExceptionType);
502begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200503 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000504 Create( AType, '' )
505end;
506
507constructor TTransportException.Create(AType: TExceptionType;
508 const msg: string);
509begin
510 inherited Create(msg);
511 FType := AType;
512end;
513
514constructor TTransportException.Create(const msg: string);
515begin
516 inherited Create(msg);
517end;
518
Jake Farrell7ae13e12011-10-18 14:35:26 +0000519{ TTransportFactoryImpl }
520
Roger Meier333bbf32012-01-08 21:51:08 +0000521function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000522begin
523 Result := ATrans;
524end;
525
526{ TServerSocket }
527
Roger Meier333bbf32012-01-08 21:51:08 +0000528constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000529begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200530 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000531 FServer := AServer;
532 FClientTimeout := AClientTimeout;
533end;
534
Roger Meier333bbf32012-01-08 21:51:08 +0000535constructor TServerSocketImpl.Create( const AServer: TTcpServer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000536begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200537 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000538 Create( AServer, 0 );
539end;
540
541constructor TServerSocketImpl.Create(APort: Integer);
542begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200543 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000544 Create( APort, 0 );
545end;
546
Jens Geyer01640402013-09-25 21:12:21 +0200547function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000548var
Jens Geyer01640402013-09-25 21:12:21 +0200549 client : TCustomIpClient;
550 trans : IStreamTransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000551begin
552 if FServer = nil then
553 begin
554 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
555 'No underlying server socket.');
556 end;
557
558 try
Jens Geyer01640402013-09-25 21:12:21 +0200559 client := TCustomIpClient.Create(nil);
560
561 if Assigned(fnAccepting)
562 then fnAccepting();
563
564 if ( not FServer.Accept( client)) then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000565 begin
Jens Geyer01640402013-09-25 21:12:21 +0200566 client.Free;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000567 Result := nil;
568 Exit;
569 end;
570
Jens Geyer01640402013-09-25 21:12:21 +0200571 if client = nil then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000572 begin
573 Result := nil;
574 Exit;
575 end;
576
Jens Geyer01640402013-09-25 21:12:21 +0200577 trans := TSocketImpl.Create( client);
578 if FUseBufferedSocket
579 then result := TBufferedTransportImpl.Create( trans)
580 else result := trans;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000581
582 except
583 on E: Exception do
584 begin
585 raise TTransportException.Create( E.ToString );
586 end;
587 end;
588end;
589
590procedure TServerSocketImpl.Close;
591begin
592 if FServer <> nil then
593 begin
594 try
595 FServer.Active := False;
596 except
597 on E: Exception do
598 begin
599 raise TTransportException.Create('Error on closing socket : ' + E.Message);
600 end;
601 end;
602 end;
603end;
604
605constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer;
606 AUseBufferedSockets: Boolean);
607begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200608 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000609 FPort := APort;
610 FClientTimeout := AClientTimeout;
611 FUseBufferedSocket := AUseBufferedSockets;
612 FOwnsServer := True;
613 FServer := TTcpServer.Create( nil );
614 FServer.BlockMode := bmBlocking;
615{$IF CompilerVersion >= 21.0}
616 FServer.LocalPort := AnsiString( IntToStr( FPort));
617{$ELSE}
618 FServer.LocalPort := IntToStr( FPort);
619{$IFEND}
620end;
621
622destructor TServerSocketImpl.Destroy;
623begin
624 if FOwnsServer then
625 begin
626 FServer.Free;
627 end;
628 inherited;
629end;
630
631procedure TServerSocketImpl.Listen;
632begin
633 if FServer <> nil then
634 begin
635 try
636 FServer.Active := True;
637 except
638 on E: Exception do
639 begin
640 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
641 end;
642 end;
643 end;
644end;
645
646constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer);
647begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200648 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000649 Create( APort, AClientTimeout, False );
650end;
651
652{ TSocket }
653
Roger Meier333bbf32012-01-08 21:51:08 +0000654constructor TSocketImpl.Create( const AClient : TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000655var
656 stream : IThriftStream;
657begin
658 FClient := AClient;
659 stream := TTcpSocketStreamImpl.Create( FClient);
Jens Geyer718f6ee2013-09-06 21:02:34 +0200660 inherited Create( stream, stream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000661end;
662
663constructor TSocketImpl.Create(const AHost: string; APort: Integer);
664begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200665 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000666 Create( AHost, APort, 0);
667end;
668
669procedure TSocketImpl.Close;
670begin
671 inherited Close;
Jens Geyer718f6ee2013-09-06 21:02:34 +0200672 if FClient <> nil
673 then FreeAndNil( FClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000674end;
675
676constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
677begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200678 inherited Create(nil,nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000679 FHost := AHost;
680 FPort := APort;
681 FTimeout := ATimeout;
682 InitSocket;
683end;
684
685destructor TSocketImpl.Destroy;
686begin
687 if FOwnsClient then
688 begin
689 FClient.Free;
690 end;
691 inherited;
692end;
693
694function TSocketImpl.GetIsOpen: Boolean;
695begin
696 Result := False;
697 if FClient <> nil then
698 begin
699 Result := FClient.Connected;
700 end;
701end;
702
703procedure TSocketImpl.InitSocket;
704var
705 stream : IThriftStream;
706begin
707 if FClient <> nil then
708 begin
709 if FOwnsClient then
710 begin
711 FClient.Free;
712 FClient := nil;
713 end;
714 end;
715 FClient := TTcpClient.Create( nil );
716 FOwnsClient := True;
717
718 stream := TTcpSocketStreamImpl.Create( FClient);
719 FInputStream := stream;
720 FOutputStream := stream;
721
722end;
723
724procedure TSocketImpl.Open;
725begin
726 if IsOpen then
727 begin
728 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
729 'Socket already connected');
730 end;
731
732 if FHost = '' then
733 begin
734 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
735 'Cannot open null host');
736 end;
737
738 if Port <= 0 then
739 begin
740 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
741 'Cannot open without port');
742 end;
743
744 if FClient = nil then
745 begin
746 InitSocket;
747 end;
748
749 FClient.RemoteHost := TSocketHost( Host);
750 FClient.RemotePort := TSocketPort( IntToStr( Port));
751 FClient.Connect;
752
753 FInputStream := TTcpSocketStreamImpl.Create( FClient);
754 FOutputStream := FInputStream;
755end;
756
757{ TBufferedStream }
758
759procedure TBufferedStreamImpl.Close;
760begin
761 Flush;
762 FStream := nil;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100763
764 FReadBuffer.Free;
765 FReadBuffer := nil;
766
767 FWriteBuffer.Free;
768 FWriteBuffer := nil;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000769end;
770
Roger Meier333bbf32012-01-08 21:51:08 +0000771constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000772begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200773 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000774 FStream := AStream;
775 FBufSize := ABufSize;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100776 FReadBuffer := TMemoryStream.Create;
777 FWriteBuffer := TMemoryStream.Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000778end;
779
780destructor TBufferedStreamImpl.Destroy;
781begin
782 Close;
783 inherited;
784end;
785
786procedure TBufferedStreamImpl.Flush;
787var
788 buf : TBytes;
789 len : Integer;
790begin
791 if IsOpen then
792 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100793 len := FWriteBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000794 if len > 0 then
795 begin
796 SetLength( buf, len );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100797 FWriteBuffer.Position := 0;
798 FWriteBuffer.Read( Pointer(@buf[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000799 FStream.Write( buf, 0, len );
800 end;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100801 FWriteBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000802 end;
803end;
804
805function TBufferedStreamImpl.IsOpen: Boolean;
806begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100807 Result := (FWriteBuffer <> nil)
808 and (FReadBuffer <> nil)
809 and (FStream <> nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000810end;
811
812procedure TBufferedStreamImpl.Open;
813begin
814
815end;
816
817function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
818var
819 nRead : Integer;
820 tempbuf : TBytes;
821begin
822 inherited;
823 Result := 0;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100824 if IsOpen then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000825 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100826 while count > 0 do begin
827
828 if FReadBuffer.Position >= FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000829 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100830 FReadBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000831 SetLength( tempbuf, FBufSize);
832 nRead := FStream.Read( tempbuf, 0, FBufSize );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100833 if nRead = 0 then Break; // avoid infinite loop
834
835 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
836 FReadBuffer.Position := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000837 end;
838
Jens Geyer6a7463a2013-03-07 20:40:59 +0100839 if FReadBuffer.Position < FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000840 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100841 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
842 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
843 Dec( count, nRead);
844 Inc( offset, nRead);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000845 end;
846 end;
847 end;
848end;
849
850function TBufferedStreamImpl.ToArray: TBytes;
851var
852 len : Integer;
853begin
854 len := 0;
855
856 if IsOpen then
857 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100858 len := FReadBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000859 end;
860
861 SetLength( Result, len);
862
863 if len > 0 then
864 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100865 FReadBuffer.Position := 0;
866 FReadBuffer.Read( Pointer(@Result[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000867 end;
868end;
869
870procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
871begin
872 inherited;
873 if count > 0 then
874 begin
875 if IsOpen then
876 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100877 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
878 if FWriteBuffer.Size > FBufSize then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000879 begin
880 Flush;
881 end;
882 end;
883 end;
884end;
885
886{ TStreamTransportImpl }
887
888procedure TStreamTransportImpl.Close;
889begin
890 if FInputStream <> FOutputStream then
891 begin
892 if FInputStream <> nil then
893 begin
894 FInputStream := nil;
895 end;
896 if FOutputStream <> nil then
897 begin
898 FOutputStream := nil;
899 end;
900 end else
901 begin
902 FInputStream := nil;
903 FOutputStream := nil;
904 end;
905end;
906
Roger Meier333bbf32012-01-08 21:51:08 +0000907constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000908begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200909 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000910 FInputStream := AInputStream;
911 FOutputStream := AOutputStream;
912end;
913
914destructor TStreamTransportImpl.Destroy;
915begin
916 FInputStream := nil;
917 FOutputStream := nil;
918 inherited;
919end;
920
921procedure TStreamTransportImpl.Flush;
922begin
923 if FOutputStream = nil then
924 begin
925 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
926 end;
927
928 FOutputStream.Flush;
929end;
930
931function TStreamTransportImpl.GetInputStream: IThriftStream;
932begin
933 Result := FInputStream;
934end;
935
936function TStreamTransportImpl.GetIsOpen: Boolean;
937begin
938 Result := True;
939end;
940
941function TStreamTransportImpl.GetOutputStream: IThriftStream;
942begin
943 Result := FInputStream;
944end;
945
946procedure TStreamTransportImpl.Open;
947begin
948
949end;
950
951function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
952begin
953 if FInputStream = nil then
954 begin
955 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
956 end;
957 Result := FInputStream.Read( buf, off, len );
958end;
959
960procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
961begin
962 if FOutputStream = nil then
963 begin
Jake Farrelld09362c2011-10-26 02:25:07 +0000964 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000965 end;
966
967 FOutputStream.Write( buf, off, len );
968end;
969
970{ TBufferedTransportImpl }
971
Roger Meier333bbf32012-01-08 21:51:08 +0000972constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000973begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200974 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000975 Create( ATransport, 1024 );
976end;
977
978procedure TBufferedTransportImpl.Close;
979begin
980 FTransport.Close;
981end;
982
Roger Meier333bbf32012-01-08 21:51:08 +0000983constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000984 ABufSize: Integer);
985begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200986 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000987 FTransport := ATransport;
988 FBufSize := ABufSize;
989 InitBuffers;
990end;
991
992procedure TBufferedTransportImpl.Flush;
993begin
994 if FOutputBuffer <> nil then
995 begin
996 FOutputBuffer.Flush;
997 end;
998end;
999
1000function TBufferedTransportImpl.GetIsOpen: Boolean;
1001begin
1002 Result := FTransport.IsOpen;
1003end;
1004
1005function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1006begin
1007 Result := FTransport;
1008end;
1009
1010procedure TBufferedTransportImpl.InitBuffers;
1011begin
1012 if FTransport.InputStream <> nil then
1013 begin
1014 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1015 end;
1016 if FTransport.OutputStream <> nil then
1017 begin
1018 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1019 end;
1020end;
1021
1022procedure TBufferedTransportImpl.Open;
1023begin
1024 FTransport.Open
1025end;
1026
1027function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1028begin
1029 Result := 0;
1030 if FInputBuffer <> nil then
1031 begin
1032 Result := FInputBuffer.Read( buf, off, len );
1033 end;
1034end;
1035
1036procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1037begin
1038 if FOutputBuffer <> nil then
1039 begin
1040 FOutputBuffer.Write( buf, off, len );
1041 end;
1042end;
1043
1044{ TFramedTransportImpl }
1045
1046{$IF CompilerVersion < 21.0}
1047procedure TFramedTransportImpl_Initialize;
1048begin
1049 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1050 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1051 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1052end;
1053{$ELSE}
1054class constructor TFramedTransportImpl.Create;
1055begin
1056 SetLength( FHeader_Dummy, FHeaderSize);
1057 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1058end;
1059{$IFEND}
1060
1061constructor TFramedTransportImpl.Create;
1062begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001063 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001064 InitWriteBuffer;
1065end;
1066
1067procedure TFramedTransportImpl.Close;
1068begin
1069 FTransport.Close;
1070end;
1071
Roger Meier333bbf32012-01-08 21:51:08 +00001072constructor TFramedTransportImpl.Create( const ATrans: ITransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001073begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001074 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001075 InitWriteBuffer;
1076 FTransport := ATrans;
1077end;
1078
1079destructor TFramedTransportImpl.Destroy;
1080begin
1081 FWriteBuffer.Free;
1082 FReadBuffer.Free;
1083 inherited;
1084end;
1085
1086procedure TFramedTransportImpl.Flush;
1087var
1088 buf : TBytes;
1089 len : Integer;
1090 data_len : Integer;
1091
1092begin
1093 len := FWriteBuffer.Size;
1094 SetLength( buf, len);
1095 if len > 0 then
1096 begin
1097 System.Move( FWriteBuffer.Memory^, buf[0], len );
1098 end;
1099
1100 data_len := len - FHeaderSize;
1101 if (data_len < 0) then
1102 begin
1103 raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
1104 end;
1105
1106 InitWriteBuffer;
1107
1108 buf[0] := Byte($FF and (data_len shr 24));
1109 buf[1] := Byte($FF and (data_len shr 16));
1110 buf[2] := Byte($FF and (data_len shr 8));
1111 buf[3] := Byte($FF and data_len);
1112
1113 FTransport.Write( buf, 0, len );
1114 FTransport.Flush;
1115end;
1116
1117function TFramedTransportImpl.GetIsOpen: Boolean;
1118begin
1119 Result := FTransport.IsOpen;
1120end;
1121
1122type
1123 TAccessMemoryStream = class(TMemoryStream)
1124 end;
1125
1126procedure TFramedTransportImpl.InitWriteBuffer;
1127begin
1128 FWriteBuffer.Free;
1129 FWriteBuffer := TMemoryStream.Create;
1130 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1131 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1132end;
1133
1134procedure TFramedTransportImpl.Open;
1135begin
1136 FTransport.Open;
1137end;
1138
1139function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1140var
1141 got : Integer;
1142begin
1143 if FReadBuffer <> nil then
1144 begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001145 if len > 0
1146 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1147 else got := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001148 if got > 0 then
1149 begin
1150 Result := got;
1151 Exit;
1152 end;
1153 end;
1154
1155 ReadFrame;
Jake Farrell9c6773a2012-03-22 02:40:45 +00001156 if len > 0
1157 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1158 else Result := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001159end;
1160
1161procedure TFramedTransportImpl.ReadFrame;
1162var
1163 i32rd : TBytes;
1164 size : Integer;
1165 buff : TBytes;
1166begin
1167 SetLength( i32rd, FHeaderSize );
1168 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1169 size :=
1170 ((i32rd[0] and $FF) shl 24) or
1171 ((i32rd[1] and $FF) shl 16) or
1172 ((i32rd[2] and $FF) shl 8) or
1173 (i32rd[3] and $FF);
1174 SetLength( buff, size );
1175 FTransport.ReadAll( buff, 0, size );
1176 FReadBuffer.Free;
1177 FReadBuffer := TMemoryStream.Create;
1178 FReadBuffer.Write( Pointer(@buff[0])^, size );
1179 FReadBuffer.Position := 0;
1180end;
1181
1182procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1183begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001184 if len > 0
1185 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +00001186end;
1187
1188{ TFramedTransport.TFactory }
1189
Roger Meier333bbf32012-01-08 21:51:08 +00001190function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001191begin
1192 Result := TFramedTransportImpl.Create( ATrans );
1193end;
1194
1195{ TTcpSocketStreamImpl }
1196
1197procedure TTcpSocketStreamImpl.Close;
1198begin
1199 FTcpClient.Close;
1200end;
1201
Roger Meier333bbf32012-01-08 21:51:08 +00001202constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001203begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001204 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001205 FTcpClient := ATcpClient;
1206end;
1207
1208procedure TTcpSocketStreamImpl.Flush;
1209begin
1210
1211end;
1212
1213function TTcpSocketStreamImpl.IsOpen: Boolean;
1214begin
1215 Result := FTcpClient.Active;
1216end;
1217
1218procedure TTcpSocketStreamImpl.Open;
1219begin
1220 FTcpClient.Open;
1221end;
1222
1223function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset,
1224 count: Integer): Integer;
1225begin
1226 inherited;
1227 Result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
1228end;
1229
1230function TTcpSocketStreamImpl.ToArray: TBytes;
1231var
1232 len : Integer;
1233begin
1234 len := 0;
1235 if IsOpen then
1236 begin
1237 len := FTcpClient.BytesReceived;
1238 end;
1239
1240 SetLength( Result, len );
1241
1242 if len > 0 then
1243 begin
1244 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1245 end;
1246end;
1247
1248procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1249begin
1250 inherited;
1251 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1252end;
1253
1254{$IF CompilerVersion < 21.0}
1255initialization
1256begin
1257 TFramedTransportImpl_Initialize;
1258end;
1259{$IFEND}
1260
1261
1262end.