blob: 0d5b384d5e8a3ac49e440ee90b13561a2d1f400f [file] [log] [blame]
Jake Farrell7ae13e12011-10-18 14:35:26 +00001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
22unit Thrift.Transport;
23
24interface
25
26uses
27 Classes,
28 SysUtils,
Jens Geyer6a7463a2013-03-07 20:40:59 +010029 Math,
Jake Farrell7ae13e12011-10-18 14:35:26 +000030 Sockets,
31 Generics.Collections,
32 Thrift.Collections,
33 Thrift.Utils,
34 Thrift.Stream,
35 ActiveX,
36 msxml;
37
38type
39 ITransport = interface
40 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
41 function GetIsOpen: Boolean;
42 property IsOpen: Boolean read GetIsOpen;
43 function Peek: Boolean;
44 procedure Open;
45 procedure Close;
46 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
47 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
48 procedure Write( const buf: TBytes); overload;
49 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
50 procedure Flush;
51 end;
52
53 TTransportImpl = class( TInterfacedObject, ITransport)
54 protected
55 function GetIsOpen: Boolean; virtual; abstract;
56 property IsOpen: Boolean read GetIsOpen;
Roger Meier3bef8c22012-10-06 06:58:00 +000057 function Peek: Boolean; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +000058 procedure Open(); virtual; abstract;
59 procedure Close(); virtual; abstract;
60 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
61 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
62 procedure Write( const buf: TBytes); overload; virtual;
63 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
64 procedure Flush; virtual;
65 end;
66
67 TTransportException = class( Exception )
68 public
69 type
70 TExceptionType = (
71 Unknown,
72 NotOpen,
73 AlreadyOpen,
74 TimedOut,
75 EndOfFile
76 );
77 private
78 FType : TExceptionType;
79 public
80 constructor Create( AType: TExceptionType); overload;
81 constructor Create( const msg: string); overload;
82 constructor Create( AType: TExceptionType; const msg: string); overload;
83 property Type_: TExceptionType read FType;
84 end;
85
86 IHTTPClient = interface( ITransport )
87 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
88 procedure SetConnectionTimeout(const Value: Integer);
89 function GetConnectionTimeout: Integer;
90 procedure SetReadTimeout(const Value: Integer);
91 function GetReadTimeout: Integer;
92 function GetCustomHeaders: IThriftDictionary<string,string>;
93 procedure SendRequest;
94 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
95 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
96 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
97 end;
98
99 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
100 private
101 FUri : string;
102 FInputStream : IThriftStream;
103 FOutputStream : IThriftStream;
104 FConnectionTimeout : Integer;
105 FReadTimeout : Integer;
106 FCustomHeaders : IThriftDictionary<string,string>;
107
108 function CreateRequest: IXMLHTTPRequest;
109 protected
110 function GetIsOpen: Boolean; override;
111 procedure Open(); override;
112 procedure Close(); override;
113 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
114 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
115 procedure Flush; override;
116
117 procedure SetConnectionTimeout(const Value: Integer);
118 function GetConnectionTimeout: Integer;
119 procedure SetReadTimeout(const Value: Integer);
120 function GetReadTimeout: Integer;
121 function GetCustomHeaders: IThriftDictionary<string,string>;
122 procedure SendRequest;
123 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
124 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
125 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
126 public
127 constructor Create( const AUri: string);
128 destructor Destroy; override;
129 end;
130
131 IServerTransport = interface
132 ['{BF6B7043-DA22-47BF-8B11-2B88EC55FE12}']
133 procedure Listen;
134 procedure Close;
135 function Accept: ITransport;
136 end;
137
138 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
139 protected
140 function AcceptImpl: ITransport; virtual; abstract;
141 public
142 procedure Listen; virtual; abstract;
143 procedure Close; virtual; abstract;
144 function Accept: ITransport;
145 end;
146
147 ITransportFactory = interface
148 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
Roger Meier333bbf32012-01-08 21:51:08 +0000149 function GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000150 end;
151
152 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
Roger Meier333bbf32012-01-08 21:51:08 +0000153 function GetTransport( const ATrans: ITransport): ITransport; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000154 end;
155
156 TTcpSocketStreamImpl = class( TThriftStreamImpl )
157 private
158 FTcpClient : TCustomIpClient;
159 protected
160 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
161 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
162 procedure Open; override;
163 procedure Close; override;
164 procedure Flush; override;
165
166 function IsOpen: Boolean; override;
167 function ToArray: TBytes; override;
168 public
Roger Meier333bbf32012-01-08 21:51:08 +0000169 constructor Create( const ATcpClient: TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000170 end;
171
172 IStreamTransport = interface( ITransport )
173 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
174 function GetInputStream: IThriftStream;
175 function GetOutputStream: IThriftStream;
176 property InputStream : IThriftStream read GetInputStream;
177 property OutputStream : IThriftStream read GetOutputStream;
178 end;
179
180 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
181 protected
182 FInputStream : IThriftStream;
183 FOutputStream : IThriftStream;
184 protected
185 function GetIsOpen: Boolean; override;
186
187 function GetInputStream: IThriftStream;
188 function GetOutputStream: IThriftStream;
189 public
190 property InputStream : IThriftStream read GetInputStream;
191 property OutputStream : IThriftStream read GetOutputStream;
192
193 procedure Open; override;
194 procedure Close; override;
195 procedure Flush; override;
196 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
197 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000198 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000199 destructor Destroy; override;
200 end;
201
202 TBufferedStreamImpl = class( TThriftStreamImpl)
203 private
204 FStream : IThriftStream;
205 FBufSize : Integer;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100206 FReadBuffer : TMemoryStream;
207 FWriteBuffer : TMemoryStream;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000208 protected
209 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
210 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
211 procedure Open; override;
212 procedure Close; override;
213 procedure Flush; override;
214 function IsOpen: Boolean; override;
215 function ToArray: TBytes; override;
216 public
Roger Meier333bbf32012-01-08 21:51:08 +0000217 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000218 destructor Destroy; override;
219 end;
220
221 TServerSocketImpl = class( TServerTransportImpl)
222 private
223 FServer : TTcpServer;
224 FPort : Integer;
225 FClientTimeout : Integer;
226 FUseBufferedSocket : Boolean;
227 FOwnsServer : Boolean;
228 protected
229 function AcceptImpl: ITransport; override;
230 public
Roger Meier333bbf32012-01-08 21:51:08 +0000231 constructor Create( const AServer: TTcpServer ); overload;
232 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000233 constructor Create( APort: Integer); overload;
234 constructor Create( APort: Integer; AClientTimeout: Integer); overload;
235 constructor Create( APort: Integer; AClientTimeout: Integer;
236 AUseBufferedSockets: Boolean); overload;
237 destructor Destroy; override;
238 procedure Listen; override;
239 procedure Close; override;
240 end;
241
242 TBufferedTransportImpl = class( TTransportImpl )
243 private
244 FInputBuffer : IThriftStream;
245 FOutputBuffer : IThriftStream;
246 FTransport : IStreamTransport;
247 FBufSize : Integer;
248
249 procedure InitBuffers;
250 function GetUnderlyingTransport: ITransport;
251 protected
252 function GetIsOpen: Boolean; override;
253 procedure Flush; override;
254 public
255 procedure Open(); override;
256 procedure Close(); override;
257 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
258 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000259 constructor Create( const ATransport : IStreamTransport ); overload;
260 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000261 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
262 property IsOpen: Boolean read GetIsOpen;
263 end;
264
265 TSocketImpl = class(TStreamTransportImpl)
266 private
267 FClient : TCustomIpClient;
268 FOwnsClient : Boolean;
269 FHost : string;
270 FPort : Integer;
271 FTimeout : Integer;
272
273 procedure InitSocket;
274 protected
275 function GetIsOpen: Boolean; override;
276 public
277 procedure Open; override;
Roger Meier333bbf32012-01-08 21:51:08 +0000278 constructor Create( const AClient : TCustomIpClient); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000279 constructor Create( const AHost: string; APort: Integer); overload;
280 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer); overload;
281 destructor Destroy; override;
282 procedure Close; override;
283 property TcpClient: TCustomIpClient read FClient;
284 property Host : string read FHost;
285 property Port: Integer read FPort;
286 end;
287
288 TFramedTransportImpl = class( TTransportImpl)
289 private const
290 FHeaderSize : Integer = 4;
291 private class var
292 FHeader_Dummy : array of Byte;
293 protected
294 FTransport : ITransport;
295 FWriteBuffer : TMemoryStream;
296 FReadBuffer : TMemoryStream;
297
298 procedure InitWriteBuffer;
299 procedure ReadFrame;
300 public
301 type
302 TFactory = class( TTransportFactoryImpl )
303 public
Roger Meier333bbf32012-01-08 21:51:08 +0000304 function GetTransport( const ATrans: ITransport): ITransport; override;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000305 end;
306
307{$IF CompilerVersion >= 21.0}
308 class constructor Create;
309{$IFEND}
310 constructor Create; overload;
Roger Meier333bbf32012-01-08 21:51:08 +0000311 constructor Create( const ATrans: ITransport); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000312 destructor Destroy; override;
313
314 procedure Open(); override;
315 function GetIsOpen: Boolean; override;
316
317 procedure Close(); override;
318 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
319 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
320 procedure Flush; override;
321 end;
322
323{$IF CompilerVersion < 21.0}
324procedure TFramedTransportImpl_Initialize;
325{$IFEND}
326
327implementation
328
329{ TTransportImpl }
330
331procedure TTransportImpl.Flush;
332begin
333
334end;
335
336function TTransportImpl.Peek: Boolean;
337begin
338 Result := IsOpen;
339end;
340
341function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
342var
343 got : Integer;
344 ret : Integer;
345begin
346 got := 0;
347 while ( got < len) do
348 begin
349 ret := Read( buf, off + got, len - got);
350 if ( ret <= 0 ) then
351 begin
352 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
353 end;
354 got := got + ret;
355 end;
356 Result := got;
357end;
358
359procedure TTransportImpl.Write( const buf: TBytes);
360begin
361 Self.Write( buf, 0, Length(buf) );
362end;
363
364{ THTTPClientImpl }
365
366procedure THTTPClientImpl.Close;
367begin
368 FInputStream := nil;
369 FOutputStream := nil;
370end;
371
372constructor THTTPClientImpl.Create(const AUri: string);
373begin
374 inherited Create;
375 FUri := AUri;
376 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
377 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
378end;
379
380function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
381var
382 pair : TPair<string,string>;
383begin
384{$IF CompilerVersion >= 21.0}
385 Result := CoXMLHTTP.Create;
386{$ELSE}
387 Result := CoXMLHTTPRequest.Create;
388{$IFEND}
389
390 Result.open('POST', FUri, False, '', '');
391 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
392 Result.setRequestHeader( 'Accept', 'application/x-thrift');
393 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
394
395 for pair in FCustomHeaders do
396 begin
397 Result.setRequestHeader( pair.Key, pair.Value );
398 end;
399end;
400
401destructor THTTPClientImpl.Destroy;
402begin
403 Close;
404 inherited;
405end;
406
407procedure THTTPClientImpl.Flush;
408begin
409 try
410 SendRequest;
411 finally
412 FOutputStream := nil;
413 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
414 end;
415end;
416
417function THTTPClientImpl.GetConnectionTimeout: Integer;
418begin
419 Result := FConnectionTimeout;
420end;
421
422function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
423begin
424 Result := FCustomHeaders;
425end;
426
427function THTTPClientImpl.GetIsOpen: Boolean;
428begin
429 Result := True;
430end;
431
432function THTTPClientImpl.GetReadTimeout: Integer;
433begin
434 Result := FReadTimeout;
435end;
436
437procedure THTTPClientImpl.Open;
438begin
439
440end;
441
442function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
443begin
444 if FInputStream = nil then
445 begin
446 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
447 'No request has been sent');
448 end;
449 try
450 Result := FInputStream.Read( buf, off, len )
451 except
452 on E: Exception do
453 begin
454 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
455 E.Message);
456 end;
457 end;
458end;
459
460procedure THTTPClientImpl.SendRequest;
461var
462 xmlhttp : IXMLHTTPRequest;
463 ms : TMemoryStream;
464 a : TBytes;
465 len : Integer;
466begin
467 xmlhttp := CreateRequest;
468
469 ms := TMemoryStream.Create;
470 try
471 a := FOutputStream.ToArray;
472 len := Length(a);
473 if len > 0 then
474 begin
475 ms.WriteBuffer( Pointer(@a[0])^, len);
476 end;
477 ms.Position := 0;
478 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
479 FInputStream := nil;
480 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
481 finally
482 ms.Free;
483 end;
484end;
485
486procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
487begin
488 FConnectionTimeout := Value;
489end;
490
491procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
492begin
493 FReadTimeout := Value
494end;
495
496procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
497begin
498 FOutputStream.Write( buf, off, len);
499end;
500
501{ TTransportException }
502
503constructor TTransportException.Create(AType: TExceptionType);
504begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200505 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000506 Create( AType, '' )
507end;
508
509constructor TTransportException.Create(AType: TExceptionType;
510 const msg: string);
511begin
512 inherited Create(msg);
513 FType := AType;
514end;
515
516constructor TTransportException.Create(const msg: string);
517begin
518 inherited Create(msg);
519end;
520
521{ TServerTransportImpl }
522
523function TServerTransportImpl.Accept: ITransport;
524begin
525 Result := AcceptImpl;
526 if Result = nil then
527 begin
528 raise TTransportException.Create( 'accept() may not return NULL' );
529 end;
530end;
531
532{ TTransportFactoryImpl }
533
Roger Meier333bbf32012-01-08 21:51:08 +0000534function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000535begin
536 Result := ATrans;
537end;
538
539{ TServerSocket }
540
Roger Meier333bbf32012-01-08 21:51:08 +0000541constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000542begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200543 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000544 FServer := AServer;
545 FClientTimeout := AClientTimeout;
546end;
547
Roger Meier333bbf32012-01-08 21:51:08 +0000548constructor TServerSocketImpl.Create( const AServer: TTcpServer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000549begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200550 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000551 Create( AServer, 0 );
552end;
553
554constructor TServerSocketImpl.Create(APort: Integer);
555begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200556 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000557 Create( APort, 0 );
558end;
559
560function TServerSocketImpl.AcceptImpl: ITransport;
561var
562 ret : TCustomIpClient;
563 ret2 : IStreamTransport;
564 ret3 : ITransport;
565begin
566 if FServer = nil then
567 begin
568 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
569 'No underlying server socket.');
570 end;
571
572 try
573 ret := TCustomIpClient.Create(nil);
574 if ( not FServer.Accept( ret )) then
575 begin
576 ret.Free;
577 Result := nil;
578 Exit;
579 end;
580
581 if ret = nil then
582 begin
583 Result := nil;
584 Exit;
585 end;
586
587 ret2 := TSocketImpl.Create( ret );
588 if FUseBufferedSocket then
589 begin
590 ret3 := TBufferedTransportImpl.Create(ret2);
591 Result := ret3;
592 end else
593 begin
594 Result := ret2;
595 end;
596
597 except
598 on E: Exception do
599 begin
600 raise TTransportException.Create( E.ToString );
601 end;
602 end;
603end;
604
605procedure TServerSocketImpl.Close;
606begin
607 if FServer <> nil then
608 begin
609 try
610 FServer.Active := False;
611 except
612 on E: Exception do
613 begin
614 raise TTransportException.Create('Error on closing socket : ' + E.Message);
615 end;
616 end;
617 end;
618end;
619
620constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer;
621 AUseBufferedSockets: Boolean);
622begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200623 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000624 FPort := APort;
625 FClientTimeout := AClientTimeout;
626 FUseBufferedSocket := AUseBufferedSockets;
627 FOwnsServer := True;
628 FServer := TTcpServer.Create( nil );
629 FServer.BlockMode := bmBlocking;
630{$IF CompilerVersion >= 21.0}
631 FServer.LocalPort := AnsiString( IntToStr( FPort));
632{$ELSE}
633 FServer.LocalPort := IntToStr( FPort);
634{$IFEND}
635end;
636
637destructor TServerSocketImpl.Destroy;
638begin
639 if FOwnsServer then
640 begin
641 FServer.Free;
642 end;
643 inherited;
644end;
645
646procedure TServerSocketImpl.Listen;
647begin
648 if FServer <> nil then
649 begin
650 try
651 FServer.Active := True;
652 except
653 on E: Exception do
654 begin
655 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
656 end;
657 end;
658 end;
659end;
660
661constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer);
662begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200663 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000664 Create( APort, AClientTimeout, False );
665end;
666
667{ TSocket }
668
Roger Meier333bbf32012-01-08 21:51:08 +0000669constructor TSocketImpl.Create( const AClient : TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000670var
671 stream : IThriftStream;
672begin
673 FClient := AClient;
674 stream := TTcpSocketStreamImpl.Create( FClient);
Jens Geyer718f6ee2013-09-06 21:02:34 +0200675 inherited Create( stream, stream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000676end;
677
678constructor TSocketImpl.Create(const AHost: string; APort: Integer);
679begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200680 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000681 Create( AHost, APort, 0);
682end;
683
684procedure TSocketImpl.Close;
685begin
686 inherited Close;
Jens Geyer718f6ee2013-09-06 21:02:34 +0200687 if FClient <> nil
688 then FreeAndNil( FClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000689end;
690
691constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
692begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200693 inherited Create(nil,nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000694 FHost := AHost;
695 FPort := APort;
696 FTimeout := ATimeout;
697 InitSocket;
698end;
699
700destructor TSocketImpl.Destroy;
701begin
702 if FOwnsClient then
703 begin
704 FClient.Free;
705 end;
706 inherited;
707end;
708
709function TSocketImpl.GetIsOpen: Boolean;
710begin
711 Result := False;
712 if FClient <> nil then
713 begin
714 Result := FClient.Connected;
715 end;
716end;
717
718procedure TSocketImpl.InitSocket;
719var
720 stream : IThriftStream;
721begin
722 if FClient <> nil then
723 begin
724 if FOwnsClient then
725 begin
726 FClient.Free;
727 FClient := nil;
728 end;
729 end;
730 FClient := TTcpClient.Create( nil );
731 FOwnsClient := True;
732
733 stream := TTcpSocketStreamImpl.Create( FClient);
734 FInputStream := stream;
735 FOutputStream := stream;
736
737end;
738
739procedure TSocketImpl.Open;
740begin
741 if IsOpen then
742 begin
743 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
744 'Socket already connected');
745 end;
746
747 if FHost = '' then
748 begin
749 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
750 'Cannot open null host');
751 end;
752
753 if Port <= 0 then
754 begin
755 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
756 'Cannot open without port');
757 end;
758
759 if FClient = nil then
760 begin
761 InitSocket;
762 end;
763
764 FClient.RemoteHost := TSocketHost( Host);
765 FClient.RemotePort := TSocketPort( IntToStr( Port));
766 FClient.Connect;
767
768 FInputStream := TTcpSocketStreamImpl.Create( FClient);
769 FOutputStream := FInputStream;
770end;
771
772{ TBufferedStream }
773
774procedure TBufferedStreamImpl.Close;
775begin
776 Flush;
777 FStream := nil;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100778
779 FReadBuffer.Free;
780 FReadBuffer := nil;
781
782 FWriteBuffer.Free;
783 FWriteBuffer := nil;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000784end;
785
Roger Meier333bbf32012-01-08 21:51:08 +0000786constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000787begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200788 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000789 FStream := AStream;
790 FBufSize := ABufSize;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100791 FReadBuffer := TMemoryStream.Create;
792 FWriteBuffer := TMemoryStream.Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000793end;
794
795destructor TBufferedStreamImpl.Destroy;
796begin
797 Close;
798 inherited;
799end;
800
801procedure TBufferedStreamImpl.Flush;
802var
803 buf : TBytes;
804 len : Integer;
805begin
806 if IsOpen then
807 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100808 len := FWriteBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000809 if len > 0 then
810 begin
811 SetLength( buf, len );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100812 FWriteBuffer.Position := 0;
813 FWriteBuffer.Read( Pointer(@buf[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000814 FStream.Write( buf, 0, len );
815 end;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100816 FWriteBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000817 end;
818end;
819
820function TBufferedStreamImpl.IsOpen: Boolean;
821begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100822 Result := (FWriteBuffer <> nil)
823 and (FReadBuffer <> nil)
824 and (FStream <> nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000825end;
826
827procedure TBufferedStreamImpl.Open;
828begin
829
830end;
831
832function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
833var
834 nRead : Integer;
835 tempbuf : TBytes;
836begin
837 inherited;
838 Result := 0;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100839 if IsOpen then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000840 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100841 while count > 0 do begin
842
843 if FReadBuffer.Position >= FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000844 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100845 FReadBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000846 SetLength( tempbuf, FBufSize);
847 nRead := FStream.Read( tempbuf, 0, FBufSize );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100848 if nRead = 0 then Break; // avoid infinite loop
849
850 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
851 FReadBuffer.Position := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000852 end;
853
Jens Geyer6a7463a2013-03-07 20:40:59 +0100854 if FReadBuffer.Position < FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000855 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100856 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
857 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
858 Dec( count, nRead);
859 Inc( offset, nRead);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000860 end;
861 end;
862 end;
863end;
864
865function TBufferedStreamImpl.ToArray: TBytes;
866var
867 len : Integer;
868begin
869 len := 0;
870
871 if IsOpen then
872 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100873 len := FReadBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000874 end;
875
876 SetLength( Result, len);
877
878 if len > 0 then
879 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100880 FReadBuffer.Position := 0;
881 FReadBuffer.Read( Pointer(@Result[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000882 end;
883end;
884
885procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
886begin
887 inherited;
888 if count > 0 then
889 begin
890 if IsOpen then
891 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100892 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
893 if FWriteBuffer.Size > FBufSize then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000894 begin
895 Flush;
896 end;
897 end;
898 end;
899end;
900
901{ TStreamTransportImpl }
902
903procedure TStreamTransportImpl.Close;
904begin
905 if FInputStream <> FOutputStream then
906 begin
907 if FInputStream <> nil then
908 begin
909 FInputStream := nil;
910 end;
911 if FOutputStream <> nil then
912 begin
913 FOutputStream := nil;
914 end;
915 end else
916 begin
917 FInputStream := nil;
918 FOutputStream := nil;
919 end;
920end;
921
Roger Meier333bbf32012-01-08 21:51:08 +0000922constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000923begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200924 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000925 FInputStream := AInputStream;
926 FOutputStream := AOutputStream;
927end;
928
929destructor TStreamTransportImpl.Destroy;
930begin
931 FInputStream := nil;
932 FOutputStream := nil;
933 inherited;
934end;
935
936procedure TStreamTransportImpl.Flush;
937begin
938 if FOutputStream = nil then
939 begin
940 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
941 end;
942
943 FOutputStream.Flush;
944end;
945
946function TStreamTransportImpl.GetInputStream: IThriftStream;
947begin
948 Result := FInputStream;
949end;
950
951function TStreamTransportImpl.GetIsOpen: Boolean;
952begin
953 Result := True;
954end;
955
956function TStreamTransportImpl.GetOutputStream: IThriftStream;
957begin
958 Result := FInputStream;
959end;
960
961procedure TStreamTransportImpl.Open;
962begin
963
964end;
965
966function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
967begin
968 if FInputStream = nil then
969 begin
970 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
971 end;
972 Result := FInputStream.Read( buf, off, len );
973end;
974
975procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
976begin
977 if FOutputStream = nil then
978 begin
Jake Farrelld09362c2011-10-26 02:25:07 +0000979 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000980 end;
981
982 FOutputStream.Write( buf, off, len );
983end;
984
985{ TBufferedTransportImpl }
986
Roger Meier333bbf32012-01-08 21:51:08 +0000987constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000988begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200989 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000990 Create( ATransport, 1024 );
991end;
992
993procedure TBufferedTransportImpl.Close;
994begin
995 FTransport.Close;
996end;
997
Roger Meier333bbf32012-01-08 21:51:08 +0000998constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000999 ABufSize: Integer);
1000begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001001 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001002 FTransport := ATransport;
1003 FBufSize := ABufSize;
1004 InitBuffers;
1005end;
1006
1007procedure TBufferedTransportImpl.Flush;
1008begin
1009 if FOutputBuffer <> nil then
1010 begin
1011 FOutputBuffer.Flush;
1012 end;
1013end;
1014
1015function TBufferedTransportImpl.GetIsOpen: Boolean;
1016begin
1017 Result := FTransport.IsOpen;
1018end;
1019
1020function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1021begin
1022 Result := FTransport;
1023end;
1024
1025procedure TBufferedTransportImpl.InitBuffers;
1026begin
1027 if FTransport.InputStream <> nil then
1028 begin
1029 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1030 end;
1031 if FTransport.OutputStream <> nil then
1032 begin
1033 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1034 end;
1035end;
1036
1037procedure TBufferedTransportImpl.Open;
1038begin
1039 FTransport.Open
1040end;
1041
1042function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1043begin
1044 Result := 0;
1045 if FInputBuffer <> nil then
1046 begin
1047 Result := FInputBuffer.Read( buf, off, len );
1048 end;
1049end;
1050
1051procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1052begin
1053 if FOutputBuffer <> nil then
1054 begin
1055 FOutputBuffer.Write( buf, off, len );
1056 end;
1057end;
1058
1059{ TFramedTransportImpl }
1060
1061{$IF CompilerVersion < 21.0}
1062procedure TFramedTransportImpl_Initialize;
1063begin
1064 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1065 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1066 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1067end;
1068{$ELSE}
1069class constructor TFramedTransportImpl.Create;
1070begin
1071 SetLength( FHeader_Dummy, FHeaderSize);
1072 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1073end;
1074{$IFEND}
1075
1076constructor TFramedTransportImpl.Create;
1077begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001078 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001079 InitWriteBuffer;
1080end;
1081
1082procedure TFramedTransportImpl.Close;
1083begin
1084 FTransport.Close;
1085end;
1086
Roger Meier333bbf32012-01-08 21:51:08 +00001087constructor TFramedTransportImpl.Create( const ATrans: ITransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001088begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001089 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001090 InitWriteBuffer;
1091 FTransport := ATrans;
1092end;
1093
1094destructor TFramedTransportImpl.Destroy;
1095begin
1096 FWriteBuffer.Free;
1097 FReadBuffer.Free;
1098 inherited;
1099end;
1100
1101procedure TFramedTransportImpl.Flush;
1102var
1103 buf : TBytes;
1104 len : Integer;
1105 data_len : Integer;
1106
1107begin
1108 len := FWriteBuffer.Size;
1109 SetLength( buf, len);
1110 if len > 0 then
1111 begin
1112 System.Move( FWriteBuffer.Memory^, buf[0], len );
1113 end;
1114
1115 data_len := len - FHeaderSize;
1116 if (data_len < 0) then
1117 begin
1118 raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
1119 end;
1120
1121 InitWriteBuffer;
1122
1123 buf[0] := Byte($FF and (data_len shr 24));
1124 buf[1] := Byte($FF and (data_len shr 16));
1125 buf[2] := Byte($FF and (data_len shr 8));
1126 buf[3] := Byte($FF and data_len);
1127
1128 FTransport.Write( buf, 0, len );
1129 FTransport.Flush;
1130end;
1131
1132function TFramedTransportImpl.GetIsOpen: Boolean;
1133begin
1134 Result := FTransport.IsOpen;
1135end;
1136
1137type
1138 TAccessMemoryStream = class(TMemoryStream)
1139 end;
1140
1141procedure TFramedTransportImpl.InitWriteBuffer;
1142begin
1143 FWriteBuffer.Free;
1144 FWriteBuffer := TMemoryStream.Create;
1145 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1146 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1147end;
1148
1149procedure TFramedTransportImpl.Open;
1150begin
1151 FTransport.Open;
1152end;
1153
1154function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1155var
1156 got : Integer;
1157begin
1158 if FReadBuffer <> nil then
1159 begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001160 if len > 0
1161 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1162 else got := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001163 if got > 0 then
1164 begin
1165 Result := got;
1166 Exit;
1167 end;
1168 end;
1169
1170 ReadFrame;
Jake Farrell9c6773a2012-03-22 02:40:45 +00001171 if len > 0
1172 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1173 else Result := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001174end;
1175
1176procedure TFramedTransportImpl.ReadFrame;
1177var
1178 i32rd : TBytes;
1179 size : Integer;
1180 buff : TBytes;
1181begin
1182 SetLength( i32rd, FHeaderSize );
1183 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1184 size :=
1185 ((i32rd[0] and $FF) shl 24) or
1186 ((i32rd[1] and $FF) shl 16) or
1187 ((i32rd[2] and $FF) shl 8) or
1188 (i32rd[3] and $FF);
1189 SetLength( buff, size );
1190 FTransport.ReadAll( buff, 0, size );
1191 FReadBuffer.Free;
1192 FReadBuffer := TMemoryStream.Create;
1193 FReadBuffer.Write( Pointer(@buff[0])^, size );
1194 FReadBuffer.Position := 0;
1195end;
1196
1197procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1198begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001199 if len > 0
1200 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +00001201end;
1202
1203{ TFramedTransport.TFactory }
1204
Roger Meier333bbf32012-01-08 21:51:08 +00001205function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001206begin
1207 Result := TFramedTransportImpl.Create( ATrans );
1208end;
1209
1210{ TTcpSocketStreamImpl }
1211
1212procedure TTcpSocketStreamImpl.Close;
1213begin
1214 FTcpClient.Close;
1215end;
1216
Roger Meier333bbf32012-01-08 21:51:08 +00001217constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001218begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001219 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001220 FTcpClient := ATcpClient;
1221end;
1222
1223procedure TTcpSocketStreamImpl.Flush;
1224begin
1225
1226end;
1227
1228function TTcpSocketStreamImpl.IsOpen: Boolean;
1229begin
1230 Result := FTcpClient.Active;
1231end;
1232
1233procedure TTcpSocketStreamImpl.Open;
1234begin
1235 FTcpClient.Open;
1236end;
1237
1238function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset,
1239 count: Integer): Integer;
1240begin
1241 inherited;
1242 Result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
1243end;
1244
1245function TTcpSocketStreamImpl.ToArray: TBytes;
1246var
1247 len : Integer;
1248begin
1249 len := 0;
1250 if IsOpen then
1251 begin
1252 len := FTcpClient.BytesReceived;
1253 end;
1254
1255 SetLength( Result, len );
1256
1257 if len > 0 then
1258 begin
1259 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1260 end;
1261end;
1262
1263procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1264begin
1265 inherited;
1266 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1267end;
1268
1269{$IF CompilerVersion < 21.0}
1270initialization
1271begin
1272 TFramedTransportImpl_Initialize;
1273end;
1274{$IFEND}
1275
1276
1277end.