blob: 9fd52a8cbf62e06538500fc253701a4d45d10c6e [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
Jens Geyer23d67462015-12-19 11:44:57 +010022{$IF CompilerVersion < 28.0}
23 {$DEFINE OLD_SOCKETS} // TODO: add socket support for CompilerVersion >= 28.0
24{$IFEND}
25
26
Jens Geyerd5436f52014-10-03 19:50:38 +020027unit Thrift.Transport;
28
29interface
30
31uses
32 Classes,
33 SysUtils,
34 Math,
Jens Geyer23d67462015-12-19 11:44:57 +010035 WinSock,
36 {$IFDEF OLD_SOCKETS}
37 Sockets,
38 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020039 Generics.Collections,
40 Thrift.Collections,
41 Thrift.Utils,
42 Thrift.Stream,
43 ActiveX,
44 msxml;
45
46type
47 ITransport = interface
48 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
49 function GetIsOpen: Boolean;
50 property IsOpen: Boolean read GetIsOpen;
51 function Peek: Boolean;
52 procedure Open;
53 procedure Close;
54 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
55 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
56 procedure Write( const buf: TBytes); overload;
57 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
58 procedure Flush;
59 end;
60
61 TTransportImpl = class( TInterfacedObject, ITransport)
62 protected
63 function GetIsOpen: Boolean; virtual; abstract;
64 property IsOpen: Boolean read GetIsOpen;
65 function Peek: Boolean; virtual;
66 procedure Open(); virtual; abstract;
67 procedure Close(); virtual; abstract;
68 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
69 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
70 procedure Write( const buf: TBytes); overload; virtual;
71 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
72 procedure Flush; virtual;
73 end;
74
75 TTransportException = class( Exception )
76 public
77 type
78 TExceptionType = (
79 Unknown,
80 NotOpen,
81 AlreadyOpen,
82 TimedOut,
83 EndOfFile
84 );
85 private
86 FType : TExceptionType;
87 public
88 constructor Create( AType: TExceptionType); overload;
89 constructor Create( const msg: string); overload;
90 constructor Create( AType: TExceptionType; const msg: string); overload;
91 property Type_: TExceptionType read FType;
92 end;
93
94 IHTTPClient = interface( ITransport )
95 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
96 procedure SetConnectionTimeout(const Value: Integer);
97 function GetConnectionTimeout: Integer;
98 procedure SetReadTimeout(const Value: Integer);
99 function GetReadTimeout: Integer;
100 function GetCustomHeaders: IThriftDictionary<string,string>;
101 procedure SendRequest;
102 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
103 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
104 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
105 end;
106
107 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
108 private
109 FUri : string;
110 FInputStream : IThriftStream;
111 FOutputStream : IThriftStream;
112 FConnectionTimeout : Integer;
113 FReadTimeout : Integer;
114 FCustomHeaders : IThriftDictionary<string,string>;
115
116 function CreateRequest: IXMLHTTPRequest;
117 protected
118 function GetIsOpen: Boolean; override;
119 procedure Open(); override;
120 procedure Close(); override;
121 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
122 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
123 procedure Flush; override;
124
125 procedure SetConnectionTimeout(const Value: Integer);
126 function GetConnectionTimeout: Integer;
127 procedure SetReadTimeout(const Value: Integer);
128 function GetReadTimeout: Integer;
129 function GetCustomHeaders: IThriftDictionary<string,string>;
130 procedure SendRequest;
131 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
132 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
133 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
134 public
135 constructor Create( const AUri: string);
136 destructor Destroy; override;
137 end;
138
139 IServerTransport = interface
140 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
141 procedure Listen;
142 procedure Close;
143 function Accept( const fnAccepting: TProc): ITransport;
144 end;
145
146 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
147 protected
148 procedure Listen; virtual; abstract;
149 procedure Close; virtual; abstract;
150 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
151 end;
152
153 ITransportFactory = interface
154 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
155 function GetTransport( const ATrans: ITransport): ITransport;
156 end;
157
158 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
159 function GetTransport( const ATrans: ITransport): ITransport; virtual;
160 end;
161
Jens Geyer23d67462015-12-19 11:44:57 +0100162 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200163 TTcpSocketStreamImpl = class( TThriftStreamImpl )
164 private type
165 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
166 private
167 FTcpClient : TCustomIpClient;
168 FTimeout : Integer;
169 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
170 TimeOut: Integer; var wsaError : Integer): Integer;
171 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200172 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +0200173 protected
174 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
175 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
176 procedure Open; override;
177 procedure Close; override;
178 procedure Flush; override;
179
180 function IsOpen: Boolean; override;
181 function ToArray: TBytes; override;
182 public
183 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
184 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100185 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200186
187 IStreamTransport = interface( ITransport )
188 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
189 function GetInputStream: IThriftStream;
190 function GetOutputStream: IThriftStream;
191 property InputStream : IThriftStream read GetInputStream;
192 property OutputStream : IThriftStream read GetOutputStream;
193 end;
194
195 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
196 protected
197 FInputStream : IThriftStream;
198 FOutputStream : IThriftStream;
199 protected
200 function GetIsOpen: Boolean; override;
201
202 function GetInputStream: IThriftStream;
203 function GetOutputStream: IThriftStream;
204 public
205 property InputStream : IThriftStream read GetInputStream;
206 property OutputStream : IThriftStream read GetOutputStream;
207
208 procedure Open; override;
209 procedure Close; override;
210 procedure Flush; override;
211 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
212 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
213 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
214 destructor Destroy; override;
215 end;
216
217 TBufferedStreamImpl = class( TThriftStreamImpl)
218 private
219 FStream : IThriftStream;
220 FBufSize : Integer;
221 FReadBuffer : TMemoryStream;
222 FWriteBuffer : TMemoryStream;
223 protected
224 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
225 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
226 procedure Open; override;
227 procedure Close; override;
228 procedure Flush; override;
229 function IsOpen: Boolean; override;
230 function ToArray: TBytes; override;
231 public
232 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
233 destructor Destroy; override;
234 end;
235
Jens Geyer23d67462015-12-19 11:44:57 +0100236 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200237 TServerSocketImpl = class( TServerTransportImpl)
238 private
239 FServer : TTcpServer;
240 FPort : Integer;
241 FClientTimeout : Integer;
242 FUseBufferedSocket : Boolean;
243 FOwnsServer : Boolean;
244 protected
245 function Accept( const fnAccepting: TProc) : ITransport; override;
246 public
247 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
248 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
249 destructor Destroy; override;
250 procedure Listen; override;
251 procedure Close; override;
252 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100253 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200254
255 TBufferedTransportImpl = class( TTransportImpl )
256 private
257 FInputBuffer : IThriftStream;
258 FOutputBuffer : IThriftStream;
259 FTransport : IStreamTransport;
260 FBufSize : Integer;
261
262 procedure InitBuffers;
263 function GetUnderlyingTransport: ITransport;
264 protected
265 function GetIsOpen: Boolean; override;
266 procedure Flush; override;
267 public
268 procedure Open(); override;
269 procedure Close(); override;
270 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
271 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
272 constructor Create( const ATransport : IStreamTransport ); overload;
273 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
274 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
275 property IsOpen: Boolean read GetIsOpen;
276 end;
277
Jens Geyer23d67462015-12-19 11:44:57 +0100278 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200279 TSocketImpl = class(TStreamTransportImpl)
280 private
281 FClient : TCustomIpClient;
282 FOwnsClient : Boolean;
283 FHost : string;
284 FPort : Integer;
285 FTimeout : Integer;
286
287 procedure InitSocket;
288 protected
289 function GetIsOpen: Boolean; override;
290 public
291 procedure Open; override;
292 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
293 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
294 destructor Destroy; override;
295 procedure Close; override;
296 property TcpClient: TCustomIpClient read FClient;
297 property Host : string read FHost;
298 property Port: Integer read FPort;
299 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100300 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200301
302 TFramedTransportImpl = class( TTransportImpl)
303 private const
304 FHeaderSize : Integer = 4;
305 private class var
306 FHeader_Dummy : array of Byte;
307 protected
308 FTransport : ITransport;
309 FWriteBuffer : TMemoryStream;
310 FReadBuffer : TMemoryStream;
311
312 procedure InitWriteBuffer;
313 procedure ReadFrame;
314 public
315 type
316 TFactory = class( TTransportFactoryImpl )
317 public
318 function GetTransport( const ATrans: ITransport): ITransport; override;
319 end;
320
321{$IF CompilerVersion >= 21.0}
322 class constructor Create;
323{$IFEND}
324 constructor Create; overload;
325 constructor Create( const ATrans: ITransport); overload;
326 destructor Destroy; override;
327
328 procedure Open(); override;
329 function GetIsOpen: Boolean; override;
330
331 procedure Close(); override;
332 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
333 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
334 procedure Flush; override;
335 end;
336
337{$IF CompilerVersion < 21.0}
338procedure TFramedTransportImpl_Initialize;
339{$IFEND}
340
341const
342 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
343
344
345implementation
346
347{ TTransportImpl }
348
349procedure TTransportImpl.Flush;
350begin
351
352end;
353
354function TTransportImpl.Peek: Boolean;
355begin
356 Result := IsOpen;
357end;
358
359function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
360var
361 got : Integer;
362 ret : Integer;
363begin
364 got := 0;
365 while ( got < len) do
366 begin
367 ret := Read( buf, off + got, len - got);
368 if ( ret <= 0 ) then
369 begin
370 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
371 end;
372 got := got + ret;
373 end;
374 Result := got;
375end;
376
377procedure TTransportImpl.Write( const buf: TBytes);
378begin
379 Self.Write( buf, 0, Length(buf) );
380end;
381
382{ THTTPClientImpl }
383
384procedure THTTPClientImpl.Close;
385begin
386 FInputStream := nil;
387 FOutputStream := nil;
388end;
389
390constructor THTTPClientImpl.Create(const AUri: string);
391begin
392 inherited Create;
393 FUri := AUri;
394 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
395 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
396end;
397
398function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
399var
400 pair : TPair<string,string>;
401begin
402{$IF CompilerVersion >= 21.0}
403 Result := CoXMLHTTP.Create;
404{$ELSE}
405 Result := CoXMLHTTPRequest.Create;
406{$IFEND}
407
408 Result.open('POST', FUri, False, '', '');
409 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
410 Result.setRequestHeader( 'Accept', 'application/x-thrift');
411 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
412
413 for pair in FCustomHeaders do
414 begin
415 Result.setRequestHeader( pair.Key, pair.Value );
416 end;
417end;
418
419destructor THTTPClientImpl.Destroy;
420begin
421 Close;
422 inherited;
423end;
424
425procedure THTTPClientImpl.Flush;
426begin
427 try
428 SendRequest;
429 finally
430 FOutputStream := nil;
431 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
432 end;
433end;
434
435function THTTPClientImpl.GetConnectionTimeout: Integer;
436begin
437 Result := FConnectionTimeout;
438end;
439
440function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
441begin
442 Result := FCustomHeaders;
443end;
444
445function THTTPClientImpl.GetIsOpen: Boolean;
446begin
447 Result := True;
448end;
449
450function THTTPClientImpl.GetReadTimeout: Integer;
451begin
452 Result := FReadTimeout;
453end;
454
455procedure THTTPClientImpl.Open;
456begin
457
458end;
459
460function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
461begin
462 if FInputStream = nil then
463 begin
464 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
465 'No request has been sent');
466 end;
467 try
468 Result := FInputStream.Read( buf, off, len )
469 except
470 on E: Exception do
471 begin
472 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
473 E.Message);
474 end;
475 end;
476end;
477
478procedure THTTPClientImpl.SendRequest;
479var
480 xmlhttp : IXMLHTTPRequest;
481 ms : TMemoryStream;
482 a : TBytes;
483 len : Integer;
484begin
485 xmlhttp := CreateRequest;
486
487 ms := TMemoryStream.Create;
488 try
489 a := FOutputStream.ToArray;
490 len := Length(a);
491 if len > 0 then
492 begin
493 ms.WriteBuffer( Pointer(@a[0])^, len);
494 end;
495 ms.Position := 0;
496 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
497 FInputStream := nil;
498 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
499 finally
500 ms.Free;
501 end;
502end;
503
504procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
505begin
506 FConnectionTimeout := Value;
507end;
508
509procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
510begin
511 FReadTimeout := Value
512end;
513
514procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
515begin
516 FOutputStream.Write( buf, off, len);
517end;
518
519{ TTransportException }
520
521constructor TTransportException.Create(AType: TExceptionType);
522begin
523 //no inherited;
524 Create( AType, '' )
525end;
526
527constructor TTransportException.Create(AType: TExceptionType;
528 const msg: string);
529begin
530 inherited Create(msg);
531 FType := AType;
532end;
533
534constructor TTransportException.Create(const msg: string);
535begin
536 inherited Create(msg);
537end;
538
539{ TTransportFactoryImpl }
540
541function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
542begin
543 Result := ATrans;
544end;
545
546{ TServerSocket }
547
Jens Geyer23d67462015-12-19 11:44:57 +0100548{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200549constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
550begin
551 inherited Create;
552 FServer := AServer;
553 FClientTimeout := AClientTimeout;
554end;
555
556constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
557begin
558 inherited Create;
559 FPort := APort;
560 FClientTimeout := AClientTimeout;
561 FUseBufferedSocket := AUseBufferedSockets;
562 FOwnsServer := True;
563 FServer := TTcpServer.Create( nil );
564 FServer.BlockMode := bmBlocking;
565{$IF CompilerVersion >= 21.0}
566 FServer.LocalPort := AnsiString( IntToStr( FPort));
567{$ELSE}
568 FServer.LocalPort := IntToStr( FPort);
569{$IFEND}
570end;
571
572destructor TServerSocketImpl.Destroy;
573begin
574 if FOwnsServer then begin
575 FServer.Free;
576 FServer := nil;
577 end;
578 inherited;
579end;
580
581function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
582var
583 client : TCustomIpClient;
584 trans : IStreamTransport;
585begin
586 if FServer = nil then
587 begin
588 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
589 'No underlying server socket.');
590 end;
591
592 client := nil;
593 try
594 client := TCustomIpClient.Create(nil);
595
596 if Assigned(fnAccepting)
597 then fnAccepting();
598
599 if not FServer.Accept( client) then
600 begin
601 client.Free;
602 Result := nil;
603 Exit;
604 end;
605
606 if client = nil then
607 begin
608 Result := nil;
609 Exit;
610 end;
611
612 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
613 client := nil; // trans owns it now
614
615 if FUseBufferedSocket
616 then result := TBufferedTransportImpl.Create( trans)
617 else result := trans;
618
619 except
620 on E: Exception do begin
621 client.Free;
622 raise TTransportException.Create( E.ToString );
623 end;
624 end;
625end;
626
627procedure TServerSocketImpl.Listen;
628begin
629 if FServer <> nil then
630 begin
631 try
632 FServer.Active := True;
633 except
634 on E: Exception do
635 begin
636 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
637 end;
638 end;
639 end;
640end;
641
642procedure TServerSocketImpl.Close;
643begin
644 if FServer <> nil then
645 begin
646 try
647 FServer.Active := False;
648 except
649 on E: Exception do
650 begin
651 raise TTransportException.Create('Error on closing socket : ' + E.Message);
652 end;
653 end;
654 end;
655end;
656
657{ TSocket }
658
659constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
660var stream : IThriftStream;
661begin
662 FClient := AClient;
663 FTimeout := ATimeout;
664 FOwnsClient := aOwnsClient;
665 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
666 inherited Create( stream, stream);
667end;
668
669constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
670begin
671 inherited Create(nil,nil);
672 FHost := AHost;
673 FPort := APort;
674 FTimeout := ATimeout;
675 InitSocket;
676end;
677
678destructor TSocketImpl.Destroy;
679begin
680 if FOwnsClient
681 then FreeAndNil( FClient);
682 inherited;
683end;
684
685procedure TSocketImpl.Close;
686begin
687 inherited Close;
688 if FOwnsClient
689 then FreeAndNil( FClient);
690end;
691
692function TSocketImpl.GetIsOpen: Boolean;
693begin
694 Result := (FClient <> nil) and FClient.Connected;
695end;
696
697procedure TSocketImpl.InitSocket;
698var
699 stream : IThriftStream;
700begin
701 if FOwnsClient
702 then FreeAndNil( FClient)
703 else FClient := nil;
704
705 FClient := TTcpClient.Create( nil);
706 FOwnsClient := True;
707
708 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
709 FInputStream := stream;
710 FOutputStream := stream;
711end;
712
713procedure TSocketImpl.Open;
714begin
715 if IsOpen then
716 begin
717 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
718 'Socket already connected');
719 end;
720
721 if FHost = '' then
722 begin
723 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
724 'Cannot open null host');
725 end;
726
727 if Port <= 0 then
728 begin
729 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
730 'Cannot open without port');
731 end;
732
733 if FClient = nil then
734 begin
735 InitSocket;
736 end;
737
738 FClient.RemoteHost := TSocketHost( Host);
739 FClient.RemotePort := TSocketPort( IntToStr( Port));
740 FClient.Connect;
741
742 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
743 FOutputStream := FInputStream;
744end;
Jens Geyer23d67462015-12-19 11:44:57 +0100745{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200746
747{ TBufferedStream }
748
749procedure TBufferedStreamImpl.Close;
750begin
751 Flush;
752 FStream := nil;
753
754 FReadBuffer.Free;
755 FReadBuffer := nil;
756
757 FWriteBuffer.Free;
758 FWriteBuffer := nil;
759end;
760
761constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
762begin
763 inherited Create;
764 FStream := AStream;
765 FBufSize := ABufSize;
766 FReadBuffer := TMemoryStream.Create;
767 FWriteBuffer := TMemoryStream.Create;
768end;
769
770destructor TBufferedStreamImpl.Destroy;
771begin
772 Close;
773 inherited;
774end;
775
776procedure TBufferedStreamImpl.Flush;
777var
778 buf : TBytes;
779 len : Integer;
780begin
781 if IsOpen then
782 begin
783 len := FWriteBuffer.Size;
784 if len > 0 then
785 begin
786 SetLength( buf, len );
787 FWriteBuffer.Position := 0;
788 FWriteBuffer.Read( Pointer(@buf[0])^, len );
789 FStream.Write( buf, 0, len );
790 end;
791 FWriteBuffer.Clear;
792 end;
793end;
794
795function TBufferedStreamImpl.IsOpen: Boolean;
796begin
797 Result := (FWriteBuffer <> nil)
798 and (FReadBuffer <> nil)
799 and (FStream <> nil);
800end;
801
802procedure TBufferedStreamImpl.Open;
803begin
804
805end;
806
807function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
808var
809 nRead : Integer;
810 tempbuf : TBytes;
811begin
812 inherited;
813 Result := 0;
814 if IsOpen then
815 begin
816 while count > 0 do begin
817
818 if FReadBuffer.Position >= FReadBuffer.Size then
819 begin
820 FReadBuffer.Clear;
821 SetLength( tempbuf, FBufSize);
822 nRead := FStream.Read( tempbuf, 0, FBufSize );
823 if nRead = 0 then Break; // avoid infinite loop
824
825 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
826 FReadBuffer.Position := 0;
827 end;
828
829 if FReadBuffer.Position < FReadBuffer.Size then
830 begin
831 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
832 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
833 Dec( count, nRead);
834 Inc( offset, nRead);
835 end;
836 end;
837 end;
838end;
839
840function TBufferedStreamImpl.ToArray: TBytes;
841var
842 len : Integer;
843begin
844 len := 0;
845
846 if IsOpen then
847 begin
848 len := FReadBuffer.Size;
849 end;
850
851 SetLength( Result, len);
852
853 if len > 0 then
854 begin
855 FReadBuffer.Position := 0;
856 FReadBuffer.Read( Pointer(@Result[0])^, len );
857 end;
858end;
859
860procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
861begin
862 inherited;
863 if count > 0 then
864 begin
865 if IsOpen then
866 begin
867 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
868 if FWriteBuffer.Size > FBufSize then
869 begin
870 Flush;
871 end;
872 end;
873 end;
874end;
875
876{ TStreamTransportImpl }
877
878procedure TStreamTransportImpl.Close;
879begin
880 if FInputStream <> FOutputStream then
881 begin
882 if FInputStream <> nil then
883 begin
884 FInputStream := nil;
885 end;
886 if FOutputStream <> nil then
887 begin
888 FOutputStream := nil;
889 end;
890 end else
891 begin
892 FInputStream := nil;
893 FOutputStream := nil;
894 end;
895end;
896
897constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
898begin
899 inherited Create;
900 FInputStream := AInputStream;
901 FOutputStream := AOutputStream;
902end;
903
904destructor TStreamTransportImpl.Destroy;
905begin
906 FInputStream := nil;
907 FOutputStream := nil;
908 inherited;
909end;
910
911procedure TStreamTransportImpl.Flush;
912begin
913 if FOutputStream = nil then
914 begin
915 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
916 end;
917
918 FOutputStream.Flush;
919end;
920
921function TStreamTransportImpl.GetInputStream: IThriftStream;
922begin
923 Result := FInputStream;
924end;
925
926function TStreamTransportImpl.GetIsOpen: Boolean;
927begin
928 Result := True;
929end;
930
931function TStreamTransportImpl.GetOutputStream: IThriftStream;
932begin
933 Result := FInputStream;
934end;
935
936procedure TStreamTransportImpl.Open;
937begin
938
939end;
940
941function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
942begin
943 if FInputStream = nil then
944 begin
945 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
946 end;
947 Result := FInputStream.Read( buf, off, len );
948end;
949
950procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
951begin
952 if FOutputStream = nil then
953 begin
954 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
955 end;
956
957 FOutputStream.Write( buf, off, len );
958end;
959
960{ TBufferedTransportImpl }
961
962constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
963begin
964 //no inherited;
965 Create( ATransport, 1024 );
966end;
967
968procedure TBufferedTransportImpl.Close;
969begin
970 FTransport.Close;
971end;
972
973constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
974 ABufSize: Integer);
975begin
976 inherited Create;
977 FTransport := ATransport;
978 FBufSize := ABufSize;
979 InitBuffers;
980end;
981
982procedure TBufferedTransportImpl.Flush;
983begin
984 if FOutputBuffer <> nil then
985 begin
986 FOutputBuffer.Flush;
987 end;
988end;
989
990function TBufferedTransportImpl.GetIsOpen: Boolean;
991begin
992 Result := FTransport.IsOpen;
993end;
994
995function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
996begin
997 Result := FTransport;
998end;
999
1000procedure TBufferedTransportImpl.InitBuffers;
1001begin
1002 if FTransport.InputStream <> nil then
1003 begin
1004 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1005 end;
1006 if FTransport.OutputStream <> nil then
1007 begin
1008 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1009 end;
1010end;
1011
1012procedure TBufferedTransportImpl.Open;
1013begin
1014 FTransport.Open
1015end;
1016
1017function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1018begin
1019 Result := 0;
1020 if FInputBuffer <> nil then
1021 begin
1022 Result := FInputBuffer.Read( buf, off, len );
1023 end;
1024end;
1025
1026procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1027begin
1028 if FOutputBuffer <> nil then
1029 begin
1030 FOutputBuffer.Write( buf, off, len );
1031 end;
1032end;
1033
1034{ TFramedTransportImpl }
1035
1036{$IF CompilerVersion < 21.0}
1037procedure TFramedTransportImpl_Initialize;
1038begin
1039 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1040 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1041 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1042end;
1043{$ELSE}
1044class constructor TFramedTransportImpl.Create;
1045begin
1046 SetLength( FHeader_Dummy, FHeaderSize);
1047 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1048end;
1049{$IFEND}
1050
1051constructor TFramedTransportImpl.Create;
1052begin
1053 inherited Create;
1054 InitWriteBuffer;
1055end;
1056
1057procedure TFramedTransportImpl.Close;
1058begin
1059 FTransport.Close;
1060end;
1061
1062constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1063begin
1064 inherited Create;
1065 InitWriteBuffer;
1066 FTransport := ATrans;
1067end;
1068
1069destructor TFramedTransportImpl.Destroy;
1070begin
1071 FWriteBuffer.Free;
1072 FReadBuffer.Free;
1073 inherited;
1074end;
1075
1076procedure TFramedTransportImpl.Flush;
1077var
1078 buf : TBytes;
1079 len : Integer;
1080 data_len : Integer;
1081
1082begin
1083 len := FWriteBuffer.Size;
1084 SetLength( buf, len);
1085 if len > 0 then
1086 begin
1087 System.Move( FWriteBuffer.Memory^, buf[0], len );
1088 end;
1089
1090 data_len := len - FHeaderSize;
1091 if (data_len < 0) then
1092 begin
1093 raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
1094 end;
1095
1096 InitWriteBuffer;
1097
1098 buf[0] := Byte($FF and (data_len shr 24));
1099 buf[1] := Byte($FF and (data_len shr 16));
1100 buf[2] := Byte($FF and (data_len shr 8));
1101 buf[3] := Byte($FF and data_len);
1102
1103 FTransport.Write( buf, 0, len );
1104 FTransport.Flush;
1105end;
1106
1107function TFramedTransportImpl.GetIsOpen: Boolean;
1108begin
1109 Result := FTransport.IsOpen;
1110end;
1111
1112type
1113 TAccessMemoryStream = class(TMemoryStream)
1114 end;
1115
1116procedure TFramedTransportImpl.InitWriteBuffer;
1117begin
1118 FWriteBuffer.Free;
1119 FWriteBuffer := TMemoryStream.Create;
1120 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1121 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1122end;
1123
1124procedure TFramedTransportImpl.Open;
1125begin
1126 FTransport.Open;
1127end;
1128
1129function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1130var
1131 got : Integer;
1132begin
1133 if FReadBuffer <> nil then
1134 begin
1135 if len > 0
1136 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1137 else got := 0;
1138 if got > 0 then
1139 begin
1140 Result := got;
1141 Exit;
1142 end;
1143 end;
1144
1145 ReadFrame;
1146 if len > 0
1147 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1148 else Result := 0;
1149end;
1150
1151procedure TFramedTransportImpl.ReadFrame;
1152var
1153 i32rd : TBytes;
1154 size : Integer;
1155 buff : TBytes;
1156begin
1157 SetLength( i32rd, FHeaderSize );
1158 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1159 size :=
1160 ((i32rd[0] and $FF) shl 24) or
1161 ((i32rd[1] and $FF) shl 16) or
1162 ((i32rd[2] and $FF) shl 8) or
1163 (i32rd[3] and $FF);
1164 SetLength( buff, size );
1165 FTransport.ReadAll( buff, 0, size );
1166 FReadBuffer.Free;
1167 FReadBuffer := TMemoryStream.Create;
1168 FReadBuffer.Write( Pointer(@buff[0])^, size );
1169 FReadBuffer.Position := 0;
1170end;
1171
1172procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1173begin
1174 if len > 0
1175 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
1176end;
1177
1178{ TFramedTransport.TFactory }
1179
1180function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1181begin
1182 Result := TFramedTransportImpl.Create( ATrans );
1183end;
1184
1185{ TTcpSocketStreamImpl }
1186
Jens Geyer23d67462015-12-19 11:44:57 +01001187{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001188procedure TTcpSocketStreamImpl.Close;
1189begin
1190 FTcpClient.Close;
1191end;
1192
1193constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
1194begin
1195 inherited Create;
1196 FTcpClient := ATcpClient;
1197 FTimeout := aTimeout;
1198end;
1199
1200procedure TTcpSocketStreamImpl.Flush;
1201begin
1202
1203end;
1204
1205function TTcpSocketStreamImpl.IsOpen: Boolean;
1206begin
1207 Result := FTcpClient.Active;
1208end;
1209
1210procedure TTcpSocketStreamImpl.Open;
1211begin
1212 FTcpClient.Open;
1213end;
1214
1215
1216function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1217 TimeOut: Integer; var wsaError : Integer): Integer;
1218var
1219 ReadFds: TFDset;
1220 ReadFdsptr: PFDset;
1221 WriteFds: TFDset;
1222 WriteFdsptr: PFDset;
1223 ExceptFds: TFDset;
1224 ExceptFdsptr: PFDset;
1225 tv: timeval;
1226 Timeptr: PTimeval;
1227 socket : TSocket;
1228begin
1229 if not FTcpClient.Active then begin
1230 wsaError := WSAEINVAL;
1231 Exit( SOCKET_ERROR);
1232 end;
1233
1234 socket := FTcpClient.Handle;
1235
1236 if Assigned(ReadReady) then
1237 begin
1238 ReadFdsptr := @ReadFds;
1239 FD_ZERO(ReadFds);
1240 FD_SET(socket, ReadFds);
1241 end
1242 else
1243 ReadFdsptr := nil;
1244
1245 if Assigned(WriteReady) then
1246 begin
1247 WriteFdsptr := @WriteFds;
1248 FD_ZERO(WriteFds);
1249 FD_SET(socket, WriteFds);
1250 end
1251 else
1252 WriteFdsptr := nil;
1253
1254 if Assigned(ExceptFlag) then
1255 begin
1256 ExceptFdsptr := @ExceptFds;
1257 FD_ZERO(ExceptFds);
1258 FD_SET(socket, ExceptFds);
1259 end
1260 else
1261 ExceptFdsptr := nil;
1262
1263 if TimeOut >= 0 then
1264 begin
1265 tv.tv_sec := TimeOut div 1000;
1266 tv.tv_usec := 1000 * (TimeOut mod 1000);
1267 Timeptr := @tv;
1268 end
1269 else
1270 Timeptr := nil; // wait forever
1271
1272 wsaError := 0;
1273 try
1274{$IFDEF MSWINDOWS}
1275 result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1276{$ENDIF}
1277{$IFDEF LINUX}
1278 result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1279{$ENDIF}
1280 if result = SOCKET_ERROR
1281 then wsaError := WSAGetLastError;
1282
1283 except
1284 result := SOCKET_ERROR;
1285 end;
1286
1287 if Assigned(ReadReady) then
1288 ReadReady^ := FD_ISSET(socket, ReadFds);
1289 if Assigned(WriteReady) then
1290 WriteReady^ := FD_ISSET(socket, WriteFds);
1291 if Assigned(ExceptFlag) then
1292 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1293end;
1294
1295function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1296 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001297 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001298var bCanRead, bError : Boolean;
1299 retval : Integer;
1300begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001301 bytesReady := 0;
1302
Jens Geyerd5436f52014-10-03 19:50:38 +02001303 // The select function returns the total number of socket handles that are ready
1304 // and contained in the fd_set structures, zero if the time limit expired,
1305 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1306 // WSAGetLastError can be used to retrieve a specific error code.
1307 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1308 if retval = SOCKET_ERROR
1309 then Exit( TWaitForData.wfd_Error);
1310 if (retval = 0) or not bCanRead
1311 then Exit( TWaitForData.wfd_Timeout);
1312
1313 // recv() returns the number of bytes received, or -1 if an error occurred.
1314 // The return value will be 0 when the peer has performed an orderly shutdown.
1315 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);
1316 if retval <= 0
1317 then Exit( TWaitForData.wfd_Error);
1318
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001319 // at least we have some data
1320 bytesReady := Min( retval, DesiredBytes);
1321 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001322end;
1323
1324function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
1325var wfd : TWaitForData;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001326 wsaError, nBytes : Integer;
1327 pDest : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001328const
1329 SLEEP_TIME = 200;
1330begin
1331 inherited;
1332
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001333 result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001334 pDest := Pointer(@buffer[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001335 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001336
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001337 while TRUE do begin
1338 if FTimeout > 0
1339 then wfd := WaitForData( FTimeout, pDest, count, wsaError, nBytes)
1340 else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError, nBytes);
Jens Geyerd5436f52014-10-03 19:50:38 +02001341
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001342 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001343 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001344 TWaitForData.wfd_HaveData : Break;
1345 TWaitForData.wfd_Timeout : begin
1346 if (FTimeout > 0)
1347 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
1348 SysErrorMessage(Cardinal(wsaError)));
1349 end;
1350 else
1351 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001352 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001353 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001354
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001355 ASSERT( nBytes <= count);
1356 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1357 Inc( pDest, nBytes);
1358 Dec( count, nBytes);
1359 Inc( result, nBytes);
1360 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001361end;
1362
1363function TTcpSocketStreamImpl.ToArray: TBytes;
1364var
1365 len : Integer;
1366begin
1367 len := 0;
1368 if IsOpen then
1369 begin
1370 len := FTcpClient.BytesReceived;
1371 end;
1372
1373 SetLength( Result, len );
1374
1375 if len > 0 then
1376 begin
1377 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1378 end;
1379end;
1380
1381procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1382var bCanWrite, bError : Boolean;
1383 retval, wsaError : Integer;
1384begin
1385 inherited;
1386
1387 if not FTcpClient.Active
1388 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
1389
1390 // The select function returns the total number of socket handles that are ready
1391 // and contained in the fd_set structures, zero if the time limit expired,
1392 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1393 // WSAGetLastError can be used to retrieve a specific error code.
1394 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1395 if retval = SOCKET_ERROR
1396 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1397 SysErrorMessage(Cardinal(wsaError)));
1398 if (retval = 0)
1399 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
1400 if bError or not bCanWrite
1401 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
1402
1403 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1404end;
Jens Geyer23d67462015-12-19 11:44:57 +01001405{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001406
1407{$IF CompilerVersion < 21.0}
1408initialization
1409begin
1410 TFramedTransportImpl_Initialize;
1411end;
1412{$IFEND}
1413
1414
1415end.