blob: a460819a4017221db4ebe9ba01d83e9307237be9 [file] [log] [blame]
Jake Farrell7ae13e12011-10-18 14:35:26 +00001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
22unit Thrift.Transport;
23
24interface
25
26uses
27 Classes,
28 SysUtils,
29 Sockets,
30 Generics.Collections,
31 Thrift.Collections,
32 Thrift.Utils,
33 Thrift.Stream,
34 ActiveX,
35 msxml;
36
37type
38 ITransport = interface
39 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
40 function GetIsOpen: Boolean;
41 property IsOpen: Boolean read GetIsOpen;
42 function Peek: Boolean;
43 procedure Open;
44 procedure Close;
45 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
46 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
47 procedure Write( const buf: TBytes); overload;
48 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
49 procedure Flush;
50 end;
51
52 TTransportImpl = class( TInterfacedObject, ITransport)
53 protected
54 function GetIsOpen: Boolean; virtual; abstract;
55 property IsOpen: Boolean read GetIsOpen;
56 function Peek: Boolean;
57 procedure Open(); virtual; abstract;
58 procedure Close(); virtual; abstract;
59 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
60 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
61 procedure Write( const buf: TBytes); overload; virtual;
62 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
63 procedure Flush; virtual;
64 end;
65
66 TTransportException = class( Exception )
67 public
68 type
69 TExceptionType = (
70 Unknown,
71 NotOpen,
72 AlreadyOpen,
73 TimedOut,
74 EndOfFile
75 );
76 private
77 FType : TExceptionType;
78 public
79 constructor Create( AType: TExceptionType); overload;
80 constructor Create( const msg: string); overload;
81 constructor Create( AType: TExceptionType; const msg: string); overload;
82 property Type_: TExceptionType read FType;
83 end;
84
85 IHTTPClient = interface( ITransport )
86 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
87 procedure SetConnectionTimeout(const Value: Integer);
88 function GetConnectionTimeout: Integer;
89 procedure SetReadTimeout(const Value: Integer);
90 function GetReadTimeout: Integer;
91 function GetCustomHeaders: IThriftDictionary<string,string>;
92 procedure SendRequest;
93 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
94 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
95 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
96 end;
97
98 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
99 private
100 FUri : string;
101 FInputStream : IThriftStream;
102 FOutputStream : IThriftStream;
103 FConnectionTimeout : Integer;
104 FReadTimeout : Integer;
105 FCustomHeaders : IThriftDictionary<string,string>;
106
107 function CreateRequest: IXMLHTTPRequest;
108 protected
109 function GetIsOpen: Boolean; override;
110 procedure Open(); override;
111 procedure Close(); override;
112 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
113 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
114 procedure Flush; override;
115
116 procedure SetConnectionTimeout(const Value: Integer);
117 function GetConnectionTimeout: Integer;
118 procedure SetReadTimeout(const Value: Integer);
119 function GetReadTimeout: Integer;
120 function GetCustomHeaders: IThriftDictionary<string,string>;
121 procedure SendRequest;
122 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
123 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
124 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
125 public
126 constructor Create( const AUri: string);
127 destructor Destroy; override;
128 end;
129
130 IServerTransport = interface
131 ['{BF6B7043-DA22-47BF-8B11-2B88EC55FE12}']
132 procedure Listen;
133 procedure Close;
134 function Accept: ITransport;
135 end;
136
137 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
138 protected
139 function AcceptImpl: ITransport; virtual; abstract;
140 public
141 procedure Listen; virtual; abstract;
142 procedure Close; virtual; abstract;
143 function Accept: ITransport;
144 end;
145
146 ITransportFactory = interface
147 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
148 function GetTransport( ATrans: ITransport): ITransport;
149 end;
150
151 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
152 function GetTransport( ATrans: ITransport): ITransport; virtual;
153 end;
154
155 TTcpSocketStreamImpl = class( TThriftStreamImpl )
156 private
157 FTcpClient : TCustomIpClient;
158 protected
159 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
160 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
161 procedure Open; override;
162 procedure Close; override;
163 procedure Flush; override;
164
165 function IsOpen: Boolean; override;
166 function ToArray: TBytes; override;
167 public
168 constructor Create( ATcpClient: TCustomIpClient);
169 end;
170
171 IStreamTransport = interface( ITransport )
172 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
173 function GetInputStream: IThriftStream;
174 function GetOutputStream: IThriftStream;
175 property InputStream : IThriftStream read GetInputStream;
176 property OutputStream : IThriftStream read GetOutputStream;
177 end;
178
179 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
180 protected
181 FInputStream : IThriftStream;
182 FOutputStream : IThriftStream;
183 protected
184 function GetIsOpen: Boolean; override;
185
186 function GetInputStream: IThriftStream;
187 function GetOutputStream: IThriftStream;
188 public
189 property InputStream : IThriftStream read GetInputStream;
190 property OutputStream : IThriftStream read GetOutputStream;
191
192 procedure Open; override;
193 procedure Close; override;
194 procedure Flush; override;
195 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
196 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
197 constructor Create( AInputStream : IThriftStream; AOutputStream : IThriftStream);
198 destructor Destroy; override;
199 end;
200
201 TBufferedStreamImpl = class( TThriftStreamImpl)
202 private
203 FStream : IThriftStream;
204 FBufSize : Integer;
205 FBuffer : TMemoryStream;
206 protected
207 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
208 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
209 procedure Open; override;
210 procedure Close; override;
211 procedure Flush; override;
212 function IsOpen: Boolean; override;
213 function ToArray: TBytes; override;
214 public
215 constructor Create( AStream: IThriftStream; ABufSize: Integer);
216 destructor Destroy; override;
217 end;
218
219 TServerSocketImpl = class( TServerTransportImpl)
220 private
221 FServer : TTcpServer;
222 FPort : Integer;
223 FClientTimeout : Integer;
224 FUseBufferedSocket : Boolean;
225 FOwnsServer : Boolean;
226 protected
227 function AcceptImpl: ITransport; override;
228 public
229 constructor Create( AServer: TTcpServer ); overload;
230 constructor Create( AServer: TTcpServer; AClientTimeout: Integer); overload;
231 constructor Create( APort: Integer); overload;
232 constructor Create( APort: Integer; AClientTimeout: Integer); overload;
233 constructor Create( APort: Integer; AClientTimeout: Integer;
234 AUseBufferedSockets: Boolean); overload;
235 destructor Destroy; override;
236 procedure Listen; override;
237 procedure Close; override;
238 end;
239
240 TBufferedTransportImpl = class( TTransportImpl )
241 private
242 FInputBuffer : IThriftStream;
243 FOutputBuffer : IThriftStream;
244 FTransport : IStreamTransport;
245 FBufSize : Integer;
246
247 procedure InitBuffers;
248 function GetUnderlyingTransport: ITransport;
249 protected
250 function GetIsOpen: Boolean; override;
251 procedure Flush; override;
252 public
253 procedure Open(); override;
254 procedure Close(); override;
255 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
256 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
257 constructor Create( ATransport : IStreamTransport ); overload;
258 constructor Create( ATransport : IStreamTransport; ABufSize: Integer); overload;
259 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
260 property IsOpen: Boolean read GetIsOpen;
261 end;
262
263 TSocketImpl = class(TStreamTransportImpl)
264 private
265 FClient : TCustomIpClient;
266 FOwnsClient : Boolean;
267 FHost : string;
268 FPort : Integer;
269 FTimeout : Integer;
270
271 procedure InitSocket;
272 protected
273 function GetIsOpen: Boolean; override;
274 public
275 procedure Open; override;
276 constructor Create( AClient : TCustomIpClient); overload;
277 constructor Create( const AHost: string; APort: Integer); overload;
278 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer); overload;
279 destructor Destroy; override;
280 procedure Close; override;
281 property TcpClient: TCustomIpClient read FClient;
282 property Host : string read FHost;
283 property Port: Integer read FPort;
284 end;
285
286 TFramedTransportImpl = class( TTransportImpl)
287 private const
288 FHeaderSize : Integer = 4;
289 private class var
290 FHeader_Dummy : array of Byte;
291 protected
292 FTransport : ITransport;
293 FWriteBuffer : TMemoryStream;
294 FReadBuffer : TMemoryStream;
295
296 procedure InitWriteBuffer;
297 procedure ReadFrame;
298 public
299 type
300 TFactory = class( TTransportFactoryImpl )
301 public
302 function GetTransport( ATrans: ITransport): ITransport; override;
303 end;
304
305{$IF CompilerVersion >= 21.0}
306 class constructor Create;
307{$IFEND}
308 constructor Create; overload;
309 constructor Create( ATrans: ITransport); overload;
310 destructor Destroy; override;
311
312 procedure Open(); override;
313 function GetIsOpen: Boolean; override;
314
315 procedure Close(); override;
316 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
317 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
318 procedure Flush; override;
319 end;
320
321{$IF CompilerVersion < 21.0}
322procedure TFramedTransportImpl_Initialize;
323{$IFEND}
324
325implementation
326
327{ TTransportImpl }
328
329procedure TTransportImpl.Flush;
330begin
331
332end;
333
334function TTransportImpl.Peek: Boolean;
335begin
336 Result := IsOpen;
337end;
338
339function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
340var
341 got : Integer;
342 ret : Integer;
343begin
344 got := 0;
345 while ( got < len) do
346 begin
347 ret := Read( buf, off + got, len - got);
348 if ( ret <= 0 ) then
349 begin
350 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
351 end;
352 got := got + ret;
353 end;
354 Result := got;
355end;
356
357procedure TTransportImpl.Write( const buf: TBytes);
358begin
359 Self.Write( buf, 0, Length(buf) );
360end;
361
362{ THTTPClientImpl }
363
364procedure THTTPClientImpl.Close;
365begin
366 FInputStream := nil;
367 FOutputStream := nil;
368end;
369
370constructor THTTPClientImpl.Create(const AUri: string);
371begin
372 inherited Create;
373 FUri := AUri;
374 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
375 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
376end;
377
378function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
379var
380 pair : TPair<string,string>;
381begin
382{$IF CompilerVersion >= 21.0}
383 Result := CoXMLHTTP.Create;
384{$ELSE}
385 Result := CoXMLHTTPRequest.Create;
386{$IFEND}
387
388 Result.open('POST', FUri, False, '', '');
389 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
390 Result.setRequestHeader( 'Accept', 'application/x-thrift');
391 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
392
393 for pair in FCustomHeaders do
394 begin
395 Result.setRequestHeader( pair.Key, pair.Value );
396 end;
397end;
398
399destructor THTTPClientImpl.Destroy;
400begin
401 Close;
402 inherited;
403end;
404
405procedure THTTPClientImpl.Flush;
406begin
407 try
408 SendRequest;
409 finally
410 FOutputStream := nil;
411 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
412 end;
413end;
414
415function THTTPClientImpl.GetConnectionTimeout: Integer;
416begin
417 Result := FConnectionTimeout;
418end;
419
420function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
421begin
422 Result := FCustomHeaders;
423end;
424
425function THTTPClientImpl.GetIsOpen: Boolean;
426begin
427 Result := True;
428end;
429
430function THTTPClientImpl.GetReadTimeout: Integer;
431begin
432 Result := FReadTimeout;
433end;
434
435procedure THTTPClientImpl.Open;
436begin
437
438end;
439
440function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
441begin
442 if FInputStream = nil then
443 begin
444 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
445 'No request has been sent');
446 end;
447 try
448 Result := FInputStream.Read( buf, off, len )
449 except
450 on E: Exception do
451 begin
452 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
453 E.Message);
454 end;
455 end;
456end;
457
458procedure THTTPClientImpl.SendRequest;
459var
460 xmlhttp : IXMLHTTPRequest;
461 ms : TMemoryStream;
462 a : TBytes;
463 len : Integer;
464begin
465 xmlhttp := CreateRequest;
466
467 ms := TMemoryStream.Create;
468 try
469 a := FOutputStream.ToArray;
470 len := Length(a);
471 if len > 0 then
472 begin
473 ms.WriteBuffer( Pointer(@a[0])^, len);
474 end;
475 ms.Position := 0;
476 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
477 FInputStream := nil;
478 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
479 finally
480 ms.Free;
481 end;
482end;
483
484procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
485begin
486 FConnectionTimeout := Value;
487end;
488
489procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
490begin
491 FReadTimeout := Value
492end;
493
494procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
495begin
496 FOutputStream.Write( buf, off, len);
497end;
498
499{ TTransportException }
500
501constructor TTransportException.Create(AType: TExceptionType);
502begin
503 Create( AType, '' )
504end;
505
506constructor TTransportException.Create(AType: TExceptionType;
507 const msg: string);
508begin
509 inherited Create(msg);
510 FType := AType;
511end;
512
513constructor TTransportException.Create(const msg: string);
514begin
515 inherited Create(msg);
516end;
517
518{ TServerTransportImpl }
519
520function TServerTransportImpl.Accept: ITransport;
521begin
522 Result := AcceptImpl;
523 if Result = nil then
524 begin
525 raise TTransportException.Create( 'accept() may not return NULL' );
526 end;
527end;
528
529{ TTransportFactoryImpl }
530
531function TTransportFactoryImpl.GetTransport(ATrans: ITransport): ITransport;
532begin
533 Result := ATrans;
534end;
535
536{ TServerSocket }
537
538constructor TServerSocketImpl.Create(AServer: TTcpServer; AClientTimeout: Integer);
539begin
540 FServer := AServer;
541 FClientTimeout := AClientTimeout;
542end;
543
544constructor TServerSocketImpl.Create(AServer: TTcpServer);
545begin
546 Create( AServer, 0 );
547end;
548
549constructor TServerSocketImpl.Create(APort: Integer);
550begin
551 Create( APort, 0 );
552end;
553
554function TServerSocketImpl.AcceptImpl: ITransport;
555var
556 ret : TCustomIpClient;
557 ret2 : IStreamTransport;
558 ret3 : ITransport;
559begin
560 if FServer = nil then
561 begin
562 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
563 'No underlying server socket.');
564 end;
565
566 try
567 ret := TCustomIpClient.Create(nil);
568 if ( not FServer.Accept( ret )) then
569 begin
570 ret.Free;
571 Result := nil;
572 Exit;
573 end;
574
575 if ret = nil then
576 begin
577 Result := nil;
578 Exit;
579 end;
580
581 ret2 := TSocketImpl.Create( ret );
582 if FUseBufferedSocket then
583 begin
584 ret3 := TBufferedTransportImpl.Create(ret2);
585 Result := ret3;
586 end else
587 begin
588 Result := ret2;
589 end;
590
591 except
592 on E: Exception do
593 begin
594 raise TTransportException.Create( E.ToString );
595 end;
596 end;
597end;
598
599procedure TServerSocketImpl.Close;
600begin
601 if FServer <> nil then
602 begin
603 try
604 FServer.Active := False;
605 except
606 on E: Exception do
607 begin
608 raise TTransportException.Create('Error on closing socket : ' + E.Message);
609 end;
610 end;
611 end;
612end;
613
614constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer;
615 AUseBufferedSockets: Boolean);
616begin
617 FPort := APort;
618 FClientTimeout := AClientTimeout;
619 FUseBufferedSocket := AUseBufferedSockets;
620 FOwnsServer := True;
621 FServer := TTcpServer.Create( nil );
622 FServer.BlockMode := bmBlocking;
623{$IF CompilerVersion >= 21.0}
624 FServer.LocalPort := AnsiString( IntToStr( FPort));
625{$ELSE}
626 FServer.LocalPort := IntToStr( FPort);
627{$IFEND}
628end;
629
630destructor TServerSocketImpl.Destroy;
631begin
632 if FOwnsServer then
633 begin
634 FServer.Free;
635 end;
636 inherited;
637end;
638
639procedure TServerSocketImpl.Listen;
640begin
641 if FServer <> nil then
642 begin
643 try
644 FServer.Active := True;
645 except
646 on E: Exception do
647 begin
648 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
649 end;
650 end;
651 end;
652end;
653
654constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer);
655begin
656 Create( APort, AClientTimeout, False );
657end;
658
659{ TSocket }
660
661constructor TSocketImpl.Create(AClient : TCustomIpClient);
662var
663 stream : IThriftStream;
664begin
665 FClient := AClient;
666 stream := TTcpSocketStreamImpl.Create( FClient);
667 FInputStream := stream;
668 FOutputStream := stream;
669end;
670
671constructor TSocketImpl.Create(const AHost: string; APort: Integer);
672begin
673 Create( AHost, APort, 0);
674end;
675
676procedure TSocketImpl.Close;
677begin
678 inherited Close;
679 if FClient <> nil then
680 begin
681 FClient.Free;
682 FClient := nil;
683 end;
684end;
685
686constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
687begin
688 FHost := AHost;
689 FPort := APort;
690 FTimeout := ATimeout;
691 InitSocket;
692end;
693
694destructor TSocketImpl.Destroy;
695begin
696 if FOwnsClient then
697 begin
698 FClient.Free;
699 end;
700 inherited;
701end;
702
703function TSocketImpl.GetIsOpen: Boolean;
704begin
705 Result := False;
706 if FClient <> nil then
707 begin
708 Result := FClient.Connected;
709 end;
710end;
711
712procedure TSocketImpl.InitSocket;
713var
714 stream : IThriftStream;
715begin
716 if FClient <> nil then
717 begin
718 if FOwnsClient then
719 begin
720 FClient.Free;
721 FClient := nil;
722 end;
723 end;
724 FClient := TTcpClient.Create( nil );
725 FOwnsClient := True;
726
727 stream := TTcpSocketStreamImpl.Create( FClient);
728 FInputStream := stream;
729 FOutputStream := stream;
730
731end;
732
733procedure TSocketImpl.Open;
734begin
735 if IsOpen then
736 begin
737 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
738 'Socket already connected');
739 end;
740
741 if FHost = '' then
742 begin
743 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
744 'Cannot open null host');
745 end;
746
747 if Port <= 0 then
748 begin
749 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
750 'Cannot open without port');
751 end;
752
753 if FClient = nil then
754 begin
755 InitSocket;
756 end;
757
758 FClient.RemoteHost := TSocketHost( Host);
759 FClient.RemotePort := TSocketPort( IntToStr( Port));
760 FClient.Connect;
761
762 FInputStream := TTcpSocketStreamImpl.Create( FClient);
763 FOutputStream := FInputStream;
764end;
765
766{ TBufferedStream }
767
768procedure TBufferedStreamImpl.Close;
769begin
770 Flush;
771 FStream := nil;
772 FBuffer.Free;
773 FBuffer := nil;
774end;
775
776constructor TBufferedStreamImpl.Create(AStream: IThriftStream; ABufSize: Integer);
777begin
778 FStream := AStream;
779 FBufSize := ABufSize;
780 FBuffer := TMemoryStream.Create;
781end;
782
783destructor TBufferedStreamImpl.Destroy;
784begin
785 Close;
786 inherited;
787end;
788
789procedure TBufferedStreamImpl.Flush;
790var
791 buf : TBytes;
792 len : Integer;
793begin
794 if IsOpen then
795 begin
796 len := FBuffer.Size;
797 if len > 0 then
798 begin
799 SetLength( buf, len );
800 FBuffer.Position := 0;
801 FBuffer.Read( Pointer(@buf[0])^, len );
802 FStream.Write( buf, 0, len );
803 end;
804 FBuffer.Clear;
805 end;
806end;
807
808function TBufferedStreamImpl.IsOpen: Boolean;
809begin
810 Result := (FBuffer <> nil) and ( FStream <> nil);
811end;
812
813procedure TBufferedStreamImpl.Open;
814begin
815
816end;
817
818function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
819var
820 nRead : Integer;
821 tempbuf : TBytes;
822begin
823 inherited;
824 Result := 0;
825 if count > 0 then
826 begin
827 if IsOpen then
828 begin
829 if FBuffer.Position >= FBuffer.Size then
830 begin
831 FBuffer.Clear;
832 SetLength( tempbuf, FBufSize);
833 nRead := FStream.Read( tempbuf, 0, FBufSize );
834 if nRead > 0 then
835 begin
836 FBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
837 FBuffer.Position := 0;
838 end;
839 end;
840
841 if FBuffer.Position < FBuffer.Size then
842 begin
843 Result := FBuffer.Read( Pointer(@buffer[offset])^, count );
844 end;
845 end;
846 end;
847end;
848
849function TBufferedStreamImpl.ToArray: TBytes;
850var
851 len : Integer;
852begin
853 len := 0;
854
855 if IsOpen then
856 begin
857 len := FBuffer.Size;
858 end;
859
860 SetLength( Result, len);
861
862 if len > 0 then
863 begin
864 FBuffer.Position := 0;
865 FBuffer.Read( Pointer(@Result[0])^, len );
866 end;
867end;
868
869procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
870begin
871 inherited;
872 if count > 0 then
873 begin
874 if IsOpen then
875 begin
876 FBuffer.Write( Pointer(@buffer[offset])^, count );
877 if FBuffer.Size > FBufSize then
878 begin
879 Flush;
880 end;
881 end;
882 end;
883end;
884
885{ TStreamTransportImpl }
886
887procedure TStreamTransportImpl.Close;
888begin
889 if FInputStream <> FOutputStream then
890 begin
891 if FInputStream <> nil then
892 begin
893 FInputStream := nil;
894 end;
895 if FOutputStream <> nil then
896 begin
897 FOutputStream := nil;
898 end;
899 end else
900 begin
901 FInputStream := nil;
902 FOutputStream := nil;
903 end;
904end;
905
906constructor TStreamTransportImpl.Create( AInputStream : IThriftStream; AOutputStream : IThriftStream);
907begin
908 FInputStream := AInputStream;
909 FOutputStream := AOutputStream;
910end;
911
912destructor TStreamTransportImpl.Destroy;
913begin
914 FInputStream := nil;
915 FOutputStream := nil;
916 inherited;
917end;
918
919procedure TStreamTransportImpl.Flush;
920begin
921 if FOutputStream = nil then
922 begin
923 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
924 end;
925
926 FOutputStream.Flush;
927end;
928
929function TStreamTransportImpl.GetInputStream: IThriftStream;
930begin
931 Result := FInputStream;
932end;
933
934function TStreamTransportImpl.GetIsOpen: Boolean;
935begin
936 Result := True;
937end;
938
939function TStreamTransportImpl.GetOutputStream: IThriftStream;
940begin
941 Result := FInputStream;
942end;
943
944procedure TStreamTransportImpl.Open;
945begin
946
947end;
948
949function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
950begin
951 if FInputStream = nil then
952 begin
953 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
954 end;
955 Result := FInputStream.Read( buf, off, len );
956end;
957
958procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
959begin
960 if FOutputStream = nil then
961 begin
Jake Farrelld09362c2011-10-26 02:25:07 +0000962 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000963 end;
964
965 FOutputStream.Write( buf, off, len );
966end;
967
968{ TBufferedTransportImpl }
969
970constructor TBufferedTransportImpl.Create(ATransport: IStreamTransport);
971begin
972 Create( ATransport, 1024 );
973end;
974
975procedure TBufferedTransportImpl.Close;
976begin
977 FTransport.Close;
978end;
979
980constructor TBufferedTransportImpl.Create(ATransport: IStreamTransport;
981 ABufSize: Integer);
982begin
983 FTransport := ATransport;
984 FBufSize := ABufSize;
985 InitBuffers;
986end;
987
988procedure TBufferedTransportImpl.Flush;
989begin
990 if FOutputBuffer <> nil then
991 begin
992 FOutputBuffer.Flush;
993 end;
994end;
995
996function TBufferedTransportImpl.GetIsOpen: Boolean;
997begin
998 Result := FTransport.IsOpen;
999end;
1000
1001function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1002begin
1003 Result := FTransport;
1004end;
1005
1006procedure TBufferedTransportImpl.InitBuffers;
1007begin
1008 if FTransport.InputStream <> nil then
1009 begin
1010 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1011 end;
1012 if FTransport.OutputStream <> nil then
1013 begin
1014 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1015 end;
1016end;
1017
1018procedure TBufferedTransportImpl.Open;
1019begin
1020 FTransport.Open
1021end;
1022
1023function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1024begin
1025 Result := 0;
1026 if FInputBuffer <> nil then
1027 begin
1028 Result := FInputBuffer.Read( buf, off, len );
1029 end;
1030end;
1031
1032procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1033begin
1034 if FOutputBuffer <> nil then
1035 begin
1036 FOutputBuffer.Write( buf, off, len );
1037 end;
1038end;
1039
1040{ TFramedTransportImpl }
1041
1042{$IF CompilerVersion < 21.0}
1043procedure TFramedTransportImpl_Initialize;
1044begin
1045 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1046 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1047 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1048end;
1049{$ELSE}
1050class constructor TFramedTransportImpl.Create;
1051begin
1052 SetLength( FHeader_Dummy, FHeaderSize);
1053 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1054end;
1055{$IFEND}
1056
1057constructor TFramedTransportImpl.Create;
1058begin
1059 InitWriteBuffer;
1060end;
1061
1062procedure TFramedTransportImpl.Close;
1063begin
1064 FTransport.Close;
1065end;
1066
1067constructor TFramedTransportImpl.Create(ATrans: ITransport);
1068begin
1069 InitWriteBuffer;
1070 FTransport := ATrans;
1071end;
1072
1073destructor TFramedTransportImpl.Destroy;
1074begin
1075 FWriteBuffer.Free;
1076 FReadBuffer.Free;
1077 inherited;
1078end;
1079
1080procedure TFramedTransportImpl.Flush;
1081var
1082 buf : TBytes;
1083 len : Integer;
1084 data_len : Integer;
1085
1086begin
1087 len := FWriteBuffer.Size;
1088 SetLength( buf, len);
1089 if len > 0 then
1090 begin
1091 System.Move( FWriteBuffer.Memory^, buf[0], len );
1092 end;
1093
1094 data_len := len - FHeaderSize;
1095 if (data_len < 0) then
1096 begin
1097 raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
1098 end;
1099
1100 InitWriteBuffer;
1101
1102 buf[0] := Byte($FF and (data_len shr 24));
1103 buf[1] := Byte($FF and (data_len shr 16));
1104 buf[2] := Byte($FF and (data_len shr 8));
1105 buf[3] := Byte($FF and data_len);
1106
1107 FTransport.Write( buf, 0, len );
1108 FTransport.Flush;
1109end;
1110
1111function TFramedTransportImpl.GetIsOpen: Boolean;
1112begin
1113 Result := FTransport.IsOpen;
1114end;
1115
1116type
1117 TAccessMemoryStream = class(TMemoryStream)
1118 end;
1119
1120procedure TFramedTransportImpl.InitWriteBuffer;
1121begin
1122 FWriteBuffer.Free;
1123 FWriteBuffer := TMemoryStream.Create;
1124 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1125 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1126end;
1127
1128procedure TFramedTransportImpl.Open;
1129begin
1130 FTransport.Open;
1131end;
1132
1133function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1134var
1135 got : Integer;
1136begin
1137 if FReadBuffer <> nil then
1138 begin
1139 got := FReadBuffer.Read( Pointer(@buf[0])^, len );
1140 if got > 0 then
1141 begin
1142 Result := got;
1143 Exit;
1144 end;
1145 end;
1146
1147 ReadFrame;
1148 Result := FReadBuffer.Read( Pointer(@buf[0])^, len );
1149end;
1150
1151procedure TFramedTransportImpl.ReadFrame;
1152var
1153 i32rd : TBytes;
1154 size : Integer;
1155 buff : TBytes;
1156begin
1157 SetLength( i32rd, FHeaderSize );
1158 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1159 size :=
1160 ((i32rd[0] and $FF) shl 24) or
1161 ((i32rd[1] and $FF) shl 16) or
1162 ((i32rd[2] and $FF) shl 8) or
1163 (i32rd[3] and $FF);
1164 SetLength( buff, size );
1165 FTransport.ReadAll( buff, 0, size );
1166 FReadBuffer.Free;
1167 FReadBuffer := TMemoryStream.Create;
1168 FReadBuffer.Write( Pointer(@buff[0])^, size );
1169 FReadBuffer.Position := 0;
1170end;
1171
1172procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1173begin
1174 FWriteBuffer.Write( Pointer(@buf[0])^, len );
1175end;
1176
1177{ TFramedTransport.TFactory }
1178
1179function TFramedTransportImpl.TFactory.GetTransport(ATrans: ITransport): ITransport;
1180begin
1181 Result := TFramedTransportImpl.Create( ATrans );
1182end;
1183
1184{ TTcpSocketStreamImpl }
1185
1186procedure TTcpSocketStreamImpl.Close;
1187begin
1188 FTcpClient.Close;
1189end;
1190
1191constructor TTcpSocketStreamImpl.Create(ATcpClient: TCustomIpClient);
1192begin
1193 FTcpClient := ATcpClient;
1194end;
1195
1196procedure TTcpSocketStreamImpl.Flush;
1197begin
1198
1199end;
1200
1201function TTcpSocketStreamImpl.IsOpen: Boolean;
1202begin
1203 Result := FTcpClient.Active;
1204end;
1205
1206procedure TTcpSocketStreamImpl.Open;
1207begin
1208 FTcpClient.Open;
1209end;
1210
1211function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset,
1212 count: Integer): Integer;
1213begin
1214 inherited;
1215 Result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
1216end;
1217
1218function TTcpSocketStreamImpl.ToArray: TBytes;
1219var
1220 len : Integer;
1221begin
1222 len := 0;
1223 if IsOpen then
1224 begin
1225 len := FTcpClient.BytesReceived;
1226 end;
1227
1228 SetLength( Result, len );
1229
1230 if len > 0 then
1231 begin
1232 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1233 end;
1234end;
1235
1236procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1237begin
1238 inherited;
1239 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1240end;
1241
1242{$IF CompilerVersion < 21.0}
1243initialization
1244begin
1245 TFramedTransportImpl_Initialize;
1246end;
1247{$IFEND}
1248
1249
1250end.