blob: 8668e5a239ae88fb16bdb629df5aa95c76d6f07b [file] [log] [blame]
Jake Farrell7ae13e12011-10-18 14:35:26 +00001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
22unit Thrift.Transport;
23
24interface
25
26uses
27 Classes,
28 SysUtils,
Jens Geyer6a7463a2013-03-07 20:40:59 +010029 Math,
Jake Farrell7ae13e12011-10-18 14:35:26 +000030 Sockets,
31 Generics.Collections,
32 Thrift.Collections,
33 Thrift.Utils,
34 Thrift.Stream,
35 ActiveX,
36 msxml;
37
38type
39 ITransport = interface
40 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
41 function GetIsOpen: Boolean;
42 property IsOpen: Boolean read GetIsOpen;
43 function Peek: Boolean;
44 procedure Open;
45 procedure Close;
46 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
47 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
48 procedure Write( const buf: TBytes); overload;
49 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
50 procedure Flush;
51 end;
52
53 TTransportImpl = class( TInterfacedObject, ITransport)
54 protected
55 function GetIsOpen: Boolean; virtual; abstract;
56 property IsOpen: Boolean read GetIsOpen;
Roger Meier3bef8c22012-10-06 06:58:00 +000057 function Peek: Boolean; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +000058 procedure Open(); virtual; abstract;
59 procedure Close(); virtual; abstract;
60 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
61 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
62 procedure Write( const buf: TBytes); overload; virtual;
63 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
64 procedure Flush; virtual;
65 end;
66
67 TTransportException = class( Exception )
68 public
69 type
70 TExceptionType = (
71 Unknown,
72 NotOpen,
73 AlreadyOpen,
74 TimedOut,
75 EndOfFile
76 );
77 private
78 FType : TExceptionType;
79 public
80 constructor Create( AType: TExceptionType); overload;
81 constructor Create( const msg: string); overload;
82 constructor Create( AType: TExceptionType; const msg: string); overload;
83 property Type_: TExceptionType read FType;
84 end;
85
86 IHTTPClient = interface( ITransport )
87 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
88 procedure SetConnectionTimeout(const Value: Integer);
89 function GetConnectionTimeout: Integer;
90 procedure SetReadTimeout(const Value: Integer);
91 function GetReadTimeout: Integer;
92 function GetCustomHeaders: IThriftDictionary<string,string>;
93 procedure SendRequest;
94 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
95 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
96 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
97 end;
98
99 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
100 private
101 FUri : string;
102 FInputStream : IThriftStream;
103 FOutputStream : IThriftStream;
104 FConnectionTimeout : Integer;
105 FReadTimeout : Integer;
106 FCustomHeaders : IThriftDictionary<string,string>;
107
108 function CreateRequest: IXMLHTTPRequest;
109 protected
110 function GetIsOpen: Boolean; override;
111 procedure Open(); override;
112 procedure Close(); override;
113 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
114 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
115 procedure Flush; override;
116
117 procedure SetConnectionTimeout(const Value: Integer);
118 function GetConnectionTimeout: Integer;
119 procedure SetReadTimeout(const Value: Integer);
120 function GetReadTimeout: Integer;
121 function GetCustomHeaders: IThriftDictionary<string,string>;
122 procedure SendRequest;
123 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
124 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
125 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
126 public
127 constructor Create( const AUri: string);
128 destructor Destroy; override;
129 end;
130
131 IServerTransport = interface
132 ['{BF6B7043-DA22-47BF-8B11-2B88EC55FE12}']
133 procedure Listen;
134 procedure Close;
135 function Accept: ITransport;
136 end;
137
138 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
139 protected
140 function AcceptImpl: ITransport; virtual; abstract;
141 public
142 procedure Listen; virtual; abstract;
143 procedure Close; virtual; abstract;
144 function Accept: ITransport;
145 end;
146
147 ITransportFactory = interface
148 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
Roger Meier333bbf32012-01-08 21:51:08 +0000149 function GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000150 end;
151
152 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
Roger Meier333bbf32012-01-08 21:51:08 +0000153 function GetTransport( const ATrans: ITransport): ITransport; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000154 end;
155
156 TTcpSocketStreamImpl = class( TThriftStreamImpl )
157 private
158 FTcpClient : TCustomIpClient;
159 protected
160 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
161 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
162 procedure Open; override;
163 procedure Close; override;
164 procedure Flush; override;
165
166 function IsOpen: Boolean; override;
167 function ToArray: TBytes; override;
168 public
Roger Meier333bbf32012-01-08 21:51:08 +0000169 constructor Create( const ATcpClient: TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000170 end;
171
172 IStreamTransport = interface( ITransport )
173 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
174 function GetInputStream: IThriftStream;
175 function GetOutputStream: IThriftStream;
176 property InputStream : IThriftStream read GetInputStream;
177 property OutputStream : IThriftStream read GetOutputStream;
178 end;
179
180 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
181 protected
182 FInputStream : IThriftStream;
183 FOutputStream : IThriftStream;
184 protected
185 function GetIsOpen: Boolean; override;
186
187 function GetInputStream: IThriftStream;
188 function GetOutputStream: IThriftStream;
189 public
190 property InputStream : IThriftStream read GetInputStream;
191 property OutputStream : IThriftStream read GetOutputStream;
192
193 procedure Open; override;
194 procedure Close; override;
195 procedure Flush; override;
196 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
197 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000198 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000199 destructor Destroy; override;
200 end;
201
202 TBufferedStreamImpl = class( TThriftStreamImpl)
203 private
204 FStream : IThriftStream;
205 FBufSize : Integer;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100206 FReadBuffer : TMemoryStream;
207 FWriteBuffer : TMemoryStream;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000208 protected
209 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
210 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
211 procedure Open; override;
212 procedure Close; override;
213 procedure Flush; override;
214 function IsOpen: Boolean; override;
215 function ToArray: TBytes; override;
216 public
Roger Meier333bbf32012-01-08 21:51:08 +0000217 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000218 destructor Destroy; override;
219 end;
220
221 TServerSocketImpl = class( TServerTransportImpl)
222 private
223 FServer : TTcpServer;
224 FPort : Integer;
225 FClientTimeout : Integer;
226 FUseBufferedSocket : Boolean;
227 FOwnsServer : Boolean;
228 protected
229 function AcceptImpl: ITransport; override;
230 public
Roger Meier333bbf32012-01-08 21:51:08 +0000231 constructor Create( const AServer: TTcpServer ); overload;
232 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000233 constructor Create( APort: Integer); overload;
234 constructor Create( APort: Integer; AClientTimeout: Integer); overload;
235 constructor Create( APort: Integer; AClientTimeout: Integer;
236 AUseBufferedSockets: Boolean); overload;
237 destructor Destroy; override;
238 procedure Listen; override;
239 procedure Close; override;
240 end;
241
242 TBufferedTransportImpl = class( TTransportImpl )
243 private
244 FInputBuffer : IThriftStream;
245 FOutputBuffer : IThriftStream;
246 FTransport : IStreamTransport;
247 FBufSize : Integer;
248
249 procedure InitBuffers;
250 function GetUnderlyingTransport: ITransport;
251 protected
252 function GetIsOpen: Boolean; override;
253 procedure Flush; override;
254 public
255 procedure Open(); override;
256 procedure Close(); override;
257 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
258 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000259 constructor Create( const ATransport : IStreamTransport ); overload;
260 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000261 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
262 property IsOpen: Boolean read GetIsOpen;
263 end;
264
265 TSocketImpl = class(TStreamTransportImpl)
266 private
267 FClient : TCustomIpClient;
268 FOwnsClient : Boolean;
269 FHost : string;
270 FPort : Integer;
271 FTimeout : Integer;
272
273 procedure InitSocket;
274 protected
275 function GetIsOpen: Boolean; override;
276 public
277 procedure Open; override;
Roger Meier333bbf32012-01-08 21:51:08 +0000278 constructor Create( const AClient : TCustomIpClient); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000279 constructor Create( const AHost: string; APort: Integer); overload;
280 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer); overload;
281 destructor Destroy; override;
282 procedure Close; override;
283 property TcpClient: TCustomIpClient read FClient;
284 property Host : string read FHost;
285 property Port: Integer read FPort;
286 end;
287
288 TFramedTransportImpl = class( TTransportImpl)
289 private const
290 FHeaderSize : Integer = 4;
291 private class var
292 FHeader_Dummy : array of Byte;
293 protected
294 FTransport : ITransport;
295 FWriteBuffer : TMemoryStream;
296 FReadBuffer : TMemoryStream;
297
298 procedure InitWriteBuffer;
299 procedure ReadFrame;
300 public
301 type
302 TFactory = class( TTransportFactoryImpl )
303 public
Roger Meier333bbf32012-01-08 21:51:08 +0000304 function GetTransport( const ATrans: ITransport): ITransport; override;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000305 end;
306
307{$IF CompilerVersion >= 21.0}
308 class constructor Create;
309{$IFEND}
310 constructor Create; overload;
Roger Meier333bbf32012-01-08 21:51:08 +0000311 constructor Create( const ATrans: ITransport); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000312 destructor Destroy; override;
313
314 procedure Open(); override;
315 function GetIsOpen: Boolean; override;
316
317 procedure Close(); override;
318 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
319 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
320 procedure Flush; override;
321 end;
322
323{$IF CompilerVersion < 21.0}
324procedure TFramedTransportImpl_Initialize;
325{$IFEND}
326
327implementation
328
329{ TTransportImpl }
330
331procedure TTransportImpl.Flush;
332begin
333
334end;
335
336function TTransportImpl.Peek: Boolean;
337begin
338 Result := IsOpen;
339end;
340
341function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
342var
343 got : Integer;
344 ret : Integer;
345begin
346 got := 0;
347 while ( got < len) do
348 begin
349 ret := Read( buf, off + got, len - got);
350 if ( ret <= 0 ) then
351 begin
352 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
353 end;
354 got := got + ret;
355 end;
356 Result := got;
357end;
358
359procedure TTransportImpl.Write( const buf: TBytes);
360begin
361 Self.Write( buf, 0, Length(buf) );
362end;
363
364{ THTTPClientImpl }
365
366procedure THTTPClientImpl.Close;
367begin
368 FInputStream := nil;
369 FOutputStream := nil;
370end;
371
372constructor THTTPClientImpl.Create(const AUri: string);
373begin
374 inherited Create;
375 FUri := AUri;
376 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
377 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
378end;
379
380function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
381var
382 pair : TPair<string,string>;
383begin
384{$IF CompilerVersion >= 21.0}
385 Result := CoXMLHTTP.Create;
386{$ELSE}
387 Result := CoXMLHTTPRequest.Create;
388{$IFEND}
389
390 Result.open('POST', FUri, False, '', '');
391 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
392 Result.setRequestHeader( 'Accept', 'application/x-thrift');
393 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
394
395 for pair in FCustomHeaders do
396 begin
397 Result.setRequestHeader( pair.Key, pair.Value );
398 end;
399end;
400
401destructor THTTPClientImpl.Destroy;
402begin
403 Close;
404 inherited;
405end;
406
407procedure THTTPClientImpl.Flush;
408begin
409 try
410 SendRequest;
411 finally
412 FOutputStream := nil;
413 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
414 end;
415end;
416
417function THTTPClientImpl.GetConnectionTimeout: Integer;
418begin
419 Result := FConnectionTimeout;
420end;
421
422function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
423begin
424 Result := FCustomHeaders;
425end;
426
427function THTTPClientImpl.GetIsOpen: Boolean;
428begin
429 Result := True;
430end;
431
432function THTTPClientImpl.GetReadTimeout: Integer;
433begin
434 Result := FReadTimeout;
435end;
436
437procedure THTTPClientImpl.Open;
438begin
439
440end;
441
442function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
443begin
444 if FInputStream = nil then
445 begin
446 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
447 'No request has been sent');
448 end;
449 try
450 Result := FInputStream.Read( buf, off, len )
451 except
452 on E: Exception do
453 begin
454 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
455 E.Message);
456 end;
457 end;
458end;
459
460procedure THTTPClientImpl.SendRequest;
461var
462 xmlhttp : IXMLHTTPRequest;
463 ms : TMemoryStream;
464 a : TBytes;
465 len : Integer;
466begin
467 xmlhttp := CreateRequest;
468
469 ms := TMemoryStream.Create;
470 try
471 a := FOutputStream.ToArray;
472 len := Length(a);
473 if len > 0 then
474 begin
475 ms.WriteBuffer( Pointer(@a[0])^, len);
476 end;
477 ms.Position := 0;
478 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
479 FInputStream := nil;
480 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
481 finally
482 ms.Free;
483 end;
484end;
485
486procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
487begin
488 FConnectionTimeout := Value;
489end;
490
491procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
492begin
493 FReadTimeout := Value
494end;
495
496procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
497begin
498 FOutputStream.Write( buf, off, len);
499end;
500
501{ TTransportException }
502
503constructor TTransportException.Create(AType: TExceptionType);
504begin
505 Create( AType, '' )
506end;
507
508constructor TTransportException.Create(AType: TExceptionType;
509 const msg: string);
510begin
511 inherited Create(msg);
512 FType := AType;
513end;
514
515constructor TTransportException.Create(const msg: string);
516begin
517 inherited Create(msg);
518end;
519
520{ TServerTransportImpl }
521
522function TServerTransportImpl.Accept: ITransport;
523begin
524 Result := AcceptImpl;
525 if Result = nil then
526 begin
527 raise TTransportException.Create( 'accept() may not return NULL' );
528 end;
529end;
530
531{ TTransportFactoryImpl }
532
Roger Meier333bbf32012-01-08 21:51:08 +0000533function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000534begin
535 Result := ATrans;
536end;
537
538{ TServerSocket }
539
Roger Meier333bbf32012-01-08 21:51:08 +0000540constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000541begin
542 FServer := AServer;
543 FClientTimeout := AClientTimeout;
544end;
545
Roger Meier333bbf32012-01-08 21:51:08 +0000546constructor TServerSocketImpl.Create( const AServer: TTcpServer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000547begin
548 Create( AServer, 0 );
549end;
550
551constructor TServerSocketImpl.Create(APort: Integer);
552begin
553 Create( APort, 0 );
554end;
555
556function TServerSocketImpl.AcceptImpl: ITransport;
557var
558 ret : TCustomIpClient;
559 ret2 : IStreamTransport;
560 ret3 : ITransport;
561begin
562 if FServer = nil then
563 begin
564 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
565 'No underlying server socket.');
566 end;
567
568 try
569 ret := TCustomIpClient.Create(nil);
570 if ( not FServer.Accept( ret )) then
571 begin
572 ret.Free;
573 Result := nil;
574 Exit;
575 end;
576
577 if ret = nil then
578 begin
579 Result := nil;
580 Exit;
581 end;
582
583 ret2 := TSocketImpl.Create( ret );
584 if FUseBufferedSocket then
585 begin
586 ret3 := TBufferedTransportImpl.Create(ret2);
587 Result := ret3;
588 end else
589 begin
590 Result := ret2;
591 end;
592
593 except
594 on E: Exception do
595 begin
596 raise TTransportException.Create( E.ToString );
597 end;
598 end;
599end;
600
601procedure TServerSocketImpl.Close;
602begin
603 if FServer <> nil then
604 begin
605 try
606 FServer.Active := False;
607 except
608 on E: Exception do
609 begin
610 raise TTransportException.Create('Error on closing socket : ' + E.Message);
611 end;
612 end;
613 end;
614end;
615
616constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer;
617 AUseBufferedSockets: Boolean);
618begin
619 FPort := APort;
620 FClientTimeout := AClientTimeout;
621 FUseBufferedSocket := AUseBufferedSockets;
622 FOwnsServer := True;
623 FServer := TTcpServer.Create( nil );
624 FServer.BlockMode := bmBlocking;
625{$IF CompilerVersion >= 21.0}
626 FServer.LocalPort := AnsiString( IntToStr( FPort));
627{$ELSE}
628 FServer.LocalPort := IntToStr( FPort);
629{$IFEND}
630end;
631
632destructor TServerSocketImpl.Destroy;
633begin
634 if FOwnsServer then
635 begin
636 FServer.Free;
637 end;
638 inherited;
639end;
640
641procedure TServerSocketImpl.Listen;
642begin
643 if FServer <> nil then
644 begin
645 try
646 FServer.Active := True;
647 except
648 on E: Exception do
649 begin
650 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
651 end;
652 end;
653 end;
654end;
655
656constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer);
657begin
658 Create( APort, AClientTimeout, False );
659end;
660
661{ TSocket }
662
Roger Meier333bbf32012-01-08 21:51:08 +0000663constructor TSocketImpl.Create( const AClient : TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000664var
665 stream : IThriftStream;
666begin
667 FClient := AClient;
668 stream := TTcpSocketStreamImpl.Create( FClient);
669 FInputStream := stream;
670 FOutputStream := stream;
671end;
672
673constructor TSocketImpl.Create(const AHost: string; APort: Integer);
674begin
675 Create( AHost, APort, 0);
676end;
677
678procedure TSocketImpl.Close;
679begin
680 inherited Close;
681 if FClient <> nil then
682 begin
683 FClient.Free;
684 FClient := nil;
685 end;
686end;
687
688constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
689begin
690 FHost := AHost;
691 FPort := APort;
692 FTimeout := ATimeout;
693 InitSocket;
694end;
695
696destructor TSocketImpl.Destroy;
697begin
698 if FOwnsClient then
699 begin
700 FClient.Free;
701 end;
702 inherited;
703end;
704
705function TSocketImpl.GetIsOpen: Boolean;
706begin
707 Result := False;
708 if FClient <> nil then
709 begin
710 Result := FClient.Connected;
711 end;
712end;
713
714procedure TSocketImpl.InitSocket;
715var
716 stream : IThriftStream;
717begin
718 if FClient <> nil then
719 begin
720 if FOwnsClient then
721 begin
722 FClient.Free;
723 FClient := nil;
724 end;
725 end;
726 FClient := TTcpClient.Create( nil );
727 FOwnsClient := True;
728
729 stream := TTcpSocketStreamImpl.Create( FClient);
730 FInputStream := stream;
731 FOutputStream := stream;
732
733end;
734
735procedure TSocketImpl.Open;
736begin
737 if IsOpen then
738 begin
739 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
740 'Socket already connected');
741 end;
742
743 if FHost = '' then
744 begin
745 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
746 'Cannot open null host');
747 end;
748
749 if Port <= 0 then
750 begin
751 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
752 'Cannot open without port');
753 end;
754
755 if FClient = nil then
756 begin
757 InitSocket;
758 end;
759
760 FClient.RemoteHost := TSocketHost( Host);
761 FClient.RemotePort := TSocketPort( IntToStr( Port));
762 FClient.Connect;
763
764 FInputStream := TTcpSocketStreamImpl.Create( FClient);
765 FOutputStream := FInputStream;
766end;
767
768{ TBufferedStream }
769
770procedure TBufferedStreamImpl.Close;
771begin
772 Flush;
773 FStream := nil;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100774
775 FReadBuffer.Free;
776 FReadBuffer := nil;
777
778 FWriteBuffer.Free;
779 FWriteBuffer := nil;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000780end;
781
Roger Meier333bbf32012-01-08 21:51:08 +0000782constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000783begin
784 FStream := AStream;
785 FBufSize := ABufSize;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100786 FReadBuffer := TMemoryStream.Create;
787 FWriteBuffer := TMemoryStream.Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000788end;
789
790destructor TBufferedStreamImpl.Destroy;
791begin
792 Close;
793 inherited;
794end;
795
796procedure TBufferedStreamImpl.Flush;
797var
798 buf : TBytes;
799 len : Integer;
800begin
801 if IsOpen then
802 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100803 len := FWriteBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000804 if len > 0 then
805 begin
806 SetLength( buf, len );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100807 FWriteBuffer.Position := 0;
808 FWriteBuffer.Read( Pointer(@buf[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000809 FStream.Write( buf, 0, len );
810 end;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100811 FWriteBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000812 end;
813end;
814
815function TBufferedStreamImpl.IsOpen: Boolean;
816begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100817 Result := (FWriteBuffer <> nil)
818 and (FReadBuffer <> nil)
819 and (FStream <> nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000820end;
821
822procedure TBufferedStreamImpl.Open;
823begin
824
825end;
826
827function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
828var
829 nRead : Integer;
830 tempbuf : TBytes;
831begin
832 inherited;
833 Result := 0;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100834 if IsOpen then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000835 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100836 while count > 0 do begin
837
838 if FReadBuffer.Position >= FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000839 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100840 FReadBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000841 SetLength( tempbuf, FBufSize);
842 nRead := FStream.Read( tempbuf, 0, FBufSize );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100843 if nRead = 0 then Break; // avoid infinite loop
844
845 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
846 FReadBuffer.Position := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000847 end;
848
Jens Geyer6a7463a2013-03-07 20:40:59 +0100849 if FReadBuffer.Position < FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000850 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100851 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
852 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
853 Dec( count, nRead);
854 Inc( offset, nRead);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000855 end;
856 end;
857 end;
858end;
859
860function TBufferedStreamImpl.ToArray: TBytes;
861var
862 len : Integer;
863begin
864 len := 0;
865
866 if IsOpen then
867 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100868 len := FReadBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000869 end;
870
871 SetLength( Result, len);
872
873 if len > 0 then
874 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100875 FReadBuffer.Position := 0;
876 FReadBuffer.Read( Pointer(@Result[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000877 end;
878end;
879
880procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
881begin
882 inherited;
883 if count > 0 then
884 begin
885 if IsOpen then
886 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100887 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
888 if FWriteBuffer.Size > FBufSize then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000889 begin
890 Flush;
891 end;
892 end;
893 end;
894end;
895
896{ TStreamTransportImpl }
897
898procedure TStreamTransportImpl.Close;
899begin
900 if FInputStream <> FOutputStream then
901 begin
902 if FInputStream <> nil then
903 begin
904 FInputStream := nil;
905 end;
906 if FOutputStream <> nil then
907 begin
908 FOutputStream := nil;
909 end;
910 end else
911 begin
912 FInputStream := nil;
913 FOutputStream := nil;
914 end;
915end;
916
Roger Meier333bbf32012-01-08 21:51:08 +0000917constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000918begin
919 FInputStream := AInputStream;
920 FOutputStream := AOutputStream;
921end;
922
923destructor TStreamTransportImpl.Destroy;
924begin
925 FInputStream := nil;
926 FOutputStream := nil;
927 inherited;
928end;
929
930procedure TStreamTransportImpl.Flush;
931begin
932 if FOutputStream = nil then
933 begin
934 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
935 end;
936
937 FOutputStream.Flush;
938end;
939
940function TStreamTransportImpl.GetInputStream: IThriftStream;
941begin
942 Result := FInputStream;
943end;
944
945function TStreamTransportImpl.GetIsOpen: Boolean;
946begin
947 Result := True;
948end;
949
950function TStreamTransportImpl.GetOutputStream: IThriftStream;
951begin
952 Result := FInputStream;
953end;
954
955procedure TStreamTransportImpl.Open;
956begin
957
958end;
959
960function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
961begin
962 if FInputStream = nil then
963 begin
964 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
965 end;
966 Result := FInputStream.Read( buf, off, len );
967end;
968
969procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
970begin
971 if FOutputStream = nil then
972 begin
Jake Farrelld09362c2011-10-26 02:25:07 +0000973 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000974 end;
975
976 FOutputStream.Write( buf, off, len );
977end;
978
979{ TBufferedTransportImpl }
980
Roger Meier333bbf32012-01-08 21:51:08 +0000981constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000982begin
983 Create( ATransport, 1024 );
984end;
985
986procedure TBufferedTransportImpl.Close;
987begin
988 FTransport.Close;
989end;
990
Roger Meier333bbf32012-01-08 21:51:08 +0000991constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000992 ABufSize: Integer);
993begin
994 FTransport := ATransport;
995 FBufSize := ABufSize;
996 InitBuffers;
997end;
998
999procedure TBufferedTransportImpl.Flush;
1000begin
1001 if FOutputBuffer <> nil then
1002 begin
1003 FOutputBuffer.Flush;
1004 end;
1005end;
1006
1007function TBufferedTransportImpl.GetIsOpen: Boolean;
1008begin
1009 Result := FTransport.IsOpen;
1010end;
1011
1012function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1013begin
1014 Result := FTransport;
1015end;
1016
1017procedure TBufferedTransportImpl.InitBuffers;
1018begin
1019 if FTransport.InputStream <> nil then
1020 begin
1021 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1022 end;
1023 if FTransport.OutputStream <> nil then
1024 begin
1025 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1026 end;
1027end;
1028
1029procedure TBufferedTransportImpl.Open;
1030begin
1031 FTransport.Open
1032end;
1033
1034function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1035begin
1036 Result := 0;
1037 if FInputBuffer <> nil then
1038 begin
1039 Result := FInputBuffer.Read( buf, off, len );
1040 end;
1041end;
1042
1043procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1044begin
1045 if FOutputBuffer <> nil then
1046 begin
1047 FOutputBuffer.Write( buf, off, len );
1048 end;
1049end;
1050
1051{ TFramedTransportImpl }
1052
1053{$IF CompilerVersion < 21.0}
1054procedure TFramedTransportImpl_Initialize;
1055begin
1056 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1057 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1058 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1059end;
1060{$ELSE}
1061class constructor TFramedTransportImpl.Create;
1062begin
1063 SetLength( FHeader_Dummy, FHeaderSize);
1064 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1065end;
1066{$IFEND}
1067
1068constructor TFramedTransportImpl.Create;
1069begin
1070 InitWriteBuffer;
1071end;
1072
1073procedure TFramedTransportImpl.Close;
1074begin
1075 FTransport.Close;
1076end;
1077
Roger Meier333bbf32012-01-08 21:51:08 +00001078constructor TFramedTransportImpl.Create( const ATrans: ITransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001079begin
1080 InitWriteBuffer;
1081 FTransport := ATrans;
1082end;
1083
1084destructor TFramedTransportImpl.Destroy;
1085begin
1086 FWriteBuffer.Free;
1087 FReadBuffer.Free;
1088 inherited;
1089end;
1090
1091procedure TFramedTransportImpl.Flush;
1092var
1093 buf : TBytes;
1094 len : Integer;
1095 data_len : Integer;
1096
1097begin
1098 len := FWriteBuffer.Size;
1099 SetLength( buf, len);
1100 if len > 0 then
1101 begin
1102 System.Move( FWriteBuffer.Memory^, buf[0], len );
1103 end;
1104
1105 data_len := len - FHeaderSize;
1106 if (data_len < 0) then
1107 begin
1108 raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
1109 end;
1110
1111 InitWriteBuffer;
1112
1113 buf[0] := Byte($FF and (data_len shr 24));
1114 buf[1] := Byte($FF and (data_len shr 16));
1115 buf[2] := Byte($FF and (data_len shr 8));
1116 buf[3] := Byte($FF and data_len);
1117
1118 FTransport.Write( buf, 0, len );
1119 FTransport.Flush;
1120end;
1121
1122function TFramedTransportImpl.GetIsOpen: Boolean;
1123begin
1124 Result := FTransport.IsOpen;
1125end;
1126
1127type
1128 TAccessMemoryStream = class(TMemoryStream)
1129 end;
1130
1131procedure TFramedTransportImpl.InitWriteBuffer;
1132begin
1133 FWriteBuffer.Free;
1134 FWriteBuffer := TMemoryStream.Create;
1135 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1136 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1137end;
1138
1139procedure TFramedTransportImpl.Open;
1140begin
1141 FTransport.Open;
1142end;
1143
1144function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1145var
1146 got : Integer;
1147begin
1148 if FReadBuffer <> nil then
1149 begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001150 if len > 0
1151 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1152 else got := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001153 if got > 0 then
1154 begin
1155 Result := got;
1156 Exit;
1157 end;
1158 end;
1159
1160 ReadFrame;
Jake Farrell9c6773a2012-03-22 02:40:45 +00001161 if len > 0
1162 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1163 else Result := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001164end;
1165
1166procedure TFramedTransportImpl.ReadFrame;
1167var
1168 i32rd : TBytes;
1169 size : Integer;
1170 buff : TBytes;
1171begin
1172 SetLength( i32rd, FHeaderSize );
1173 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1174 size :=
1175 ((i32rd[0] and $FF) shl 24) or
1176 ((i32rd[1] and $FF) shl 16) or
1177 ((i32rd[2] and $FF) shl 8) or
1178 (i32rd[3] and $FF);
1179 SetLength( buff, size );
1180 FTransport.ReadAll( buff, 0, size );
1181 FReadBuffer.Free;
1182 FReadBuffer := TMemoryStream.Create;
1183 FReadBuffer.Write( Pointer(@buff[0])^, size );
1184 FReadBuffer.Position := 0;
1185end;
1186
1187procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1188begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001189 if len > 0
1190 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +00001191end;
1192
1193{ TFramedTransport.TFactory }
1194
Roger Meier333bbf32012-01-08 21:51:08 +00001195function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001196begin
1197 Result := TFramedTransportImpl.Create( ATrans );
1198end;
1199
1200{ TTcpSocketStreamImpl }
1201
1202procedure TTcpSocketStreamImpl.Close;
1203begin
1204 FTcpClient.Close;
1205end;
1206
Roger Meier333bbf32012-01-08 21:51:08 +00001207constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001208begin
1209 FTcpClient := ATcpClient;
1210end;
1211
1212procedure TTcpSocketStreamImpl.Flush;
1213begin
1214
1215end;
1216
1217function TTcpSocketStreamImpl.IsOpen: Boolean;
1218begin
1219 Result := FTcpClient.Active;
1220end;
1221
1222procedure TTcpSocketStreamImpl.Open;
1223begin
1224 FTcpClient.Open;
1225end;
1226
1227function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset,
1228 count: Integer): Integer;
1229begin
1230 inherited;
1231 Result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
1232end;
1233
1234function TTcpSocketStreamImpl.ToArray: TBytes;
1235var
1236 len : Integer;
1237begin
1238 len := 0;
1239 if IsOpen then
1240 begin
1241 len := FTcpClient.BytesReceived;
1242 end;
1243
1244 SetLength( Result, len );
1245
1246 if len > 0 then
1247 begin
1248 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1249 end;
1250end;
1251
1252procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1253begin
1254 inherited;
1255 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1256end;
1257
1258{$IF CompilerVersion < 21.0}
1259initialization
1260begin
1261 TFramedTransportImpl_Initialize;
1262end;
1263{$IFEND}
1264
1265
1266end.