blob: fd088375d42552439df4f29caf28d46e5dfe0eb9 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
Jens Geyer02230912019-04-03 01:12:51 +020032 WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer02230912019-04-03 01:12:51 +020034 Winapi.WinSock,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020035 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyera019cda2019-11-09 23:24:52 +010041 Thrift.Configuration,
Jens Geyerd5436f52014-10-03 19:50:38 +020042 Thrift.Collections,
Jens Geyer606f1ef2018-04-09 23:09:41 +020043 Thrift.Exception,
Jens Geyerd5436f52014-10-03 19:50:38 +020044 Thrift.Utils,
Jens Geyer02230912019-04-03 01:12:51 +020045 Thrift.WinHTTP,
Nick4f5229e2016-04-14 16:43:22 +030046 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020047
48type
Jens Geyera019cda2019-11-09 23:24:52 +010049 IStreamTransport = interface;
Jens Geyer41f47af2019-11-09 23:24:52 +010050
Jens Geyerd5436f52014-10-03 19:50:38 +020051 ITransport = interface
Jens Geyera019cda2019-11-09 23:24:52 +010052 ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
Jens Geyerd5436f52014-10-03 19:50:38 +020053 function GetIsOpen: Boolean;
54 property IsOpen: Boolean read GetIsOpen;
55 function Peek: Boolean;
56 procedure Open;
57 procedure Close;
Jens Geyer41f47af2019-11-09 23:24:52 +010058
Jens Geyer17c3ad92017-09-05 20:31:27 +020059 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
60 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
61 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
62 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020063 procedure Write( const buf: TBytes); overload;
64 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020065 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
66 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020067 procedure Flush;
Jens Geyer41f47af2019-11-09 23:24:52 +010068
Jens Geyera019cda2019-11-09 23:24:52 +010069 function Configuration : IThriftConfiguration;
70 function MaxMessageSize : Integer;
71 procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
72 procedure CheckReadBytesAvailable( const numBytes : Int64);
73 procedure UpdateKnownMessageSize( const size : Int64);
Jens Geyerd5436f52014-10-03 19:50:38 +020074 end;
75
Jens Geyera019cda2019-11-09 23:24:52 +010076 TTransportBase = class abstract( TInterfacedObject)
Jens Geyerfad7fd32019-11-09 23:24:52 +010077 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +020078 function GetIsOpen: Boolean; virtual; abstract;
79 property IsOpen: Boolean read GetIsOpen;
80 function Peek: Boolean; virtual;
81 procedure Open(); virtual; abstract;
82 procedure Close(); virtual; abstract;
Jens Geyer41f47af2019-11-09 23:24:52 +010083
Jens Geyer17c3ad92017-09-05 20:31:27 +020084 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
85 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
86 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
87 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
88 procedure Write( const buf: TBytes); overload; inline;
89 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
90 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
91 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020092 procedure Flush; virtual;
Jens Geyer41f47af2019-11-09 23:24:52 +010093
Jens Geyera019cda2019-11-09 23:24:52 +010094 function Configuration : IThriftConfiguration; virtual; abstract;
95 procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
96 end;
Jens Geyer41f47af2019-11-09 23:24:52 +010097
Jens Geyera019cda2019-11-09 23:24:52 +010098 // base class for all endpoint transports, e.g. sockets, pipes or HTTP
99 TEndpointTransportBase = class abstract( TTransportBase, ITransport)
100 strict private
101 FRemainingMessageSize : Int64;
102 FKnownMessageSize : Int64;
103 FConfiguration : IThriftConfiguration;
104 strict protected
105 function Configuration : IThriftConfiguration; override;
106 function MaxMessageSize : Integer;
107 property RemainingMessageSize : Int64 read FRemainingMessageSize;
108 property KnownMessageSize : Int64 read FKnownMessageSize;
Jens Geyer6762cad2020-10-30 17:15:18 +0100109 procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
Jens Geyera019cda2019-11-09 23:24:52 +0100110 procedure UpdateKnownMessageSize(const size : Int64); override;
111 procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
112 procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
Jens Geyer41f47af2019-11-09 23:24:52 +0100113 public
Jens Geyera019cda2019-11-09 23:24:52 +0100114 constructor Create( const aConfig : IThriftConfiguration); reintroduce;
115 end;
116
117 // base class for all layered transports, e.g. framed
118 TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
119 strict private
120 FTransport : T;
121 strict protected
122 property InnerTransport : T read FTransport;
123 function GetUnderlyingTransport: ITransport;
124 function Configuration : IThriftConfiguration; override;
125 procedure UpdateKnownMessageSize( const size : Int64); override;
126 function MaxMessageSize : Integer; inline;
127 procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
128 procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
129 public
130 constructor Create( const aTransport: T); reintroduce;
131 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200132 end;
133
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100134 TTransportException = class abstract( TException)
Jens Geyerd5436f52014-10-03 19:50:38 +0200135 public
136 type
137 TExceptionType = (
138 Unknown,
139 NotOpen,
140 AlreadyOpen,
141 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200142 EndOfFile,
143 BadArgs,
Jens Geyer2646bd62019-11-09 23:24:52 +0100144 Interrupted,
145 CorruptedData
Jens Geyerd5436f52014-10-03 19:50:38 +0200146 );
Jens Geyerfad7fd32019-11-09 23:24:52 +0100147 strict protected
Jens Geyere0e32402016-04-20 21:50:48 +0200148 constructor HiddenCreate(const Msg: string);
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100149 class function GetType: TExceptionType; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +0200150 public
Jens Geyer41f47af2019-11-09 23:24:52 +0100151 class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyere0e32402016-04-20 21:50:48 +0200152 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyer41f47af2019-11-09 23:24:52 +0100153 class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyere0e32402016-04-20 21:50:48 +0200154 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200155 end;
156
Jens Geyere0e32402016-04-20 21:50:48 +0200157 // Needed to remove deprecation warning
158 TTransportExceptionSpecialized = class abstract (TTransportException)
159 public
160 constructor Create(const Msg: string);
161 end;
162
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100163 TTransportExceptionUnknown = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100164 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100165 class function GetType: TTransportException.TExceptionType; override;
166 end;
167
168 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100169 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100170 class function GetType: TTransportException.TExceptionType; override;
171 end;
172
173 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100174 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100175 class function GetType: TTransportException.TExceptionType; override;
176 end;
177
178 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100179 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100180 class function GetType: TTransportException.TExceptionType; override;
181 end;
182
183 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100184 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100185 class function GetType: TTransportException.TExceptionType; override;
186 end;
187
188 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100189 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100190 class function GetType: TTransportException.TExceptionType; override;
191 end;
192
193 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100194 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100195 class function GetType: TTransportException.TExceptionType; override;
196 end;
Jens Geyere0e32402016-04-20 21:50:48 +0200197
Jens Geyer2646bd62019-11-09 23:24:52 +0100198 TTransportExceptionCorruptedData = class (TTransportExceptionSpecialized)
199 protected
200 class function GetType: TTransportException.TExceptionType; override;
201 end;
202
Jens Geyer47f63172019-06-06 22:42:58 +0200203 TSecureProtocol = (
204 SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
205 TLS_1_1, TLS_1_2 // secure (as of today)
206 );
207
208 TSecureProtocols = set of TSecureProtocol;
209
Jens Geyerd5436f52014-10-03 19:50:38 +0200210 IHTTPClient = interface( ITransport )
Jens Geyer47f63172019-06-06 22:42:58 +0200211 ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
Jens Geyer20e727e2018-06-22 22:39:57 +0200212 procedure SetDnsResolveTimeout(const Value: Integer);
213 function GetDnsResolveTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200214 procedure SetConnectionTimeout(const Value: Integer);
215 function GetConnectionTimeout: Integer;
Jens Geyer20e727e2018-06-22 22:39:57 +0200216 procedure SetSendTimeout(const Value: Integer);
217 function GetSendTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200218 procedure SetReadTimeout(const Value: Integer);
219 function GetReadTimeout: Integer;
220 function GetCustomHeaders: IThriftDictionary<string,string>;
221 procedure SendRequest;
Jens Geyer47f63172019-06-06 22:42:58 +0200222 function GetSecureProtocols : TSecureProtocols;
223 procedure SetSecureProtocols( const value : TSecureProtocols);
Jens Geyer20e727e2018-06-22 22:39:57 +0200224
225 property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200226 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
Jens Geyer20e727e2018-06-22 22:39:57 +0200227 property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200228 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
229 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
Jens Geyer47f63172019-06-06 22:42:58 +0200230 property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
Jens Geyerd5436f52014-10-03 19:50:38 +0200231 end;
232
Jens Geyerd5436f52014-10-03 19:50:38 +0200233 IServerTransport = interface
Jens Geyera019cda2019-11-09 23:24:52 +0100234 ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
Jens Geyerd5436f52014-10-03 19:50:38 +0200235 procedure Listen;
236 procedure Close;
237 function Accept( const fnAccepting: TProc): ITransport;
Jens Geyera019cda2019-11-09 23:24:52 +0100238 function Configuration : IThriftConfiguration;
Jens Geyerd5436f52014-10-03 19:50:38 +0200239 end;
240
241 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
Jens Geyera019cda2019-11-09 23:24:52 +0100242 strict private
243 FConfig : IThriftConfiguration;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100244 strict protected
Jens Geyera019cda2019-11-09 23:24:52 +0100245 function Configuration : IThriftConfiguration;
Jens Geyerd5436f52014-10-03 19:50:38 +0200246 procedure Listen; virtual; abstract;
247 procedure Close; virtual; abstract;
Jens Geyera019cda2019-11-09 23:24:52 +0100248 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
249 public
250 constructor Create( const aConfig : IThriftConfiguration);
Jens Geyerd5436f52014-10-03 19:50:38 +0200251 end;
252
253 ITransportFactory = interface
254 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
Jens Geyer41f47af2019-11-09 23:24:52 +0100255 function GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200256 end;
257
Jens Geyera019cda2019-11-09 23:24:52 +0100258 TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
259 strict protected
Jens Geyered994552019-11-09 23:24:52 +0100260 function GetTransport( const aTransport: ITransport): ITransport; virtual;
Jens Geyerd5436f52014-10-03 19:50:38 +0200261 end;
262
Jens Geyera019cda2019-11-09 23:24:52 +0100263
264 TTcpSocketStreamImpl = class( TThriftStreamImpl)
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200265{$IFDEF OLD_SOCKETS}
Jens Geyerfad7fd32019-11-09 23:24:52 +0100266 strict private type
Jens Geyerd5436f52014-10-03 19:50:38 +0200267 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
Jens Geyerfad7fd32019-11-09 23:24:52 +0100268 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200269 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200270 FTimeout : Integer;
271 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
272 TimeOut: Integer; var wsaError : Integer): Integer;
273 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200274 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200275{$ELSE}
276 FTcpClient: TSocket;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100277 strict protected const
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200278 SLEEP_TIME = 200;
279{$ENDIF}
Jens Geyerfad7fd32019-11-09 23:24:52 +0100280 strict protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200281 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
282 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200283 procedure Open; override;
284 procedure Close; override;
285 procedure Flush; override;
286
287 function IsOpen: Boolean; override;
288 function ToArray: TBytes; override;
289 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200290{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100291 constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200292{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100293 constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200294{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200295 end;
296
297 IStreamTransport = interface( ITransport )
298 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
299 function GetInputStream: IThriftStream;
300 function GetOutputStream: IThriftStream;
301 property InputStream : IThriftStream read GetInputStream;
302 property OutputStream : IThriftStream read GetOutputStream;
303 end;
304
Jens Geyera019cda2019-11-09 23:24:52 +0100305 TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100306 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200307 FInputStream : IThriftStream;
308 FOutputStream : IThriftStream;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100309 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200310 function GetIsOpen: Boolean; override;
311
312 function GetInputStream: IThriftStream;
313 function GetOutputStream: IThriftStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200314
Jens Geyer41f47af2019-11-09 23:24:52 +0100315 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200316 procedure Open; override;
317 procedure Close; override;
318 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200319 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
320 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyered994552019-11-09 23:24:52 +0100321 public
Jens Geyera019cda2019-11-09 23:24:52 +0100322 constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
Jens Geyerd5436f52014-10-03 19:50:38 +0200323 destructor Destroy; override;
Jens Geyered994552019-11-09 23:24:52 +0100324
325 property InputStream : IThriftStream read GetInputStream;
326 property OutputStream : IThriftStream read GetOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200327 end;
328
329 TBufferedStreamImpl = class( TThriftStreamImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100330 strict private
Jens Geyerd5436f52014-10-03 19:50:38 +0200331 FStream : IThriftStream;
332 FBufSize : Integer;
Jens Geyerf726ae32021-06-04 11:17:26 +0200333 FReadBuffer : TThriftMemoryStream;
334 FWriteBuffer : TThriftMemoryStream;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100335 strict protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200336 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
337 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200338 procedure Open; override;
339 procedure Close; override;
340 procedure Flush; override;
341 function IsOpen: Boolean; override;
342 function ToArray: TBytes; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100343 function Size : Int64; override;
344 function Position : Int64; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200345 public
Jens Geyered994552019-11-09 23:24:52 +0100346 constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200347 destructor Destroy; override;
348 end;
349
350 TServerSocketImpl = class( TServerTransportImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100351 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200352{$IFDEF OLD_SOCKETS}
353 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200354 FPort : Integer;
355 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200356{$ELSE}
357 FServer: TServerSocket;
358{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200359 FUseBufferedSocket : Boolean;
360 FOwnsServer : Boolean;
Jens Geyer41f47af2019-11-09 23:24:52 +0100361
Jens Geyerfad7fd32019-11-09 23:24:52 +0100362 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200363 function Accept( const fnAccepting: TProc) : ITransport; override;
Jens Geyer41f47af2019-11-09 23:24:52 +0100364
Jens Geyerd5436f52014-10-03 19:50:38 +0200365 public
Jens Geyera019cda2019-11-09 23:24:52 +0100366 {$IFDEF OLD_SOCKETS}
367 constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
368 constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
369 {$ELSE}
370 constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
371 constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
372 {$ENDIF}
373
Jens Geyerd5436f52014-10-03 19:50:38 +0200374 destructor Destroy; override;
375 procedure Listen; override;
376 procedure Close; override;
377 end;
378
Jens Geyera019cda2019-11-09 23:24:52 +0100379 TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100380 strict private
Jens Geyerd5436f52014-10-03 19:50:38 +0200381 FInputBuffer : IThriftStream;
382 FOutputBuffer : IThriftStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200383 FBufSize : Integer;
384
385 procedure InitBuffers;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100386 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200387 function GetIsOpen: Boolean; override;
388 procedure Flush; override;
389 public
Jens Geyered994552019-11-09 23:24:52 +0100390 type
391 TFactory = class( TTransportFactoryImpl )
392 public
393 function GetTransport( const aTransport: ITransport): ITransport; override;
394 end;
395
396 constructor Create( const aTransport : IStreamTransport; const aBufSize: Integer = 1024);
Jens Geyerd5436f52014-10-03 19:50:38 +0200397 procedure Open(); override;
398 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200399 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
400 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100401 procedure CheckReadBytesAvailable( const value : Int64); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200402 property IsOpen: Boolean read GetIsOpen;
403 end;
404
405 TSocketImpl = class(TStreamTransportImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100406 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200407{$IFDEF OLD_SOCKETS}
408 FClient : TCustomIpClient;
409{$ELSE}
410 FClient: TSocket;
411{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200412 FOwnsClient : Boolean;
413 FHost : string;
414 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200415{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200416 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200417{$ELSE}
418 FTimeout : Longword;
419{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200420
421 procedure InitSocket;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100422 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200423 function GetIsOpen: Boolean; override;
424 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200425{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100426 constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
427 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200428{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100429 constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
430 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200431{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200432 destructor Destroy; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100433
434 procedure Open; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200435 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200436{$IFDEF OLD_SOCKETS}
437 property TcpClient: TCustomIpClient read FClient;
438{$ELSE}
439 property TcpClient: TSocket read FClient;
440{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200441 property Host : string read FHost;
442 property Port: Integer read FPort;
443 end;
444
Jens Geyera019cda2019-11-09 23:24:52 +0100445 TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
Jens Geyer2646bd62019-11-09 23:24:52 +0100446 strict protected type
447 TFramedHeader = Int32;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100448 strict protected
Jens Geyerf726ae32021-06-04 11:17:26 +0200449 FWriteBuffer : TThriftMemoryStream;
450 FReadBuffer : TThriftMemoryStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200451
452 procedure InitWriteBuffer;
453 procedure ReadFrame;
Jens Geyerd5436f52014-10-03 19:50:38 +0200454
455 procedure Open(); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100456 function GetIsOpen: Boolean; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200457
458 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200459 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
460 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100461 procedure CheckReadBytesAvailable( const value : Int64); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200462 procedure Flush; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100463
Jens Geyered994552019-11-09 23:24:52 +0100464 public
465 type
466 TFactory = class( TTransportFactoryImpl )
467 public
468 function GetTransport( const aTransport: ITransport): ITransport; override;
469 end;
470
471 constructor Create( const aTransport: ITransport); overload;
472 destructor Destroy; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200473 end;
474
Jens Geyerd5436f52014-10-03 19:50:38 +0200475
476const
Jens Geyer47f63172019-06-06 22:42:58 +0200477 DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
478
Jens Geyerd5436f52014-10-03 19:50:38 +0200479implementation
480
Jens Geyered994552019-11-09 23:24:52 +0100481
Jens Geyera019cda2019-11-09 23:24:52 +0100482{ TTransportBase }
Jens Geyer41f47af2019-11-09 23:24:52 +0100483
Jens Geyera019cda2019-11-09 23:24:52 +0100484procedure TTransportBase.Flush;
Jens Geyerd5436f52014-10-03 19:50:38 +0200485begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200486 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200487end;
488
Jens Geyera019cda2019-11-09 23:24:52 +0100489function TTransportBase.Peek: Boolean;
Jens Geyerd5436f52014-10-03 19:50:38 +0200490begin
491 Result := IsOpen;
492end;
493
Jens Geyera019cda2019-11-09 23:24:52 +0100494function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200495begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200496 if Length(buf) > 0
497 then result := Read( @buf[0], Length(buf), off, len)
498 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200499end;
500
Jens Geyera019cda2019-11-09 23:24:52 +0100501function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200502begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200503 if Length(buf) > 0
504 then result := ReadAll( @buf[0], Length(buf), off, len)
505 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200506end;
507
Jens Geyera019cda2019-11-09 23:24:52 +0100508function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200509var ret : Integer;
510begin
511 result := 0;
512 while result < len do begin
513 ret := Read( pBuf, buflen, off + result, len - result);
514 if ret > 0
515 then Inc( result, ret)
516 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
517 end;
518end;
519
Jens Geyera019cda2019-11-09 23:24:52 +0100520procedure TTransportBase.Write( const buf: TBytes);
Jens Geyered994552019-11-09 23:24:52 +0100521begin
522 if Length(buf) > 0
523 then Write( @buf[0], 0, Length(buf));
524end;
525
Jens Geyera019cda2019-11-09 23:24:52 +0100526procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
Jens Geyered994552019-11-09 23:24:52 +0100527begin
528 if Length(buf) > 0
529 then Write( @buf[0], off, len);
530end;
531
Jens Geyera019cda2019-11-09 23:24:52 +0100532procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200533begin
534 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200535end;
536
Jens Geyered994552019-11-09 23:24:52 +0100537
Jens Geyera019cda2019-11-09 23:24:52 +0100538{ TEndpointTransportBase }
539
540constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
Jens Geyer41f47af2019-11-09 23:24:52 +0100541begin
Jens Geyera019cda2019-11-09 23:24:52 +0100542 inherited Create;
543
544 if aConfig <> nil
545 then FConfiguration := aConfig
546 else FConfiguration := TThriftConfigurationImpl.Create;
547
548 ResetConsumedMessageSize;
Jens Geyer41f47af2019-11-09 23:24:52 +0100549end;
550
551
Jens Geyera019cda2019-11-09 23:24:52 +0100552function TEndpointTransportBase.Configuration : IThriftConfiguration;
Jens Geyer41f47af2019-11-09 23:24:52 +0100553begin
Jens Geyera019cda2019-11-09 23:24:52 +0100554 result := FConfiguration;
Jens Geyer41f47af2019-11-09 23:24:52 +0100555end;
556
557
Jens Geyera019cda2019-11-09 23:24:52 +0100558function TEndpointTransportBase.MaxMessageSize : Integer;
559begin
560 ASSERT( Configuration <> nil);
561 result := Configuration.MaxMessageSize;
562end;
563
564
565procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
566// Resets RemainingMessageSize to the configured maximum
567begin
568 // full reset
569 if newSize < 0 then begin
570 FKnownMessageSize := MaxMessageSize;
571 FRemainingMessageSize := MaxMessageSize;
572 Exit;
573 end;
574
575 // update only: message size can shrink, but not grow
576 ASSERT( KnownMessageSize <= MaxMessageSize);
577 if newSize > KnownMessageSize
Jens Geyerb0123182020-02-12 12:16:19 +0100578 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
Jens Geyera019cda2019-11-09 23:24:52 +0100579
580 FKnownMessageSize := newSize;
581 FRemainingMessageSize := newSize;
582end;
583
584
585procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
586// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
587// Will throw if we already consumed too many bytes.
588var consumed : Int64;
589begin
590 consumed := KnownMessageSize - RemainingMessageSize;
591 ResetConsumedMessageSize(size);
592 CountConsumedMessageBytes(consumed);
593end;
594
595
596procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
597// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
598begin
Jens Geyer73f5bd42022-09-03 14:19:31 +0200599 if (RemainingMessageSize < numBytes) or (numBytes < 0)
Jens Geyera019cda2019-11-09 23:24:52 +0100600 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
601end;
602
603
604procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
605// Consumes numBytes from the RemainingMessageSize.
606begin
Jens Geyer73f5bd42022-09-03 14:19:31 +0200607 if (RemainingMessageSize >= numBytes) and (numBytes >= 0)
Jens Geyera019cda2019-11-09 23:24:52 +0100608 then Dec( FRemainingMessageSize, numBytes)
609 else begin
610 FRemainingMessageSize := 0;
611 raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
612 end;
613end;
614
615{ TLayeredTransportBase }
616
617constructor TLayeredTransportBase<T>.Create( const aTransport: T);
618begin
619 inherited Create;
620 FTransport := aTransport;
621end;
622
623function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
624begin
625 result := InnerTransport;
626end;
627
628function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
629begin
630 result := InnerTransport.Configuration;
631end;
632
633procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
634begin
635 InnerTransport.UpdateKnownMessageSize( size);
636end;
637
638
639function TLayeredTransportBase<T>.MaxMessageSize : Integer;
640begin
641 result := InnerTransport.MaxMessageSize;
642end;
643
644
645procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
646begin
647 InnerTransport.ResetConsumedMessageSize( knownSize);
648end;
649
650
651procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
652begin
653 InnerTransport.CheckReadBytesAvailable( numBytes);
654end;
655
656
657
Jens Geyerd5436f52014-10-03 19:50:38 +0200658{ TTransportException }
659
Jens Geyere0e32402016-04-20 21:50:48 +0200660constructor TTransportException.HiddenCreate(const Msg: string);
661begin
662 inherited Create(Msg);
663end;
664
Jens Geyered994552019-11-09 23:24:52 +0100665class function TTransportException.Create(aType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200666begin
667 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200668{$WARN SYMBOL_DEPRECATED OFF}
Jens Geyered994552019-11-09 23:24:52 +0100669 Result := Create(aType, '')
Jens Geyere0e32402016-04-20 21:50:48 +0200670{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200671end;
672
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100673class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200674begin
Jens Geyered994552019-11-09 23:24:52 +0100675 case aType of
Jens Geyere0e32402016-04-20 21:50:48 +0200676 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
677 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
678 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
679 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
680 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
681 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
682 else
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100683 ASSERT( TExceptionType.Unknown = aType);
Jens Geyere0e32402016-04-20 21:50:48 +0200684 Result := TTransportExceptionUnknown.Create(msg);
685 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200686end;
687
Jens Geyere0e32402016-04-20 21:50:48 +0200688class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200689begin
Jens Geyere0e32402016-04-20 21:50:48 +0200690 Result := TTransportExceptionUnknown.Create(Msg);
691end;
692
693{ TTransportExceptionSpecialized }
694
695constructor TTransportExceptionSpecialized.Create(const Msg: string);
696begin
697 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200698end;
699
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100700{ specialized TTransportExceptions }
701
702class function TTransportExceptionUnknown.GetType: TTransportException.TExceptionType;
703begin
704 result := TExceptionType.Unknown;
705end;
706
707class function TTransportExceptionNotOpen.GetType: TTransportException.TExceptionType;
708begin
709 result := TExceptionType.NotOpen;
710end;
711
712class function TTransportExceptionAlreadyOpen.GetType: TTransportException.TExceptionType;
713begin
714 result := TExceptionType.AlreadyOpen;
715end;
716
717class function TTransportExceptionTimedOut.GetType: TTransportException.TExceptionType;
718begin
719 result := TExceptionType.TimedOut;
720end;
721
722class function TTransportExceptionEndOfFile.GetType: TTransportException.TExceptionType;
723begin
724 result := TExceptionType.EndOfFile;
725end;
726
727class function TTransportExceptionBadArgs.GetType: TTransportException.TExceptionType;
728begin
729 result := TExceptionType.BadArgs;
730end;
731
732class function TTransportExceptionInterrupted.GetType: TTransportException.TExceptionType;
733begin
734 result := TExceptionType.Interrupted;
735end;
736
Jens Geyer2646bd62019-11-09 23:24:52 +0100737class function TTransportExceptionCorruptedData.GetType: TTransportException.TExceptionType;
738begin
739 result := TExceptionType.CorruptedData;
740end;
741
Jens Geyerd5436f52014-10-03 19:50:38 +0200742{ TTransportFactoryImpl }
743
Jens Geyered994552019-11-09 23:24:52 +0100744function TTransportFactoryImpl.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200745begin
Jens Geyered994552019-11-09 23:24:52 +0100746 Result := aTransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200747end;
748
Jens Geyera019cda2019-11-09 23:24:52 +0100749
750{ TServerTransportImpl }
751
752constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
753begin
754 inherited Create;
755 if aConfig <> nil
756 then FConfig := aConfig
757 else FConfig := TThriftConfigurationImpl.Create;
758end;
759
760function TServerTransportImpl.Configuration : IThriftConfiguration;
761begin
762 result := FConfig;
763end;
764
Jens Geyerd5436f52014-10-03 19:50:38 +0200765{ TServerSocket }
766
Jens Geyer23d67462015-12-19 11:44:57 +0100767{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100768constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200769{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100770constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100771{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200772begin
Jens Geyera019cda2019-11-09 23:24:52 +0100773 inherited Create( aConfig);
Jens Geyered994552019-11-09 23:24:52 +0100774 FServer := aServer;
Jens Geyera019cda2019-11-09 23:24:52 +0100775
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200776
777{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +0100778 FClientTimeout := aClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200779{$ELSE}
Jens Geyered994552019-11-09 23:24:52 +0100780 FServer.RecvTimeout := aClientTimeout;
781 FServer.SendTimeout := aClientTimeout;
782{$ENDIF}
783end;
784
785
786{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100787constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100788{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100789constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200790{$ENDIF}
791begin
Jens Geyera019cda2019-11-09 23:24:52 +0100792 inherited Create( aConfig);
Jens Geyer41f47af2019-11-09 23:24:52 +0100793
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200794{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +0100795 FPort := aPort;
796 FClientTimeout := aClientTimeout;
797
798 FOwnsServer := True;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200799 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200800 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200801 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200802 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200803 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200804 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200805 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200806{$ELSE}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200807 FOwnsServer := True;
Jens Geyered994552019-11-09 23:24:52 +0100808 FServer := TServerSocket.Create(aPort, aClientTimeout, aClientTimeout);
809{$ENDIF}
810
811 FUseBufferedSocket := aUseBufferedSockets;
Jens Geyerd5436f52014-10-03 19:50:38 +0200812end;
813
814destructor TServerSocketImpl.Destroy;
815begin
816 if FOwnsServer then begin
817 FServer.Free;
818 FServer := nil;
819 end;
820 inherited;
821end;
822
823function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
824var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200825{$IFDEF OLD_SOCKETS}
826 client : TCustomIpClient;
827{$ELSE}
828 client: TSocket;
829{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200830 trans : IStreamTransport;
831begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100832 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200833 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200834 end;
835
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200836{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200837 client := nil;
838 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200839 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200840
841 if Assigned(fnAccepting)
842 then fnAccepting();
843
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100844 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200845 client.Free;
846 Result := nil;
847 Exit;
848 end;
849
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100850 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200851 Result := nil;
852 Exit;
853 end;
854
Jens Geyera019cda2019-11-09 23:24:52 +0100855 trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
Jens Geyerd5436f52014-10-03 19:50:38 +0200856 client := nil; // trans owns it now
857
858 if FUseBufferedSocket
859 then result := TBufferedTransportImpl.Create( trans)
860 else result := trans;
861
862 except
863 on E: Exception do begin
864 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200865 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200866 end;
867 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200868{$ELSE}
869 if Assigned(fnAccepting) then
870 fnAccepting();
871
872 client := FServer.Accept;
873 try
Jens Geyera019cda2019-11-09 23:24:52 +0100874 trans := TSocketImpl.Create(client, TRUE, Configuration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200875 client := nil;
876
877 if FUseBufferedSocket then
878 Result := TBufferedTransportImpl.Create(trans)
879 else
880 Result := trans;
881 except
882 client.Free;
883 raise;
884 end;
885{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200886end;
887
888procedure TServerSocketImpl.Listen;
889begin
890 if FServer <> nil then
891 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200892{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200893 try
894 FServer.Active := True;
895 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200896 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200897 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200898 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200899{$ELSE}
900 FServer.Listen;
901{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200902 end;
903end;
904
905procedure TServerSocketImpl.Close;
906begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200907 if FServer <> nil then
908{$IFDEF OLD_SOCKETS}
909 try
910 FServer.Active := False;
911 except
912 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200913 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200914 end;
915{$ELSE}
916 FServer.Close;
917{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200918end;
919
920{ TSocket }
921
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200922{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100923constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100924{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100925constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100926{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200927var stream : IThriftStream;
928begin
Jens Geyered994552019-11-09 23:24:52 +0100929 FClient := aClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200930 FOwnsClient := aOwnsClient;
Jens Geyered994552019-11-09 23:24:52 +0100931
932{$IFDEF OLD_SOCKETS}
933 FTimeout := aTimeout;
934{$ELSE}
935 FTimeout := aClient.RecvTimeout;
936{$ENDIF}
937
Jens Geyerd5436f52014-10-03 19:50:38 +0200938 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
Jens Geyera019cda2019-11-09 23:24:52 +0100939 inherited Create( stream, stream, aConfig);
Jens Geyerd5436f52014-10-03 19:50:38 +0200940end;
941
Jens Geyera019cda2019-11-09 23:24:52 +0100942
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200943{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100944constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200945{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100946constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200947{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200948begin
Jens Geyera019cda2019-11-09 23:24:52 +0100949 inherited Create(nil,nil, aConfig);
Jens Geyered994552019-11-09 23:24:52 +0100950 FHost := aHost;
951 FPort := aPort;
952 FTimeout := aTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200953 InitSocket;
954end;
955
956destructor TSocketImpl.Destroy;
957begin
958 if FOwnsClient
959 then FreeAndNil( FClient);
960 inherited;
961end;
962
963procedure TSocketImpl.Close;
964begin
965 inherited Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200966
967 FInputStream := nil;
968 FOutputStream := nil;
969
Jens Geyerd5436f52014-10-03 19:50:38 +0200970 if FOwnsClient
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200971 then FreeAndNil( FClient)
972 else FClient := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200973end;
974
975function TSocketImpl.GetIsOpen: Boolean;
976begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200977{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200978 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200979{$ELSE}
980 Result := (FClient <> nil) and FClient.IsOpen
981{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200982end;
983
984procedure TSocketImpl.InitSocket;
985var
986 stream : IThriftStream;
987begin
988 if FOwnsClient
989 then FreeAndNil( FClient)
990 else FClient := nil;
991
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200992{$IFDEF OLD_SOCKETS}
993 FClient := TTcpClient.Create( nil);
994{$ELSE}
995 FClient := TSocket.Create(FHost, FPort);
996{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200997 FOwnsClient := True;
998
999 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1000 FInputStream := stream;
1001 FOutputStream := stream;
1002end;
1003
1004procedure TSocketImpl.Open;
1005begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001006 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001007 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +02001008 end;
1009
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001010 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001011 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +02001012 end;
1013
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001014 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001015 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +02001016 end;
1017
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001018 if FClient = nil
1019 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +02001020
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001021{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001022 FClient.RemoteHost := TSocketHost( Host);
1023 FClient.RemotePort := TSocketPort( IntToStr( Port));
1024 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001025{$ELSE}
1026 FClient.Open;
1027{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001028
1029 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1030 FOutputStream := FInputStream;
1031end;
1032
1033{ TBufferedStream }
1034
1035procedure TBufferedStreamImpl.Close;
1036begin
1037 Flush;
1038 FStream := nil;
1039
1040 FReadBuffer.Free;
1041 FReadBuffer := nil;
1042
1043 FWriteBuffer.Free;
1044 FWriteBuffer := nil;
1045end;
1046
Jens Geyered994552019-11-09 23:24:52 +01001047constructor TBufferedStreamImpl.Create( const aStream: IThriftStream; const aBufSize : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001048begin
1049 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001050 FStream := aStream;
1051 FBufSize := aBufSize;
Jens Geyerf726ae32021-06-04 11:17:26 +02001052 FReadBuffer := TThriftMemoryStream.Create(FBufSize);
1053 FWriteBuffer := TThriftMemoryStream.Create(FBufSize);
Jens Geyerd5436f52014-10-03 19:50:38 +02001054end;
1055
1056destructor TBufferedStreamImpl.Destroy;
1057begin
1058 Close;
1059 inherited;
1060end;
1061
1062procedure TBufferedStreamImpl.Flush;
1063var
1064 buf : TBytes;
1065 len : Integer;
1066begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001067 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001068 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001069 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001070 SetLength( buf, len );
1071 FWriteBuffer.Position := 0;
1072 FWriteBuffer.Read( Pointer(@buf[0])^, len );
1073 FStream.Write( buf, 0, len );
1074 end;
1075 FWriteBuffer.Clear;
1076 end;
1077end;
1078
1079function TBufferedStreamImpl.IsOpen: Boolean;
1080begin
1081 Result := (FWriteBuffer <> nil)
1082 and (FReadBuffer <> nil)
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001083 and (FStream <> nil)
1084 and FStream.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001085end;
1086
1087procedure TBufferedStreamImpl.Open;
1088begin
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001089 FStream.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +02001090end;
1091
Jens Geyer17c3ad92017-09-05 20:31:27 +02001092function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001093var
1094 nRead : Integer;
1095 tempbuf : TBytes;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001096 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001097begin
1098 inherited;
1099 Result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001100
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001101 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001102 while count > 0 do begin
1103
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001104 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001105 FReadBuffer.Clear;
1106 SetLength( tempbuf, FBufSize);
1107 nRead := FStream.Read( tempbuf, 0, FBufSize );
1108 if nRead = 0 then Break; // avoid infinite loop
1109
1110 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
1111 FReadBuffer.Position := 0;
1112 end;
1113
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001114 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001115 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
1116 pTmp := pBuf;
1117 Inc( pTmp, offset);
1118 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +02001119 Dec( count, nRead);
1120 Inc( offset, nRead);
1121 end;
1122 end;
1123 end;
1124end;
1125
Jens Geyered994552019-11-09 23:24:52 +01001126
Jens Geyerd5436f52014-10-03 19:50:38 +02001127function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001128var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001129begin
Jens Geyera019cda2019-11-09 23:24:52 +01001130 if IsOpen
1131 then len := FReadBuffer.Size
1132 else len := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001133
1134 SetLength( Result, len);
1135
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001136 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001137 FReadBuffer.Position := 0;
1138 FReadBuffer.Read( Pointer(@Result[0])^, len );
1139 end;
1140end;
1141
Jens Geyer17c3ad92017-09-05 20:31:27 +02001142procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001143var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001144begin
1145 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001146 if count > 0 then begin
1147 if IsOpen then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001148 pTmp := pBuf;
1149 Inc( pTmp, offset);
1150 FWriteBuffer.Write( pTmp^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001151 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001152 Flush;
1153 end;
1154 end;
1155 end;
1156end;
1157
Jens Geyera019cda2019-11-09 23:24:52 +01001158
1159function TBufferedStreamImpl.Size : Int64;
1160begin
1161 result := FReadBuffer.Size;
1162end;
1163
1164
1165function TBufferedStreamImpl.Position : Int64;
1166begin
1167 result := FReadBuffer.Position;
1168end;
1169
1170
Jens Geyerd5436f52014-10-03 19:50:38 +02001171{ TStreamTransportImpl }
1172
Jens Geyera019cda2019-11-09 23:24:52 +01001173constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
Jens Geyerd5436f52014-10-03 19:50:38 +02001174begin
Jens Geyera019cda2019-11-09 23:24:52 +01001175 inherited Create( aConfig);
Jens Geyered994552019-11-09 23:24:52 +01001176 FInputStream := aInputStream;
1177 FOutputStream := aOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +02001178end;
1179
1180destructor TStreamTransportImpl.Destroy;
1181begin
1182 FInputStream := nil;
1183 FOutputStream := nil;
1184 inherited;
1185end;
1186
Jens Geyer20e727e2018-06-22 22:39:57 +02001187procedure TStreamTransportImpl.Close;
1188begin
1189 FInputStream := nil;
1190 FOutputStream := nil;
1191end;
1192
Jens Geyerd5436f52014-10-03 19:50:38 +02001193procedure TStreamTransportImpl.Flush;
1194begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001195 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001196 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001197 end;
1198
1199 FOutputStream.Flush;
1200end;
1201
1202function TStreamTransportImpl.GetInputStream: IThriftStream;
1203begin
1204 Result := FInputStream;
1205end;
1206
1207function TStreamTransportImpl.GetIsOpen: Boolean;
1208begin
1209 Result := True;
1210end;
1211
1212function TStreamTransportImpl.GetOutputStream: IThriftStream;
1213begin
Jens Geyer02fbe0e2018-03-19 17:35:44 +01001214 Result := FOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +02001215end;
1216
1217procedure TStreamTransportImpl.Open;
1218begin
Jens Geyer2646bd62019-11-09 23:24:52 +01001219 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +02001220end;
1221
Jens Geyer17c3ad92017-09-05 20:31:27 +02001222function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001223begin
Jens Geyered994552019-11-09 23:24:52 +01001224 if FInputStream = nil
1225 then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001226
Jens Geyer17c3ad92017-09-05 20:31:27 +02001227 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyera019cda2019-11-09 23:24:52 +01001228 CountConsumedMessageBytes( result);
Jens Geyerd5436f52014-10-03 19:50:38 +02001229end;
1230
Jens Geyer17c3ad92017-09-05 20:31:27 +02001231procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001232begin
Jens Geyered994552019-11-09 23:24:52 +01001233 if FOutputStream = nil
1234 then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001235
Jens Geyer17c3ad92017-09-05 20:31:27 +02001236 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001237end;
1238
1239{ TBufferedTransportImpl }
1240
Jens Geyered994552019-11-09 23:24:52 +01001241constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001242begin
Jens Geyered994552019-11-09 23:24:52 +01001243 ASSERT( aTransport <> nil);
Jens Geyera019cda2019-11-09 23:24:52 +01001244 inherited Create( aTransport);
Jens Geyered994552019-11-09 23:24:52 +01001245 FBufSize := aBufSize;
Jens Geyerd5436f52014-10-03 19:50:38 +02001246 InitBuffers;
1247end;
1248
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001249procedure TBufferedTransportImpl.Close;
1250begin
Jens Geyera019cda2019-11-09 23:24:52 +01001251 InnerTransport.Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001252 FInputBuffer := nil;
Jens Geyered994552019-11-09 23:24:52 +01001253 FOutputBuffer := nil;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001254end;
1255
Jens Geyerd5436f52014-10-03 19:50:38 +02001256procedure TBufferedTransportImpl.Flush;
1257begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001258 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001259 FOutputBuffer.Flush;
1260 end;
1261end;
1262
1263function TBufferedTransportImpl.GetIsOpen: Boolean;
1264begin
Jens Geyera019cda2019-11-09 23:24:52 +01001265 Result := InnerTransport.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001266end;
1267
1268procedure TBufferedTransportImpl.InitBuffers;
1269begin
Jens Geyera019cda2019-11-09 23:24:52 +01001270 if InnerTransport.InputStream <> nil then begin
1271 FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
Jens Geyerd5436f52014-10-03 19:50:38 +02001272 end;
Jens Geyera019cda2019-11-09 23:24:52 +01001273 if InnerTransport.OutputStream <> nil then begin
1274 FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
Jens Geyerd5436f52014-10-03 19:50:38 +02001275 end;
1276end;
1277
1278procedure TBufferedTransportImpl.Open;
1279begin
Jens Geyera019cda2019-11-09 23:24:52 +01001280 InnerTransport.Open;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001281 InitBuffers; // we need to get the buffers to match FTransport substreams again
Jens Geyerd5436f52014-10-03 19:50:38 +02001282end;
1283
Jens Geyer17c3ad92017-09-05 20:31:27 +02001284function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001285begin
Jens Geyered994552019-11-09 23:24:52 +01001286 if FInputBuffer <> nil
Jens Geyera019cda2019-11-09 23:24:52 +01001287 then Result := FInputBuffer.Read( pBuf,buflen, off, len )
Jens Geyered994552019-11-09 23:24:52 +01001288 else Result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001289end;
1290
Jens Geyer17c3ad92017-09-05 20:31:27 +02001291procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001292begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001293 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001294 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001295 end;
1296end;
1297
Jens Geyera019cda2019-11-09 23:24:52 +01001298procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1299var buffered, need : Int64;
Jens Geyer41f47af2019-11-09 23:24:52 +01001300begin
1301 need := value;
1302
1303 // buffered bytes
Jens Geyera019cda2019-11-09 23:24:52 +01001304 buffered := FInputBuffer.Size - FInputBuffer.Position;
1305 if buffered < need
1306 then InnerTransport.CheckReadBytesAvailable( need - buffered);
Jens Geyer41f47af2019-11-09 23:24:52 +01001307end;
1308
Jens Geyera019cda2019-11-09 23:24:52 +01001309
Jens Geyered994552019-11-09 23:24:52 +01001310{ TBufferedTransportImpl.TFactory }
Jens Geyerd5436f52014-10-03 19:50:38 +02001311
Jens Geyered994552019-11-09 23:24:52 +01001312function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +02001313begin
Jens Geyered994552019-11-09 23:24:52 +01001314 Result := TFramedTransportImpl.Create( aTransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001315end;
1316
Jens Geyered994552019-11-09 23:24:52 +01001317
1318{ TFramedTransportImpl }
1319
1320constructor TFramedTransportImpl.Create( const aTransport: ITransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001321begin
Jens Geyered994552019-11-09 23:24:52 +01001322 ASSERT( aTransport <> nil);
Jens Geyera019cda2019-11-09 23:24:52 +01001323 inherited Create( aTransport);
Jens Geyer2646bd62019-11-09 23:24:52 +01001324
Jens Geyerd5436f52014-10-03 19:50:38 +02001325 InitWriteBuffer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001326end;
1327
1328destructor TFramedTransportImpl.Destroy;
1329begin
1330 FWriteBuffer.Free;
Jens Geyera019cda2019-11-09 23:24:52 +01001331 FWriteBuffer := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001332 FReadBuffer.Free;
Jens Geyera019cda2019-11-09 23:24:52 +01001333 FReadBuffer := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001334 inherited;
1335end;
1336
Jens Geyer2646bd62019-11-09 23:24:52 +01001337procedure TFramedTransportImpl.Close;
1338begin
Jens Geyera019cda2019-11-09 23:24:52 +01001339 InnerTransport.Close;
Jens Geyer2646bd62019-11-09 23:24:52 +01001340end;
1341
Jens Geyerd5436f52014-10-03 19:50:38 +02001342procedure TFramedTransportImpl.Flush;
1343var
1344 buf : TBytes;
1345 len : Integer;
Jens Geyera019cda2019-11-09 23:24:52 +01001346 data_len : Int64;
Jens Geyerd5436f52014-10-03 19:50:38 +02001347begin
Jens Geyer528a0f02019-11-18 20:17:03 +01001348 if not IsOpen
1349 then raise TTransportExceptionNotOpen.Create('not open');
1350
Jens Geyerd5436f52014-10-03 19:50:38 +02001351 len := FWriteBuffer.Size;
1352 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001353 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001354 System.Move( FWriteBuffer.Memory^, buf[0], len );
1355 end;
1356
Jens Geyer2646bd62019-11-09 23:24:52 +01001357 data_len := len - SizeOf(TFramedHeader);
Jens Geyera019cda2019-11-09 23:24:52 +01001358 if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
1359 then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
1360 else UpdateKnownMessageSize( len);
Jens Geyerd5436f52014-10-03 19:50:38 +02001361
1362 InitWriteBuffer;
1363
1364 buf[0] := Byte($FF and (data_len shr 24));
1365 buf[1] := Byte($FF and (data_len shr 16));
1366 buf[2] := Byte($FF and (data_len shr 8));
1367 buf[3] := Byte($FF and data_len);
1368
Jens Geyera019cda2019-11-09 23:24:52 +01001369 InnerTransport.Write( buf, 0, len );
1370 InnerTransport.Flush;
Jens Geyerd5436f52014-10-03 19:50:38 +02001371end;
1372
1373function TFramedTransportImpl.GetIsOpen: Boolean;
1374begin
Jens Geyera019cda2019-11-09 23:24:52 +01001375 Result := InnerTransport.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001376end;
1377
Jens Geyerd5436f52014-10-03 19:50:38 +02001378procedure TFramedTransportImpl.InitWriteBuffer;
Jens Geyer2646bd62019-11-09 23:24:52 +01001379const DUMMY_HEADER : TFramedHeader = 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001380begin
Jens Geyer528a0f02019-11-18 20:17:03 +01001381 FreeAndNil( FWriteBuffer);
Jens Geyerf726ae32021-06-04 11:17:26 +02001382 FWriteBuffer := TThriftMemoryStream.Create(1024);
Jens Geyer2646bd62019-11-09 23:24:52 +01001383 FWriteBuffer.Write( DUMMY_HEADER, SizeOf(DUMMY_HEADER));
Jens Geyerd5436f52014-10-03 19:50:38 +02001384end;
1385
1386procedure TFramedTransportImpl.Open;
1387begin
Jens Geyera019cda2019-11-09 23:24:52 +01001388 InnerTransport.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +02001389end;
1390
Jens Geyer17c3ad92017-09-05 20:31:27 +02001391function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001392var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001393begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001394 if len > (buflen-off)
1395 then len := buflen-off;
1396
Jens Geyer5089b0a2018-02-01 22:37:18 +01001397 pTmp := pBuf;
1398 Inc( pTmp, off);
1399
Jens Geyer17c3ad92017-09-05 20:31:27 +02001400 if (FReadBuffer <> nil) and (len > 0) then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001401 result := FReadBuffer.Read( pTmp^, len);
Jens Geyered994552019-11-09 23:24:52 +01001402 if result > 0 then Exit;
Jens Geyerd5436f52014-10-03 19:50:38 +02001403 end;
1404
1405 ReadFrame;
1406 if len > 0
Jens Geyer5089b0a2018-02-01 22:37:18 +01001407 then Result := FReadBuffer.Read( pTmp^, len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001408 else Result := 0;
1409end;
1410
1411procedure TFramedTransportImpl.ReadFrame;
1412var
Jens Geyer2646bd62019-11-09 23:24:52 +01001413 i32rd : packed array[0..SizeOf(TFramedHeader)-1] of Byte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001414 size : Integer;
1415 buff : TBytes;
1416begin
Jens Geyera019cda2019-11-09 23:24:52 +01001417 InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
Jens Geyerd5436f52014-10-03 19:50:38 +02001418 size :=
1419 ((i32rd[0] and $FF) shl 24) or
1420 ((i32rd[1] and $FF) shl 16) or
1421 ((i32rd[2] and $FF) shl 8) or
1422 (i32rd[3] and $FF);
Jens Geyer2646bd62019-11-09 23:24:52 +01001423
1424 if size < 0 then begin
1425 Close();
1426 raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
1427 end;
1428
Jens Geyera019cda2019-11-09 23:24:52 +01001429 if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
Jens Geyer2646bd62019-11-09 23:24:52 +01001430 Close();
Jens Geyer589ee5b2021-03-29 21:40:55 +02001431 if CharUtils.IsHtmlDoctype(size)
1432 then raise TTransportExceptionCorruptedData.Create('Remote end sends HTML instead of data')
1433 else raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
Jens Geyer2646bd62019-11-09 23:24:52 +01001434 end;
1435
Jens Geyera019cda2019-11-09 23:24:52 +01001436 UpdateKnownMessageSize(size + SizeOf(size));
1437
Jens Geyerd5436f52014-10-03 19:50:38 +02001438 SetLength( buff, size );
Jens Geyera019cda2019-11-09 23:24:52 +01001439 InnerTransport.ReadAll( buff, 0, size );
Jens Geyered994552019-11-09 23:24:52 +01001440
1441 FreeAndNil( FReadBuffer);
Jens Geyerf726ae32021-06-04 11:17:26 +02001442 FReadBuffer := TThriftMemoryStream.Create(1024);
Jens Geyera76e6c72017-09-08 21:03:30 +02001443 if Length(buff) > 0
1444 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001445 FReadBuffer.Position := 0;
1446end;
1447
Jens Geyer17c3ad92017-09-05 20:31:27 +02001448procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001449var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001450begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001451 if len > 0 then begin
1452 pTmp := pBuf;
1453 Inc( pTmp, off);
1454
1455 FWriteBuffer.Write( pTmp^, len );
1456 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001457end;
1458
Jens Geyered994552019-11-09 23:24:52 +01001459
Jens Geyera019cda2019-11-09 23:24:52 +01001460procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1461var buffered, need : Int64;
Jens Geyer41f47af2019-11-09 23:24:52 +01001462begin
Jens Geyera019cda2019-11-09 23:24:52 +01001463 need := value;
Jens Geyer41f47af2019-11-09 23:24:52 +01001464
Jens Geyera019cda2019-11-09 23:24:52 +01001465 // buffered bytes
1466 buffered := FReadBuffer.Size - FReadBuffer.Position;
1467 if buffered < need
1468 then InnerTransport.CheckReadBytesAvailable( need - buffered);
Jens Geyer41f47af2019-11-09 23:24:52 +01001469end;
1470
1471
Jens Geyerd5436f52014-10-03 19:50:38 +02001472{ TFramedTransport.TFactory }
1473
Jens Geyered994552019-11-09 23:24:52 +01001474function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +02001475begin
Jens Geyered994552019-11-09 23:24:52 +01001476 Result := TFramedTransportImpl.Create( aTransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001477end;
1478
1479{ TTcpSocketStreamImpl }
1480
1481procedure TTcpSocketStreamImpl.Close;
1482begin
1483 FTcpClient.Close;
1484end;
1485
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001486{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +01001487constructor TTcpSocketStreamImpl.Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001488begin
1489 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001490 FTcpClient := aTcpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +02001491 FTimeout := aTimeout;
1492end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001493{$ELSE}
Jens Geyered994552019-11-09 23:24:52 +01001494constructor TTcpSocketStreamImpl.Create( const aTcpClient: TSocket; const aTimeout : Longword);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001495begin
1496 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001497 FTcpClient := aTcpClient;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001498 if aTimeout = 0 then
1499 FTcpClient.RecvTimeout := SLEEP_TIME
1500 else
1501 FTcpClient.RecvTimeout := aTimeout;
1502 FTcpClient.SendTimeout := aTimeout;
1503end;
1504{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001505
1506procedure TTcpSocketStreamImpl.Flush;
1507begin
Jens Geyera019cda2019-11-09 23:24:52 +01001508 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +02001509end;
1510
Jens Geyera019cda2019-11-09 23:24:52 +01001511
Jens Geyerd5436f52014-10-03 19:50:38 +02001512function TTcpSocketStreamImpl.IsOpen: Boolean;
1513begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001514{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001515 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001516{$ELSE}
1517 Result := FTcpClient.IsOpen;
1518{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001519end;
1520
1521procedure TTcpSocketStreamImpl.Open;
1522begin
1523 FTcpClient.Open;
1524end;
1525
1526
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001527{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001528function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1529 TimeOut: Integer; var wsaError : Integer): Integer;
1530var
1531 ReadFds: TFDset;
1532 ReadFdsptr: PFDset;
1533 WriteFds: TFDset;
1534 WriteFdsptr: PFDset;
1535 ExceptFds: TFDset;
1536 ExceptFdsptr: PFDset;
1537 tv: timeval;
1538 Timeptr: PTimeval;
1539 socket : TSocket;
1540begin
1541 if not FTcpClient.Active then begin
1542 wsaError := WSAEINVAL;
1543 Exit( SOCKET_ERROR);
1544 end;
1545
1546 socket := FTcpClient.Handle;
1547
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001548 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001549 ReadFdsptr := @ReadFds;
1550 FD_ZERO(ReadFds);
1551 FD_SET(socket, ReadFds);
1552 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001553 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001554 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001555 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001556
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001557 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001558 WriteFdsptr := @WriteFds;
1559 FD_ZERO(WriteFds);
1560 FD_SET(socket, WriteFds);
1561 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001562 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001563 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001564 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001565
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001566 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001567 ExceptFdsptr := @ExceptFds;
1568 FD_ZERO(ExceptFds);
1569 FD_SET(socket, ExceptFds);
1570 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001571 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001572 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001573 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001574
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001575 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001576 tv.tv_sec := TimeOut div 1000;
1577 tv.tv_usec := 1000 * (TimeOut mod 1000);
1578 Timeptr := @tv;
1579 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001580 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001581 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001582 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001583
1584 wsaError := 0;
1585 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001586 {$IFDEF MSWINDOWS}
1587 {$IFDEF OLD_UNIT_NAMES}
1588 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1589 {$ELSE}
1590 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1591 {$ENDIF}
1592 {$ENDIF}
1593 {$IFDEF LINUX}
1594 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1595 {$ENDIF}
Jens Geyera019cda2019-11-09 23:24:52 +01001596
Jens Geyerd5436f52014-10-03 19:50:38 +02001597 if result = SOCKET_ERROR
1598 then wsaError := WSAGetLastError;
1599
1600 except
1601 result := SOCKET_ERROR;
1602 end;
1603
1604 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001605 ReadReady^ := FD_ISSET(socket, ReadFds);
1606
Jens Geyerd5436f52014-10-03 19:50:38 +02001607 if Assigned(WriteReady) then
1608 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001609
Jens Geyerd5436f52014-10-03 19:50:38 +02001610 if Assigned(ExceptFlag) then
1611 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1612end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001613{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001614
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001615{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001616function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1617 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001618 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001619var bCanRead, bError : Boolean;
1620 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001621const
1622 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001623begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001624 bytesReady := 0;
1625
Jens Geyerd5436f52014-10-03 19:50:38 +02001626 // The select function returns the total number of socket handles that are ready
1627 // and contained in the fd_set structures, zero if the time limit expired,
1628 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1629 // WSAGetLastError can be used to retrieve a specific error code.
1630 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1631 if retval = SOCKET_ERROR
1632 then Exit( TWaitForData.wfd_Error);
1633 if (retval = 0) or not bCanRead
1634 then Exit( TWaitForData.wfd_Timeout);
1635
1636 // recv() returns the number of bytes received, or -1 if an error occurred.
1637 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001638
1639 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001640 if retval <= 0
1641 then Exit( TWaitForData.wfd_Error);
1642
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001643 // at least we have some data
1644 bytesReady := Min( retval, DesiredBytes);
1645 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001646end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001647{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001648
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001649{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001650function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001651// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001652var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001653 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001654 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001655 nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001656 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001657begin
1658 inherited;
1659
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001660 if FTimeout > 0
1661 then msecs := FTimeout
1662 else msecs := DEFAULT_THRIFT_TIMEOUT;
1663
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001664 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001665 pTmp := pBuf;
1666 Inc( pTmp, offset);
Jens Geyerc140bb92019-11-27 22:18:12 +01001667 while (count > 0) and (result = 0) do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001668
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001669 while TRUE do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001670 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001671 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001672 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001673 TWaitForData.wfd_HaveData : Break;
1674 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001675 if (FTimeout = 0)
1676 then Exit
Jens Geyera019cda2019-11-09 23:24:52 +01001677 else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001678 end;
1679 else
1680 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001681 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001682 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001683
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001684 // reduce the timeout once we got data
1685 if FTimeout > 0
1686 then msecs := FTimeout div 10
1687 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1688 msecs := Max( msecs, 200);
1689
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001690 ASSERT( nBytes <= count);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001691 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1692 Inc( pTmp, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001693 Dec( count, nBytes);
1694 Inc( result, nBytes);
1695 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001696end;
1697
1698function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001699// old sockets version
1700var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001701begin
1702 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001703 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001704 len := FTcpClient.BytesReceived;
1705 end;
1706
1707 SetLength( Result, len );
1708
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001709 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001710 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1711 end;
1712end;
1713
Jens Geyer17c3ad92017-09-05 20:31:27 +02001714procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001715// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001716var bCanWrite, bError : Boolean;
1717 retval, wsaError : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001718 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001719begin
1720 inherited;
1721
1722 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001723 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001724
1725 // The select function returns the total number of socket handles that are ready
1726 // and contained in the fd_set structures, zero if the time limit expired,
1727 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1728 // WSAGetLastError can be used to retrieve a specific error code.
1729 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1730 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001731 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001732
Jens Geyerd5436f52014-10-03 19:50:38 +02001733 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001734 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001735
Jens Geyerd5436f52014-10-03 19:50:38 +02001736 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001737 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001738
Jens Geyer5089b0a2018-02-01 22:37:18 +01001739 pTmp := pBuf;
1740 Inc( pTmp, offset);
1741 FTcpClient.SendBuf( pTmp^, count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001742end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001743
1744{$ELSE}
1745
Jens Geyer17c3ad92017-09-05 20:31:27 +02001746function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001747// new sockets version
1748var nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001749 pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001750begin
1751 inherited;
1752
1753 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001754 pTmp := pBuf;
1755 Inc( pTmp, offset);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001756 while count > 0 do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001757 nBytes := FTcpClient.Read( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001758 if nBytes = 0 then Exit;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001759 Inc( pTmp, nBytes);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001760 Dec( count, nBytes);
1761 Inc( result, nBytes);
1762 end;
1763end;
1764
1765function TTcpSocketStreamImpl.ToArray: TBytes;
1766// new sockets version
1767var len : Integer;
1768begin
1769 len := 0;
1770 try
1771 if FTcpClient.Peek then
1772 repeat
1773 SetLength(Result, Length(Result) + 1024);
1774 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1775 until len < 1024;
1776 except
1777 on TTransportException do begin { don't allow default exceptions } end;
1778 else raise;
1779 end;
1780 if len > 0 then
1781 SetLength(Result, Length(Result) - 1024 + len);
1782end;
1783
Jens Geyer17c3ad92017-09-05 20:31:27 +02001784procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001785// new sockets version
Jens Geyer5089b0a2018-02-01 22:37:18 +01001786var pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001787begin
1788 inherited;
1789
1790 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001791 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001792
Jens Geyer5089b0a2018-02-01 22:37:18 +01001793 pTmp := pBuf;
1794 Inc( pTmp, offset);
1795 FTcpClient.Write( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001796end;
1797
Jens Geyer23d67462015-12-19 11:44:57 +01001798{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001799
Jens Geyerd5436f52014-10-03 19:50:38 +02001800
1801end.