blob: 4ca38315d5085917618f26dda588faaea1761684 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
Jens Geyer02230912019-04-03 01:12:51 +020032 WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer02230912019-04-03 01:12:51 +020034 Winapi.WinSock,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020035 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyera019cda2019-11-09 23:24:52 +010041 Thrift.Configuration,
Jens Geyerd5436f52014-10-03 19:50:38 +020042 Thrift.Collections,
Jens Geyer606f1ef2018-04-09 23:09:41 +020043 Thrift.Exception,
Jens Geyerd5436f52014-10-03 19:50:38 +020044 Thrift.Utils,
Jens Geyer02230912019-04-03 01:12:51 +020045 Thrift.WinHTTP,
Nick4f5229e2016-04-14 16:43:22 +030046 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020047
48type
Jens Geyera019cda2019-11-09 23:24:52 +010049 IStreamTransport = interface;
Jens Geyer41f47af2019-11-09 23:24:52 +010050
Jens Geyerd5436f52014-10-03 19:50:38 +020051 ITransport = interface
Jens Geyera019cda2019-11-09 23:24:52 +010052 ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
Jens Geyerd5436f52014-10-03 19:50:38 +020053 function GetIsOpen: Boolean;
54 property IsOpen: Boolean read GetIsOpen;
55 function Peek: Boolean;
56 procedure Open;
57 procedure Close;
Jens Geyer41f47af2019-11-09 23:24:52 +010058
Jens Geyer17c3ad92017-09-05 20:31:27 +020059 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
60 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
61 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
62 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020063 procedure Write( const buf: TBytes); overload;
64 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020065 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
66 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020067 procedure Flush;
Jens Geyer41f47af2019-11-09 23:24:52 +010068
Jens Geyera019cda2019-11-09 23:24:52 +010069 function Configuration : IThriftConfiguration;
70 function MaxMessageSize : Integer;
Jens Geyer5a781c22025-02-04 23:35:55 +010071 procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
Jens Geyera019cda2019-11-09 23:24:52 +010072 procedure CheckReadBytesAvailable( const numBytes : Int64);
73 procedure UpdateKnownMessageSize( const size : Int64);
Jens Geyerd5436f52014-10-03 19:50:38 +020074 end;
75
Jens Geyera019cda2019-11-09 23:24:52 +010076 TTransportBase = class abstract( TInterfacedObject)
Jens Geyerfad7fd32019-11-09 23:24:52 +010077 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +020078 function GetIsOpen: Boolean; virtual; abstract;
79 property IsOpen: Boolean read GetIsOpen;
80 function Peek: Boolean; virtual;
81 procedure Open(); virtual; abstract;
82 procedure Close(); virtual; abstract;
Jens Geyer41f47af2019-11-09 23:24:52 +010083
Jens Geyer17c3ad92017-09-05 20:31:27 +020084 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
85 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
86 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
87 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
88 procedure Write( const buf: TBytes); overload; inline;
89 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
90 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
91 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020092 procedure Flush; virtual;
Jens Geyer41f47af2019-11-09 23:24:52 +010093
Jens Geyera019cda2019-11-09 23:24:52 +010094 function Configuration : IThriftConfiguration; virtual; abstract;
95 procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
96 end;
Jens Geyer41f47af2019-11-09 23:24:52 +010097
Jens Geyera019cda2019-11-09 23:24:52 +010098 // base class for all endpoint transports, e.g. sockets, pipes or HTTP
99 TEndpointTransportBase = class abstract( TTransportBase, ITransport)
100 strict private
101 FRemainingMessageSize : Int64;
102 FKnownMessageSize : Int64;
103 FConfiguration : IThriftConfiguration;
104 strict protected
105 function Configuration : IThriftConfiguration; override;
106 function MaxMessageSize : Integer;
107 property RemainingMessageSize : Int64 read FRemainingMessageSize;
108 property KnownMessageSize : Int64 read FKnownMessageSize;
Jens Geyer5a781c22025-02-04 23:35:55 +0100109 procedure ResetMessageSizeAndConsumedBytes( const newSize : Int64 = -1);
Jens Geyera019cda2019-11-09 23:24:52 +0100110 procedure UpdateKnownMessageSize(const size : Int64); override;
Jens Geyer5a781c22025-02-04 23:35:55 +0100111 procedure CheckReadBytesAvailable(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
112 procedure CountConsumedMessageBytes(const numBytes : Int64); {$IFNDEF Debug} inline; {$ENDIF}
Jens Geyer41f47af2019-11-09 23:24:52 +0100113 public
Jens Geyera019cda2019-11-09 23:24:52 +0100114 constructor Create( const aConfig : IThriftConfiguration); reintroduce;
115 end;
116
117 // base class for all layered transports, e.g. framed
118 TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
119 strict private
120 FTransport : T;
121 strict protected
122 property InnerTransport : T read FTransport;
123 function GetUnderlyingTransport: ITransport;
124 function Configuration : IThriftConfiguration; override;
125 procedure UpdateKnownMessageSize( const size : Int64); override;
126 function MaxMessageSize : Integer; inline;
Jens Geyer5a781c22025-02-04 23:35:55 +0100127 procedure ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1); inline;
Jens Geyera019cda2019-11-09 23:24:52 +0100128 procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
129 public
130 constructor Create( const aTransport: T); reintroduce;
131 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200132 end;
133
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100134 TTransportException = class abstract( TException)
Jens Geyerd5436f52014-10-03 19:50:38 +0200135 public
136 type
137 TExceptionType = (
138 Unknown,
139 NotOpen,
140 AlreadyOpen,
141 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200142 EndOfFile,
143 BadArgs,
Jens Geyer2646bd62019-11-09 23:24:52 +0100144 Interrupted,
145 CorruptedData
Jens Geyerd5436f52014-10-03 19:50:38 +0200146 );
Jens Geyerfad7fd32019-11-09 23:24:52 +0100147 strict protected
Jens Geyere0e32402016-04-20 21:50:48 +0200148 constructor HiddenCreate(const Msg: string);
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100149 class function GetType: TExceptionType; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +0200150 public
Jens Geyer41f47af2019-11-09 23:24:52 +0100151 class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyere0e32402016-04-20 21:50:48 +0200152 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyer41f47af2019-11-09 23:24:52 +0100153 class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyere0e32402016-04-20 21:50:48 +0200154 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200155 end;
156
Jens Geyere0e32402016-04-20 21:50:48 +0200157 // Needed to remove deprecation warning
158 TTransportExceptionSpecialized = class abstract (TTransportException)
159 public
160 constructor Create(const Msg: string);
161 end;
162
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100163 TTransportExceptionUnknown = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100164 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100165 class function GetType: TTransportException.TExceptionType; override;
166 end;
167
168 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100169 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100170 class function GetType: TTransportException.TExceptionType; override;
171 end;
172
173 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100174 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100175 class function GetType: TTransportException.TExceptionType; override;
176 end;
177
178 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100179 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100180 class function GetType: TTransportException.TExceptionType; override;
181 end;
182
183 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100184 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100185 class function GetType: TTransportException.TExceptionType; override;
186 end;
187
188 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100189 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100190 class function GetType: TTransportException.TExceptionType; override;
191 end;
192
193 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100194 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100195 class function GetType: TTransportException.TExceptionType; override;
196 end;
Jens Geyere0e32402016-04-20 21:50:48 +0200197
Jens Geyer2646bd62019-11-09 23:24:52 +0100198 TTransportExceptionCorruptedData = class (TTransportExceptionSpecialized)
199 protected
200 class function GetType: TTransportException.TExceptionType; override;
201 end;
202
Jens Geyer47f63172019-06-06 22:42:58 +0200203 TSecureProtocol = (
204 SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
205 TLS_1_1, TLS_1_2 // secure (as of today)
206 );
207
208 TSecureProtocols = set of TSecureProtocol;
209
Jens Geyerd5436f52014-10-03 19:50:38 +0200210 IHTTPClient = interface( ITransport )
Jens Geyer47f63172019-06-06 22:42:58 +0200211 ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
Jens Geyer20e727e2018-06-22 22:39:57 +0200212 procedure SetDnsResolveTimeout(const Value: Integer);
213 function GetDnsResolveTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200214 procedure SetConnectionTimeout(const Value: Integer);
215 function GetConnectionTimeout: Integer;
Jens Geyer20e727e2018-06-22 22:39:57 +0200216 procedure SetSendTimeout(const Value: Integer);
217 function GetSendTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200218 procedure SetReadTimeout(const Value: Integer);
219 function GetReadTimeout: Integer;
220 function GetCustomHeaders: IThriftDictionary<string,string>;
221 procedure SendRequest;
Jens Geyer47f63172019-06-06 22:42:58 +0200222 function GetSecureProtocols : TSecureProtocols;
223 procedure SetSecureProtocols( const value : TSecureProtocols);
Jens Geyer20e727e2018-06-22 22:39:57 +0200224
225 property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200226 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
Jens Geyer20e727e2018-06-22 22:39:57 +0200227 property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200228 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
229 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
Jens Geyer47f63172019-06-06 22:42:58 +0200230 property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
Jens Geyerd5436f52014-10-03 19:50:38 +0200231 end;
232
Jens Geyerd5436f52014-10-03 19:50:38 +0200233 IServerTransport = interface
Jens Geyera019cda2019-11-09 23:24:52 +0100234 ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
Jens Geyerd5436f52014-10-03 19:50:38 +0200235 procedure Listen;
236 procedure Close;
237 function Accept( const fnAccepting: TProc): ITransport;
Jens Geyera019cda2019-11-09 23:24:52 +0100238 function Configuration : IThriftConfiguration;
Jens Geyerd5436f52014-10-03 19:50:38 +0200239 end;
240
241 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
Jens Geyera019cda2019-11-09 23:24:52 +0100242 strict private
243 FConfig : IThriftConfiguration;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100244 strict protected
Jens Geyera019cda2019-11-09 23:24:52 +0100245 function Configuration : IThriftConfiguration;
Jens Geyerd5436f52014-10-03 19:50:38 +0200246 procedure Listen; virtual; abstract;
247 procedure Close; virtual; abstract;
Jens Geyera019cda2019-11-09 23:24:52 +0100248 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
249 public
250 constructor Create( const aConfig : IThriftConfiguration);
Jens Geyerd5436f52014-10-03 19:50:38 +0200251 end;
252
253 ITransportFactory = interface
254 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
Jens Geyer41f47af2019-11-09 23:24:52 +0100255 function GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200256 end;
257
Jens Geyera019cda2019-11-09 23:24:52 +0100258 TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
259 strict protected
Jens Geyered994552019-11-09 23:24:52 +0100260 function GetTransport( const aTransport: ITransport): ITransport; virtual;
Jens Geyerd5436f52014-10-03 19:50:38 +0200261 end;
262
Jens Geyera019cda2019-11-09 23:24:52 +0100263
264 TTcpSocketStreamImpl = class( TThriftStreamImpl)
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200265{$IFDEF OLD_SOCKETS}
Jens Geyerfad7fd32019-11-09 23:24:52 +0100266 strict private type
Jens Geyerd5436f52014-10-03 19:50:38 +0200267 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
Jens Geyerfad7fd32019-11-09 23:24:52 +0100268 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200269 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200270 FTimeout : Integer;
271 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
272 TimeOut: Integer; var wsaError : Integer): Integer;
273 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200274 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200275{$ELSE}
276 FTcpClient: TSocket;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100277 strict protected const
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200278 SLEEP_TIME = 200;
279{$ENDIF}
Jens Geyerfad7fd32019-11-09 23:24:52 +0100280 strict protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200281 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
282 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200283 procedure Open; override;
284 procedure Close; override;
285 procedure Flush; override;
286
287 function IsOpen: Boolean; override;
288 function ToArray: TBytes; override;
289 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200290{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100291 constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200292{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100293 constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200294{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200295 end;
296
297 IStreamTransport = interface( ITransport )
298 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
299 function GetInputStream: IThriftStream;
300 function GetOutputStream: IThriftStream;
301 property InputStream : IThriftStream read GetInputStream;
302 property OutputStream : IThriftStream read GetOutputStream;
303 end;
304
Jens Geyera019cda2019-11-09 23:24:52 +0100305 TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
Jens Geyer5a781c22025-02-04 23:35:55 +0100306 strict private
307 FInternalInputStream : IThriftStream;
308 FInternalOutputStream : IThriftStream;
309
Jens Geyerfad7fd32019-11-09 23:24:52 +0100310 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200311 function GetIsOpen: Boolean; override;
312
Jens Geyer5a781c22025-02-04 23:35:55 +0100313 function GetInputStream: IThriftStream; inline;
314 procedure SetInputStream( const stream : IThriftStream);
315
316 function GetOutputStream: IThriftStream; inline;
317 procedure SetOutputStream( const stream : IThriftStream);
Jens Geyerd5436f52014-10-03 19:50:38 +0200318
Jens Geyer41f47af2019-11-09 23:24:52 +0100319 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200320 procedure Open; override;
321 procedure Close; override;
322 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200323 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
324 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyer5a781c22025-02-04 23:35:55 +0100325
326 procedure UpdateKnownMessageSize(const size : Int64); override;
Jens Geyered994552019-11-09 23:24:52 +0100327 public
Jens Geyera019cda2019-11-09 23:24:52 +0100328 constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
Jens Geyerd5436f52014-10-03 19:50:38 +0200329 destructor Destroy; override;
Jens Geyered994552019-11-09 23:24:52 +0100330
331 property InputStream : IThriftStream read GetInputStream;
332 property OutputStream : IThriftStream read GetOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200333 end;
334
335 TBufferedStreamImpl = class( TThriftStreamImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100336 strict private
Jens Geyerd5436f52014-10-03 19:50:38 +0200337 FStream : IThriftStream;
338 FBufSize : Integer;
Jens Geyerf726ae32021-06-04 11:17:26 +0200339 FReadBuffer : TThriftMemoryStream;
340 FWriteBuffer : TThriftMemoryStream;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100341 strict protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200342 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
343 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200344 procedure Open; override;
345 procedure Close; override;
346 procedure Flush; override;
347 function IsOpen: Boolean; override;
348 function ToArray: TBytes; override;
Jens Geyer5a781c22025-02-04 23:35:55 +0100349 function CanSeek : Boolean; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100350 function Size : Int64; override;
351 function Position : Int64; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200352 public
Jens Geyered994552019-11-09 23:24:52 +0100353 constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200354 destructor Destroy; override;
355 end;
356
357 TServerSocketImpl = class( TServerTransportImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100358 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200359{$IFDEF OLD_SOCKETS}
360 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200361 FPort : Integer;
362 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200363{$ELSE}
364 FServer: TServerSocket;
365{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200366 FUseBufferedSocket : Boolean;
367 FOwnsServer : Boolean;
Jens Geyer41f47af2019-11-09 23:24:52 +0100368
Jens Geyerfad7fd32019-11-09 23:24:52 +0100369 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200370 function Accept( const fnAccepting: TProc) : ITransport; override;
Jens Geyer41f47af2019-11-09 23:24:52 +0100371
Jens Geyerd5436f52014-10-03 19:50:38 +0200372 public
Jens Geyera019cda2019-11-09 23:24:52 +0100373 {$IFDEF OLD_SOCKETS}
374 constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
375 constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
376 {$ELSE}
377 constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
378 constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
379 {$ENDIF}
380
Jens Geyerd5436f52014-10-03 19:50:38 +0200381 destructor Destroy; override;
382 procedure Listen; override;
383 procedure Close; override;
384 end;
385
Jens Geyera019cda2019-11-09 23:24:52 +0100386 TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100387 strict private
Jens Geyerd5436f52014-10-03 19:50:38 +0200388 FInputBuffer : IThriftStream;
389 FOutputBuffer : IThriftStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200390 FBufSize : Integer;
391
392 procedure InitBuffers;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100393 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200394 function GetIsOpen: Boolean; override;
395 procedure Flush; override;
396 public
Jens Geyered994552019-11-09 23:24:52 +0100397 type
398 TFactory = class( TTransportFactoryImpl )
399 public
400 function GetTransport( const aTransport: ITransport): ITransport; override;
401 end;
402
403 constructor Create( const aTransport : IStreamTransport; const aBufSize: Integer = 1024);
Jens Geyerd5436f52014-10-03 19:50:38 +0200404 procedure Open(); override;
405 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200406 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
407 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100408 procedure CheckReadBytesAvailable( const value : Int64); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200409 property IsOpen: Boolean read GetIsOpen;
410 end;
411
412 TSocketImpl = class(TStreamTransportImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100413 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200414{$IFDEF OLD_SOCKETS}
415 FClient : TCustomIpClient;
416{$ELSE}
417 FClient: TSocket;
418{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200419 FOwnsClient : Boolean;
420 FHost : string;
421 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200422{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200423 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200424{$ELSE}
425 FTimeout : Longword;
426{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200427
428 procedure InitSocket;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100429 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200430 function GetIsOpen: Boolean; override;
431 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200432{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100433 constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
434 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200435{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100436 constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
437 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200438{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200439 destructor Destroy; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100440
441 procedure Open; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200442 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200443{$IFDEF OLD_SOCKETS}
444 property TcpClient: TCustomIpClient read FClient;
445{$ELSE}
446 property TcpClient: TSocket read FClient;
447{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200448 property Host : string read FHost;
449 property Port: Integer read FPort;
450 end;
451
Jens Geyera019cda2019-11-09 23:24:52 +0100452 TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
Jens Geyer2646bd62019-11-09 23:24:52 +0100453 strict protected type
454 TFramedHeader = Int32;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100455 strict protected
Jens Geyerf726ae32021-06-04 11:17:26 +0200456 FWriteBuffer : TThriftMemoryStream;
457 FReadBuffer : TThriftMemoryStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200458
459 procedure InitWriteBuffer;
460 procedure ReadFrame;
Jens Geyerd5436f52014-10-03 19:50:38 +0200461
462 procedure Open(); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100463 function GetIsOpen: Boolean; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200464
465 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200466 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
467 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100468 procedure CheckReadBytesAvailable( const value : Int64); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200469 procedure Flush; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100470
Jens Geyered994552019-11-09 23:24:52 +0100471 public
472 type
473 TFactory = class( TTransportFactoryImpl )
474 public
475 function GetTransport( const aTransport: ITransport): ITransport; override;
476 end;
477
478 constructor Create( const aTransport: ITransport); overload;
479 destructor Destroy; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200480 end;
481
Jens Geyerd5436f52014-10-03 19:50:38 +0200482
483const
Jens Geyer47f63172019-06-06 22:42:58 +0200484 DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
485
Jens Geyerd5436f52014-10-03 19:50:38 +0200486implementation
487
Jens Geyered994552019-11-09 23:24:52 +0100488
Jens Geyera019cda2019-11-09 23:24:52 +0100489{ TTransportBase }
Jens Geyer41f47af2019-11-09 23:24:52 +0100490
Jens Geyera019cda2019-11-09 23:24:52 +0100491procedure TTransportBase.Flush;
Jens Geyerd5436f52014-10-03 19:50:38 +0200492begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200493 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200494end;
495
Jens Geyera019cda2019-11-09 23:24:52 +0100496function TTransportBase.Peek: Boolean;
Jens Geyerd5436f52014-10-03 19:50:38 +0200497begin
498 Result := IsOpen;
499end;
500
Jens Geyera019cda2019-11-09 23:24:52 +0100501function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200502begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200503 if Length(buf) > 0
504 then result := Read( @buf[0], Length(buf), off, len)
505 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200506end;
507
Jens Geyera019cda2019-11-09 23:24:52 +0100508function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200509begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200510 if Length(buf) > 0
511 then result := ReadAll( @buf[0], Length(buf), off, len)
512 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200513end;
514
Jens Geyera019cda2019-11-09 23:24:52 +0100515function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200516var ret : Integer;
517begin
518 result := 0;
519 while result < len do begin
520 ret := Read( pBuf, buflen, off + result, len - result);
521 if ret > 0
522 then Inc( result, ret)
523 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
524 end;
525end;
526
Jens Geyera019cda2019-11-09 23:24:52 +0100527procedure TTransportBase.Write( const buf: TBytes);
Jens Geyered994552019-11-09 23:24:52 +0100528begin
529 if Length(buf) > 0
530 then Write( @buf[0], 0, Length(buf));
531end;
532
Jens Geyera019cda2019-11-09 23:24:52 +0100533procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
Jens Geyered994552019-11-09 23:24:52 +0100534begin
535 if Length(buf) > 0
536 then Write( @buf[0], off, len);
537end;
538
Jens Geyera019cda2019-11-09 23:24:52 +0100539procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200540begin
541 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200542end;
543
Jens Geyered994552019-11-09 23:24:52 +0100544
Jens Geyera019cda2019-11-09 23:24:52 +0100545{ TEndpointTransportBase }
546
547constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
Jens Geyer41f47af2019-11-09 23:24:52 +0100548begin
Jens Geyera019cda2019-11-09 23:24:52 +0100549 inherited Create;
550
551 if aConfig <> nil
552 then FConfiguration := aConfig
553 else FConfiguration := TThriftConfigurationImpl.Create;
554
Jens Geyer5a781c22025-02-04 23:35:55 +0100555 ResetMessageSizeAndConsumedBytes;
Jens Geyer41f47af2019-11-09 23:24:52 +0100556end;
557
558
Jens Geyera019cda2019-11-09 23:24:52 +0100559function TEndpointTransportBase.Configuration : IThriftConfiguration;
Jens Geyer41f47af2019-11-09 23:24:52 +0100560begin
Jens Geyera019cda2019-11-09 23:24:52 +0100561 result := FConfiguration;
Jens Geyer41f47af2019-11-09 23:24:52 +0100562end;
563
564
Jens Geyera019cda2019-11-09 23:24:52 +0100565function TEndpointTransportBase.MaxMessageSize : Integer;
566begin
567 ASSERT( Configuration <> nil);
568 result := Configuration.MaxMessageSize;
569end;
570
571
Jens Geyer5a781c22025-02-04 23:35:55 +0100572procedure TEndpointTransportBase.ResetMessageSizeAndConsumedBytes( const newSize : Int64);
Jens Geyera019cda2019-11-09 23:24:52 +0100573// Resets RemainingMessageSize to the configured maximum
574begin
575 // full reset
576 if newSize < 0 then begin
577 FKnownMessageSize := MaxMessageSize;
578 FRemainingMessageSize := MaxMessageSize;
579 Exit;
580 end;
581
582 // update only: message size can shrink, but not grow
583 ASSERT( KnownMessageSize <= MaxMessageSize);
584 if newSize > KnownMessageSize
Jens Geyerb0123182020-02-12 12:16:19 +0100585 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
Jens Geyera019cda2019-11-09 23:24:52 +0100586
587 FKnownMessageSize := newSize;
588 FRemainingMessageSize := newSize;
589end;
590
591
592procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
Jens Geyer5a781c22025-02-04 23:35:55 +0100593// Updates RemainingMessageSize to reflect the known real message size (e.g. framed transport).
Jens Geyera019cda2019-11-09 23:24:52 +0100594// Will throw if we already consumed too many bytes.
595var consumed : Int64;
596begin
597 consumed := KnownMessageSize - RemainingMessageSize;
Jens Geyer5a781c22025-02-04 23:35:55 +0100598 ResetMessageSizeAndConsumedBytes(size);
Jens Geyera019cda2019-11-09 23:24:52 +0100599 CountConsumedMessageBytes(consumed);
600end;
601
602
603procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
604// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
605begin
Jens Geyer73f5bd42022-09-03 14:19:31 +0200606 if (RemainingMessageSize < numBytes) or (numBytes < 0)
Jens Geyera019cda2019-11-09 23:24:52 +0100607 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
608end;
609
610
611procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
612// Consumes numBytes from the RemainingMessageSize.
613begin
Jens Geyer73f5bd42022-09-03 14:19:31 +0200614 if (RemainingMessageSize >= numBytes) and (numBytes >= 0)
Jens Geyera019cda2019-11-09 23:24:52 +0100615 then Dec( FRemainingMessageSize, numBytes)
616 else begin
617 FRemainingMessageSize := 0;
618 raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
619 end;
620end;
621
622{ TLayeredTransportBase }
623
624constructor TLayeredTransportBase<T>.Create( const aTransport: T);
625begin
626 inherited Create;
627 FTransport := aTransport;
628end;
629
630function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
631begin
632 result := InnerTransport;
633end;
634
635function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
636begin
637 result := InnerTransport.Configuration;
638end;
639
640procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
641begin
642 InnerTransport.UpdateKnownMessageSize( size);
643end;
644
645
646function TLayeredTransportBase<T>.MaxMessageSize : Integer;
647begin
648 result := InnerTransport.MaxMessageSize;
649end;
650
651
Jens Geyer5a781c22025-02-04 23:35:55 +0100652procedure TLayeredTransportBase<T>.ResetMessageSizeAndConsumedBytes( const knownSize : Int64 = -1);
Jens Geyera019cda2019-11-09 23:24:52 +0100653begin
Jens Geyer5a781c22025-02-04 23:35:55 +0100654 InnerTransport.ResetMessageSizeAndConsumedBytes( knownSize);
Jens Geyera019cda2019-11-09 23:24:52 +0100655end;
656
657
658procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
659begin
660 InnerTransport.CheckReadBytesAvailable( numBytes);
661end;
662
663
664
Jens Geyerd5436f52014-10-03 19:50:38 +0200665{ TTransportException }
666
Jens Geyere0e32402016-04-20 21:50:48 +0200667constructor TTransportException.HiddenCreate(const Msg: string);
668begin
669 inherited Create(Msg);
670end;
671
Jens Geyered994552019-11-09 23:24:52 +0100672class function TTransportException.Create(aType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200673begin
674 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200675{$WARN SYMBOL_DEPRECATED OFF}
Jens Geyered994552019-11-09 23:24:52 +0100676 Result := Create(aType, '')
Jens Geyere0e32402016-04-20 21:50:48 +0200677{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200678end;
679
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100680class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200681begin
Jens Geyered994552019-11-09 23:24:52 +0100682 case aType of
Jens Geyere0e32402016-04-20 21:50:48 +0200683 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
684 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
685 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
686 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
687 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
688 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
689 else
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100690 ASSERT( TExceptionType.Unknown = aType);
Jens Geyere0e32402016-04-20 21:50:48 +0200691 Result := TTransportExceptionUnknown.Create(msg);
692 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200693end;
694
Jens Geyere0e32402016-04-20 21:50:48 +0200695class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200696begin
Jens Geyere0e32402016-04-20 21:50:48 +0200697 Result := TTransportExceptionUnknown.Create(Msg);
698end;
699
700{ TTransportExceptionSpecialized }
701
702constructor TTransportExceptionSpecialized.Create(const Msg: string);
703begin
704 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200705end;
706
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100707{ specialized TTransportExceptions }
708
709class function TTransportExceptionUnknown.GetType: TTransportException.TExceptionType;
710begin
711 result := TExceptionType.Unknown;
712end;
713
714class function TTransportExceptionNotOpen.GetType: TTransportException.TExceptionType;
715begin
716 result := TExceptionType.NotOpen;
717end;
718
719class function TTransportExceptionAlreadyOpen.GetType: TTransportException.TExceptionType;
720begin
721 result := TExceptionType.AlreadyOpen;
722end;
723
724class function TTransportExceptionTimedOut.GetType: TTransportException.TExceptionType;
725begin
726 result := TExceptionType.TimedOut;
727end;
728
729class function TTransportExceptionEndOfFile.GetType: TTransportException.TExceptionType;
730begin
731 result := TExceptionType.EndOfFile;
732end;
733
734class function TTransportExceptionBadArgs.GetType: TTransportException.TExceptionType;
735begin
736 result := TExceptionType.BadArgs;
737end;
738
739class function TTransportExceptionInterrupted.GetType: TTransportException.TExceptionType;
740begin
741 result := TExceptionType.Interrupted;
742end;
743
Jens Geyer2646bd62019-11-09 23:24:52 +0100744class function TTransportExceptionCorruptedData.GetType: TTransportException.TExceptionType;
745begin
746 result := TExceptionType.CorruptedData;
747end;
748
Jens Geyerd5436f52014-10-03 19:50:38 +0200749{ TTransportFactoryImpl }
750
Jens Geyered994552019-11-09 23:24:52 +0100751function TTransportFactoryImpl.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200752begin
Jens Geyered994552019-11-09 23:24:52 +0100753 Result := aTransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200754end;
755
Jens Geyera019cda2019-11-09 23:24:52 +0100756
757{ TServerTransportImpl }
758
759constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
760begin
761 inherited Create;
762 if aConfig <> nil
763 then FConfig := aConfig
764 else FConfig := TThriftConfigurationImpl.Create;
765end;
766
767function TServerTransportImpl.Configuration : IThriftConfiguration;
768begin
769 result := FConfig;
770end;
771
Jens Geyerd5436f52014-10-03 19:50:38 +0200772{ TServerSocket }
773
Jens Geyer23d67462015-12-19 11:44:57 +0100774{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100775constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200776{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100777constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100778{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200779begin
Jens Geyera019cda2019-11-09 23:24:52 +0100780 inherited Create( aConfig);
Jens Geyered994552019-11-09 23:24:52 +0100781 FServer := aServer;
Jens Geyera019cda2019-11-09 23:24:52 +0100782
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200783
784{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +0100785 FClientTimeout := aClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200786{$ELSE}
Jens Geyered994552019-11-09 23:24:52 +0100787 FServer.RecvTimeout := aClientTimeout;
788 FServer.SendTimeout := aClientTimeout;
789{$ENDIF}
790end;
791
792
793{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100794constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100795{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100796constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200797{$ENDIF}
798begin
Jens Geyera019cda2019-11-09 23:24:52 +0100799 inherited Create( aConfig);
Jens Geyer41f47af2019-11-09 23:24:52 +0100800
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200801{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +0100802 FPort := aPort;
803 FClientTimeout := aClientTimeout;
804
805 FOwnsServer := True;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200806 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200807 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200808 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200809 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200810 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200811 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200812 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200813{$ELSE}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200814 FOwnsServer := True;
Jens Geyered994552019-11-09 23:24:52 +0100815 FServer := TServerSocket.Create(aPort, aClientTimeout, aClientTimeout);
816{$ENDIF}
817
818 FUseBufferedSocket := aUseBufferedSockets;
Jens Geyerd5436f52014-10-03 19:50:38 +0200819end;
820
821destructor TServerSocketImpl.Destroy;
822begin
823 if FOwnsServer then begin
824 FServer.Free;
825 FServer := nil;
826 end;
827 inherited;
828end;
829
830function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
831var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200832{$IFDEF OLD_SOCKETS}
833 client : TCustomIpClient;
834{$ELSE}
835 client: TSocket;
836{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200837 trans : IStreamTransport;
838begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100839 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200840 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200841 end;
842
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200843{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200844 client := nil;
845 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200846 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200847
848 if Assigned(fnAccepting)
849 then fnAccepting();
850
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100851 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200852 client.Free;
853 Result := nil;
854 Exit;
855 end;
856
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100857 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200858 Result := nil;
859 Exit;
860 end;
861
Jens Geyera019cda2019-11-09 23:24:52 +0100862 trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
Jens Geyerd5436f52014-10-03 19:50:38 +0200863 client := nil; // trans owns it now
864
865 if FUseBufferedSocket
866 then result := TBufferedTransportImpl.Create( trans)
867 else result := trans;
868
869 except
870 on E: Exception do begin
871 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200872 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200873 end;
874 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200875{$ELSE}
876 if Assigned(fnAccepting) then
877 fnAccepting();
878
879 client := FServer.Accept;
880 try
Jens Geyera019cda2019-11-09 23:24:52 +0100881 trans := TSocketImpl.Create(client, TRUE, Configuration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200882 client := nil;
883
884 if FUseBufferedSocket then
885 Result := TBufferedTransportImpl.Create(trans)
886 else
887 Result := trans;
888 except
889 client.Free;
890 raise;
891 end;
892{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200893end;
894
895procedure TServerSocketImpl.Listen;
896begin
897 if FServer <> nil then
898 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200899{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200900 try
901 FServer.Active := True;
902 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200903 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200904 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200905 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200906{$ELSE}
907 FServer.Listen;
908{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200909 end;
910end;
911
912procedure TServerSocketImpl.Close;
913begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200914 if FServer <> nil then
915{$IFDEF OLD_SOCKETS}
916 try
917 FServer.Active := False;
918 except
919 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200920 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200921 end;
922{$ELSE}
923 FServer.Close;
924{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200925end;
926
927{ TSocket }
928
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200929{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100930constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100931{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100932constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100933{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200934var stream : IThriftStream;
935begin
Jens Geyered994552019-11-09 23:24:52 +0100936 FClient := aClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200937 FOwnsClient := aOwnsClient;
Jens Geyered994552019-11-09 23:24:52 +0100938
939{$IFDEF OLD_SOCKETS}
940 FTimeout := aTimeout;
941{$ELSE}
942 FTimeout := aClient.RecvTimeout;
943{$ENDIF}
944
Jens Geyerd5436f52014-10-03 19:50:38 +0200945 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
Jens Geyera019cda2019-11-09 23:24:52 +0100946 inherited Create( stream, stream, aConfig);
Jens Geyerd5436f52014-10-03 19:50:38 +0200947end;
948
Jens Geyera019cda2019-11-09 23:24:52 +0100949
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200950{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100951constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200952{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100953constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200954{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200955begin
Jens Geyera019cda2019-11-09 23:24:52 +0100956 inherited Create(nil,nil, aConfig);
Jens Geyered994552019-11-09 23:24:52 +0100957 FHost := aHost;
958 FPort := aPort;
959 FTimeout := aTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200960 InitSocket;
961end;
962
963destructor TSocketImpl.Destroy;
964begin
965 if FOwnsClient
966 then FreeAndNil( FClient);
967 inherited;
968end;
969
970procedure TSocketImpl.Close;
971begin
972 inherited Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200973
Jens Geyer5a781c22025-02-04 23:35:55 +0100974 SetInputStream( nil);
975 SetOutputStream( nil);
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200976
Jens Geyerd5436f52014-10-03 19:50:38 +0200977 if FOwnsClient
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200978 then FreeAndNil( FClient)
979 else FClient := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200980end;
981
982function TSocketImpl.GetIsOpen: Boolean;
983begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200984{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200985 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200986{$ELSE}
987 Result := (FClient <> nil) and FClient.IsOpen
988{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200989end;
990
991procedure TSocketImpl.InitSocket;
992var
993 stream : IThriftStream;
994begin
995 if FOwnsClient
996 then FreeAndNil( FClient)
997 else FClient := nil;
998
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200999{$IFDEF OLD_SOCKETS}
1000 FClient := TTcpClient.Create( nil);
1001{$ELSE}
1002 FClient := TSocket.Create(FHost, FPort);
1003{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001004 FOwnsClient := True;
1005
1006 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
Jens Geyer5a781c22025-02-04 23:35:55 +01001007 SetInputStream( stream);
1008 SetOutputStream( stream);
Jens Geyerd5436f52014-10-03 19:50:38 +02001009end;
1010
1011procedure TSocketImpl.Open;
1012begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001013 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001014 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +02001015 end;
1016
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001017 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001018 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +02001019 end;
1020
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001021 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001022 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +02001023 end;
1024
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001025 if FClient = nil
1026 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +02001027
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001028{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001029 FClient.RemoteHost := TSocketHost( Host);
1030 FClient.RemotePort := TSocketPort( IntToStr( Port));
1031 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001032{$ELSE}
1033 FClient.Open;
1034{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001035
Jens Geyer5a781c22025-02-04 23:35:55 +01001036 SetInputStream( TTcpSocketStreamImpl.Create( FClient, FTimeout));
1037 SetOutputStream( InputStream); // same
Jens Geyerd5436f52014-10-03 19:50:38 +02001038end;
1039
1040{ TBufferedStream }
1041
1042procedure TBufferedStreamImpl.Close;
1043begin
1044 Flush;
1045 FStream := nil;
1046
1047 FReadBuffer.Free;
1048 FReadBuffer := nil;
1049
1050 FWriteBuffer.Free;
1051 FWriteBuffer := nil;
1052end;
1053
Jens Geyered994552019-11-09 23:24:52 +01001054constructor TBufferedStreamImpl.Create( const aStream: IThriftStream; const aBufSize : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001055begin
1056 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001057 FStream := aStream;
1058 FBufSize := aBufSize;
Jens Geyerf726ae32021-06-04 11:17:26 +02001059 FReadBuffer := TThriftMemoryStream.Create(FBufSize);
1060 FWriteBuffer := TThriftMemoryStream.Create(FBufSize);
Jens Geyerd5436f52014-10-03 19:50:38 +02001061end;
1062
1063destructor TBufferedStreamImpl.Destroy;
1064begin
1065 Close;
1066 inherited;
1067end;
1068
1069procedure TBufferedStreamImpl.Flush;
1070var
1071 buf : TBytes;
1072 len : Integer;
1073begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001074 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001075 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001076 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001077 SetLength( buf, len );
1078 FWriteBuffer.Position := 0;
1079 FWriteBuffer.Read( Pointer(@buf[0])^, len );
1080 FStream.Write( buf, 0, len );
1081 end;
1082 FWriteBuffer.Clear;
1083 end;
1084end;
1085
1086function TBufferedStreamImpl.IsOpen: Boolean;
1087begin
1088 Result := (FWriteBuffer <> nil)
1089 and (FReadBuffer <> nil)
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001090 and (FStream <> nil)
1091 and FStream.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001092end;
1093
1094procedure TBufferedStreamImpl.Open;
1095begin
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001096 FStream.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +02001097end;
1098
Jens Geyer17c3ad92017-09-05 20:31:27 +02001099function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001100var
1101 nRead : Integer;
1102 tempbuf : TBytes;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001103 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001104begin
1105 inherited;
1106 Result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001107
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001108 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001109 while count > 0 do begin
1110
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001111 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001112 FReadBuffer.Clear;
1113 SetLength( tempbuf, FBufSize);
1114 nRead := FStream.Read( tempbuf, 0, FBufSize );
1115 if nRead = 0 then Break; // avoid infinite loop
1116
1117 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
1118 FReadBuffer.Position := 0;
1119 end;
1120
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001121 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001122 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
1123 pTmp := pBuf;
1124 Inc( pTmp, offset);
1125 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +02001126 Dec( count, nRead);
1127 Inc( offset, nRead);
1128 end;
1129 end;
1130 end;
1131end;
1132
Jens Geyered994552019-11-09 23:24:52 +01001133
Jens Geyerd5436f52014-10-03 19:50:38 +02001134function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001135var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001136begin
Jens Geyera019cda2019-11-09 23:24:52 +01001137 if IsOpen
1138 then len := FReadBuffer.Size
1139 else len := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001140
1141 SetLength( Result, len);
1142
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001143 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001144 FReadBuffer.Position := 0;
1145 FReadBuffer.Read( Pointer(@Result[0])^, len );
1146 end;
1147end;
1148
Jens Geyer17c3ad92017-09-05 20:31:27 +02001149procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001150var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001151begin
1152 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001153 if count > 0 then begin
1154 if IsOpen then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001155 pTmp := pBuf;
1156 Inc( pTmp, offset);
1157 FWriteBuffer.Write( pTmp^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001158 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001159 Flush;
1160 end;
1161 end;
1162 end;
1163end;
1164
Jens Geyera019cda2019-11-09 23:24:52 +01001165
Jens Geyer5a781c22025-02-04 23:35:55 +01001166function TBufferedStreamImpl.CanSeek : Boolean;
1167begin
1168 result := TRUE;
1169end;
1170
1171
Jens Geyera019cda2019-11-09 23:24:52 +01001172function TBufferedStreamImpl.Size : Int64;
1173begin
1174 result := FReadBuffer.Size;
1175end;
1176
1177
1178function TBufferedStreamImpl.Position : Int64;
1179begin
1180 result := FReadBuffer.Position;
1181end;
1182
1183
Jens Geyerd5436f52014-10-03 19:50:38 +02001184{ TStreamTransportImpl }
1185
Jens Geyera019cda2019-11-09 23:24:52 +01001186constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
Jens Geyerd5436f52014-10-03 19:50:38 +02001187begin
Jens Geyera019cda2019-11-09 23:24:52 +01001188 inherited Create( aConfig);
Jens Geyer5a781c22025-02-04 23:35:55 +01001189 SetInputStream( aInputStream);
1190 SetOutputStream( aOutputStream);
Jens Geyerd5436f52014-10-03 19:50:38 +02001191end;
1192
1193destructor TStreamTransportImpl.Destroy;
1194begin
Jens Geyer5a781c22025-02-04 23:35:55 +01001195 SetInputStream( nil);
1196 SetInputStream( nil);
Jens Geyerd5436f52014-10-03 19:50:38 +02001197 inherited;
1198end;
1199
Jens Geyer20e727e2018-06-22 22:39:57 +02001200procedure TStreamTransportImpl.Close;
1201begin
Jens Geyer5a781c22025-02-04 23:35:55 +01001202 SetInputStream( nil);
1203 SetInputStream( nil);
Jens Geyer20e727e2018-06-22 22:39:57 +02001204end;
1205
Jens Geyerd5436f52014-10-03 19:50:38 +02001206procedure TStreamTransportImpl.Flush;
1207begin
Jens Geyer5a781c22025-02-04 23:35:55 +01001208 if OutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001209 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001210 end;
1211
Jens Geyer5a781c22025-02-04 23:35:55 +01001212 OutputStream.Flush;
Jens Geyerd5436f52014-10-03 19:50:38 +02001213end;
1214
1215function TStreamTransportImpl.GetInputStream: IThriftStream;
1216begin
Jens Geyer5a781c22025-02-04 23:35:55 +01001217 Result := FInternalInputStream;
1218end;
1219
1220procedure TStreamTransportImpl.SetInputStream( const stream : IThriftStream);
1221begin
1222 FInternalInputStream := stream;
1223 ResetMessageSizeAndConsumedBytes(-1); // full reset to configured maximum
1224 UpdateKnownMessageSize( -1); // adjust to real stream size
1225end;
1226
1227function TStreamTransportImpl.GetOutputStream: IThriftStream;
1228begin
1229 Result := FInternalOutputStream;
1230end;
1231
1232procedure TStreamTransportImpl.SetOutputStream( const stream : IThriftStream);
1233begin
1234 FInternalOutputStream := stream;
Jens Geyerd5436f52014-10-03 19:50:38 +02001235end;
1236
1237function TStreamTransportImpl.GetIsOpen: Boolean;
1238begin
1239 Result := True;
1240end;
1241
Jens Geyerd5436f52014-10-03 19:50:38 +02001242procedure TStreamTransportImpl.Open;
1243begin
Jens Geyer2646bd62019-11-09 23:24:52 +01001244 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +02001245end;
1246
Jens Geyer17c3ad92017-09-05 20:31:27 +02001247function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001248begin
Jens Geyer5a781c22025-02-04 23:35:55 +01001249 if InputStream = nil
Jens Geyered994552019-11-09 23:24:52 +01001250 then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001251
Jens Geyer5a781c22025-02-04 23:35:55 +01001252 Result := InputStream.Read( pBuf,buflen, off, len );
Jens Geyera019cda2019-11-09 23:24:52 +01001253 CountConsumedMessageBytes( result);
Jens Geyerd5436f52014-10-03 19:50:38 +02001254end;
1255
Jens Geyer17c3ad92017-09-05 20:31:27 +02001256procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001257begin
Jens Geyer5a781c22025-02-04 23:35:55 +01001258 if OutputStream = nil
Jens Geyered994552019-11-09 23:24:52 +01001259 then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001260
Jens Geyer5a781c22025-02-04 23:35:55 +01001261 OutputStream.Write( pBuf, off, len );
1262end;
1263
1264
1265procedure TStreamTransportImpl.UpdateKnownMessageSize(const size : Int64);
1266var adjusted : Int64;
1267begin
1268 if InputStream = nil
1269 then adjusted := 0
1270 else begin
1271 adjusted := MaxMessageSize;
1272 if size > 0
1273 then adjusted := Math.Min( adjusted, size);
1274 if InputStream.CanSeek
1275 then adjusted := Math.Min( adjusted, InputStream.Size);
1276 end;
1277
1278 inherited UpdateKnownMessageSize( adjusted);
Jens Geyerd5436f52014-10-03 19:50:38 +02001279end;
1280
1281{ TBufferedTransportImpl }
1282
Jens Geyered994552019-11-09 23:24:52 +01001283constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001284begin
Jens Geyered994552019-11-09 23:24:52 +01001285 ASSERT( aTransport <> nil);
Jens Geyera019cda2019-11-09 23:24:52 +01001286 inherited Create( aTransport);
Jens Geyered994552019-11-09 23:24:52 +01001287 FBufSize := aBufSize;
Jens Geyerd5436f52014-10-03 19:50:38 +02001288 InitBuffers;
1289end;
1290
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001291procedure TBufferedTransportImpl.Close;
1292begin
Jens Geyera019cda2019-11-09 23:24:52 +01001293 InnerTransport.Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001294 FInputBuffer := nil;
Jens Geyered994552019-11-09 23:24:52 +01001295 FOutputBuffer := nil;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001296end;
1297
Jens Geyerd5436f52014-10-03 19:50:38 +02001298procedure TBufferedTransportImpl.Flush;
1299begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001300 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001301 FOutputBuffer.Flush;
1302 end;
1303end;
1304
1305function TBufferedTransportImpl.GetIsOpen: Boolean;
1306begin
Jens Geyera019cda2019-11-09 23:24:52 +01001307 Result := InnerTransport.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001308end;
1309
1310procedure TBufferedTransportImpl.InitBuffers;
1311begin
Jens Geyera019cda2019-11-09 23:24:52 +01001312 if InnerTransport.InputStream <> nil then begin
1313 FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
Jens Geyerd5436f52014-10-03 19:50:38 +02001314 end;
Jens Geyera019cda2019-11-09 23:24:52 +01001315 if InnerTransport.OutputStream <> nil then begin
1316 FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
Jens Geyerd5436f52014-10-03 19:50:38 +02001317 end;
1318end;
1319
1320procedure TBufferedTransportImpl.Open;
1321begin
Jens Geyera019cda2019-11-09 23:24:52 +01001322 InnerTransport.Open;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001323 InitBuffers; // we need to get the buffers to match FTransport substreams again
Jens Geyerd5436f52014-10-03 19:50:38 +02001324end;
1325
Jens Geyer17c3ad92017-09-05 20:31:27 +02001326function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001327begin
Jens Geyered994552019-11-09 23:24:52 +01001328 if FInputBuffer <> nil
Jens Geyera019cda2019-11-09 23:24:52 +01001329 then Result := FInputBuffer.Read( pBuf,buflen, off, len )
Jens Geyered994552019-11-09 23:24:52 +01001330 else Result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001331end;
1332
Jens Geyer17c3ad92017-09-05 20:31:27 +02001333procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001334begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001335 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001336 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001337 end;
1338end;
1339
Jens Geyera019cda2019-11-09 23:24:52 +01001340procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1341var buffered, need : Int64;
Jens Geyer41f47af2019-11-09 23:24:52 +01001342begin
1343 need := value;
1344
1345 // buffered bytes
Jens Geyera019cda2019-11-09 23:24:52 +01001346 buffered := FInputBuffer.Size - FInputBuffer.Position;
1347 if buffered < need
1348 then InnerTransport.CheckReadBytesAvailable( need - buffered);
Jens Geyer41f47af2019-11-09 23:24:52 +01001349end;
1350
Jens Geyera019cda2019-11-09 23:24:52 +01001351
Jens Geyered994552019-11-09 23:24:52 +01001352{ TBufferedTransportImpl.TFactory }
Jens Geyerd5436f52014-10-03 19:50:38 +02001353
Jens Geyered994552019-11-09 23:24:52 +01001354function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +02001355begin
Jens Geyered994552019-11-09 23:24:52 +01001356 Result := TFramedTransportImpl.Create( aTransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001357end;
1358
Jens Geyered994552019-11-09 23:24:52 +01001359
1360{ TFramedTransportImpl }
1361
1362constructor TFramedTransportImpl.Create( const aTransport: ITransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001363begin
Jens Geyered994552019-11-09 23:24:52 +01001364 ASSERT( aTransport <> nil);
Jens Geyera019cda2019-11-09 23:24:52 +01001365 inherited Create( aTransport);
Jens Geyer2646bd62019-11-09 23:24:52 +01001366
Jens Geyerd5436f52014-10-03 19:50:38 +02001367 InitWriteBuffer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001368end;
1369
1370destructor TFramedTransportImpl.Destroy;
1371begin
1372 FWriteBuffer.Free;
Jens Geyera019cda2019-11-09 23:24:52 +01001373 FWriteBuffer := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001374 FReadBuffer.Free;
Jens Geyera019cda2019-11-09 23:24:52 +01001375 FReadBuffer := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001376 inherited;
1377end;
1378
Jens Geyer2646bd62019-11-09 23:24:52 +01001379procedure TFramedTransportImpl.Close;
1380begin
Jens Geyera019cda2019-11-09 23:24:52 +01001381 InnerTransport.Close;
Jens Geyer2646bd62019-11-09 23:24:52 +01001382end;
1383
Jens Geyerd5436f52014-10-03 19:50:38 +02001384procedure TFramedTransportImpl.Flush;
1385var
1386 buf : TBytes;
1387 len : Integer;
Jens Geyera019cda2019-11-09 23:24:52 +01001388 data_len : Int64;
Jens Geyerd5436f52014-10-03 19:50:38 +02001389begin
Jens Geyer528a0f02019-11-18 20:17:03 +01001390 if not IsOpen
1391 then raise TTransportExceptionNotOpen.Create('not open');
1392
Jens Geyerd5436f52014-10-03 19:50:38 +02001393 len := FWriteBuffer.Size;
1394 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001395 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001396 System.Move( FWriteBuffer.Memory^, buf[0], len );
1397 end;
1398
Jens Geyer2646bd62019-11-09 23:24:52 +01001399 data_len := len - SizeOf(TFramedHeader);
Jens Geyera019cda2019-11-09 23:24:52 +01001400 if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
1401 then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
1402 else UpdateKnownMessageSize( len);
Jens Geyerd5436f52014-10-03 19:50:38 +02001403
1404 InitWriteBuffer;
1405
1406 buf[0] := Byte($FF and (data_len shr 24));
1407 buf[1] := Byte($FF and (data_len shr 16));
1408 buf[2] := Byte($FF and (data_len shr 8));
1409 buf[3] := Byte($FF and data_len);
1410
Jens Geyera019cda2019-11-09 23:24:52 +01001411 InnerTransport.Write( buf, 0, len );
1412 InnerTransport.Flush;
Jens Geyerd5436f52014-10-03 19:50:38 +02001413end;
1414
1415function TFramedTransportImpl.GetIsOpen: Boolean;
1416begin
Jens Geyera019cda2019-11-09 23:24:52 +01001417 Result := InnerTransport.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001418end;
1419
Jens Geyerd5436f52014-10-03 19:50:38 +02001420procedure TFramedTransportImpl.InitWriteBuffer;
Jens Geyer2646bd62019-11-09 23:24:52 +01001421const DUMMY_HEADER : TFramedHeader = 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001422begin
Jens Geyer528a0f02019-11-18 20:17:03 +01001423 FreeAndNil( FWriteBuffer);
Jens Geyerf726ae32021-06-04 11:17:26 +02001424 FWriteBuffer := TThriftMemoryStream.Create(1024);
Jens Geyer2646bd62019-11-09 23:24:52 +01001425 FWriteBuffer.Write( DUMMY_HEADER, SizeOf(DUMMY_HEADER));
Jens Geyerd5436f52014-10-03 19:50:38 +02001426end;
1427
1428procedure TFramedTransportImpl.Open;
1429begin
Jens Geyera019cda2019-11-09 23:24:52 +01001430 InnerTransport.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +02001431end;
1432
Jens Geyer17c3ad92017-09-05 20:31:27 +02001433function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001434var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001435begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001436 if len > (buflen-off)
1437 then len := buflen-off;
1438
Jens Geyer5089b0a2018-02-01 22:37:18 +01001439 pTmp := pBuf;
1440 Inc( pTmp, off);
1441
Jens Geyer17c3ad92017-09-05 20:31:27 +02001442 if (FReadBuffer <> nil) and (len > 0) then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001443 result := FReadBuffer.Read( pTmp^, len);
Jens Geyered994552019-11-09 23:24:52 +01001444 if result > 0 then Exit;
Jens Geyerd5436f52014-10-03 19:50:38 +02001445 end;
1446
1447 ReadFrame;
1448 if len > 0
Jens Geyer5089b0a2018-02-01 22:37:18 +01001449 then Result := FReadBuffer.Read( pTmp^, len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001450 else Result := 0;
1451end;
1452
1453procedure TFramedTransportImpl.ReadFrame;
1454var
Jens Geyer2646bd62019-11-09 23:24:52 +01001455 i32rd : packed array[0..SizeOf(TFramedHeader)-1] of Byte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001456 size : Integer;
1457 buff : TBytes;
1458begin
Jens Geyera019cda2019-11-09 23:24:52 +01001459 InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
Jens Geyerd5436f52014-10-03 19:50:38 +02001460 size :=
1461 ((i32rd[0] and $FF) shl 24) or
1462 ((i32rd[1] and $FF) shl 16) or
1463 ((i32rd[2] and $FF) shl 8) or
1464 (i32rd[3] and $FF);
Jens Geyer2646bd62019-11-09 23:24:52 +01001465
1466 if size < 0 then begin
1467 Close();
1468 raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
1469 end;
1470
Jens Geyera019cda2019-11-09 23:24:52 +01001471 if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
Jens Geyer2646bd62019-11-09 23:24:52 +01001472 Close();
Jens Geyer589ee5b2021-03-29 21:40:55 +02001473 if CharUtils.IsHtmlDoctype(size)
1474 then raise TTransportExceptionCorruptedData.Create('Remote end sends HTML instead of data')
1475 else raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
Jens Geyer2646bd62019-11-09 23:24:52 +01001476 end;
1477
Jens Geyera019cda2019-11-09 23:24:52 +01001478 UpdateKnownMessageSize(size + SizeOf(size));
1479
Jens Geyerd5436f52014-10-03 19:50:38 +02001480 SetLength( buff, size );
Jens Geyera019cda2019-11-09 23:24:52 +01001481 InnerTransport.ReadAll( buff, 0, size );
Jens Geyered994552019-11-09 23:24:52 +01001482
1483 FreeAndNil( FReadBuffer);
Jens Geyerf726ae32021-06-04 11:17:26 +02001484 FReadBuffer := TThriftMemoryStream.Create(1024);
Jens Geyera76e6c72017-09-08 21:03:30 +02001485 if Length(buff) > 0
1486 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001487 FReadBuffer.Position := 0;
1488end;
1489
Jens Geyer17c3ad92017-09-05 20:31:27 +02001490procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001491var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001492begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001493 if len > 0 then begin
1494 pTmp := pBuf;
1495 Inc( pTmp, off);
1496
1497 FWriteBuffer.Write( pTmp^, len );
1498 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001499end;
1500
Jens Geyered994552019-11-09 23:24:52 +01001501
Jens Geyera019cda2019-11-09 23:24:52 +01001502procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1503var buffered, need : Int64;
Jens Geyer41f47af2019-11-09 23:24:52 +01001504begin
Jens Geyera019cda2019-11-09 23:24:52 +01001505 need := value;
Jens Geyer41f47af2019-11-09 23:24:52 +01001506
Jens Geyera019cda2019-11-09 23:24:52 +01001507 // buffered bytes
1508 buffered := FReadBuffer.Size - FReadBuffer.Position;
1509 if buffered < need
1510 then InnerTransport.CheckReadBytesAvailable( need - buffered);
Jens Geyer41f47af2019-11-09 23:24:52 +01001511end;
1512
1513
Jens Geyerd5436f52014-10-03 19:50:38 +02001514{ TFramedTransport.TFactory }
1515
Jens Geyered994552019-11-09 23:24:52 +01001516function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +02001517begin
Jens Geyered994552019-11-09 23:24:52 +01001518 Result := TFramedTransportImpl.Create( aTransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001519end;
1520
1521{ TTcpSocketStreamImpl }
1522
1523procedure TTcpSocketStreamImpl.Close;
1524begin
1525 FTcpClient.Close;
1526end;
1527
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001528{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +01001529constructor TTcpSocketStreamImpl.Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001530begin
1531 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001532 FTcpClient := aTcpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +02001533 FTimeout := aTimeout;
1534end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001535{$ELSE}
Jens Geyered994552019-11-09 23:24:52 +01001536constructor TTcpSocketStreamImpl.Create( const aTcpClient: TSocket; const aTimeout : Longword);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001537begin
1538 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001539 FTcpClient := aTcpClient;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001540 if aTimeout = 0 then
1541 FTcpClient.RecvTimeout := SLEEP_TIME
1542 else
1543 FTcpClient.RecvTimeout := aTimeout;
1544 FTcpClient.SendTimeout := aTimeout;
1545end;
1546{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001547
1548procedure TTcpSocketStreamImpl.Flush;
1549begin
Jens Geyera019cda2019-11-09 23:24:52 +01001550 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +02001551end;
1552
Jens Geyera019cda2019-11-09 23:24:52 +01001553
Jens Geyerd5436f52014-10-03 19:50:38 +02001554function TTcpSocketStreamImpl.IsOpen: Boolean;
1555begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001556{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001557 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001558{$ELSE}
1559 Result := FTcpClient.IsOpen;
1560{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001561end;
1562
1563procedure TTcpSocketStreamImpl.Open;
1564begin
1565 FTcpClient.Open;
1566end;
1567
1568
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001569{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001570function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1571 TimeOut: Integer; var wsaError : Integer): Integer;
1572var
1573 ReadFds: TFDset;
1574 ReadFdsptr: PFDset;
1575 WriteFds: TFDset;
1576 WriteFdsptr: PFDset;
1577 ExceptFds: TFDset;
1578 ExceptFdsptr: PFDset;
1579 tv: timeval;
1580 Timeptr: PTimeval;
1581 socket : TSocket;
1582begin
1583 if not FTcpClient.Active then begin
1584 wsaError := WSAEINVAL;
1585 Exit( SOCKET_ERROR);
1586 end;
1587
1588 socket := FTcpClient.Handle;
1589
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001590 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001591 ReadFdsptr := @ReadFds;
1592 FD_ZERO(ReadFds);
1593 FD_SET(socket, ReadFds);
1594 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001595 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001596 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001597 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001598
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001599 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001600 WriteFdsptr := @WriteFds;
1601 FD_ZERO(WriteFds);
1602 FD_SET(socket, WriteFds);
1603 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001604 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001605 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001606 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001607
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001608 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001609 ExceptFdsptr := @ExceptFds;
1610 FD_ZERO(ExceptFds);
1611 FD_SET(socket, ExceptFds);
1612 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001613 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001614 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001615 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001616
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001617 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001618 tv.tv_sec := TimeOut div 1000;
1619 tv.tv_usec := 1000 * (TimeOut mod 1000);
1620 Timeptr := @tv;
1621 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001622 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001623 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001624 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001625
1626 wsaError := 0;
1627 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001628 {$IFDEF MSWINDOWS}
1629 {$IFDEF OLD_UNIT_NAMES}
1630 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1631 {$ELSE}
1632 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1633 {$ENDIF}
1634 {$ENDIF}
1635 {$IFDEF LINUX}
1636 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1637 {$ENDIF}
Jens Geyera019cda2019-11-09 23:24:52 +01001638
Jens Geyerd5436f52014-10-03 19:50:38 +02001639 if result = SOCKET_ERROR
1640 then wsaError := WSAGetLastError;
1641
1642 except
1643 result := SOCKET_ERROR;
1644 end;
1645
1646 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001647 ReadReady^ := FD_ISSET(socket, ReadFds);
1648
Jens Geyerd5436f52014-10-03 19:50:38 +02001649 if Assigned(WriteReady) then
1650 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001651
Jens Geyerd5436f52014-10-03 19:50:38 +02001652 if Assigned(ExceptFlag) then
1653 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1654end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001655{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001656
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001657{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001658function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1659 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001660 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001661var bCanRead, bError : Boolean;
1662 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001663const
1664 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001665begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001666 bytesReady := 0;
1667
Jens Geyerd5436f52014-10-03 19:50:38 +02001668 // The select function returns the total number of socket handles that are ready
1669 // and contained in the fd_set structures, zero if the time limit expired,
1670 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1671 // WSAGetLastError can be used to retrieve a specific error code.
1672 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1673 if retval = SOCKET_ERROR
1674 then Exit( TWaitForData.wfd_Error);
1675 if (retval = 0) or not bCanRead
1676 then Exit( TWaitForData.wfd_Timeout);
1677
1678 // recv() returns the number of bytes received, or -1 if an error occurred.
1679 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001680
1681 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001682 if retval <= 0
1683 then Exit( TWaitForData.wfd_Error);
1684
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001685 // at least we have some data
1686 bytesReady := Min( retval, DesiredBytes);
1687 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001688end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001689{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001690
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001691{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001692function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001693// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001694var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001695 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001696 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001697 nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001698 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001699begin
1700 inherited;
1701
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001702 if FTimeout > 0
1703 then msecs := FTimeout
1704 else msecs := DEFAULT_THRIFT_TIMEOUT;
1705
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001706 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001707 pTmp := pBuf;
1708 Inc( pTmp, offset);
Jens Geyerc140bb92019-11-27 22:18:12 +01001709 while (count > 0) and (result = 0) do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001710
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001711 while TRUE do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001712 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001713 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001714 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001715 TWaitForData.wfd_HaveData : Break;
1716 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001717 if (FTimeout = 0)
1718 then Exit
Jens Geyera019cda2019-11-09 23:24:52 +01001719 else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001720 end;
1721 else
1722 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001723 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001724 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001725
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001726 // reduce the timeout once we got data
1727 if FTimeout > 0
1728 then msecs := FTimeout div 10
1729 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1730 msecs := Max( msecs, 200);
1731
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001732 ASSERT( nBytes <= count);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001733 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1734 Inc( pTmp, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001735 Dec( count, nBytes);
1736 Inc( result, nBytes);
1737 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001738end;
1739
1740function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001741// old sockets version
1742var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001743begin
1744 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001745 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001746 len := FTcpClient.BytesReceived;
1747 end;
1748
1749 SetLength( Result, len );
1750
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001751 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001752 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1753 end;
1754end;
1755
Jens Geyer17c3ad92017-09-05 20:31:27 +02001756procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001757// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001758var bCanWrite, bError : Boolean;
1759 retval, wsaError : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001760 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001761begin
1762 inherited;
1763
1764 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001765 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001766
1767 // The select function returns the total number of socket handles that are ready
1768 // and contained in the fd_set structures, zero if the time limit expired,
1769 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1770 // WSAGetLastError can be used to retrieve a specific error code.
1771 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1772 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001773 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001774
Jens Geyerd5436f52014-10-03 19:50:38 +02001775 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001776 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001777
Jens Geyerd5436f52014-10-03 19:50:38 +02001778 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001779 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001780
Jens Geyer5089b0a2018-02-01 22:37:18 +01001781 pTmp := pBuf;
1782 Inc( pTmp, offset);
1783 FTcpClient.SendBuf( pTmp^, count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001784end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001785
1786{$ELSE}
1787
Jens Geyer17c3ad92017-09-05 20:31:27 +02001788function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001789// new sockets version
1790var nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001791 pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001792begin
1793 inherited;
1794
1795 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001796 pTmp := pBuf;
1797 Inc( pTmp, offset);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001798 while count > 0 do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001799 nBytes := FTcpClient.Read( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001800 if nBytes = 0 then Exit;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001801 Inc( pTmp, nBytes);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001802 Dec( count, nBytes);
1803 Inc( result, nBytes);
1804 end;
1805end;
1806
1807function TTcpSocketStreamImpl.ToArray: TBytes;
1808// new sockets version
1809var len : Integer;
1810begin
1811 len := 0;
1812 try
1813 if FTcpClient.Peek then
1814 repeat
1815 SetLength(Result, Length(Result) + 1024);
1816 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1817 until len < 1024;
1818 except
1819 on TTransportException do begin { don't allow default exceptions } end;
1820 else raise;
1821 end;
1822 if len > 0 then
1823 SetLength(Result, Length(Result) - 1024 + len);
1824end;
1825
Jens Geyer17c3ad92017-09-05 20:31:27 +02001826procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001827// new sockets version
Jens Geyer5089b0a2018-02-01 22:37:18 +01001828var pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001829begin
1830 inherited;
1831
1832 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001833 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001834
Jens Geyer5089b0a2018-02-01 22:37:18 +01001835 pTmp := pBuf;
1836 Inc( pTmp, offset);
1837 FTcpClient.Write( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001838end;
1839
Jens Geyer23d67462015-12-19 11:44:57 +01001840{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001841
Jens Geyerd5436f52014-10-03 19:50:38 +02001842
1843end.