blob: df087a1e4c6b48ebf96733461315db3fcb07688a [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
Jens Geyer02230912019-04-03 01:12:51 +020032 WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer02230912019-04-03 01:12:51 +020034 Winapi.WinSock,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020035 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
Jens Geyer606f1ef2018-04-09 23:09:41 +020042 Thrift.Exception,
Jens Geyerd5436f52014-10-03 19:50:38 +020043 Thrift.Utils,
Jens Geyer02230912019-04-03 01:12:51 +020044 Thrift.WinHTTP,
Nick4f5229e2016-04-14 16:43:22 +030045 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020046
47type
48 ITransport = interface
Jens Geyer17c3ad92017-09-05 20:31:27 +020049 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
Jens Geyerd5436f52014-10-03 19:50:38 +020050 function GetIsOpen: Boolean;
51 property IsOpen: Boolean read GetIsOpen;
52 function Peek: Boolean;
53 procedure Open;
54 procedure Close;
Jens Geyer17c3ad92017-09-05 20:31:27 +020055 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
56 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
57 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
58 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020059 procedure Write( const buf: TBytes); overload;
60 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020061 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
62 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020063 procedure Flush;
64 end;
65
66 TTransportImpl = class( TInterfacedObject, ITransport)
67 protected
68 function GetIsOpen: Boolean; virtual; abstract;
69 property IsOpen: Boolean read GetIsOpen;
70 function Peek: Boolean; virtual;
71 procedure Open(); virtual; abstract;
72 procedure Close(); virtual; abstract;
Jens Geyer17c3ad92017-09-05 20:31:27 +020073 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
74 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
75 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
76 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
77 procedure Write( const buf: TBytes); overload; inline;
78 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
79 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
80 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020081 procedure Flush; virtual;
82 end;
83
Jens Geyer9f11c1e2019-11-09 19:39:20 +010084 TTransportException = class abstract( TException)
Jens Geyerd5436f52014-10-03 19:50:38 +020085 public
86 type
87 TExceptionType = (
88 Unknown,
89 NotOpen,
90 AlreadyOpen,
91 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +020092 EndOfFile,
93 BadArgs,
94 Interrupted
Jens Geyerd5436f52014-10-03 19:50:38 +020095 );
Jens Geyere0e32402016-04-20 21:50:48 +020096 protected
97 constructor HiddenCreate(const Msg: string);
Jens Geyer9f11c1e2019-11-09 19:39:20 +010098 class function GetType: TExceptionType; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020099 public
Jens Geyere0e32402016-04-20 21:50:48 +0200100 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
101 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
103 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200104 end;
105
Jens Geyere0e32402016-04-20 21:50:48 +0200106 // Needed to remove deprecation warning
107 TTransportExceptionSpecialized = class abstract (TTransportException)
108 public
109 constructor Create(const Msg: string);
110 end;
111
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100112 TTransportExceptionUnknown = class (TTransportExceptionSpecialized)
113 protected
114 class function GetType: TTransportException.TExceptionType; override;
115 end;
116
117 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized)
118 protected
119 class function GetType: TTransportException.TExceptionType; override;
120 end;
121
122 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized)
123 protected
124 class function GetType: TTransportException.TExceptionType; override;
125 end;
126
127 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized)
128 protected
129 class function GetType: TTransportException.TExceptionType; override;
130 end;
131
132 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized)
133 protected
134 class function GetType: TTransportException.TExceptionType; override;
135 end;
136
137 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized)
138 protected
139 class function GetType: TTransportException.TExceptionType; override;
140 end;
141
142 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized)
143 protected
144 class function GetType: TTransportException.TExceptionType; override;
145 end;
Jens Geyere0e32402016-04-20 21:50:48 +0200146
Jens Geyer47f63172019-06-06 22:42:58 +0200147 TSecureProtocol = (
148 SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
149 TLS_1_1, TLS_1_2 // secure (as of today)
150 );
151
152 TSecureProtocols = set of TSecureProtocol;
153
Jens Geyerd5436f52014-10-03 19:50:38 +0200154 IHTTPClient = interface( ITransport )
Jens Geyer47f63172019-06-06 22:42:58 +0200155 ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
Jens Geyer20e727e2018-06-22 22:39:57 +0200156 procedure SetDnsResolveTimeout(const Value: Integer);
157 function GetDnsResolveTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200158 procedure SetConnectionTimeout(const Value: Integer);
159 function GetConnectionTimeout: Integer;
Jens Geyer20e727e2018-06-22 22:39:57 +0200160 procedure SetSendTimeout(const Value: Integer);
161 function GetSendTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200162 procedure SetReadTimeout(const Value: Integer);
163 function GetReadTimeout: Integer;
164 function GetCustomHeaders: IThriftDictionary<string,string>;
165 procedure SendRequest;
Jens Geyer47f63172019-06-06 22:42:58 +0200166 function GetSecureProtocols : TSecureProtocols;
167 procedure SetSecureProtocols( const value : TSecureProtocols);
Jens Geyer20e727e2018-06-22 22:39:57 +0200168
169 property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200170 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
Jens Geyer20e727e2018-06-22 22:39:57 +0200171 property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200172 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
173 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
Jens Geyer47f63172019-06-06 22:42:58 +0200174 property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
Jens Geyerd5436f52014-10-03 19:50:38 +0200175 end;
176
Jens Geyerd5436f52014-10-03 19:50:38 +0200177 IServerTransport = interface
178 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
179 procedure Listen;
180 procedure Close;
181 function Accept( const fnAccepting: TProc): ITransport;
182 end;
183
184 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
185 protected
186 procedure Listen; virtual; abstract;
187 procedure Close; virtual; abstract;
188 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
189 end;
190
191 ITransportFactory = interface
192 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
193 function GetTransport( const ATrans: ITransport): ITransport;
194 end;
195
196 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
197 function GetTransport( const ATrans: ITransport): ITransport; virtual;
198 end;
199
200 TTcpSocketStreamImpl = class( TThriftStreamImpl )
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200201{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200202 private type
203 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
204 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200205 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200206 FTimeout : Integer;
207 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
208 TimeOut: Integer; var wsaError : Integer): Integer;
209 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200210 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200211{$ELSE}
212 FTcpClient: TSocket;
213 protected const
214 SLEEP_TIME = 200;
215{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200216 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200217 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
218 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200219 procedure Open; override;
220 procedure Close; override;
221 procedure Flush; override;
222
223 function IsOpen: Boolean; override;
224 function ToArray: TBytes; override;
225 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200226{$IFDEF OLD_SOCKETS}
227 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
228{$ELSE}
229 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
230{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200231 end;
232
233 IStreamTransport = interface( ITransport )
234 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
235 function GetInputStream: IThriftStream;
236 function GetOutputStream: IThriftStream;
237 property InputStream : IThriftStream read GetInputStream;
238 property OutputStream : IThriftStream read GetOutputStream;
239 end;
240
241 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
242 protected
243 FInputStream : IThriftStream;
244 FOutputStream : IThriftStream;
245 protected
246 function GetIsOpen: Boolean; override;
247
248 function GetInputStream: IThriftStream;
249 function GetOutputStream: IThriftStream;
250 public
251 property InputStream : IThriftStream read GetInputStream;
252 property OutputStream : IThriftStream read GetOutputStream;
253
254 procedure Open; override;
255 procedure Close; override;
256 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200257 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
258 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200259 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
260 destructor Destroy; override;
261 end;
262
263 TBufferedStreamImpl = class( TThriftStreamImpl)
264 private
265 FStream : IThriftStream;
266 FBufSize : Integer;
267 FReadBuffer : TMemoryStream;
268 FWriteBuffer : TMemoryStream;
269 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200270 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
271 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200272 procedure Open; override;
273 procedure Close; override;
274 procedure Flush; override;
275 function IsOpen: Boolean; override;
276 function ToArray: TBytes; override;
277 public
278 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
279 destructor Destroy; override;
280 end;
281
282 TServerSocketImpl = class( TServerTransportImpl)
283 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200284{$IFDEF OLD_SOCKETS}
285 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200286 FPort : Integer;
287 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200288{$ELSE}
289 FServer: TServerSocket;
290{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200291 FUseBufferedSocket : Boolean;
292 FOwnsServer : Boolean;
293 protected
294 function Accept( const fnAccepting: TProc) : ITransport; override;
295 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200296{$IFDEF OLD_SOCKETS}
297 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200298 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200299{$ELSE}
300 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
301 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
302{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200303 destructor Destroy; override;
304 procedure Listen; override;
305 procedure Close; override;
306 end;
307
308 TBufferedTransportImpl = class( TTransportImpl )
309 private
310 FInputBuffer : IThriftStream;
311 FOutputBuffer : IThriftStream;
312 FTransport : IStreamTransport;
313 FBufSize : Integer;
314
315 procedure InitBuffers;
316 function GetUnderlyingTransport: ITransport;
317 protected
318 function GetIsOpen: Boolean; override;
319 procedure Flush; override;
320 public
321 procedure Open(); override;
322 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200323 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
324 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200325 constructor Create( const ATransport : IStreamTransport ); overload;
326 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
327 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
328 property IsOpen: Boolean read GetIsOpen;
329 end;
330
331 TSocketImpl = class(TStreamTransportImpl)
332 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200333{$IFDEF OLD_SOCKETS}
334 FClient : TCustomIpClient;
335{$ELSE}
336 FClient: TSocket;
337{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200338 FOwnsClient : Boolean;
339 FHost : string;
340 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200341{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200342 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200343{$ELSE}
344 FTimeout : Longword;
345{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200346
347 procedure InitSocket;
348 protected
349 function GetIsOpen: Boolean; override;
350 public
351 procedure Open; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200352{$IFDEF OLD_SOCKETS}
353 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200354 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200355{$ELSE}
356 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
357 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
358{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200359 destructor Destroy; override;
360 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200361{$IFDEF OLD_SOCKETS}
362 property TcpClient: TCustomIpClient read FClient;
363{$ELSE}
364 property TcpClient: TSocket read FClient;
365{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200366 property Host : string read FHost;
367 property Port: Integer read FPort;
368 end;
369
370 TFramedTransportImpl = class( TTransportImpl)
371 private const
372 FHeaderSize : Integer = 4;
373 private class var
374 FHeader_Dummy : array of Byte;
375 protected
376 FTransport : ITransport;
377 FWriteBuffer : TMemoryStream;
378 FReadBuffer : TMemoryStream;
379
380 procedure InitWriteBuffer;
381 procedure ReadFrame;
382 public
383 type
384 TFactory = class( TTransportFactoryImpl )
385 public
386 function GetTransport( const ATrans: ITransport): ITransport; override;
387 end;
388
Jens Geyere0e32402016-04-20 21:50:48 +0200389 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200390 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200391 {$ENDIF}
Jens Geyere0e32402016-04-20 21:50:48 +0200392
Jens Geyerd5436f52014-10-03 19:50:38 +0200393 constructor Create; overload;
394 constructor Create( const ATrans: ITransport); overload;
395 destructor Destroy; override;
396
397 procedure Open(); override;
398 function GetIsOpen: Boolean; override;
399
400 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200401 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
402 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200403 procedure Flush; override;
404 end;
405
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200406{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200407procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200408{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200409
410const
411 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
Jens Geyer47f63172019-06-06 22:42:58 +0200412 DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
413
Jens Geyerd5436f52014-10-03 19:50:38 +0200414
415
416implementation
417
418{ TTransportImpl }
419
420procedure TTransportImpl.Flush;
421begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200422 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200423end;
424
425function TTransportImpl.Peek: Boolean;
426begin
427 Result := IsOpen;
428end;
429
Jens Geyer17c3ad92017-09-05 20:31:27 +0200430function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200431begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200432 if Length(buf) > 0
433 then result := Read( @buf[0], Length(buf), off, len)
434 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200435end;
436
437function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
438begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200439 if Length(buf) > 0
440 then result := ReadAll( @buf[0], Length(buf), off, len)
441 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200442end;
443
444procedure TTransportImpl.Write( const buf: TBytes);
445begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200446 if Length(buf) > 0
447 then Write( @buf[0], 0, Length(buf));
Jens Geyer17c3ad92017-09-05 20:31:27 +0200448end;
449
450procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
451begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200452 if Length(buf) > 0
453 then Write( @buf[0], off, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200454end;
455
456function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
457var ret : Integer;
458begin
459 result := 0;
460 while result < len do begin
461 ret := Read( pBuf, buflen, off + result, len - result);
462 if ret > 0
463 then Inc( result, ret)
464 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
465 end;
466end;
467
468procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
469begin
470 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200471end;
472
Jens Geyerd5436f52014-10-03 19:50:38 +0200473{ TTransportException }
474
Jens Geyere0e32402016-04-20 21:50:48 +0200475constructor TTransportException.HiddenCreate(const Msg: string);
476begin
477 inherited Create(Msg);
478end;
479
480class function TTransportException.Create(AType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200481begin
482 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200483{$WARN SYMBOL_DEPRECATED OFF}
484 Result := Create(AType, '')
485{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200486end;
487
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100488class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200489begin
Jens Geyere0e32402016-04-20 21:50:48 +0200490 case AType of
491 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
492 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
493 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
494 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
495 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
496 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
497 else
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100498 ASSERT( TExceptionType.Unknown = aType);
Jens Geyere0e32402016-04-20 21:50:48 +0200499 Result := TTransportExceptionUnknown.Create(msg);
500 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200501end;
502
Jens Geyere0e32402016-04-20 21:50:48 +0200503class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200504begin
Jens Geyere0e32402016-04-20 21:50:48 +0200505 Result := TTransportExceptionUnknown.Create(Msg);
506end;
507
508{ TTransportExceptionSpecialized }
509
510constructor TTransportExceptionSpecialized.Create(const Msg: string);
511begin
512 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200513end;
514
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100515{ specialized TTransportExceptions }
516
517class function TTransportExceptionUnknown.GetType: TTransportException.TExceptionType;
518begin
519 result := TExceptionType.Unknown;
520end;
521
522class function TTransportExceptionNotOpen.GetType: TTransportException.TExceptionType;
523begin
524 result := TExceptionType.NotOpen;
525end;
526
527class function TTransportExceptionAlreadyOpen.GetType: TTransportException.TExceptionType;
528begin
529 result := TExceptionType.AlreadyOpen;
530end;
531
532class function TTransportExceptionTimedOut.GetType: TTransportException.TExceptionType;
533begin
534 result := TExceptionType.TimedOut;
535end;
536
537class function TTransportExceptionEndOfFile.GetType: TTransportException.TExceptionType;
538begin
539 result := TExceptionType.EndOfFile;
540end;
541
542class function TTransportExceptionBadArgs.GetType: TTransportException.TExceptionType;
543begin
544 result := TExceptionType.BadArgs;
545end;
546
547class function TTransportExceptionInterrupted.GetType: TTransportException.TExceptionType;
548begin
549 result := TExceptionType.Interrupted;
550end;
551
Jens Geyerd5436f52014-10-03 19:50:38 +0200552{ TTransportFactoryImpl }
553
554function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
555begin
556 Result := ATrans;
557end;
558
559{ TServerSocket }
560
Jens Geyer23d67462015-12-19 11:44:57 +0100561{$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200562constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200563begin
564 inherited Create;
565 FServer := AServer;
566 FClientTimeout := AClientTimeout;
567end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200568{$ELSE}
569constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
Jens Geyerd5436f52014-10-03 19:50:38 +0200570begin
571 inherited Create;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200572 FServer := AServer;
573 FServer.RecvTimeout := AClientTimeout;
574 FServer.SendTimeout := AClientTimeout;
575end;
576{$ENDIF}
577
578{$IFDEF OLD_SOCKETS}
579constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
580{$ELSE}
581constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
582{$ENDIF}
583begin
584 inherited Create;
585{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200586 FPort := APort;
587 FClientTimeout := AClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200588 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200589 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200590 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200591 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200592 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200593 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200594 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200595{$ELSE}
596 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
597{$ENDIF}
598 FUseBufferedSocket := AUseBufferedSockets;
599 FOwnsServer := True;
Jens Geyerd5436f52014-10-03 19:50:38 +0200600end;
601
602destructor TServerSocketImpl.Destroy;
603begin
604 if FOwnsServer then begin
605 FServer.Free;
606 FServer := nil;
607 end;
608 inherited;
609end;
610
611function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
612var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200613{$IFDEF OLD_SOCKETS}
614 client : TCustomIpClient;
615{$ELSE}
616 client: TSocket;
617{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200618 trans : IStreamTransport;
619begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100620 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200621 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200622 end;
623
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200624{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200625 client := nil;
626 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200627 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200628
629 if Assigned(fnAccepting)
630 then fnAccepting();
631
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100632 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200633 client.Free;
634 Result := nil;
635 Exit;
636 end;
637
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100638 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200639 Result := nil;
640 Exit;
641 end;
642
643 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
644 client := nil; // trans owns it now
645
646 if FUseBufferedSocket
647 then result := TBufferedTransportImpl.Create( trans)
648 else result := trans;
649
650 except
651 on E: Exception do begin
652 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200653 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200654 end;
655 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200656{$ELSE}
657 if Assigned(fnAccepting) then
658 fnAccepting();
659
660 client := FServer.Accept;
661 try
662 trans := TSocketImpl.Create(client, True);
663 client := nil;
664
665 if FUseBufferedSocket then
666 Result := TBufferedTransportImpl.Create(trans)
667 else
668 Result := trans;
669 except
670 client.Free;
671 raise;
672 end;
673{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200674end;
675
676procedure TServerSocketImpl.Listen;
677begin
678 if FServer <> nil then
679 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200680{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200681 try
682 FServer.Active := True;
683 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200684 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200685 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200686 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200687{$ELSE}
688 FServer.Listen;
689{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200690 end;
691end;
692
693procedure TServerSocketImpl.Close;
694begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200695 if FServer <> nil then
696{$IFDEF OLD_SOCKETS}
697 try
698 FServer.Active := False;
699 except
700 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200701 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200702 end;
703{$ELSE}
704 FServer.Close;
705{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200706end;
707
708{ TSocket }
709
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200710{$IFDEF OLD_SOCKETS}
711constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200712var stream : IThriftStream;
713begin
714 FClient := AClient;
715 FTimeout := ATimeout;
716 FOwnsClient := aOwnsClient;
717 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
718 inherited Create( stream, stream);
719end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200720{$ELSE}
721constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
722var stream : IThriftStream;
723begin
724 FClient := AClient;
725 FTimeout := AClient.RecvTimeout;
726 FOwnsClient := aOwnsClient;
727 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
728 inherited Create(stream, stream);
729end;
730{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200731
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200732{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200733constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200734{$ELSE}
735constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
736{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200737begin
738 inherited Create(nil,nil);
739 FHost := AHost;
740 FPort := APort;
741 FTimeout := ATimeout;
742 InitSocket;
743end;
744
745destructor TSocketImpl.Destroy;
746begin
747 if FOwnsClient
748 then FreeAndNil( FClient);
749 inherited;
750end;
751
752procedure TSocketImpl.Close;
753begin
754 inherited Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200755
756 FInputStream := nil;
757 FOutputStream := nil;
758
Jens Geyerd5436f52014-10-03 19:50:38 +0200759 if FOwnsClient
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200760 then FreeAndNil( FClient)
761 else FClient := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200762end;
763
764function TSocketImpl.GetIsOpen: Boolean;
765begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200766{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200767 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200768{$ELSE}
769 Result := (FClient <> nil) and FClient.IsOpen
770{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200771end;
772
773procedure TSocketImpl.InitSocket;
774var
775 stream : IThriftStream;
776begin
777 if FOwnsClient
778 then FreeAndNil( FClient)
779 else FClient := nil;
780
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200781{$IFDEF OLD_SOCKETS}
782 FClient := TTcpClient.Create( nil);
783{$ELSE}
784 FClient := TSocket.Create(FHost, FPort);
785{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200786 FOwnsClient := True;
787
788 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
789 FInputStream := stream;
790 FOutputStream := stream;
791end;
792
793procedure TSocketImpl.Open;
794begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100795 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200796 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200797 end;
798
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100799 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200800 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200801 end;
802
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100803 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200804 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200805 end;
806
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100807 if FClient = nil
808 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200809
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200810{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200811 FClient.RemoteHost := TSocketHost( Host);
812 FClient.RemotePort := TSocketPort( IntToStr( Port));
813 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200814{$ELSE}
815 FClient.Open;
816{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200817
818 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
819 FOutputStream := FInputStream;
820end;
821
822{ TBufferedStream }
823
824procedure TBufferedStreamImpl.Close;
825begin
826 Flush;
827 FStream := nil;
828
829 FReadBuffer.Free;
830 FReadBuffer := nil;
831
832 FWriteBuffer.Free;
833 FWriteBuffer := nil;
834end;
835
836constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
837begin
838 inherited Create;
839 FStream := AStream;
840 FBufSize := ABufSize;
841 FReadBuffer := TMemoryStream.Create;
842 FWriteBuffer := TMemoryStream.Create;
843end;
844
845destructor TBufferedStreamImpl.Destroy;
846begin
847 Close;
848 inherited;
849end;
850
851procedure TBufferedStreamImpl.Flush;
852var
853 buf : TBytes;
854 len : Integer;
855begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200856 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200857 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200858 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200859 SetLength( buf, len );
860 FWriteBuffer.Position := 0;
861 FWriteBuffer.Read( Pointer(@buf[0])^, len );
862 FStream.Write( buf, 0, len );
863 end;
864 FWriteBuffer.Clear;
865 end;
866end;
867
868function TBufferedStreamImpl.IsOpen: Boolean;
869begin
870 Result := (FWriteBuffer <> nil)
871 and (FReadBuffer <> nil)
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200872 and (FStream <> nil)
873 and FStream.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +0200874end;
875
876procedure TBufferedStreamImpl.Open;
877begin
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200878 FStream.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +0200879end;
880
Jens Geyer17c3ad92017-09-05 20:31:27 +0200881function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200882var
883 nRead : Integer;
884 tempbuf : TBytes;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100885 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +0200886begin
887 inherited;
888 Result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100889
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200890 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200891 while count > 0 do begin
892
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200893 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200894 FReadBuffer.Clear;
895 SetLength( tempbuf, FBufSize);
896 nRead := FStream.Read( tempbuf, 0, FBufSize );
897 if nRead = 0 then Break; // avoid infinite loop
898
899 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
900 FReadBuffer.Position := 0;
901 end;
902
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200903 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +0100904 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
905 pTmp := pBuf;
906 Inc( pTmp, offset);
907 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +0200908 Dec( count, nRead);
909 Inc( offset, nRead);
910 end;
911 end;
912 end;
913end;
914
915function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200916var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200917begin
918 len := 0;
919
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200920 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200921 len := FReadBuffer.Size;
922 end;
923
924 SetLength( Result, len);
925
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200926 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200927 FReadBuffer.Position := 0;
928 FReadBuffer.Read( Pointer(@Result[0])^, len );
929 end;
930end;
931
Jens Geyer17c3ad92017-09-05 20:31:27 +0200932procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +0100933var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +0200934begin
935 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200936 if count > 0 then begin
937 if IsOpen then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +0100938 pTmp := pBuf;
939 Inc( pTmp, offset);
940 FWriteBuffer.Write( pTmp^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200941 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200942 Flush;
943 end;
944 end;
945 end;
946end;
947
948{ TStreamTransportImpl }
949
Jens Geyerd5436f52014-10-03 19:50:38 +0200950constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
951begin
952 inherited Create;
953 FInputStream := AInputStream;
954 FOutputStream := AOutputStream;
955end;
956
957destructor TStreamTransportImpl.Destroy;
958begin
959 FInputStream := nil;
960 FOutputStream := nil;
961 inherited;
962end;
963
Jens Geyer20e727e2018-06-22 22:39:57 +0200964procedure TStreamTransportImpl.Close;
965begin
966 FInputStream := nil;
967 FOutputStream := nil;
968end;
969
Jens Geyerd5436f52014-10-03 19:50:38 +0200970procedure TStreamTransportImpl.Flush;
971begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100972 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200973 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200974 end;
975
976 FOutputStream.Flush;
977end;
978
979function TStreamTransportImpl.GetInputStream: IThriftStream;
980begin
981 Result := FInputStream;
982end;
983
984function TStreamTransportImpl.GetIsOpen: Boolean;
985begin
986 Result := True;
987end;
988
989function TStreamTransportImpl.GetOutputStream: IThriftStream;
990begin
Jens Geyer02fbe0e2018-03-19 17:35:44 +0100991 Result := FOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200992end;
993
994procedure TStreamTransportImpl.Open;
995begin
996
997end;
998
Jens Geyer17c3ad92017-09-05 20:31:27 +0200999function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001000begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001001 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001002 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001003 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001004
Jens Geyer17c3ad92017-09-05 20:31:27 +02001005 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001006end;
1007
Jens Geyer17c3ad92017-09-05 20:31:27 +02001008procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001009begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001010 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001011 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001012 end;
1013
Jens Geyer17c3ad92017-09-05 20:31:27 +02001014 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001015end;
1016
1017{ TBufferedTransportImpl }
1018
1019constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
1020begin
1021 //no inherited;
1022 Create( ATransport, 1024 );
1023end;
1024
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001025constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001026begin
1027 inherited Create;
1028 FTransport := ATransport;
1029 FBufSize := ABufSize;
1030 InitBuffers;
1031end;
1032
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001033procedure TBufferedTransportImpl.Close;
1034begin
1035 FTransport.Close;
1036 FInputBuffer := nil;
1037 FOutputBuffer := nil;
1038end;
1039
Jens Geyerd5436f52014-10-03 19:50:38 +02001040procedure TBufferedTransportImpl.Flush;
1041begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001042 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001043 FOutputBuffer.Flush;
1044 end;
1045end;
1046
1047function TBufferedTransportImpl.GetIsOpen: Boolean;
1048begin
1049 Result := FTransport.IsOpen;
1050end;
1051
1052function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1053begin
1054 Result := FTransport;
1055end;
1056
1057procedure TBufferedTransportImpl.InitBuffers;
1058begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001059 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001060 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1061 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001062 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001063 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1064 end;
1065end;
1066
1067procedure TBufferedTransportImpl.Open;
1068begin
Jens Geyera0cf38e2018-04-04 17:31:52 +02001069 FTransport.Open;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001070 InitBuffers; // we need to get the buffers to match FTransport substreams again
Jens Geyerd5436f52014-10-03 19:50:38 +02001071end;
1072
Jens Geyer17c3ad92017-09-05 20:31:27 +02001073function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001074begin
1075 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001076 if FInputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001077 Result := FInputBuffer.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001078 end;
1079end;
1080
Jens Geyer17c3ad92017-09-05 20:31:27 +02001081procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001082begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001083 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001084 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001085 end;
1086end;
1087
1088{ TFramedTransportImpl }
1089
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001090{$IFDEF HAVE_CLASS_CTOR}
1091class constructor TFramedTransportImpl.Create;
1092begin
1093 SetLength( FHeader_Dummy, FHeaderSize);
1094 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1095end;
1096{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001097procedure TFramedTransportImpl_Initialize;
1098begin
1099 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1100 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1101 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1102end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001103{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001104
1105constructor TFramedTransportImpl.Create;
1106begin
1107 inherited Create;
1108 InitWriteBuffer;
1109end;
1110
1111procedure TFramedTransportImpl.Close;
1112begin
1113 FTransport.Close;
1114end;
1115
1116constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1117begin
1118 inherited Create;
1119 InitWriteBuffer;
1120 FTransport := ATrans;
1121end;
1122
1123destructor TFramedTransportImpl.Destroy;
1124begin
1125 FWriteBuffer.Free;
1126 FReadBuffer.Free;
1127 inherited;
1128end;
1129
1130procedure TFramedTransportImpl.Flush;
1131var
1132 buf : TBytes;
1133 len : Integer;
1134 data_len : Integer;
1135
1136begin
1137 len := FWriteBuffer.Size;
1138 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001139 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001140 System.Move( FWriteBuffer.Memory^, buf[0], len );
1141 end;
1142
1143 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001144 if (data_len < 0) then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001145 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001146 end;
1147
1148 InitWriteBuffer;
1149
1150 buf[0] := Byte($FF and (data_len shr 24));
1151 buf[1] := Byte($FF and (data_len shr 16));
1152 buf[2] := Byte($FF and (data_len shr 8));
1153 buf[3] := Byte($FF and data_len);
1154
1155 FTransport.Write( buf, 0, len );
1156 FTransport.Flush;
1157end;
1158
1159function TFramedTransportImpl.GetIsOpen: Boolean;
1160begin
1161 Result := FTransport.IsOpen;
1162end;
1163
1164type
1165 TAccessMemoryStream = class(TMemoryStream)
1166 end;
1167
1168procedure TFramedTransportImpl.InitWriteBuffer;
1169begin
1170 FWriteBuffer.Free;
1171 FWriteBuffer := TMemoryStream.Create;
1172 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1173 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1174end;
1175
1176procedure TFramedTransportImpl.Open;
1177begin
1178 FTransport.Open;
1179end;
1180
Jens Geyer17c3ad92017-09-05 20:31:27 +02001181function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001182var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001183begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001184 if len > (buflen-off)
1185 then len := buflen-off;
1186
Jens Geyer5089b0a2018-02-01 22:37:18 +01001187 pTmp := pBuf;
1188 Inc( pTmp, off);
1189
Jens Geyer17c3ad92017-09-05 20:31:27 +02001190 if (FReadBuffer <> nil) and (len > 0) then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001191 result := FReadBuffer.Read( pTmp^, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +02001192 if result > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001193 Exit;
1194 end;
1195 end;
1196
1197 ReadFrame;
1198 if len > 0
Jens Geyer5089b0a2018-02-01 22:37:18 +01001199 then Result := FReadBuffer.Read( pTmp^, len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001200 else Result := 0;
1201end;
1202
1203procedure TFramedTransportImpl.ReadFrame;
1204var
1205 i32rd : TBytes;
1206 size : Integer;
1207 buff : TBytes;
1208begin
1209 SetLength( i32rd, FHeaderSize );
1210 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1211 size :=
1212 ((i32rd[0] and $FF) shl 24) or
1213 ((i32rd[1] and $FF) shl 16) or
1214 ((i32rd[2] and $FF) shl 8) or
1215 (i32rd[3] and $FF);
1216 SetLength( buff, size );
1217 FTransport.ReadAll( buff, 0, size );
1218 FReadBuffer.Free;
1219 FReadBuffer := TMemoryStream.Create;
Jens Geyera76e6c72017-09-08 21:03:30 +02001220 if Length(buff) > 0
1221 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001222 FReadBuffer.Position := 0;
1223end;
1224
Jens Geyer17c3ad92017-09-05 20:31:27 +02001225procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001226var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001227begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001228 if len > 0 then begin
1229 pTmp := pBuf;
1230 Inc( pTmp, off);
1231
1232 FWriteBuffer.Write( pTmp^, len );
1233 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001234end;
1235
1236{ TFramedTransport.TFactory }
1237
1238function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1239begin
1240 Result := TFramedTransportImpl.Create( ATrans );
1241end;
1242
1243{ TTcpSocketStreamImpl }
1244
1245procedure TTcpSocketStreamImpl.Close;
1246begin
1247 FTcpClient.Close;
1248end;
1249
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001250{$IFDEF OLD_SOCKETS}
1251constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001252begin
1253 inherited Create;
1254 FTcpClient := ATcpClient;
1255 FTimeout := aTimeout;
1256end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001257{$ELSE}
1258constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1259begin
1260 inherited Create;
1261 FTcpClient := ATcpClient;
1262 if aTimeout = 0 then
1263 FTcpClient.RecvTimeout := SLEEP_TIME
1264 else
1265 FTcpClient.RecvTimeout := aTimeout;
1266 FTcpClient.SendTimeout := aTimeout;
1267end;
1268{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001269
1270procedure TTcpSocketStreamImpl.Flush;
1271begin
1272
1273end;
1274
1275function TTcpSocketStreamImpl.IsOpen: Boolean;
1276begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001277{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001278 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001279{$ELSE}
1280 Result := FTcpClient.IsOpen;
1281{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001282end;
1283
1284procedure TTcpSocketStreamImpl.Open;
1285begin
1286 FTcpClient.Open;
1287end;
1288
1289
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001290{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001291function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1292 TimeOut: Integer; var wsaError : Integer): Integer;
1293var
1294 ReadFds: TFDset;
1295 ReadFdsptr: PFDset;
1296 WriteFds: TFDset;
1297 WriteFdsptr: PFDset;
1298 ExceptFds: TFDset;
1299 ExceptFdsptr: PFDset;
1300 tv: timeval;
1301 Timeptr: PTimeval;
1302 socket : TSocket;
1303begin
1304 if not FTcpClient.Active then begin
1305 wsaError := WSAEINVAL;
1306 Exit( SOCKET_ERROR);
1307 end;
1308
1309 socket := FTcpClient.Handle;
1310
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001311 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001312 ReadFdsptr := @ReadFds;
1313 FD_ZERO(ReadFds);
1314 FD_SET(socket, ReadFds);
1315 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001316 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001317 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001318 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001319
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001320 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001321 WriteFdsptr := @WriteFds;
1322 FD_ZERO(WriteFds);
1323 FD_SET(socket, WriteFds);
1324 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001325 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001326 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001327 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001328
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001329 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001330 ExceptFdsptr := @ExceptFds;
1331 FD_ZERO(ExceptFds);
1332 FD_SET(socket, ExceptFds);
1333 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001334 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001335 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001336 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001337
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001338 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001339 tv.tv_sec := TimeOut div 1000;
1340 tv.tv_usec := 1000 * (TimeOut mod 1000);
1341 Timeptr := @tv;
1342 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001343 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001344 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001345 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001346
1347 wsaError := 0;
1348 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001349 {$IFDEF MSWINDOWS}
1350 {$IFDEF OLD_UNIT_NAMES}
1351 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1352 {$ELSE}
1353 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1354 {$ENDIF}
1355 {$ENDIF}
1356 {$IFDEF LINUX}
1357 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1358 {$ENDIF}
1359
Jens Geyerd5436f52014-10-03 19:50:38 +02001360 if result = SOCKET_ERROR
1361 then wsaError := WSAGetLastError;
1362
1363 except
1364 result := SOCKET_ERROR;
1365 end;
1366
1367 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001368 ReadReady^ := FD_ISSET(socket, ReadFds);
1369
Jens Geyerd5436f52014-10-03 19:50:38 +02001370 if Assigned(WriteReady) then
1371 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001372
Jens Geyerd5436f52014-10-03 19:50:38 +02001373 if Assigned(ExceptFlag) then
1374 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1375end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001376{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001377
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001378{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001379function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1380 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001381 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001382var bCanRead, bError : Boolean;
1383 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001384const
1385 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001386begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001387 bytesReady := 0;
1388
Jens Geyerd5436f52014-10-03 19:50:38 +02001389 // The select function returns the total number of socket handles that are ready
1390 // and contained in the fd_set structures, zero if the time limit expired,
1391 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1392 // WSAGetLastError can be used to retrieve a specific error code.
1393 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1394 if retval = SOCKET_ERROR
1395 then Exit( TWaitForData.wfd_Error);
1396 if (retval = 0) or not bCanRead
1397 then Exit( TWaitForData.wfd_Timeout);
1398
1399 // recv() returns the number of bytes received, or -1 if an error occurred.
1400 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001401
1402 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001403 if retval <= 0
1404 then Exit( TWaitForData.wfd_Error);
1405
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001406 // at least we have some data
1407 bytesReady := Min( retval, DesiredBytes);
1408 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001409end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001410{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001411
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001412{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001413function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001414// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001415var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001416 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001417 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001418 nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001419 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001420begin
1421 inherited;
1422
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001423 if FTimeout > 0
1424 then msecs := FTimeout
1425 else msecs := DEFAULT_THRIFT_TIMEOUT;
1426
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001427 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001428 pTmp := pBuf;
1429 Inc( pTmp, offset);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001430 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001431
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001432 while TRUE do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001433 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001434 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001435 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001436 TWaitForData.wfd_HaveData : Break;
1437 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001438 if (FTimeout = 0)
1439 then Exit
1440 else begin
Jens Geyere0e32402016-04-20 21:50:48 +02001441 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001442
1443 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001444 end;
1445 else
1446 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001447 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001448 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001449
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001450 // reduce the timeout once we got data
1451 if FTimeout > 0
1452 then msecs := FTimeout div 10
1453 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1454 msecs := Max( msecs, 200);
1455
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001456 ASSERT( nBytes <= count);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001457 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1458 Inc( pTmp, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001459 Dec( count, nBytes);
1460 Inc( result, nBytes);
1461 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001462end;
1463
1464function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001465// old sockets version
1466var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001467begin
1468 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001469 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001470 len := FTcpClient.BytesReceived;
1471 end;
1472
1473 SetLength( Result, len );
1474
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001475 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001476 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1477 end;
1478end;
1479
Jens Geyer17c3ad92017-09-05 20:31:27 +02001480procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001481// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001482var bCanWrite, bError : Boolean;
1483 retval, wsaError : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001484 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001485begin
1486 inherited;
1487
1488 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001489 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001490
1491 // The select function returns the total number of socket handles that are ready
1492 // and contained in the fd_set structures, zero if the time limit expired,
1493 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1494 // WSAGetLastError can be used to retrieve a specific error code.
1495 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1496 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001497 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001498
Jens Geyerd5436f52014-10-03 19:50:38 +02001499 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001500 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001501
Jens Geyerd5436f52014-10-03 19:50:38 +02001502 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001503 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001504
Jens Geyer5089b0a2018-02-01 22:37:18 +01001505 pTmp := pBuf;
1506 Inc( pTmp, offset);
1507 FTcpClient.SendBuf( pTmp^, count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001508end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001509
1510{$ELSE}
1511
Jens Geyer17c3ad92017-09-05 20:31:27 +02001512function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001513// new sockets version
1514var nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001515 pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001516begin
1517 inherited;
1518
1519 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001520 pTmp := pBuf;
1521 Inc( pTmp, offset);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001522 while count > 0 do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001523 nBytes := FTcpClient.Read( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001524 if nBytes = 0 then Exit;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001525 Inc( pTmp, nBytes);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001526 Dec( count, nBytes);
1527 Inc( result, nBytes);
1528 end;
1529end;
1530
1531function TTcpSocketStreamImpl.ToArray: TBytes;
1532// new sockets version
1533var len : Integer;
1534begin
1535 len := 0;
1536 try
1537 if FTcpClient.Peek then
1538 repeat
1539 SetLength(Result, Length(Result) + 1024);
1540 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1541 until len < 1024;
1542 except
1543 on TTransportException do begin { don't allow default exceptions } end;
1544 else raise;
1545 end;
1546 if len > 0 then
1547 SetLength(Result, Length(Result) - 1024 + len);
1548end;
1549
Jens Geyer17c3ad92017-09-05 20:31:27 +02001550procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001551// new sockets version
Jens Geyer5089b0a2018-02-01 22:37:18 +01001552var pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001553begin
1554 inherited;
1555
1556 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001557 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001558
Jens Geyer5089b0a2018-02-01 22:37:18 +01001559 pTmp := pBuf;
1560 Inc( pTmp, offset);
1561 FTcpClient.Write( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001562end;
1563
Jens Geyer23d67462015-12-19 11:44:57 +01001564{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001565
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001566
Jens Geyerd5436f52014-10-03 19:50:38 +02001567{$IF CompilerVersion < 21.0}
1568initialization
1569begin
1570 TFramedTransportImpl_Initialize;
1571end;
1572{$IFEND}
1573
1574
1575end.