blob: c485e70bb0b3f657ceb92d65b803c020cf71257f [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
22unit Thrift.Transport;
23
24interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
30 Sockets, WinSock,
31 Generics.Collections,
32 Thrift.Collections,
33 Thrift.Utils,
34 Thrift.Stream,
35 ActiveX,
36 msxml;
37
38type
39 ITransport = interface
40 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
41 function GetIsOpen: Boolean;
42 property IsOpen: Boolean read GetIsOpen;
43 function Peek: Boolean;
44 procedure Open;
45 procedure Close;
46 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
47 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
48 procedure Write( const buf: TBytes); overload;
49 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
50 procedure Flush;
51 end;
52
53 TTransportImpl = class( TInterfacedObject, ITransport)
54 protected
55 function GetIsOpen: Boolean; virtual; abstract;
56 property IsOpen: Boolean read GetIsOpen;
57 function Peek: Boolean; virtual;
58 procedure Open(); virtual; abstract;
59 procedure Close(); virtual; abstract;
60 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
61 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
62 procedure Write( const buf: TBytes); overload; virtual;
63 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
64 procedure Flush; virtual;
65 end;
66
67 TTransportException = class( Exception )
68 public
69 type
70 TExceptionType = (
71 Unknown,
72 NotOpen,
73 AlreadyOpen,
74 TimedOut,
75 EndOfFile
76 );
77 private
78 FType : TExceptionType;
79 public
80 constructor Create( AType: TExceptionType); overload;
81 constructor Create( const msg: string); overload;
82 constructor Create( AType: TExceptionType; const msg: string); overload;
83 property Type_: TExceptionType read FType;
84 end;
85
86 IHTTPClient = interface( ITransport )
87 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
88 procedure SetConnectionTimeout(const Value: Integer);
89 function GetConnectionTimeout: Integer;
90 procedure SetReadTimeout(const Value: Integer);
91 function GetReadTimeout: Integer;
92 function GetCustomHeaders: IThriftDictionary<string,string>;
93 procedure SendRequest;
94 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
95 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
96 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
97 end;
98
99 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
100 private
101 FUri : string;
102 FInputStream : IThriftStream;
103 FOutputStream : IThriftStream;
104 FConnectionTimeout : Integer;
105 FReadTimeout : Integer;
106 FCustomHeaders : IThriftDictionary<string,string>;
107
108 function CreateRequest: IXMLHTTPRequest;
109 protected
110 function GetIsOpen: Boolean; override;
111 procedure Open(); override;
112 procedure Close(); override;
113 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
114 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
115 procedure Flush; override;
116
117 procedure SetConnectionTimeout(const Value: Integer);
118 function GetConnectionTimeout: Integer;
119 procedure SetReadTimeout(const Value: Integer);
120 function GetReadTimeout: Integer;
121 function GetCustomHeaders: IThriftDictionary<string,string>;
122 procedure SendRequest;
123 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
124 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
125 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
126 public
127 constructor Create( const AUri: string);
128 destructor Destroy; override;
129 end;
130
131 IServerTransport = interface
132 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
133 procedure Listen;
134 procedure Close;
135 function Accept( const fnAccepting: TProc): ITransport;
136 end;
137
138 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
139 protected
140 procedure Listen; virtual; abstract;
141 procedure Close; virtual; abstract;
142 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
143 end;
144
145 ITransportFactory = interface
146 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
147 function GetTransport( const ATrans: ITransport): ITransport;
148 end;
149
150 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
151 function GetTransport( const ATrans: ITransport): ITransport; virtual;
152 end;
153
154 TTcpSocketStreamImpl = class( TThriftStreamImpl )
155 private type
156 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
157 private
158 FTcpClient : TCustomIpClient;
159 FTimeout : Integer;
160 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
161 TimeOut: Integer; var wsaError : Integer): Integer;
162 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200163 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +0200164 protected
165 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
166 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
167 procedure Open; override;
168 procedure Close; override;
169 procedure Flush; override;
170
171 function IsOpen: Boolean; override;
172 function ToArray: TBytes; override;
173 public
174 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
175 end;
176
177 IStreamTransport = interface( ITransport )
178 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
179 function GetInputStream: IThriftStream;
180 function GetOutputStream: IThriftStream;
181 property InputStream : IThriftStream read GetInputStream;
182 property OutputStream : IThriftStream read GetOutputStream;
183 end;
184
185 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
186 protected
187 FInputStream : IThriftStream;
188 FOutputStream : IThriftStream;
189 protected
190 function GetIsOpen: Boolean; override;
191
192 function GetInputStream: IThriftStream;
193 function GetOutputStream: IThriftStream;
194 public
195 property InputStream : IThriftStream read GetInputStream;
196 property OutputStream : IThriftStream read GetOutputStream;
197
198 procedure Open; override;
199 procedure Close; override;
200 procedure Flush; override;
201 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
202 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
203 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
204 destructor Destroy; override;
205 end;
206
207 TBufferedStreamImpl = class( TThriftStreamImpl)
208 private
209 FStream : IThriftStream;
210 FBufSize : Integer;
211 FReadBuffer : TMemoryStream;
212 FWriteBuffer : TMemoryStream;
213 protected
214 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
215 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
216 procedure Open; override;
217 procedure Close; override;
218 procedure Flush; override;
219 function IsOpen: Boolean; override;
220 function ToArray: TBytes; override;
221 public
222 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
223 destructor Destroy; override;
224 end;
225
226 TServerSocketImpl = class( TServerTransportImpl)
227 private
228 FServer : TTcpServer;
229 FPort : Integer;
230 FClientTimeout : Integer;
231 FUseBufferedSocket : Boolean;
232 FOwnsServer : Boolean;
233 protected
234 function Accept( const fnAccepting: TProc) : ITransport; override;
235 public
236 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
237 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
238 destructor Destroy; override;
239 procedure Listen; override;
240 procedure Close; override;
241 end;
242
243 TBufferedTransportImpl = class( TTransportImpl )
244 private
245 FInputBuffer : IThriftStream;
246 FOutputBuffer : IThriftStream;
247 FTransport : IStreamTransport;
248 FBufSize : Integer;
249
250 procedure InitBuffers;
251 function GetUnderlyingTransport: ITransport;
252 protected
253 function GetIsOpen: Boolean; override;
254 procedure Flush; override;
255 public
256 procedure Open(); override;
257 procedure Close(); override;
258 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
259 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
260 constructor Create( const ATransport : IStreamTransport ); overload;
261 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
262 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
263 property IsOpen: Boolean read GetIsOpen;
264 end;
265
266 TSocketImpl = class(TStreamTransportImpl)
267 private
268 FClient : TCustomIpClient;
269 FOwnsClient : Boolean;
270 FHost : string;
271 FPort : Integer;
272 FTimeout : Integer;
273
274 procedure InitSocket;
275 protected
276 function GetIsOpen: Boolean; override;
277 public
278 procedure Open; override;
279 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
280 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
281 destructor Destroy; override;
282 procedure Close; override;
283 property TcpClient: TCustomIpClient read FClient;
284 property Host : string read FHost;
285 property Port: Integer read FPort;
286 end;
287
288 TFramedTransportImpl = class( TTransportImpl)
289 private const
290 FHeaderSize : Integer = 4;
291 private class var
292 FHeader_Dummy : array of Byte;
293 protected
294 FTransport : ITransport;
295 FWriteBuffer : TMemoryStream;
296 FReadBuffer : TMemoryStream;
297
298 procedure InitWriteBuffer;
299 procedure ReadFrame;
300 public
301 type
302 TFactory = class( TTransportFactoryImpl )
303 public
304 function GetTransport( const ATrans: ITransport): ITransport; override;
305 end;
306
307{$IF CompilerVersion >= 21.0}
308 class constructor Create;
309{$IFEND}
310 constructor Create; overload;
311 constructor Create( const ATrans: ITransport); overload;
312 destructor Destroy; override;
313
314 procedure Open(); override;
315 function GetIsOpen: Boolean; override;
316
317 procedure Close(); override;
318 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
319 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
320 procedure Flush; override;
321 end;
322
323{$IF CompilerVersion < 21.0}
324procedure TFramedTransportImpl_Initialize;
325{$IFEND}
326
327const
328 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
329
330
331implementation
332
333{ TTransportImpl }
334
335procedure TTransportImpl.Flush;
336begin
337
338end;
339
340function TTransportImpl.Peek: Boolean;
341begin
342 Result := IsOpen;
343end;
344
345function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
346var
347 got : Integer;
348 ret : Integer;
349begin
350 got := 0;
351 while ( got < len) do
352 begin
353 ret := Read( buf, off + got, len - got);
354 if ( ret <= 0 ) then
355 begin
356 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
357 end;
358 got := got + ret;
359 end;
360 Result := got;
361end;
362
363procedure TTransportImpl.Write( const buf: TBytes);
364begin
365 Self.Write( buf, 0, Length(buf) );
366end;
367
368{ THTTPClientImpl }
369
370procedure THTTPClientImpl.Close;
371begin
372 FInputStream := nil;
373 FOutputStream := nil;
374end;
375
376constructor THTTPClientImpl.Create(const AUri: string);
377begin
378 inherited Create;
379 FUri := AUri;
380 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
381 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
382end;
383
384function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
385var
386 pair : TPair<string,string>;
387begin
388{$IF CompilerVersion >= 21.0}
389 Result := CoXMLHTTP.Create;
390{$ELSE}
391 Result := CoXMLHTTPRequest.Create;
392{$IFEND}
393
394 Result.open('POST', FUri, False, '', '');
395 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
396 Result.setRequestHeader( 'Accept', 'application/x-thrift');
397 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
398
399 for pair in FCustomHeaders do
400 begin
401 Result.setRequestHeader( pair.Key, pair.Value );
402 end;
403end;
404
405destructor THTTPClientImpl.Destroy;
406begin
407 Close;
408 inherited;
409end;
410
411procedure THTTPClientImpl.Flush;
412begin
413 try
414 SendRequest;
415 finally
416 FOutputStream := nil;
417 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
418 end;
419end;
420
421function THTTPClientImpl.GetConnectionTimeout: Integer;
422begin
423 Result := FConnectionTimeout;
424end;
425
426function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
427begin
428 Result := FCustomHeaders;
429end;
430
431function THTTPClientImpl.GetIsOpen: Boolean;
432begin
433 Result := True;
434end;
435
436function THTTPClientImpl.GetReadTimeout: Integer;
437begin
438 Result := FReadTimeout;
439end;
440
441procedure THTTPClientImpl.Open;
442begin
443
444end;
445
446function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
447begin
448 if FInputStream = nil then
449 begin
450 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
451 'No request has been sent');
452 end;
453 try
454 Result := FInputStream.Read( buf, off, len )
455 except
456 on E: Exception do
457 begin
458 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
459 E.Message);
460 end;
461 end;
462end;
463
464procedure THTTPClientImpl.SendRequest;
465var
466 xmlhttp : IXMLHTTPRequest;
467 ms : TMemoryStream;
468 a : TBytes;
469 len : Integer;
470begin
471 xmlhttp := CreateRequest;
472
473 ms := TMemoryStream.Create;
474 try
475 a := FOutputStream.ToArray;
476 len := Length(a);
477 if len > 0 then
478 begin
479 ms.WriteBuffer( Pointer(@a[0])^, len);
480 end;
481 ms.Position := 0;
482 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
483 FInputStream := nil;
484 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
485 finally
486 ms.Free;
487 end;
488end;
489
490procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
491begin
492 FConnectionTimeout := Value;
493end;
494
495procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
496begin
497 FReadTimeout := Value
498end;
499
500procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
501begin
502 FOutputStream.Write( buf, off, len);
503end;
504
505{ TTransportException }
506
507constructor TTransportException.Create(AType: TExceptionType);
508begin
509 //no inherited;
510 Create( AType, '' )
511end;
512
513constructor TTransportException.Create(AType: TExceptionType;
514 const msg: string);
515begin
516 inherited Create(msg);
517 FType := AType;
518end;
519
520constructor TTransportException.Create(const msg: string);
521begin
522 inherited Create(msg);
523end;
524
525{ TTransportFactoryImpl }
526
527function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
528begin
529 Result := ATrans;
530end;
531
532{ TServerSocket }
533
534constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
535begin
536 inherited Create;
537 FServer := AServer;
538 FClientTimeout := AClientTimeout;
539end;
540
541constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
542begin
543 inherited Create;
544 FPort := APort;
545 FClientTimeout := AClientTimeout;
546 FUseBufferedSocket := AUseBufferedSockets;
547 FOwnsServer := True;
548 FServer := TTcpServer.Create( nil );
549 FServer.BlockMode := bmBlocking;
550{$IF CompilerVersion >= 21.0}
551 FServer.LocalPort := AnsiString( IntToStr( FPort));
552{$ELSE}
553 FServer.LocalPort := IntToStr( FPort);
554{$IFEND}
555end;
556
557destructor TServerSocketImpl.Destroy;
558begin
559 if FOwnsServer then begin
560 FServer.Free;
561 FServer := nil;
562 end;
563 inherited;
564end;
565
566function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
567var
568 client : TCustomIpClient;
569 trans : IStreamTransport;
570begin
571 if FServer = nil then
572 begin
573 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
574 'No underlying server socket.');
575 end;
576
577 client := nil;
578 try
579 client := TCustomIpClient.Create(nil);
580
581 if Assigned(fnAccepting)
582 then fnAccepting();
583
584 if not FServer.Accept( client) then
585 begin
586 client.Free;
587 Result := nil;
588 Exit;
589 end;
590
591 if client = nil then
592 begin
593 Result := nil;
594 Exit;
595 end;
596
597 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
598 client := nil; // trans owns it now
599
600 if FUseBufferedSocket
601 then result := TBufferedTransportImpl.Create( trans)
602 else result := trans;
603
604 except
605 on E: Exception do begin
606 client.Free;
607 raise TTransportException.Create( E.ToString );
608 end;
609 end;
610end;
611
612procedure TServerSocketImpl.Listen;
613begin
614 if FServer <> nil then
615 begin
616 try
617 FServer.Active := True;
618 except
619 on E: Exception do
620 begin
621 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
622 end;
623 end;
624 end;
625end;
626
627procedure TServerSocketImpl.Close;
628begin
629 if FServer <> nil then
630 begin
631 try
632 FServer.Active := False;
633 except
634 on E: Exception do
635 begin
636 raise TTransportException.Create('Error on closing socket : ' + E.Message);
637 end;
638 end;
639 end;
640end;
641
642{ TSocket }
643
644constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
645var stream : IThriftStream;
646begin
647 FClient := AClient;
648 FTimeout := ATimeout;
649 FOwnsClient := aOwnsClient;
650 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
651 inherited Create( stream, stream);
652end;
653
654constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
655begin
656 inherited Create(nil,nil);
657 FHost := AHost;
658 FPort := APort;
659 FTimeout := ATimeout;
660 InitSocket;
661end;
662
663destructor TSocketImpl.Destroy;
664begin
665 if FOwnsClient
666 then FreeAndNil( FClient);
667 inherited;
668end;
669
670procedure TSocketImpl.Close;
671begin
672 inherited Close;
673 if FOwnsClient
674 then FreeAndNil( FClient);
675end;
676
677function TSocketImpl.GetIsOpen: Boolean;
678begin
679 Result := (FClient <> nil) and FClient.Connected;
680end;
681
682procedure TSocketImpl.InitSocket;
683var
684 stream : IThriftStream;
685begin
686 if FOwnsClient
687 then FreeAndNil( FClient)
688 else FClient := nil;
689
690 FClient := TTcpClient.Create( nil);
691 FOwnsClient := True;
692
693 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
694 FInputStream := stream;
695 FOutputStream := stream;
696end;
697
698procedure TSocketImpl.Open;
699begin
700 if IsOpen then
701 begin
702 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
703 'Socket already connected');
704 end;
705
706 if FHost = '' then
707 begin
708 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
709 'Cannot open null host');
710 end;
711
712 if Port <= 0 then
713 begin
714 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
715 'Cannot open without port');
716 end;
717
718 if FClient = nil then
719 begin
720 InitSocket;
721 end;
722
723 FClient.RemoteHost := TSocketHost( Host);
724 FClient.RemotePort := TSocketPort( IntToStr( Port));
725 FClient.Connect;
726
727 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
728 FOutputStream := FInputStream;
729end;
730
731{ TBufferedStream }
732
733procedure TBufferedStreamImpl.Close;
734begin
735 Flush;
736 FStream := nil;
737
738 FReadBuffer.Free;
739 FReadBuffer := nil;
740
741 FWriteBuffer.Free;
742 FWriteBuffer := nil;
743end;
744
745constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
746begin
747 inherited Create;
748 FStream := AStream;
749 FBufSize := ABufSize;
750 FReadBuffer := TMemoryStream.Create;
751 FWriteBuffer := TMemoryStream.Create;
752end;
753
754destructor TBufferedStreamImpl.Destroy;
755begin
756 Close;
757 inherited;
758end;
759
760procedure TBufferedStreamImpl.Flush;
761var
762 buf : TBytes;
763 len : Integer;
764begin
765 if IsOpen then
766 begin
767 len := FWriteBuffer.Size;
768 if len > 0 then
769 begin
770 SetLength( buf, len );
771 FWriteBuffer.Position := 0;
772 FWriteBuffer.Read( Pointer(@buf[0])^, len );
773 FStream.Write( buf, 0, len );
774 end;
775 FWriteBuffer.Clear;
776 end;
777end;
778
779function TBufferedStreamImpl.IsOpen: Boolean;
780begin
781 Result := (FWriteBuffer <> nil)
782 and (FReadBuffer <> nil)
783 and (FStream <> nil);
784end;
785
786procedure TBufferedStreamImpl.Open;
787begin
788
789end;
790
791function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
792var
793 nRead : Integer;
794 tempbuf : TBytes;
795begin
796 inherited;
797 Result := 0;
798 if IsOpen then
799 begin
800 while count > 0 do begin
801
802 if FReadBuffer.Position >= FReadBuffer.Size then
803 begin
804 FReadBuffer.Clear;
805 SetLength( tempbuf, FBufSize);
806 nRead := FStream.Read( tempbuf, 0, FBufSize );
807 if nRead = 0 then Break; // avoid infinite loop
808
809 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
810 FReadBuffer.Position := 0;
811 end;
812
813 if FReadBuffer.Position < FReadBuffer.Size then
814 begin
815 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
816 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
817 Dec( count, nRead);
818 Inc( offset, nRead);
819 end;
820 end;
821 end;
822end;
823
824function TBufferedStreamImpl.ToArray: TBytes;
825var
826 len : Integer;
827begin
828 len := 0;
829
830 if IsOpen then
831 begin
832 len := FReadBuffer.Size;
833 end;
834
835 SetLength( Result, len);
836
837 if len > 0 then
838 begin
839 FReadBuffer.Position := 0;
840 FReadBuffer.Read( Pointer(@Result[0])^, len );
841 end;
842end;
843
844procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
845begin
846 inherited;
847 if count > 0 then
848 begin
849 if IsOpen then
850 begin
851 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
852 if FWriteBuffer.Size > FBufSize then
853 begin
854 Flush;
855 end;
856 end;
857 end;
858end;
859
860{ TStreamTransportImpl }
861
862procedure TStreamTransportImpl.Close;
863begin
864 if FInputStream <> FOutputStream then
865 begin
866 if FInputStream <> nil then
867 begin
868 FInputStream := nil;
869 end;
870 if FOutputStream <> nil then
871 begin
872 FOutputStream := nil;
873 end;
874 end else
875 begin
876 FInputStream := nil;
877 FOutputStream := nil;
878 end;
879end;
880
881constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
882begin
883 inherited Create;
884 FInputStream := AInputStream;
885 FOutputStream := AOutputStream;
886end;
887
888destructor TStreamTransportImpl.Destroy;
889begin
890 FInputStream := nil;
891 FOutputStream := nil;
892 inherited;
893end;
894
895procedure TStreamTransportImpl.Flush;
896begin
897 if FOutputStream = nil then
898 begin
899 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
900 end;
901
902 FOutputStream.Flush;
903end;
904
905function TStreamTransportImpl.GetInputStream: IThriftStream;
906begin
907 Result := FInputStream;
908end;
909
910function TStreamTransportImpl.GetIsOpen: Boolean;
911begin
912 Result := True;
913end;
914
915function TStreamTransportImpl.GetOutputStream: IThriftStream;
916begin
917 Result := FInputStream;
918end;
919
920procedure TStreamTransportImpl.Open;
921begin
922
923end;
924
925function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
926begin
927 if FInputStream = nil then
928 begin
929 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
930 end;
931 Result := FInputStream.Read( buf, off, len );
932end;
933
934procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
935begin
936 if FOutputStream = nil then
937 begin
938 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
939 end;
940
941 FOutputStream.Write( buf, off, len );
942end;
943
944{ TBufferedTransportImpl }
945
946constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
947begin
948 //no inherited;
949 Create( ATransport, 1024 );
950end;
951
952procedure TBufferedTransportImpl.Close;
953begin
954 FTransport.Close;
955end;
956
957constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
958 ABufSize: Integer);
959begin
960 inherited Create;
961 FTransport := ATransport;
962 FBufSize := ABufSize;
963 InitBuffers;
964end;
965
966procedure TBufferedTransportImpl.Flush;
967begin
968 if FOutputBuffer <> nil then
969 begin
970 FOutputBuffer.Flush;
971 end;
972end;
973
974function TBufferedTransportImpl.GetIsOpen: Boolean;
975begin
976 Result := FTransport.IsOpen;
977end;
978
979function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
980begin
981 Result := FTransport;
982end;
983
984procedure TBufferedTransportImpl.InitBuffers;
985begin
986 if FTransport.InputStream <> nil then
987 begin
988 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
989 end;
990 if FTransport.OutputStream <> nil then
991 begin
992 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
993 end;
994end;
995
996procedure TBufferedTransportImpl.Open;
997begin
998 FTransport.Open
999end;
1000
1001function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1002begin
1003 Result := 0;
1004 if FInputBuffer <> nil then
1005 begin
1006 Result := FInputBuffer.Read( buf, off, len );
1007 end;
1008end;
1009
1010procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1011begin
1012 if FOutputBuffer <> nil then
1013 begin
1014 FOutputBuffer.Write( buf, off, len );
1015 end;
1016end;
1017
1018{ TFramedTransportImpl }
1019
1020{$IF CompilerVersion < 21.0}
1021procedure TFramedTransportImpl_Initialize;
1022begin
1023 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1024 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1025 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1026end;
1027{$ELSE}
1028class constructor TFramedTransportImpl.Create;
1029begin
1030 SetLength( FHeader_Dummy, FHeaderSize);
1031 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1032end;
1033{$IFEND}
1034
1035constructor TFramedTransportImpl.Create;
1036begin
1037 inherited Create;
1038 InitWriteBuffer;
1039end;
1040
1041procedure TFramedTransportImpl.Close;
1042begin
1043 FTransport.Close;
1044end;
1045
1046constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1047begin
1048 inherited Create;
1049 InitWriteBuffer;
1050 FTransport := ATrans;
1051end;
1052
1053destructor TFramedTransportImpl.Destroy;
1054begin
1055 FWriteBuffer.Free;
1056 FReadBuffer.Free;
1057 inherited;
1058end;
1059
1060procedure TFramedTransportImpl.Flush;
1061var
1062 buf : TBytes;
1063 len : Integer;
1064 data_len : Integer;
1065
1066begin
1067 len := FWriteBuffer.Size;
1068 SetLength( buf, len);
1069 if len > 0 then
1070 begin
1071 System.Move( FWriteBuffer.Memory^, buf[0], len );
1072 end;
1073
1074 data_len := len - FHeaderSize;
1075 if (data_len < 0) then
1076 begin
1077 raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
1078 end;
1079
1080 InitWriteBuffer;
1081
1082 buf[0] := Byte($FF and (data_len shr 24));
1083 buf[1] := Byte($FF and (data_len shr 16));
1084 buf[2] := Byte($FF and (data_len shr 8));
1085 buf[3] := Byte($FF and data_len);
1086
1087 FTransport.Write( buf, 0, len );
1088 FTransport.Flush;
1089end;
1090
1091function TFramedTransportImpl.GetIsOpen: Boolean;
1092begin
1093 Result := FTransport.IsOpen;
1094end;
1095
1096type
1097 TAccessMemoryStream = class(TMemoryStream)
1098 end;
1099
1100procedure TFramedTransportImpl.InitWriteBuffer;
1101begin
1102 FWriteBuffer.Free;
1103 FWriteBuffer := TMemoryStream.Create;
1104 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1105 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1106end;
1107
1108procedure TFramedTransportImpl.Open;
1109begin
1110 FTransport.Open;
1111end;
1112
1113function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1114var
1115 got : Integer;
1116begin
1117 if FReadBuffer <> nil then
1118 begin
1119 if len > 0
1120 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1121 else got := 0;
1122 if got > 0 then
1123 begin
1124 Result := got;
1125 Exit;
1126 end;
1127 end;
1128
1129 ReadFrame;
1130 if len > 0
1131 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1132 else Result := 0;
1133end;
1134
1135procedure TFramedTransportImpl.ReadFrame;
1136var
1137 i32rd : TBytes;
1138 size : Integer;
1139 buff : TBytes;
1140begin
1141 SetLength( i32rd, FHeaderSize );
1142 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1143 size :=
1144 ((i32rd[0] and $FF) shl 24) or
1145 ((i32rd[1] and $FF) shl 16) or
1146 ((i32rd[2] and $FF) shl 8) or
1147 (i32rd[3] and $FF);
1148 SetLength( buff, size );
1149 FTransport.ReadAll( buff, 0, size );
1150 FReadBuffer.Free;
1151 FReadBuffer := TMemoryStream.Create;
1152 FReadBuffer.Write( Pointer(@buff[0])^, size );
1153 FReadBuffer.Position := 0;
1154end;
1155
1156procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1157begin
1158 if len > 0
1159 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
1160end;
1161
1162{ TFramedTransport.TFactory }
1163
1164function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1165begin
1166 Result := TFramedTransportImpl.Create( ATrans );
1167end;
1168
1169{ TTcpSocketStreamImpl }
1170
1171procedure TTcpSocketStreamImpl.Close;
1172begin
1173 FTcpClient.Close;
1174end;
1175
1176constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
1177begin
1178 inherited Create;
1179 FTcpClient := ATcpClient;
1180 FTimeout := aTimeout;
1181end;
1182
1183procedure TTcpSocketStreamImpl.Flush;
1184begin
1185
1186end;
1187
1188function TTcpSocketStreamImpl.IsOpen: Boolean;
1189begin
1190 Result := FTcpClient.Active;
1191end;
1192
1193procedure TTcpSocketStreamImpl.Open;
1194begin
1195 FTcpClient.Open;
1196end;
1197
1198
1199function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1200 TimeOut: Integer; var wsaError : Integer): Integer;
1201var
1202 ReadFds: TFDset;
1203 ReadFdsptr: PFDset;
1204 WriteFds: TFDset;
1205 WriteFdsptr: PFDset;
1206 ExceptFds: TFDset;
1207 ExceptFdsptr: PFDset;
1208 tv: timeval;
1209 Timeptr: PTimeval;
1210 socket : TSocket;
1211begin
1212 if not FTcpClient.Active then begin
1213 wsaError := WSAEINVAL;
1214 Exit( SOCKET_ERROR);
1215 end;
1216
1217 socket := FTcpClient.Handle;
1218
1219 if Assigned(ReadReady) then
1220 begin
1221 ReadFdsptr := @ReadFds;
1222 FD_ZERO(ReadFds);
1223 FD_SET(socket, ReadFds);
1224 end
1225 else
1226 ReadFdsptr := nil;
1227
1228 if Assigned(WriteReady) then
1229 begin
1230 WriteFdsptr := @WriteFds;
1231 FD_ZERO(WriteFds);
1232 FD_SET(socket, WriteFds);
1233 end
1234 else
1235 WriteFdsptr := nil;
1236
1237 if Assigned(ExceptFlag) then
1238 begin
1239 ExceptFdsptr := @ExceptFds;
1240 FD_ZERO(ExceptFds);
1241 FD_SET(socket, ExceptFds);
1242 end
1243 else
1244 ExceptFdsptr := nil;
1245
1246 if TimeOut >= 0 then
1247 begin
1248 tv.tv_sec := TimeOut div 1000;
1249 tv.tv_usec := 1000 * (TimeOut mod 1000);
1250 Timeptr := @tv;
1251 end
1252 else
1253 Timeptr := nil; // wait forever
1254
1255 wsaError := 0;
1256 try
1257{$IFDEF MSWINDOWS}
1258 result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1259{$ENDIF}
1260{$IFDEF LINUX}
1261 result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1262{$ENDIF}
1263 if result = SOCKET_ERROR
1264 then wsaError := WSAGetLastError;
1265
1266 except
1267 result := SOCKET_ERROR;
1268 end;
1269
1270 if Assigned(ReadReady) then
1271 ReadReady^ := FD_ISSET(socket, ReadFds);
1272 if Assigned(WriteReady) then
1273 WriteReady^ := FD_ISSET(socket, WriteFds);
1274 if Assigned(ExceptFlag) then
1275 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1276end;
1277
1278function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1279 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001280 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001281var bCanRead, bError : Boolean;
1282 retval : Integer;
1283begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001284 bytesReady := 0;
1285
Jens Geyerd5436f52014-10-03 19:50:38 +02001286 // The select function returns the total number of socket handles that are ready
1287 // and contained in the fd_set structures, zero if the time limit expired,
1288 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1289 // WSAGetLastError can be used to retrieve a specific error code.
1290 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1291 if retval = SOCKET_ERROR
1292 then Exit( TWaitForData.wfd_Error);
1293 if (retval = 0) or not bCanRead
1294 then Exit( TWaitForData.wfd_Timeout);
1295
1296 // recv() returns the number of bytes received, or -1 if an error occurred.
1297 // The return value will be 0 when the peer has performed an orderly shutdown.
1298 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);
1299 if retval <= 0
1300 then Exit( TWaitForData.wfd_Error);
1301
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001302 // at least we have some data
1303 bytesReady := Min( retval, DesiredBytes);
1304 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001305end;
1306
1307function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
1308var wfd : TWaitForData;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001309 wsaError, nBytes : Integer;
1310 pDest : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001311const
1312 SLEEP_TIME = 200;
1313begin
1314 inherited;
1315
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001316 result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001317 pDest := Pointer(@buffer[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001318 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001319
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001320 while TRUE do begin
1321 if FTimeout > 0
1322 then wfd := WaitForData( FTimeout, pDest, count, wsaError, nBytes)
1323 else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError, nBytes);
Jens Geyerd5436f52014-10-03 19:50:38 +02001324
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001325 case wfd of
1326 TWaitForData.wfd_Error : Exit(0);
1327 TWaitForData.wfd_HaveData : Break;
1328 TWaitForData.wfd_Timeout : begin
1329 if (FTimeout > 0)
1330 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
1331 SysErrorMessage(Cardinal(wsaError)));
1332 end;
1333 else
1334 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001335 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001336 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001337
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001338 ASSERT( nBytes <= count);
1339 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1340 Inc( pDest, nBytes);
1341 Dec( count, nBytes);
1342 Inc( result, nBytes);
1343 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001344end;
1345
1346function TTcpSocketStreamImpl.ToArray: TBytes;
1347var
1348 len : Integer;
1349begin
1350 len := 0;
1351 if IsOpen then
1352 begin
1353 len := FTcpClient.BytesReceived;
1354 end;
1355
1356 SetLength( Result, len );
1357
1358 if len > 0 then
1359 begin
1360 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1361 end;
1362end;
1363
1364procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1365var bCanWrite, bError : Boolean;
1366 retval, wsaError : Integer;
1367begin
1368 inherited;
1369
1370 if not FTcpClient.Active
1371 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
1372
1373 // The select function returns the total number of socket handles that are ready
1374 // and contained in the fd_set structures, zero if the time limit expired,
1375 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1376 // WSAGetLastError can be used to retrieve a specific error code.
1377 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1378 if retval = SOCKET_ERROR
1379 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1380 SysErrorMessage(Cardinal(wsaError)));
1381 if (retval = 0)
1382 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
1383 if bError or not bCanWrite
1384 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
1385
1386 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1387end;
1388
1389{$IF CompilerVersion < 21.0}
1390initialization
1391begin
1392 TFramedTransportImpl_Initialize;
1393end;
1394{$IFEND}
1395
1396
1397end.