blob: a46fe5c08f7044975c090e14929de300fa6aa06a [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX, msxml, WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer9f7f11e2016-04-14 21:37:11 +020034 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
42 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030043 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020044
45type
46 ITransport = interface
47 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
48 function GetIsOpen: Boolean;
49 property IsOpen: Boolean read GetIsOpen;
50 function Peek: Boolean;
51 procedure Open;
52 procedure Close;
53 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
54 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
55 procedure Write( const buf: TBytes); overload;
56 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
57 procedure Flush;
58 end;
59
60 TTransportImpl = class( TInterfacedObject, ITransport)
61 protected
62 function GetIsOpen: Boolean; virtual; abstract;
63 property IsOpen: Boolean read GetIsOpen;
64 function Peek: Boolean; virtual;
65 procedure Open(); virtual; abstract;
66 procedure Close(); virtual; abstract;
67 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
68 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
69 procedure Write( const buf: TBytes); overload; virtual;
70 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
71 procedure Flush; virtual;
72 end;
73
74 TTransportException = class( Exception )
75 public
76 type
77 TExceptionType = (
78 Unknown,
79 NotOpen,
80 AlreadyOpen,
81 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +020082 EndOfFile,
83 BadArgs,
84 Interrupted
Jens Geyerd5436f52014-10-03 19:50:38 +020085 );
86 private
87 FType : TExceptionType;
88 public
89 constructor Create( AType: TExceptionType); overload;
90 constructor Create( const msg: string); overload;
91 constructor Create( AType: TExceptionType; const msg: string); overload;
92 property Type_: TExceptionType read FType;
93 end;
94
95 IHTTPClient = interface( ITransport )
96 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
97 procedure SetConnectionTimeout(const Value: Integer);
98 function GetConnectionTimeout: Integer;
99 procedure SetReadTimeout(const Value: Integer);
100 function GetReadTimeout: Integer;
101 function GetCustomHeaders: IThriftDictionary<string,string>;
102 procedure SendRequest;
103 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
104 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
105 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
106 end;
107
108 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
109 private
110 FUri : string;
111 FInputStream : IThriftStream;
112 FOutputStream : IThriftStream;
113 FConnectionTimeout : Integer;
114 FReadTimeout : Integer;
115 FCustomHeaders : IThriftDictionary<string,string>;
116
117 function CreateRequest: IXMLHTTPRequest;
118 protected
119 function GetIsOpen: Boolean; override;
120 procedure Open(); override;
121 procedure Close(); override;
122 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
123 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
124 procedure Flush; override;
125
126 procedure SetConnectionTimeout(const Value: Integer);
127 function GetConnectionTimeout: Integer;
128 procedure SetReadTimeout(const Value: Integer);
129 function GetReadTimeout: Integer;
130 function GetCustomHeaders: IThriftDictionary<string,string>;
131 procedure SendRequest;
132 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
133 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
134 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
135 public
136 constructor Create( const AUri: string);
137 destructor Destroy; override;
138 end;
139
140 IServerTransport = interface
141 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
142 procedure Listen;
143 procedure Close;
144 function Accept( const fnAccepting: TProc): ITransport;
145 end;
146
147 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
148 protected
149 procedure Listen; virtual; abstract;
150 procedure Close; virtual; abstract;
151 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
152 end;
153
154 ITransportFactory = interface
155 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
156 function GetTransport( const ATrans: ITransport): ITransport;
157 end;
158
159 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
160 function GetTransport( const ATrans: ITransport): ITransport; virtual;
161 end;
162
163 TTcpSocketStreamImpl = class( TThriftStreamImpl )
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200164{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200165 private type
166 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
167 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200168 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200169 FTimeout : Integer;
170 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
171 TimeOut: Integer; var wsaError : Integer): Integer;
172 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200173 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200174{$ELSE}
175 FTcpClient: TSocket;
176 protected const
177 SLEEP_TIME = 200;
178{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200179 protected
180 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
181 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
182 procedure Open; override;
183 procedure Close; override;
184 procedure Flush; override;
185
186 function IsOpen: Boolean; override;
187 function ToArray: TBytes; override;
188 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200189{$IFDEF OLD_SOCKETS}
190 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
191{$ELSE}
192 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
193{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200194 end;
195
196 IStreamTransport = interface( ITransport )
197 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
198 function GetInputStream: IThriftStream;
199 function GetOutputStream: IThriftStream;
200 property InputStream : IThriftStream read GetInputStream;
201 property OutputStream : IThriftStream read GetOutputStream;
202 end;
203
204 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
205 protected
206 FInputStream : IThriftStream;
207 FOutputStream : IThriftStream;
208 protected
209 function GetIsOpen: Boolean; override;
210
211 function GetInputStream: IThriftStream;
212 function GetOutputStream: IThriftStream;
213 public
214 property InputStream : IThriftStream read GetInputStream;
215 property OutputStream : IThriftStream read GetOutputStream;
216
217 procedure Open; override;
218 procedure Close; override;
219 procedure Flush; override;
220 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
221 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
222 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
223 destructor Destroy; override;
224 end;
225
226 TBufferedStreamImpl = class( TThriftStreamImpl)
227 private
228 FStream : IThriftStream;
229 FBufSize : Integer;
230 FReadBuffer : TMemoryStream;
231 FWriteBuffer : TMemoryStream;
232 protected
233 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
234 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
235 procedure Open; override;
236 procedure Close; override;
237 procedure Flush; override;
238 function IsOpen: Boolean; override;
239 function ToArray: TBytes; override;
240 public
241 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
242 destructor Destroy; override;
243 end;
244
245 TServerSocketImpl = class( TServerTransportImpl)
246 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200247{$IFDEF OLD_SOCKETS}
248 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200249 FPort : Integer;
250 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200251{$ELSE}
252 FServer: TServerSocket;
253{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200254 FUseBufferedSocket : Boolean;
255 FOwnsServer : Boolean;
256 protected
257 function Accept( const fnAccepting: TProc) : ITransport; override;
258 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200259{$IFDEF OLD_SOCKETS}
260 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200261 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200262{$ELSE}
263 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
264 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
265{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200266 destructor Destroy; override;
267 procedure Listen; override;
268 procedure Close; override;
269 end;
270
271 TBufferedTransportImpl = class( TTransportImpl )
272 private
273 FInputBuffer : IThriftStream;
274 FOutputBuffer : IThriftStream;
275 FTransport : IStreamTransport;
276 FBufSize : Integer;
277
278 procedure InitBuffers;
279 function GetUnderlyingTransport: ITransport;
280 protected
281 function GetIsOpen: Boolean; override;
282 procedure Flush; override;
283 public
284 procedure Open(); override;
285 procedure Close(); override;
286 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
287 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
288 constructor Create( const ATransport : IStreamTransport ); overload;
289 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
290 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
291 property IsOpen: Boolean read GetIsOpen;
292 end;
293
294 TSocketImpl = class(TStreamTransportImpl)
295 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200296{$IFDEF OLD_SOCKETS}
297 FClient : TCustomIpClient;
298{$ELSE}
299 FClient: TSocket;
300{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200301 FOwnsClient : Boolean;
302 FHost : string;
303 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200304{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200305 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200306{$ELSE}
307 FTimeout : Longword;
308{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200309
310 procedure InitSocket;
311 protected
312 function GetIsOpen: Boolean; override;
313 public
314 procedure Open; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200315{$IFDEF OLD_SOCKETS}
316 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200317 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200318{$ELSE}
319 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
320 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
321{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200322 destructor Destroy; override;
323 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200324{$IFDEF OLD_SOCKETS}
325 property TcpClient: TCustomIpClient read FClient;
326{$ELSE}
327 property TcpClient: TSocket read FClient;
328{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200329 property Host : string read FHost;
330 property Port: Integer read FPort;
331 end;
332
333 TFramedTransportImpl = class( TTransportImpl)
334 private const
335 FHeaderSize : Integer = 4;
336 private class var
337 FHeader_Dummy : array of Byte;
338 protected
339 FTransport : ITransport;
340 FWriteBuffer : TMemoryStream;
341 FReadBuffer : TMemoryStream;
342
343 procedure InitWriteBuffer;
344 procedure ReadFrame;
345 public
346 type
347 TFactory = class( TTransportFactoryImpl )
348 public
349 function GetTransport( const ATrans: ITransport): ITransport; override;
350 end;
351
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200352 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200353 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200354 {$ENDIF}
355
Jens Geyerd5436f52014-10-03 19:50:38 +0200356 constructor Create; overload;
357 constructor Create( const ATrans: ITransport); overload;
358 destructor Destroy; override;
359
360 procedure Open(); override;
361 function GetIsOpen: Boolean; override;
362
363 procedure Close(); override;
364 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
365 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
366 procedure Flush; override;
367 end;
368
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200369{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200370procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200371{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200372
373const
374 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
375
376
377implementation
378
379{ TTransportImpl }
380
381procedure TTransportImpl.Flush;
382begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200383 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200384end;
385
386function TTransportImpl.Peek: Boolean;
387begin
388 Result := IsOpen;
389end;
390
391function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
392var
393 got : Integer;
394 ret : Integer;
395begin
396 got := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200397 while got < len do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200398 ret := Read( buf, off + got, len - got);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200399 if ret > 0
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200400 then Inc( got, ret)
401 else raise TTransportException.Create( 'Cannot read, Remote side has closed' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200402 end;
403 Result := got;
404end;
405
406procedure TTransportImpl.Write( const buf: TBytes);
407begin
408 Self.Write( buf, 0, Length(buf) );
409end;
410
411{ THTTPClientImpl }
412
413procedure THTTPClientImpl.Close;
414begin
415 FInputStream := nil;
416 FOutputStream := nil;
417end;
418
419constructor THTTPClientImpl.Create(const AUri: string);
420begin
421 inherited Create;
422 FUri := AUri;
423 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
424 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
425end;
426
427function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
428var
429 pair : TPair<string,string>;
430begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200431 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200432 Result := CoXMLHTTP.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200433 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200434 Result := CoXMLHTTPRequest.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200435 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200436
437 Result.open('POST', FUri, False, '', '');
438 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
439 Result.setRequestHeader( 'Accept', 'application/x-thrift');
440 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
441
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200442 for pair in FCustomHeaders do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200443 Result.setRequestHeader( pair.Key, pair.Value );
444 end;
445end;
446
447destructor THTTPClientImpl.Destroy;
448begin
449 Close;
450 inherited;
451end;
452
453procedure THTTPClientImpl.Flush;
454begin
455 try
456 SendRequest;
457 finally
458 FOutputStream := nil;
459 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
460 end;
461end;
462
463function THTTPClientImpl.GetConnectionTimeout: Integer;
464begin
465 Result := FConnectionTimeout;
466end;
467
468function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
469begin
470 Result := FCustomHeaders;
471end;
472
473function THTTPClientImpl.GetIsOpen: Boolean;
474begin
475 Result := True;
476end;
477
478function THTTPClientImpl.GetReadTimeout: Integer;
479begin
480 Result := FReadTimeout;
481end;
482
483procedure THTTPClientImpl.Open;
484begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200485 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200486end;
487
488function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
489begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100490 if FInputStream = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200491 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100492 'No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200493 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100494
Jens Geyerd5436f52014-10-03 19:50:38 +0200495 try
496 Result := FInputStream.Read( buf, off, len )
497 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100498 on E: Exception
499 do raise TTransportException.Create( TTransportException.TExceptionType.Unknown, E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200500 end;
501end;
502
503procedure THTTPClientImpl.SendRequest;
504var
505 xmlhttp : IXMLHTTPRequest;
506 ms : TMemoryStream;
507 a : TBytes;
508 len : Integer;
509begin
510 xmlhttp := CreateRequest;
511
512 ms := TMemoryStream.Create;
513 try
514 a := FOutputStream.ToArray;
515 len := Length(a);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200516 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200517 ms.WriteBuffer( Pointer(@a[0])^, len);
518 end;
519 ms.Position := 0;
520 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
521 FInputStream := nil;
522 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
523 finally
524 ms.Free;
525 end;
526end;
527
528procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
529begin
530 FConnectionTimeout := Value;
531end;
532
533procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
534begin
535 FReadTimeout := Value
536end;
537
538procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
539begin
540 FOutputStream.Write( buf, off, len);
541end;
542
543{ TTransportException }
544
545constructor TTransportException.Create(AType: TExceptionType);
546begin
547 //no inherited;
548 Create( AType, '' )
549end;
550
551constructor TTransportException.Create(AType: TExceptionType;
552 const msg: string);
553begin
554 inherited Create(msg);
555 FType := AType;
556end;
557
558constructor TTransportException.Create(const msg: string);
559begin
560 inherited Create(msg);
561end;
562
563{ TTransportFactoryImpl }
564
565function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
566begin
567 Result := ATrans;
568end;
569
570{ TServerSocket }
571
Jens Geyer23d67462015-12-19 11:44:57 +0100572{$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200573constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200574begin
575 inherited Create;
576 FServer := AServer;
577 FClientTimeout := AClientTimeout;
578end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200579{$ELSE}
580constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
Jens Geyerd5436f52014-10-03 19:50:38 +0200581begin
582 inherited Create;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200583 FServer := AServer;
584 FServer.RecvTimeout := AClientTimeout;
585 FServer.SendTimeout := AClientTimeout;
586end;
587{$ENDIF}
588
589{$IFDEF OLD_SOCKETS}
590constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
591{$ELSE}
592constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
593{$ENDIF}
594begin
595 inherited Create;
596{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200597 FPort := APort;
598 FClientTimeout := AClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200599 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200600 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200601 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200602 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200603 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200604 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200605 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200606{$ELSE}
607 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
608{$ENDIF}
609 FUseBufferedSocket := AUseBufferedSockets;
610 FOwnsServer := True;
Jens Geyerd5436f52014-10-03 19:50:38 +0200611end;
612
613destructor TServerSocketImpl.Destroy;
614begin
615 if FOwnsServer then begin
616 FServer.Free;
617 FServer := nil;
618 end;
619 inherited;
620end;
621
622function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
623var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200624{$IFDEF OLD_SOCKETS}
625 client : TCustomIpClient;
626{$ELSE}
627 client: TSocket;
628{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200629 trans : IStreamTransport;
630begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100631 if FServer = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200632 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100633 'No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200634 end;
635
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200636{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200637 client := nil;
638 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200639 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200640
641 if Assigned(fnAccepting)
642 then fnAccepting();
643
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100644 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200645 client.Free;
646 Result := nil;
647 Exit;
648 end;
649
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100650 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200651 Result := nil;
652 Exit;
653 end;
654
655 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
656 client := nil; // trans owns it now
657
658 if FUseBufferedSocket
659 then result := TBufferedTransportImpl.Create( trans)
660 else result := trans;
661
662 except
663 on E: Exception do begin
664 client.Free;
665 raise TTransportException.Create( E.ToString );
666 end;
667 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200668{$ELSE}
669 if Assigned(fnAccepting) then
670 fnAccepting();
671
672 client := FServer.Accept;
673 try
674 trans := TSocketImpl.Create(client, True);
675 client := nil;
676
677 if FUseBufferedSocket then
678 Result := TBufferedTransportImpl.Create(trans)
679 else
680 Result := trans;
681 except
682 client.Free;
683 raise;
684 end;
685{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200686end;
687
688procedure TServerSocketImpl.Listen;
689begin
690 if FServer <> nil then
691 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200692{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200693 try
694 FServer.Active := True;
695 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200696 on E: Exception
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200697 do raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200698 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200699{$ELSE}
700 FServer.Listen;
701{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200702 end;
703end;
704
705procedure TServerSocketImpl.Close;
706begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200707 if FServer <> nil then
708{$IFDEF OLD_SOCKETS}
709 try
710 FServer.Active := False;
711 except
712 on E: Exception
713 do raise TTransportException.Create('Error on closing socket : ' + E.Message);
714 end;
715{$ELSE}
716 FServer.Close;
717{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200718end;
719
720{ TSocket }
721
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200722{$IFDEF OLD_SOCKETS}
723constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200724var stream : IThriftStream;
725begin
726 FClient := AClient;
727 FTimeout := ATimeout;
728 FOwnsClient := aOwnsClient;
729 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
730 inherited Create( stream, stream);
731end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200732{$ELSE}
733constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
734var stream : IThriftStream;
735begin
736 FClient := AClient;
737 FTimeout := AClient.RecvTimeout;
738 FOwnsClient := aOwnsClient;
739 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
740 inherited Create(stream, stream);
741end;
742{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200743
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200744{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200745constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200746{$ELSE}
747constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
748{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200749begin
750 inherited Create(nil,nil);
751 FHost := AHost;
752 FPort := APort;
753 FTimeout := ATimeout;
754 InitSocket;
755end;
756
757destructor TSocketImpl.Destroy;
758begin
759 if FOwnsClient
760 then FreeAndNil( FClient);
761 inherited;
762end;
763
764procedure TSocketImpl.Close;
765begin
766 inherited Close;
767 if FOwnsClient
768 then FreeAndNil( FClient);
769end;
770
771function TSocketImpl.GetIsOpen: Boolean;
772begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200773{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200774 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200775{$ELSE}
776 Result := (FClient <> nil) and FClient.IsOpen
777{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200778end;
779
780procedure TSocketImpl.InitSocket;
781var
782 stream : IThriftStream;
783begin
784 if FOwnsClient
785 then FreeAndNil( FClient)
786 else FClient := nil;
787
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200788{$IFDEF OLD_SOCKETS}
789 FClient := TTcpClient.Create( nil);
790{$ELSE}
791 FClient := TSocket.Create(FHost, FPort);
792{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200793 FOwnsClient := True;
794
795 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
796 FInputStream := stream;
797 FOutputStream := stream;
798end;
799
800procedure TSocketImpl.Open;
801begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100802 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200803 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100804 'Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200805 end;
806
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100807 if FHost = '' then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200808 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100809 'Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200810 end;
811
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100812 if Port <= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200813 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100814 'Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200815 end;
816
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100817 if FClient = nil
818 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200819
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200820{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200821 FClient.RemoteHost := TSocketHost( Host);
822 FClient.RemotePort := TSocketPort( IntToStr( Port));
823 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200824{$ELSE}
825 FClient.Open;
826{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200827
828 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
829 FOutputStream := FInputStream;
830end;
831
832{ TBufferedStream }
833
834procedure TBufferedStreamImpl.Close;
835begin
836 Flush;
837 FStream := nil;
838
839 FReadBuffer.Free;
840 FReadBuffer := nil;
841
842 FWriteBuffer.Free;
843 FWriteBuffer := nil;
844end;
845
846constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
847begin
848 inherited Create;
849 FStream := AStream;
850 FBufSize := ABufSize;
851 FReadBuffer := TMemoryStream.Create;
852 FWriteBuffer := TMemoryStream.Create;
853end;
854
855destructor TBufferedStreamImpl.Destroy;
856begin
857 Close;
858 inherited;
859end;
860
861procedure TBufferedStreamImpl.Flush;
862var
863 buf : TBytes;
864 len : Integer;
865begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200866 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200867 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200868 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200869 SetLength( buf, len );
870 FWriteBuffer.Position := 0;
871 FWriteBuffer.Read( Pointer(@buf[0])^, len );
872 FStream.Write( buf, 0, len );
873 end;
874 FWriteBuffer.Clear;
875 end;
876end;
877
878function TBufferedStreamImpl.IsOpen: Boolean;
879begin
880 Result := (FWriteBuffer <> nil)
881 and (FReadBuffer <> nil)
882 and (FStream <> nil);
883end;
884
885procedure TBufferedStreamImpl.Open;
886begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200887 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200888end;
889
890function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
891var
892 nRead : Integer;
893 tempbuf : TBytes;
894begin
895 inherited;
896 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200897
898 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200899 while count > 0 do begin
900
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200901 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200902 FReadBuffer.Clear;
903 SetLength( tempbuf, FBufSize);
904 nRead := FStream.Read( tempbuf, 0, FBufSize );
905 if nRead = 0 then Break; // avoid infinite loop
906
907 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
908 FReadBuffer.Position := 0;
909 end;
910
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200911 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200912 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
913 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
914 Dec( count, nRead);
915 Inc( offset, nRead);
916 end;
917 end;
918 end;
919end;
920
921function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200922var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200923begin
924 len := 0;
925
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200926 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200927 len := FReadBuffer.Size;
928 end;
929
930 SetLength( Result, len);
931
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200932 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200933 FReadBuffer.Position := 0;
934 FReadBuffer.Read( Pointer(@Result[0])^, len );
935 end;
936end;
937
938procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
939begin
940 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200941 if count > 0 then begin
942 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200943 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200944 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200945 Flush;
946 end;
947 end;
948 end;
949end;
950
951{ TStreamTransportImpl }
952
953procedure TStreamTransportImpl.Close;
954begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100955 FInputStream := nil;
956 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200957end;
958
959constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
960begin
961 inherited Create;
962 FInputStream := AInputStream;
963 FOutputStream := AOutputStream;
964end;
965
966destructor TStreamTransportImpl.Destroy;
967begin
968 FInputStream := nil;
969 FOutputStream := nil;
970 inherited;
971end;
972
973procedure TStreamTransportImpl.Flush;
974begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100975 if FOutputStream = nil then begin
976 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
977 'Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200978 end;
979
980 FOutputStream.Flush;
981end;
982
983function TStreamTransportImpl.GetInputStream: IThriftStream;
984begin
985 Result := FInputStream;
986end;
987
988function TStreamTransportImpl.GetIsOpen: Boolean;
989begin
990 Result := True;
991end;
992
993function TStreamTransportImpl.GetOutputStream: IThriftStream;
994begin
995 Result := FInputStream;
996end;
997
998procedure TStreamTransportImpl.Open;
999begin
1000
1001end;
1002
1003function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1004begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001005 if FInputStream = nil then begin
1006 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
1007 'Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001008 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001009
Jens Geyerd5436f52014-10-03 19:50:38 +02001010 Result := FInputStream.Read( buf, off, len );
1011end;
1012
1013procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
1014begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001015 if FOutputStream = nil then begin
1016 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
1017 'Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001018 end;
1019
1020 FOutputStream.Write( buf, off, len );
1021end;
1022
1023{ TBufferedTransportImpl }
1024
1025constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
1026begin
1027 //no inherited;
1028 Create( ATransport, 1024 );
1029end;
1030
1031procedure TBufferedTransportImpl.Close;
1032begin
1033 FTransport.Close;
1034end;
1035
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001036constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001037begin
1038 inherited Create;
1039 FTransport := ATransport;
1040 FBufSize := ABufSize;
1041 InitBuffers;
1042end;
1043
1044procedure TBufferedTransportImpl.Flush;
1045begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001046 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001047 FOutputBuffer.Flush;
1048 end;
1049end;
1050
1051function TBufferedTransportImpl.GetIsOpen: Boolean;
1052begin
1053 Result := FTransport.IsOpen;
1054end;
1055
1056function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1057begin
1058 Result := FTransport;
1059end;
1060
1061procedure TBufferedTransportImpl.InitBuffers;
1062begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001063 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001064 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1065 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001066 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001067 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1068 end;
1069end;
1070
1071procedure TBufferedTransportImpl.Open;
1072begin
1073 FTransport.Open
1074end;
1075
1076function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1077begin
1078 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001079 if FInputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001080 Result := FInputBuffer.Read( buf, off, len );
1081 end;
1082end;
1083
1084procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1085begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001086 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001087 FOutputBuffer.Write( buf, off, len );
1088 end;
1089end;
1090
1091{ TFramedTransportImpl }
1092
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001093{$IFDEF HAVE_CLASS_CTOR}
1094class constructor TFramedTransportImpl.Create;
1095begin
1096 SetLength( FHeader_Dummy, FHeaderSize);
1097 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1098end;
1099{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001100procedure TFramedTransportImpl_Initialize;
1101begin
1102 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1103 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1104 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1105end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001106{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001107
1108constructor TFramedTransportImpl.Create;
1109begin
1110 inherited Create;
1111 InitWriteBuffer;
1112end;
1113
1114procedure TFramedTransportImpl.Close;
1115begin
1116 FTransport.Close;
1117end;
1118
1119constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1120begin
1121 inherited Create;
1122 InitWriteBuffer;
1123 FTransport := ATrans;
1124end;
1125
1126destructor TFramedTransportImpl.Destroy;
1127begin
1128 FWriteBuffer.Free;
1129 FReadBuffer.Free;
1130 inherited;
1131end;
1132
1133procedure TFramedTransportImpl.Flush;
1134var
1135 buf : TBytes;
1136 len : Integer;
1137 data_len : Integer;
1138
1139begin
1140 len := FWriteBuffer.Size;
1141 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001142 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001143 System.Move( FWriteBuffer.Memory^, buf[0], len );
1144 end;
1145
1146 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001147 if (data_len < 0) then begin
1148 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1149 'TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001150 end;
1151
1152 InitWriteBuffer;
1153
1154 buf[0] := Byte($FF and (data_len shr 24));
1155 buf[1] := Byte($FF and (data_len shr 16));
1156 buf[2] := Byte($FF and (data_len shr 8));
1157 buf[3] := Byte($FF and data_len);
1158
1159 FTransport.Write( buf, 0, len );
1160 FTransport.Flush;
1161end;
1162
1163function TFramedTransportImpl.GetIsOpen: Boolean;
1164begin
1165 Result := FTransport.IsOpen;
1166end;
1167
1168type
1169 TAccessMemoryStream = class(TMemoryStream)
1170 end;
1171
1172procedure TFramedTransportImpl.InitWriteBuffer;
1173begin
1174 FWriteBuffer.Free;
1175 FWriteBuffer := TMemoryStream.Create;
1176 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1177 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1178end;
1179
1180procedure TFramedTransportImpl.Open;
1181begin
1182 FTransport.Open;
1183end;
1184
1185function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1186var
1187 got : Integer;
1188begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001189 if FReadBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001190 if len > 0
1191 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1192 else got := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001193
1194 if got > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001195 Result := got;
1196 Exit;
1197 end;
1198 end;
1199
1200 ReadFrame;
1201 if len > 0
1202 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1203 else Result := 0;
1204end;
1205
1206procedure TFramedTransportImpl.ReadFrame;
1207var
1208 i32rd : TBytes;
1209 size : Integer;
1210 buff : TBytes;
1211begin
1212 SetLength( i32rd, FHeaderSize );
1213 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1214 size :=
1215 ((i32rd[0] and $FF) shl 24) or
1216 ((i32rd[1] and $FF) shl 16) or
1217 ((i32rd[2] and $FF) shl 8) or
1218 (i32rd[3] and $FF);
1219 SetLength( buff, size );
1220 FTransport.ReadAll( buff, 0, size );
1221 FReadBuffer.Free;
1222 FReadBuffer := TMemoryStream.Create;
1223 FReadBuffer.Write( Pointer(@buff[0])^, size );
1224 FReadBuffer.Position := 0;
1225end;
1226
1227procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1228begin
1229 if len > 0
1230 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
1231end;
1232
1233{ TFramedTransport.TFactory }
1234
1235function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1236begin
1237 Result := TFramedTransportImpl.Create( ATrans );
1238end;
1239
1240{ TTcpSocketStreamImpl }
1241
1242procedure TTcpSocketStreamImpl.Close;
1243begin
1244 FTcpClient.Close;
1245end;
1246
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001247{$IFDEF OLD_SOCKETS}
1248constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001249begin
1250 inherited Create;
1251 FTcpClient := ATcpClient;
1252 FTimeout := aTimeout;
1253end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001254{$ELSE}
1255constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1256begin
1257 inherited Create;
1258 FTcpClient := ATcpClient;
1259 if aTimeout = 0 then
1260 FTcpClient.RecvTimeout := SLEEP_TIME
1261 else
1262 FTcpClient.RecvTimeout := aTimeout;
1263 FTcpClient.SendTimeout := aTimeout;
1264end;
1265{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001266
1267procedure TTcpSocketStreamImpl.Flush;
1268begin
1269
1270end;
1271
1272function TTcpSocketStreamImpl.IsOpen: Boolean;
1273begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001274{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001275 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001276{$ELSE}
1277 Result := FTcpClient.IsOpen;
1278{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001279end;
1280
1281procedure TTcpSocketStreamImpl.Open;
1282begin
1283 FTcpClient.Open;
1284end;
1285
1286
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001287{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001288function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1289 TimeOut: Integer; var wsaError : Integer): Integer;
1290var
1291 ReadFds: TFDset;
1292 ReadFdsptr: PFDset;
1293 WriteFds: TFDset;
1294 WriteFdsptr: PFDset;
1295 ExceptFds: TFDset;
1296 ExceptFdsptr: PFDset;
1297 tv: timeval;
1298 Timeptr: PTimeval;
1299 socket : TSocket;
1300begin
1301 if not FTcpClient.Active then begin
1302 wsaError := WSAEINVAL;
1303 Exit( SOCKET_ERROR);
1304 end;
1305
1306 socket := FTcpClient.Handle;
1307
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001308 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001309 ReadFdsptr := @ReadFds;
1310 FD_ZERO(ReadFds);
1311 FD_SET(socket, ReadFds);
1312 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001313 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001314 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001315 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001316
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001317 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001318 WriteFdsptr := @WriteFds;
1319 FD_ZERO(WriteFds);
1320 FD_SET(socket, WriteFds);
1321 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001322 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001323 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001324 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001325
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001326 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001327 ExceptFdsptr := @ExceptFds;
1328 FD_ZERO(ExceptFds);
1329 FD_SET(socket, ExceptFds);
1330 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001331 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001332 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001333 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001334
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001335 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001336 tv.tv_sec := TimeOut div 1000;
1337 tv.tv_usec := 1000 * (TimeOut mod 1000);
1338 Timeptr := @tv;
1339 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001340 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001341 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001342 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001343
1344 wsaError := 0;
1345 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001346 {$IFDEF MSWINDOWS}
1347 {$IFDEF OLD_UNIT_NAMES}
1348 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1349 {$ELSE}
1350 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1351 {$ENDIF}
1352 {$ENDIF}
1353 {$IFDEF LINUX}
1354 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1355 {$ENDIF}
1356
Jens Geyerd5436f52014-10-03 19:50:38 +02001357 if result = SOCKET_ERROR
1358 then wsaError := WSAGetLastError;
1359
1360 except
1361 result := SOCKET_ERROR;
1362 end;
1363
1364 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001365 ReadReady^ := FD_ISSET(socket, ReadFds);
1366
Jens Geyerd5436f52014-10-03 19:50:38 +02001367 if Assigned(WriteReady) then
1368 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001369
Jens Geyerd5436f52014-10-03 19:50:38 +02001370 if Assigned(ExceptFlag) then
1371 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1372end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001373{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001374
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001375{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001376function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1377 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001378 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001379var bCanRead, bError : Boolean;
1380 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001381const
1382 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001383begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001384 bytesReady := 0;
1385
Jens Geyerd5436f52014-10-03 19:50:38 +02001386 // The select function returns the total number of socket handles that are ready
1387 // and contained in the fd_set structures, zero if the time limit expired,
1388 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1389 // WSAGetLastError can be used to retrieve a specific error code.
1390 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1391 if retval = SOCKET_ERROR
1392 then Exit( TWaitForData.wfd_Error);
1393 if (retval = 0) or not bCanRead
1394 then Exit( TWaitForData.wfd_Timeout);
1395
1396 // recv() returns the number of bytes received, or -1 if an error occurred.
1397 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001398
1399 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001400 if retval <= 0
1401 then Exit( TWaitForData.wfd_Error);
1402
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001403 // at least we have some data
1404 bytesReady := Min( retval, DesiredBytes);
1405 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001406end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001407{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001408
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001409{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001410function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001411// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001412var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001413 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001414 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001415 nBytes : Integer;
1416 pDest : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001417begin
1418 inherited;
1419
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001420 if FTimeout > 0
1421 then msecs := FTimeout
1422 else msecs := DEFAULT_THRIFT_TIMEOUT;
1423
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001424 result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001425 pDest := Pointer(@buffer[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001426 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001427
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001428 while TRUE do begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001429 wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001430 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001431 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001432 TWaitForData.wfd_HaveData : Break;
1433 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001434 if (FTimeout = 0)
1435 then Exit
1436 else begin
1437 raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
1438 SysErrorMessage(Cardinal(wsaError)));
1439
1440 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001441 end;
1442 else
1443 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001444 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001445 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001446
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001447 // reduce the timeout once we got data
1448 if FTimeout > 0
1449 then msecs := FTimeout div 10
1450 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1451 msecs := Max( msecs, 200);
1452
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001453 ASSERT( nBytes <= count);
1454 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1455 Inc( pDest, nBytes);
1456 Dec( count, nBytes);
1457 Inc( result, nBytes);
1458 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001459end;
1460
1461function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001462// old sockets version
1463var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001464begin
1465 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001466 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001467 len := FTcpClient.BytesReceived;
1468 end;
1469
1470 SetLength( Result, len );
1471
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001472 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001473 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1474 end;
1475end;
1476
1477procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001478// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001479var bCanWrite, bError : Boolean;
1480 retval, wsaError : Integer;
1481begin
1482 inherited;
1483
1484 if not FTcpClient.Active
1485 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
1486
1487 // The select function returns the total number of socket handles that are ready
1488 // and contained in the fd_set structures, zero if the time limit expired,
1489 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1490 // WSAGetLastError can be used to retrieve a specific error code.
1491 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1492 if retval = SOCKET_ERROR
1493 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1494 SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001495
Jens Geyerd5436f52014-10-03 19:50:38 +02001496 if (retval = 0)
1497 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001498
Jens Geyerd5436f52014-10-03 19:50:38 +02001499 if bError or not bCanWrite
1500 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
1501
1502 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1503end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001504
1505{$ELSE}
1506
1507function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
1508// new sockets version
1509var nBytes : Integer;
1510 pDest : PByte;
1511begin
1512 inherited;
1513
1514 result := 0;
1515 pDest := Pointer(@buffer[offset]);
1516 while count > 0 do begin
1517 nBytes := FTcpClient.Read(pDest^, count);
1518 if nBytes = 0 then Exit;
1519 Inc( pDest, nBytes);
1520 Dec( count, nBytes);
1521 Inc( result, nBytes);
1522 end;
1523end;
1524
1525function TTcpSocketStreamImpl.ToArray: TBytes;
1526// new sockets version
1527var len : Integer;
1528begin
1529 len := 0;
1530 try
1531 if FTcpClient.Peek then
1532 repeat
1533 SetLength(Result, Length(Result) + 1024);
1534 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1535 until len < 1024;
1536 except
1537 on TTransportException do begin { don't allow default exceptions } end;
1538 else raise;
1539 end;
1540 if len > 0 then
1541 SetLength(Result, Length(Result) - 1024 + len);
1542end;
1543
1544procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1545// new sockets version
1546begin
1547 inherited;
1548
1549 if not FTcpClient.IsOpen
1550 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
1551
1552 FTcpClient.Write(buffer[offset], count);
1553end;
1554
Jens Geyer23d67462015-12-19 11:44:57 +01001555{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001556
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001557
Jens Geyerd5436f52014-10-03 19:50:38 +02001558{$IF CompilerVersion < 21.0}
1559initialization
1560begin
1561 TFramedTransportImpl_Initialize;
1562end;
1563{$IFEND}
1564
1565
1566end.