blob: 69d74a30603490a7c19aecc473ba5dc56506cba2 [file] [log] [blame]
Jake Farrell7ae13e12011-10-18 14:35:26 +00001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
22unit Thrift.Transport;
23
24interface
25
26uses
27 Classes,
28 SysUtils,
Jens Geyer6a7463a2013-03-07 20:40:59 +010029 Math,
Jake Farrell7ae13e12011-10-18 14:35:26 +000030 Sockets,
31 Generics.Collections,
32 Thrift.Collections,
33 Thrift.Utils,
34 Thrift.Stream,
35 ActiveX,
36 msxml;
37
38type
39 ITransport = interface
40 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
41 function GetIsOpen: Boolean;
42 property IsOpen: Boolean read GetIsOpen;
43 function Peek: Boolean;
44 procedure Open;
45 procedure Close;
46 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
47 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
48 procedure Write( const buf: TBytes); overload;
49 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
50 procedure Flush;
51 end;
52
53 TTransportImpl = class( TInterfacedObject, ITransport)
54 protected
55 function GetIsOpen: Boolean; virtual; abstract;
56 property IsOpen: Boolean read GetIsOpen;
Roger Meier3bef8c22012-10-06 06:58:00 +000057 function Peek: Boolean; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +000058 procedure Open(); virtual; abstract;
59 procedure Close(); virtual; abstract;
60 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
61 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
62 procedure Write( const buf: TBytes); overload; virtual;
63 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
64 procedure Flush; virtual;
65 end;
66
67 TTransportException = class( Exception )
68 public
69 type
70 TExceptionType = (
71 Unknown,
72 NotOpen,
73 AlreadyOpen,
74 TimedOut,
75 EndOfFile
76 );
77 private
78 FType : TExceptionType;
79 public
80 constructor Create( AType: TExceptionType); overload;
81 constructor Create( const msg: string); overload;
82 constructor Create( AType: TExceptionType; const msg: string); overload;
83 property Type_: TExceptionType read FType;
84 end;
85
86 IHTTPClient = interface( ITransport )
87 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
88 procedure SetConnectionTimeout(const Value: Integer);
89 function GetConnectionTimeout: Integer;
90 procedure SetReadTimeout(const Value: Integer);
91 function GetReadTimeout: Integer;
92 function GetCustomHeaders: IThriftDictionary<string,string>;
93 procedure SendRequest;
94 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
95 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
96 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
97 end;
98
99 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
100 private
101 FUri : string;
102 FInputStream : IThriftStream;
103 FOutputStream : IThriftStream;
104 FConnectionTimeout : Integer;
105 FReadTimeout : Integer;
106 FCustomHeaders : IThriftDictionary<string,string>;
107
108 function CreateRequest: IXMLHTTPRequest;
109 protected
110 function GetIsOpen: Boolean; override;
111 procedure Open(); override;
112 procedure Close(); override;
113 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
114 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
115 procedure Flush; override;
116
117 procedure SetConnectionTimeout(const Value: Integer);
118 function GetConnectionTimeout: Integer;
119 procedure SetReadTimeout(const Value: Integer);
120 function GetReadTimeout: Integer;
121 function GetCustomHeaders: IThriftDictionary<string,string>;
122 procedure SendRequest;
123 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
124 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
125 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
126 public
127 constructor Create( const AUri: string);
128 destructor Destroy; override;
129 end;
130
131 IServerTransport = interface
Jens Geyer01640402013-09-25 21:12:21 +0200132 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
Jake Farrell7ae13e12011-10-18 14:35:26 +0000133 procedure Listen;
134 procedure Close;
Jens Geyer01640402013-09-25 21:12:21 +0200135 function Accept( const fnAccepting: TProc): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000136 end;
137
138 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
139 protected
Jake Farrell7ae13e12011-10-18 14:35:26 +0000140 procedure Listen; virtual; abstract;
141 procedure Close; virtual; abstract;
Jens Geyer01640402013-09-25 21:12:21 +0200142 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000143 end;
144
145 ITransportFactory = interface
146 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
Roger Meier333bbf32012-01-08 21:51:08 +0000147 function GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000148 end;
149
150 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
Roger Meier333bbf32012-01-08 21:51:08 +0000151 function GetTransport( const ATrans: ITransport): ITransport; virtual;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000152 end;
153
154 TTcpSocketStreamImpl = class( TThriftStreamImpl )
155 private
156 FTcpClient : TCustomIpClient;
Jens Geyer684ccab2014-09-11 21:14:44 +0200157 FTimeout : Integer;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000158 protected
159 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
160 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
161 procedure Open; override;
162 procedure Close; override;
163 procedure Flush; override;
164
165 function IsOpen: Boolean; override;
166 function ToArray: TBytes; override;
167 public
Jens Geyer684ccab2014-09-11 21:14:44 +0200168 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000169 end;
170
171 IStreamTransport = interface( ITransport )
172 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
173 function GetInputStream: IThriftStream;
174 function GetOutputStream: IThriftStream;
175 property InputStream : IThriftStream read GetInputStream;
176 property OutputStream : IThriftStream read GetOutputStream;
177 end;
178
179 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
180 protected
181 FInputStream : IThriftStream;
182 FOutputStream : IThriftStream;
183 protected
184 function GetIsOpen: Boolean; override;
185
186 function GetInputStream: IThriftStream;
187 function GetOutputStream: IThriftStream;
188 public
189 property InputStream : IThriftStream read GetInputStream;
190 property OutputStream : IThriftStream read GetOutputStream;
191
192 procedure Open; override;
193 procedure Close; override;
194 procedure Flush; override;
195 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
196 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000197 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000198 destructor Destroy; override;
199 end;
200
201 TBufferedStreamImpl = class( TThriftStreamImpl)
202 private
203 FStream : IThriftStream;
204 FBufSize : Integer;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100205 FReadBuffer : TMemoryStream;
206 FWriteBuffer : TMemoryStream;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000207 protected
208 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
209 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
210 procedure Open; override;
211 procedure Close; override;
212 procedure Flush; override;
213 function IsOpen: Boolean; override;
214 function ToArray: TBytes; override;
215 public
Roger Meier333bbf32012-01-08 21:51:08 +0000216 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000217 destructor Destroy; override;
218 end;
219
220 TServerSocketImpl = class( TServerTransportImpl)
221 private
222 FServer : TTcpServer;
223 FPort : Integer;
224 FClientTimeout : Integer;
225 FUseBufferedSocket : Boolean;
226 FOwnsServer : Boolean;
227 protected
Jens Geyer01640402013-09-25 21:12:21 +0200228 function Accept( const fnAccepting: TProc) : ITransport; override;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000229 public
Jens Geyer684ccab2014-09-11 21:14:44 +0200230 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
231 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000232 destructor Destroy; override;
233 procedure Listen; override;
234 procedure Close; override;
235 end;
236
237 TBufferedTransportImpl = class( TTransportImpl )
238 private
239 FInputBuffer : IThriftStream;
240 FOutputBuffer : IThriftStream;
241 FTransport : IStreamTransport;
242 FBufSize : Integer;
243
244 procedure InitBuffers;
245 function GetUnderlyingTransport: ITransport;
246 protected
247 function GetIsOpen: Boolean; override;
248 procedure Flush; override;
249 public
250 procedure Open(); override;
251 procedure Close(); override;
252 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
253 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
Roger Meier333bbf32012-01-08 21:51:08 +0000254 constructor Create( const ATransport : IStreamTransport ); overload;
255 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000256 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
257 property IsOpen: Boolean read GetIsOpen;
258 end;
259
260 TSocketImpl = class(TStreamTransportImpl)
261 private
262 FClient : TCustomIpClient;
263 FOwnsClient : Boolean;
264 FHost : string;
265 FPort : Integer;
266 FTimeout : Integer;
267
268 procedure InitSocket;
269 protected
270 function GetIsOpen: Boolean; override;
271 public
272 procedure Open; override;
Jens Geyer684ccab2014-09-11 21:14:44 +0200273 constructor Create( const AClient : TCustomIpClient; ATimeout: Integer = 0); overload;
274 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000275 destructor Destroy; override;
276 procedure Close; override;
277 property TcpClient: TCustomIpClient read FClient;
278 property Host : string read FHost;
279 property Port: Integer read FPort;
280 end;
281
282 TFramedTransportImpl = class( TTransportImpl)
283 private const
284 FHeaderSize : Integer = 4;
285 private class var
286 FHeader_Dummy : array of Byte;
287 protected
288 FTransport : ITransport;
289 FWriteBuffer : TMemoryStream;
290 FReadBuffer : TMemoryStream;
291
292 procedure InitWriteBuffer;
293 procedure ReadFrame;
294 public
295 type
296 TFactory = class( TTransportFactoryImpl )
297 public
Roger Meier333bbf32012-01-08 21:51:08 +0000298 function GetTransport( const ATrans: ITransport): ITransport; override;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000299 end;
300
301{$IF CompilerVersion >= 21.0}
302 class constructor Create;
303{$IFEND}
304 constructor Create; overload;
Roger Meier333bbf32012-01-08 21:51:08 +0000305 constructor Create( const ATrans: ITransport); overload;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000306 destructor Destroy; override;
307
308 procedure Open(); override;
309 function GetIsOpen: Boolean; override;
310
311 procedure Close(); override;
312 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
313 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
314 procedure Flush; override;
315 end;
316
317{$IF CompilerVersion < 21.0}
318procedure TFramedTransportImpl_Initialize;
319{$IFEND}
320
321implementation
322
323{ TTransportImpl }
324
325procedure TTransportImpl.Flush;
326begin
327
328end;
329
330function TTransportImpl.Peek: Boolean;
331begin
332 Result := IsOpen;
333end;
334
335function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
336var
337 got : Integer;
338 ret : Integer;
339begin
340 got := 0;
341 while ( got < len) do
342 begin
343 ret := Read( buf, off + got, len - got);
344 if ( ret <= 0 ) then
345 begin
346 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
347 end;
348 got := got + ret;
349 end;
350 Result := got;
351end;
352
353procedure TTransportImpl.Write( const buf: TBytes);
354begin
355 Self.Write( buf, 0, Length(buf) );
356end;
357
358{ THTTPClientImpl }
359
360procedure THTTPClientImpl.Close;
361begin
362 FInputStream := nil;
363 FOutputStream := nil;
364end;
365
366constructor THTTPClientImpl.Create(const AUri: string);
367begin
368 inherited Create;
369 FUri := AUri;
370 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
371 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
372end;
373
374function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
375var
376 pair : TPair<string,string>;
377begin
378{$IF CompilerVersion >= 21.0}
379 Result := CoXMLHTTP.Create;
380{$ELSE}
381 Result := CoXMLHTTPRequest.Create;
382{$IFEND}
383
384 Result.open('POST', FUri, False, '', '');
385 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
386 Result.setRequestHeader( 'Accept', 'application/x-thrift');
387 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
388
389 for pair in FCustomHeaders do
390 begin
391 Result.setRequestHeader( pair.Key, pair.Value );
392 end;
393end;
394
395destructor THTTPClientImpl.Destroy;
396begin
397 Close;
398 inherited;
399end;
400
401procedure THTTPClientImpl.Flush;
402begin
403 try
404 SendRequest;
405 finally
406 FOutputStream := nil;
407 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
408 end;
409end;
410
411function THTTPClientImpl.GetConnectionTimeout: Integer;
412begin
413 Result := FConnectionTimeout;
414end;
415
416function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
417begin
418 Result := FCustomHeaders;
419end;
420
421function THTTPClientImpl.GetIsOpen: Boolean;
422begin
423 Result := True;
424end;
425
426function THTTPClientImpl.GetReadTimeout: Integer;
427begin
428 Result := FReadTimeout;
429end;
430
431procedure THTTPClientImpl.Open;
432begin
433
434end;
435
436function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
437begin
438 if FInputStream = nil then
439 begin
440 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
441 'No request has been sent');
442 end;
443 try
444 Result := FInputStream.Read( buf, off, len )
445 except
446 on E: Exception do
447 begin
448 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
449 E.Message);
450 end;
451 end;
452end;
453
454procedure THTTPClientImpl.SendRequest;
455var
456 xmlhttp : IXMLHTTPRequest;
457 ms : TMemoryStream;
458 a : TBytes;
459 len : Integer;
460begin
461 xmlhttp := CreateRequest;
462
463 ms := TMemoryStream.Create;
464 try
465 a := FOutputStream.ToArray;
466 len := Length(a);
467 if len > 0 then
468 begin
469 ms.WriteBuffer( Pointer(@a[0])^, len);
470 end;
471 ms.Position := 0;
472 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
473 FInputStream := nil;
474 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
475 finally
476 ms.Free;
477 end;
478end;
479
480procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
481begin
482 FConnectionTimeout := Value;
483end;
484
485procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
486begin
487 FReadTimeout := Value
488end;
489
490procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
491begin
492 FOutputStream.Write( buf, off, len);
493end;
494
495{ TTransportException }
496
497constructor TTransportException.Create(AType: TExceptionType);
498begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200499 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000500 Create( AType, '' )
501end;
502
503constructor TTransportException.Create(AType: TExceptionType;
504 const msg: string);
505begin
506 inherited Create(msg);
507 FType := AType;
508end;
509
510constructor TTransportException.Create(const msg: string);
511begin
512 inherited Create(msg);
513end;
514
Jake Farrell7ae13e12011-10-18 14:35:26 +0000515{ TTransportFactoryImpl }
516
Roger Meier333bbf32012-01-08 21:51:08 +0000517function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000518begin
519 Result := ATrans;
520end;
521
522{ TServerSocket }
523
Roger Meier333bbf32012-01-08 21:51:08 +0000524constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000525begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200526 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000527 FServer := AServer;
528 FClientTimeout := AClientTimeout;
529end;
530
Jens Geyer684ccab2014-09-11 21:14:44 +0200531constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000532begin
Jens Geyer684ccab2014-09-11 21:14:44 +0200533 inherited Create;
534 FPort := APort;
535 FClientTimeout := AClientTimeout;
536 FUseBufferedSocket := AUseBufferedSockets;
537 FOwnsServer := True;
538 FServer := TTcpServer.Create( nil );
539 FServer.BlockMode := bmBlocking;
540{$IF CompilerVersion >= 21.0}
541 FServer.LocalPort := AnsiString( IntToStr( FPort));
542{$ELSE}
543 FServer.LocalPort := IntToStr( FPort);
544{$IFEND}
Jake Farrell7ae13e12011-10-18 14:35:26 +0000545end;
546
Jens Geyer684ccab2014-09-11 21:14:44 +0200547destructor TServerSocketImpl.Destroy;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000548begin
Jens Geyer684ccab2014-09-11 21:14:44 +0200549 if FOwnsServer then begin
550 FServer.Free;
551 FServer := nil;
552 end;
553 inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000554end;
555
Jens Geyer01640402013-09-25 21:12:21 +0200556function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000557var
Jens Geyer01640402013-09-25 21:12:21 +0200558 client : TCustomIpClient;
559 trans : IStreamTransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000560begin
561 if FServer = nil then
562 begin
563 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
564 'No underlying server socket.');
565 end;
566
567 try
Jens Geyer01640402013-09-25 21:12:21 +0200568 client := TCustomIpClient.Create(nil);
569
570 if Assigned(fnAccepting)
571 then fnAccepting();
572
573 if ( not FServer.Accept( client)) then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000574 begin
Jens Geyer01640402013-09-25 21:12:21 +0200575 client.Free;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000576 Result := nil;
577 Exit;
578 end;
579
Jens Geyer01640402013-09-25 21:12:21 +0200580 if client = nil then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000581 begin
582 Result := nil;
583 Exit;
584 end;
585
Jens Geyer684ccab2014-09-11 21:14:44 +0200586 trans := TSocketImpl.Create( client, FClientTimeout);
Jens Geyer01640402013-09-25 21:12:21 +0200587 if FUseBufferedSocket
588 then result := TBufferedTransportImpl.Create( trans)
589 else result := trans;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000590
591 except
592 on E: Exception do
593 begin
594 raise TTransportException.Create( E.ToString );
595 end;
596 end;
597end;
598
Jens Geyer684ccab2014-09-11 21:14:44 +0200599procedure TServerSocketImpl.Listen;
600begin
601 if FServer <> nil then
602 begin
603 try
604 FServer.Active := True;
605 except
606 on E: Exception do
607 begin
608 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
609 end;
610 end;
611 end;
612end;
613
Jake Farrell7ae13e12011-10-18 14:35:26 +0000614procedure TServerSocketImpl.Close;
615begin
616 if FServer <> nil then
617 begin
618 try
619 FServer.Active := False;
620 except
621 on E: Exception do
622 begin
623 raise TTransportException.Create('Error on closing socket : ' + E.Message);
624 end;
625 end;
626 end;
627end;
628
Jake Farrell7ae13e12011-10-18 14:35:26 +0000629{ TSocket }
630
Jens Geyer684ccab2014-09-11 21:14:44 +0200631constructor TSocketImpl.Create( const AClient : TCustomIpClient; ATimeout: Integer = 0);
632var stream : IThriftStream;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000633begin
634 FClient := AClient;
Jens Geyer684ccab2014-09-11 21:14:44 +0200635 FTimeout := ATimeout;
636 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
Jens Geyer718f6ee2013-09-06 21:02:34 +0200637 inherited Create( stream, stream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000638end;
639
Jake Farrell7ae13e12011-10-18 14:35:26 +0000640constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
641begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200642 inherited Create(nil,nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000643 FHost := AHost;
644 FPort := APort;
645 FTimeout := ATimeout;
646 InitSocket;
647end;
648
649destructor TSocketImpl.Destroy;
650begin
651 if FOwnsClient then
652 begin
653 FClient.Free;
654 end;
655 inherited;
656end;
657
Jens Geyer684ccab2014-09-11 21:14:44 +0200658procedure TSocketImpl.Close;
659begin
660 inherited Close;
661 if FClient <> nil
662 then FreeAndNil( FClient);
663end;
664
Jake Farrell7ae13e12011-10-18 14:35:26 +0000665function TSocketImpl.GetIsOpen: Boolean;
666begin
667 Result := False;
668 if FClient <> nil then
669 begin
670 Result := FClient.Connected;
671 end;
672end;
673
674procedure TSocketImpl.InitSocket;
675var
676 stream : IThriftStream;
677begin
Jens Geyer684ccab2014-09-11 21:14:44 +0200678 if (FClient <> nil) and FOwnsClient then begin
679 FClient.Free;
680 FClient := nil;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000681 end;
Jens Geyer684ccab2014-09-11 21:14:44 +0200682
683 FClient := TTcpClient.Create( nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000684 FOwnsClient := True;
685
Jens Geyer684ccab2014-09-11 21:14:44 +0200686 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000687 FInputStream := stream;
688 FOutputStream := stream;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000689end;
690
691procedure TSocketImpl.Open;
692begin
693 if IsOpen then
694 begin
695 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
696 'Socket already connected');
697 end;
698
699 if FHost = '' then
700 begin
701 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
702 'Cannot open null host');
703 end;
704
705 if Port <= 0 then
706 begin
707 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
708 'Cannot open without port');
709 end;
710
711 if FClient = nil then
712 begin
713 InitSocket;
714 end;
715
716 FClient.RemoteHost := TSocketHost( Host);
717 FClient.RemotePort := TSocketPort( IntToStr( Port));
718 FClient.Connect;
719
Jens Geyer684ccab2014-09-11 21:14:44 +0200720 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000721 FOutputStream := FInputStream;
722end;
723
724{ TBufferedStream }
725
726procedure TBufferedStreamImpl.Close;
727begin
728 Flush;
729 FStream := nil;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100730
731 FReadBuffer.Free;
732 FReadBuffer := nil;
733
734 FWriteBuffer.Free;
735 FWriteBuffer := nil;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000736end;
737
Roger Meier333bbf32012-01-08 21:51:08 +0000738constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000739begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200740 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000741 FStream := AStream;
742 FBufSize := ABufSize;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100743 FReadBuffer := TMemoryStream.Create;
744 FWriteBuffer := TMemoryStream.Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000745end;
746
747destructor TBufferedStreamImpl.Destroy;
748begin
749 Close;
750 inherited;
751end;
752
753procedure TBufferedStreamImpl.Flush;
754var
755 buf : TBytes;
756 len : Integer;
757begin
758 if IsOpen then
759 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100760 len := FWriteBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000761 if len > 0 then
762 begin
763 SetLength( buf, len );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100764 FWriteBuffer.Position := 0;
765 FWriteBuffer.Read( Pointer(@buf[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000766 FStream.Write( buf, 0, len );
767 end;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100768 FWriteBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000769 end;
770end;
771
772function TBufferedStreamImpl.IsOpen: Boolean;
773begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100774 Result := (FWriteBuffer <> nil)
775 and (FReadBuffer <> nil)
776 and (FStream <> nil);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000777end;
778
779procedure TBufferedStreamImpl.Open;
780begin
781
782end;
783
784function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
785var
786 nRead : Integer;
787 tempbuf : TBytes;
788begin
789 inherited;
790 Result := 0;
Jens Geyer6a7463a2013-03-07 20:40:59 +0100791 if IsOpen then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000792 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100793 while count > 0 do begin
794
795 if FReadBuffer.Position >= FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000796 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100797 FReadBuffer.Clear;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000798 SetLength( tempbuf, FBufSize);
799 nRead := FStream.Read( tempbuf, 0, FBufSize );
Jens Geyer6a7463a2013-03-07 20:40:59 +0100800 if nRead = 0 then Break; // avoid infinite loop
801
802 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
803 FReadBuffer.Position := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000804 end;
805
Jens Geyer6a7463a2013-03-07 20:40:59 +0100806 if FReadBuffer.Position < FReadBuffer.Size then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000807 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100808 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
809 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
810 Dec( count, nRead);
811 Inc( offset, nRead);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000812 end;
813 end;
814 end;
815end;
816
817function TBufferedStreamImpl.ToArray: TBytes;
818var
819 len : Integer;
820begin
821 len := 0;
822
823 if IsOpen then
824 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100825 len := FReadBuffer.Size;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000826 end;
827
828 SetLength( Result, len);
829
830 if len > 0 then
831 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100832 FReadBuffer.Position := 0;
833 FReadBuffer.Read( Pointer(@Result[0])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000834 end;
835end;
836
837procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
838begin
839 inherited;
840 if count > 0 then
841 begin
842 if IsOpen then
843 begin
Jens Geyer6a7463a2013-03-07 20:40:59 +0100844 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
845 if FWriteBuffer.Size > FBufSize then
Jake Farrell7ae13e12011-10-18 14:35:26 +0000846 begin
847 Flush;
848 end;
849 end;
850 end;
851end;
852
853{ TStreamTransportImpl }
854
855procedure TStreamTransportImpl.Close;
856begin
857 if FInputStream <> FOutputStream then
858 begin
859 if FInputStream <> nil then
860 begin
861 FInputStream := nil;
862 end;
863 if FOutputStream <> nil then
864 begin
865 FOutputStream := nil;
866 end;
867 end else
868 begin
869 FInputStream := nil;
870 FOutputStream := nil;
871 end;
872end;
873
Roger Meier333bbf32012-01-08 21:51:08 +0000874constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000875begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200876 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000877 FInputStream := AInputStream;
878 FOutputStream := AOutputStream;
879end;
880
881destructor TStreamTransportImpl.Destroy;
882begin
883 FInputStream := nil;
884 FOutputStream := nil;
885 inherited;
886end;
887
888procedure TStreamTransportImpl.Flush;
889begin
890 if FOutputStream = nil then
891 begin
892 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
893 end;
894
895 FOutputStream.Flush;
896end;
897
898function TStreamTransportImpl.GetInputStream: IThriftStream;
899begin
900 Result := FInputStream;
901end;
902
903function TStreamTransportImpl.GetIsOpen: Boolean;
904begin
905 Result := True;
906end;
907
908function TStreamTransportImpl.GetOutputStream: IThriftStream;
909begin
910 Result := FInputStream;
911end;
912
913procedure TStreamTransportImpl.Open;
914begin
915
916end;
917
918function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
919begin
920 if FInputStream = nil then
921 begin
922 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
923 end;
924 Result := FInputStream.Read( buf, off, len );
925end;
926
927procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
928begin
929 if FOutputStream = nil then
930 begin
Jake Farrelld09362c2011-10-26 02:25:07 +0000931 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
Jake Farrell7ae13e12011-10-18 14:35:26 +0000932 end;
933
934 FOutputStream.Write( buf, off, len );
935end;
936
937{ TBufferedTransportImpl }
938
Roger Meier333bbf32012-01-08 21:51:08 +0000939constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +0000940begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200941 //no inherited;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000942 Create( ATransport, 1024 );
943end;
944
945procedure TBufferedTransportImpl.Close;
946begin
947 FTransport.Close;
948end;
949
Roger Meier333bbf32012-01-08 21:51:08 +0000950constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000951 ABufSize: Integer);
952begin
Jens Geyer718f6ee2013-09-06 21:02:34 +0200953 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +0000954 FTransport := ATransport;
955 FBufSize := ABufSize;
956 InitBuffers;
957end;
958
959procedure TBufferedTransportImpl.Flush;
960begin
961 if FOutputBuffer <> nil then
962 begin
963 FOutputBuffer.Flush;
964 end;
965end;
966
967function TBufferedTransportImpl.GetIsOpen: Boolean;
968begin
969 Result := FTransport.IsOpen;
970end;
971
972function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
973begin
974 Result := FTransport;
975end;
976
977procedure TBufferedTransportImpl.InitBuffers;
978begin
979 if FTransport.InputStream <> nil then
980 begin
981 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
982 end;
983 if FTransport.OutputStream <> nil then
984 begin
985 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
986 end;
987end;
988
989procedure TBufferedTransportImpl.Open;
990begin
991 FTransport.Open
992end;
993
994function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
995begin
996 Result := 0;
997 if FInputBuffer <> nil then
998 begin
999 Result := FInputBuffer.Read( buf, off, len );
1000 end;
1001end;
1002
1003procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1004begin
1005 if FOutputBuffer <> nil then
1006 begin
1007 FOutputBuffer.Write( buf, off, len );
1008 end;
1009end;
1010
1011{ TFramedTransportImpl }
1012
1013{$IF CompilerVersion < 21.0}
1014procedure TFramedTransportImpl_Initialize;
1015begin
1016 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1017 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1018 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1019end;
1020{$ELSE}
1021class constructor TFramedTransportImpl.Create;
1022begin
1023 SetLength( FHeader_Dummy, FHeaderSize);
1024 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1025end;
1026{$IFEND}
1027
1028constructor TFramedTransportImpl.Create;
1029begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001030 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001031 InitWriteBuffer;
1032end;
1033
1034procedure TFramedTransportImpl.Close;
1035begin
1036 FTransport.Close;
1037end;
1038
Roger Meier333bbf32012-01-08 21:51:08 +00001039constructor TFramedTransportImpl.Create( const ATrans: ITransport);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001040begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001041 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001042 InitWriteBuffer;
1043 FTransport := ATrans;
1044end;
1045
1046destructor TFramedTransportImpl.Destroy;
1047begin
1048 FWriteBuffer.Free;
1049 FReadBuffer.Free;
1050 inherited;
1051end;
1052
1053procedure TFramedTransportImpl.Flush;
1054var
1055 buf : TBytes;
1056 len : Integer;
1057 data_len : Integer;
1058
1059begin
1060 len := FWriteBuffer.Size;
1061 SetLength( buf, len);
1062 if len > 0 then
1063 begin
1064 System.Move( FWriteBuffer.Memory^, buf[0], len );
1065 end;
1066
1067 data_len := len - FHeaderSize;
1068 if (data_len < 0) then
1069 begin
1070 raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
1071 end;
1072
1073 InitWriteBuffer;
1074
1075 buf[0] := Byte($FF and (data_len shr 24));
1076 buf[1] := Byte($FF and (data_len shr 16));
1077 buf[2] := Byte($FF and (data_len shr 8));
1078 buf[3] := Byte($FF and data_len);
1079
1080 FTransport.Write( buf, 0, len );
1081 FTransport.Flush;
1082end;
1083
1084function TFramedTransportImpl.GetIsOpen: Boolean;
1085begin
1086 Result := FTransport.IsOpen;
1087end;
1088
1089type
1090 TAccessMemoryStream = class(TMemoryStream)
1091 end;
1092
1093procedure TFramedTransportImpl.InitWriteBuffer;
1094begin
1095 FWriteBuffer.Free;
1096 FWriteBuffer := TMemoryStream.Create;
1097 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1098 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1099end;
1100
1101procedure TFramedTransportImpl.Open;
1102begin
1103 FTransport.Open;
1104end;
1105
1106function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1107var
1108 got : Integer;
1109begin
1110 if FReadBuffer <> nil then
1111 begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001112 if len > 0
1113 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1114 else got := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001115 if got > 0 then
1116 begin
1117 Result := got;
1118 Exit;
1119 end;
1120 end;
1121
1122 ReadFrame;
Jake Farrell9c6773a2012-03-22 02:40:45 +00001123 if len > 0
1124 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1125 else Result := 0;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001126end;
1127
1128procedure TFramedTransportImpl.ReadFrame;
1129var
1130 i32rd : TBytes;
1131 size : Integer;
1132 buff : TBytes;
1133begin
1134 SetLength( i32rd, FHeaderSize );
1135 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1136 size :=
1137 ((i32rd[0] and $FF) shl 24) or
1138 ((i32rd[1] and $FF) shl 16) or
1139 ((i32rd[2] and $FF) shl 8) or
1140 (i32rd[3] and $FF);
1141 SetLength( buff, size );
1142 FTransport.ReadAll( buff, 0, size );
1143 FReadBuffer.Free;
1144 FReadBuffer := TMemoryStream.Create;
1145 FReadBuffer.Write( Pointer(@buff[0])^, size );
1146 FReadBuffer.Position := 0;
1147end;
1148
1149procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1150begin
Jake Farrell9c6773a2012-03-22 02:40:45 +00001151 if len > 0
1152 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
Jake Farrell7ae13e12011-10-18 14:35:26 +00001153end;
1154
1155{ TFramedTransport.TFactory }
1156
Roger Meier333bbf32012-01-08 21:51:08 +00001157function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001158begin
1159 Result := TFramedTransportImpl.Create( ATrans );
1160end;
1161
1162{ TTcpSocketStreamImpl }
1163
1164procedure TTcpSocketStreamImpl.Close;
1165begin
1166 FTcpClient.Close;
1167end;
1168
Jens Geyer684ccab2014-09-11 21:14:44 +02001169constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001170begin
Jens Geyer718f6ee2013-09-06 21:02:34 +02001171 inherited Create;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001172 FTcpClient := ATcpClient;
Jens Geyer684ccab2014-09-11 21:14:44 +02001173 FTimeout := aTimeout;
Jake Farrell7ae13e12011-10-18 14:35:26 +00001174end;
1175
1176procedure TTcpSocketStreamImpl.Flush;
1177begin
1178
1179end;
1180
1181function TTcpSocketStreamImpl.IsOpen: Boolean;
1182begin
1183 Result := FTcpClient.Active;
1184end;
1185
1186procedure TTcpSocketStreamImpl.Open;
1187begin
1188 FTcpClient.Open;
1189end;
1190
1191function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset,
1192 count: Integer): Integer;
1193begin
1194 inherited;
Jens Geyer684ccab2014-09-11 21:14:44 +02001195
1196 if (FTimeout > 0) then begin
1197 if not FTcpClient.WaitForData(FTimeout)
1198 then Exit(0);
1199 end;
1200
1201 result := FTcpClient.ReceiveBuf( Pointer(@buffer[offset])^, count);
Jake Farrell7ae13e12011-10-18 14:35:26 +00001202end;
1203
1204function TTcpSocketStreamImpl.ToArray: TBytes;
1205var
1206 len : Integer;
1207begin
1208 len := 0;
1209 if IsOpen then
1210 begin
1211 len := FTcpClient.BytesReceived;
1212 end;
1213
1214 SetLength( Result, len );
1215
1216 if len > 0 then
1217 begin
1218 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1219 end;
1220end;
1221
1222procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1223begin
1224 inherited;
1225 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1226end;
1227
1228{$IF CompilerVersion < 21.0}
1229initialization
1230begin
1231 TFramedTransportImpl_Initialize;
1232end;
1233{$IFEND}
1234
1235
1236end.