blob: 6a69d93380f2ac1da4d7e6e26eb04d398c8be53f [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
Jens Geyer02230912019-04-03 01:12:51 +020032 WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer02230912019-04-03 01:12:51 +020034 Winapi.WinSock,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020035 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyera019cda2019-11-09 23:24:52 +010041 Thrift.Configuration,
Jens Geyerd5436f52014-10-03 19:50:38 +020042 Thrift.Collections,
Jens Geyer606f1ef2018-04-09 23:09:41 +020043 Thrift.Exception,
Jens Geyerd5436f52014-10-03 19:50:38 +020044 Thrift.Utils,
Jens Geyer02230912019-04-03 01:12:51 +020045 Thrift.WinHTTP,
Nick4f5229e2016-04-14 16:43:22 +030046 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020047
Jens Geyer41f47af2019-11-09 23:24:52 +010048const
49 DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB
50 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
51
Jens Geyerd5436f52014-10-03 19:50:38 +020052type
Jens Geyera019cda2019-11-09 23:24:52 +010053 IStreamTransport = interface;
Jens Geyer41f47af2019-11-09 23:24:52 +010054
Jens Geyerd5436f52014-10-03 19:50:38 +020055 ITransport = interface
Jens Geyera019cda2019-11-09 23:24:52 +010056 ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
Jens Geyerd5436f52014-10-03 19:50:38 +020057 function GetIsOpen: Boolean;
58 property IsOpen: Boolean read GetIsOpen;
59 function Peek: Boolean;
60 procedure Open;
61 procedure Close;
Jens Geyer41f47af2019-11-09 23:24:52 +010062
Jens Geyer17c3ad92017-09-05 20:31:27 +020063 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
64 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
65 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
66 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020067 procedure Write( const buf: TBytes); overload;
68 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020069 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
70 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020071 procedure Flush;
Jens Geyer41f47af2019-11-09 23:24:52 +010072
Jens Geyera019cda2019-11-09 23:24:52 +010073 function Configuration : IThriftConfiguration;
74 function MaxMessageSize : Integer;
75 procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
76 procedure CheckReadBytesAvailable( const numBytes : Int64);
77 procedure UpdateKnownMessageSize( const size : Int64);
Jens Geyerd5436f52014-10-03 19:50:38 +020078 end;
79
Jens Geyera019cda2019-11-09 23:24:52 +010080 TTransportBase = class abstract( TInterfacedObject)
Jens Geyerfad7fd32019-11-09 23:24:52 +010081 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +020082 function GetIsOpen: Boolean; virtual; abstract;
83 property IsOpen: Boolean read GetIsOpen;
84 function Peek: Boolean; virtual;
85 procedure Open(); virtual; abstract;
86 procedure Close(); virtual; abstract;
Jens Geyer41f47af2019-11-09 23:24:52 +010087
Jens Geyer17c3ad92017-09-05 20:31:27 +020088 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
89 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
90 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
91 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
92 procedure Write( const buf: TBytes); overload; inline;
93 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
94 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
95 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020096 procedure Flush; virtual;
Jens Geyer41f47af2019-11-09 23:24:52 +010097
Jens Geyera019cda2019-11-09 23:24:52 +010098 function Configuration : IThriftConfiguration; virtual; abstract;
99 procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
100 end;
Jens Geyer41f47af2019-11-09 23:24:52 +0100101
Jens Geyera019cda2019-11-09 23:24:52 +0100102 // base class for all endpoint transports, e.g. sockets, pipes or HTTP
103 TEndpointTransportBase = class abstract( TTransportBase, ITransport)
104 strict private
105 FRemainingMessageSize : Int64;
106 FKnownMessageSize : Int64;
107 FConfiguration : IThriftConfiguration;
108 strict protected
109 function Configuration : IThriftConfiguration; override;
110 function MaxMessageSize : Integer;
111 property RemainingMessageSize : Int64 read FRemainingMessageSize;
112 property KnownMessageSize : Int64 read FKnownMessageSize;
Jens Geyer6762cad2020-10-30 17:15:18 +0100113 procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
Jens Geyera019cda2019-11-09 23:24:52 +0100114 procedure UpdateKnownMessageSize(const size : Int64); override;
115 procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
116 procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
Jens Geyer41f47af2019-11-09 23:24:52 +0100117 public
Jens Geyera019cda2019-11-09 23:24:52 +0100118 constructor Create( const aConfig : IThriftConfiguration); reintroduce;
119 end;
120
121 // base class for all layered transports, e.g. framed
122 TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
123 strict private
124 FTransport : T;
125 strict protected
126 property InnerTransport : T read FTransport;
127 function GetUnderlyingTransport: ITransport;
128 function Configuration : IThriftConfiguration; override;
129 procedure UpdateKnownMessageSize( const size : Int64); override;
130 function MaxMessageSize : Integer; inline;
131 procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
132 procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
133 public
134 constructor Create( const aTransport: T); reintroduce;
135 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200136 end;
137
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100138 TTransportException = class abstract( TException)
Jens Geyerd5436f52014-10-03 19:50:38 +0200139 public
140 type
141 TExceptionType = (
142 Unknown,
143 NotOpen,
144 AlreadyOpen,
145 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200146 EndOfFile,
147 BadArgs,
Jens Geyer2646bd62019-11-09 23:24:52 +0100148 Interrupted,
149 CorruptedData
Jens Geyerd5436f52014-10-03 19:50:38 +0200150 );
Jens Geyerfad7fd32019-11-09 23:24:52 +0100151 strict protected
Jens Geyere0e32402016-04-20 21:50:48 +0200152 constructor HiddenCreate(const Msg: string);
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100153 class function GetType: TExceptionType; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +0200154 public
Jens Geyer41f47af2019-11-09 23:24:52 +0100155 class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyere0e32402016-04-20 21:50:48 +0200156 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyer41f47af2019-11-09 23:24:52 +0100157 class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
Jens Geyere0e32402016-04-20 21:50:48 +0200158 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200159 end;
160
Jens Geyere0e32402016-04-20 21:50:48 +0200161 // Needed to remove deprecation warning
162 TTransportExceptionSpecialized = class abstract (TTransportException)
163 public
164 constructor Create(const Msg: string);
165 end;
166
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100167 TTransportExceptionUnknown = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100168 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100169 class function GetType: TTransportException.TExceptionType; override;
170 end;
171
172 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100173 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100174 class function GetType: TTransportException.TExceptionType; override;
175 end;
176
177 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100178 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100179 class function GetType: TTransportException.TExceptionType; override;
180 end;
181
182 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100183 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100184 class function GetType: TTransportException.TExceptionType; override;
185 end;
186
187 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100188 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100189 class function GetType: TTransportException.TExceptionType; override;
190 end;
191
192 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100193 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100194 class function GetType: TTransportException.TExceptionType; override;
195 end;
196
197 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100198 strict protected
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100199 class function GetType: TTransportException.TExceptionType; override;
200 end;
Jens Geyere0e32402016-04-20 21:50:48 +0200201
Jens Geyer2646bd62019-11-09 23:24:52 +0100202 TTransportExceptionCorruptedData = class (TTransportExceptionSpecialized)
203 protected
204 class function GetType: TTransportException.TExceptionType; override;
205 end;
206
Jens Geyer47f63172019-06-06 22:42:58 +0200207 TSecureProtocol = (
208 SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
209 TLS_1_1, TLS_1_2 // secure (as of today)
210 );
211
212 TSecureProtocols = set of TSecureProtocol;
213
Jens Geyerd5436f52014-10-03 19:50:38 +0200214 IHTTPClient = interface( ITransport )
Jens Geyer47f63172019-06-06 22:42:58 +0200215 ['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
Jens Geyer20e727e2018-06-22 22:39:57 +0200216 procedure SetDnsResolveTimeout(const Value: Integer);
217 function GetDnsResolveTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200218 procedure SetConnectionTimeout(const Value: Integer);
219 function GetConnectionTimeout: Integer;
Jens Geyer20e727e2018-06-22 22:39:57 +0200220 procedure SetSendTimeout(const Value: Integer);
221 function GetSendTimeout: Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200222 procedure SetReadTimeout(const Value: Integer);
223 function GetReadTimeout: Integer;
224 function GetCustomHeaders: IThriftDictionary<string,string>;
225 procedure SendRequest;
Jens Geyer47f63172019-06-06 22:42:58 +0200226 function GetSecureProtocols : TSecureProtocols;
227 procedure SetSecureProtocols( const value : TSecureProtocols);
Jens Geyer20e727e2018-06-22 22:39:57 +0200228
229 property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200230 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
Jens Geyer20e727e2018-06-22 22:39:57 +0200231 property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200232 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
233 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
Jens Geyer47f63172019-06-06 22:42:58 +0200234 property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
Jens Geyerd5436f52014-10-03 19:50:38 +0200235 end;
236
Jens Geyerd5436f52014-10-03 19:50:38 +0200237 IServerTransport = interface
Jens Geyera019cda2019-11-09 23:24:52 +0100238 ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
Jens Geyerd5436f52014-10-03 19:50:38 +0200239 procedure Listen;
240 procedure Close;
241 function Accept( const fnAccepting: TProc): ITransport;
Jens Geyera019cda2019-11-09 23:24:52 +0100242 function Configuration : IThriftConfiguration;
Jens Geyerd5436f52014-10-03 19:50:38 +0200243 end;
244
245 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
Jens Geyera019cda2019-11-09 23:24:52 +0100246 strict private
247 FConfig : IThriftConfiguration;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100248 strict protected
Jens Geyera019cda2019-11-09 23:24:52 +0100249 function Configuration : IThriftConfiguration;
Jens Geyerd5436f52014-10-03 19:50:38 +0200250 procedure Listen; virtual; abstract;
251 procedure Close; virtual; abstract;
Jens Geyera019cda2019-11-09 23:24:52 +0100252 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
253 public
254 constructor Create( const aConfig : IThriftConfiguration);
Jens Geyerd5436f52014-10-03 19:50:38 +0200255 end;
256
257 ITransportFactory = interface
258 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
Jens Geyer41f47af2019-11-09 23:24:52 +0100259 function GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200260 end;
261
Jens Geyera019cda2019-11-09 23:24:52 +0100262 TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
263 strict protected
Jens Geyered994552019-11-09 23:24:52 +0100264 function GetTransport( const aTransport: ITransport): ITransport; virtual;
Jens Geyerd5436f52014-10-03 19:50:38 +0200265 end;
266
Jens Geyera019cda2019-11-09 23:24:52 +0100267
268 TTcpSocketStreamImpl = class( TThriftStreamImpl)
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200269{$IFDEF OLD_SOCKETS}
Jens Geyerfad7fd32019-11-09 23:24:52 +0100270 strict private type
Jens Geyerd5436f52014-10-03 19:50:38 +0200271 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
Jens Geyerfad7fd32019-11-09 23:24:52 +0100272 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200273 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200274 FTimeout : Integer;
275 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
276 TimeOut: Integer; var wsaError : Integer): Integer;
277 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200278 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200279{$ELSE}
280 FTcpClient: TSocket;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100281 strict protected const
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200282 SLEEP_TIME = 200;
283{$ENDIF}
Jens Geyerfad7fd32019-11-09 23:24:52 +0100284 strict protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200285 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
286 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200287 procedure Open; override;
288 procedure Close; override;
289 procedure Flush; override;
290
291 function IsOpen: Boolean; override;
292 function ToArray: TBytes; override;
293 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200294{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100295 constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200296{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100297 constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200298{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200299 end;
300
301 IStreamTransport = interface( ITransport )
302 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
303 function GetInputStream: IThriftStream;
304 function GetOutputStream: IThriftStream;
305 property InputStream : IThriftStream read GetInputStream;
306 property OutputStream : IThriftStream read GetOutputStream;
307 end;
308
Jens Geyera019cda2019-11-09 23:24:52 +0100309 TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100310 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200311 FInputStream : IThriftStream;
312 FOutputStream : IThriftStream;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100313 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200314 function GetIsOpen: Boolean; override;
315
316 function GetInputStream: IThriftStream;
317 function GetOutputStream: IThriftStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200318
Jens Geyer41f47af2019-11-09 23:24:52 +0100319 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200320 procedure Open; override;
321 procedure Close; override;
322 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200323 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
324 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyered994552019-11-09 23:24:52 +0100325 public
Jens Geyera019cda2019-11-09 23:24:52 +0100326 constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
Jens Geyerd5436f52014-10-03 19:50:38 +0200327 destructor Destroy; override;
Jens Geyered994552019-11-09 23:24:52 +0100328
329 property InputStream : IThriftStream read GetInputStream;
330 property OutputStream : IThriftStream read GetOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200331 end;
332
333 TBufferedStreamImpl = class( TThriftStreamImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100334 strict private
Jens Geyerd5436f52014-10-03 19:50:38 +0200335 FStream : IThriftStream;
336 FBufSize : Integer;
337 FReadBuffer : TMemoryStream;
338 FWriteBuffer : TMemoryStream;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100339 strict protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200340 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
341 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200342 procedure Open; override;
343 procedure Close; override;
344 procedure Flush; override;
345 function IsOpen: Boolean; override;
346 function ToArray: TBytes; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100347 function Size : Int64; override;
348 function Position : Int64; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200349 public
Jens Geyered994552019-11-09 23:24:52 +0100350 constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200351 destructor Destroy; override;
352 end;
353
354 TServerSocketImpl = class( TServerTransportImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100355 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200356{$IFDEF OLD_SOCKETS}
357 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200358 FPort : Integer;
359 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200360{$ELSE}
361 FServer: TServerSocket;
362{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200363 FUseBufferedSocket : Boolean;
364 FOwnsServer : Boolean;
Jens Geyer41f47af2019-11-09 23:24:52 +0100365
Jens Geyerfad7fd32019-11-09 23:24:52 +0100366 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200367 function Accept( const fnAccepting: TProc) : ITransport; override;
Jens Geyer41f47af2019-11-09 23:24:52 +0100368
Jens Geyerd5436f52014-10-03 19:50:38 +0200369 public
Jens Geyera019cda2019-11-09 23:24:52 +0100370 {$IFDEF OLD_SOCKETS}
371 constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
372 constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
373 {$ELSE}
374 constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
375 constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
376 {$ENDIF}
377
Jens Geyerd5436f52014-10-03 19:50:38 +0200378 destructor Destroy; override;
379 procedure Listen; override;
380 procedure Close; override;
381 end;
382
Jens Geyera019cda2019-11-09 23:24:52 +0100383 TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100384 strict private
Jens Geyerd5436f52014-10-03 19:50:38 +0200385 FInputBuffer : IThriftStream;
386 FOutputBuffer : IThriftStream;
Jens Geyerd5436f52014-10-03 19:50:38 +0200387 FBufSize : Integer;
388
389 procedure InitBuffers;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100390 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200391 function GetIsOpen: Boolean; override;
392 procedure Flush; override;
393 public
Jens Geyered994552019-11-09 23:24:52 +0100394 type
395 TFactory = class( TTransportFactoryImpl )
396 public
397 function GetTransport( const aTransport: ITransport): ITransport; override;
398 end;
399
400 constructor Create( const aTransport : IStreamTransport; const aBufSize: Integer = 1024);
Jens Geyerd5436f52014-10-03 19:50:38 +0200401 procedure Open(); override;
402 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200403 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
404 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100405 procedure CheckReadBytesAvailable( const value : Int64); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200406 property IsOpen: Boolean read GetIsOpen;
407 end;
408
409 TSocketImpl = class(TStreamTransportImpl)
Jens Geyerfad7fd32019-11-09 23:24:52 +0100410 strict private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200411{$IFDEF OLD_SOCKETS}
412 FClient : TCustomIpClient;
413{$ELSE}
414 FClient: TSocket;
415{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200416 FOwnsClient : Boolean;
417 FHost : string;
418 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200419{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200420 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200421{$ELSE}
422 FTimeout : Longword;
423{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200424
425 procedure InitSocket;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100426 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200427 function GetIsOpen: Boolean; override;
428 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200429{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100430 constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
431 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200432{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100433 constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
434 constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200435{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200436 destructor Destroy; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100437
438 procedure Open; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200439 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200440{$IFDEF OLD_SOCKETS}
441 property TcpClient: TCustomIpClient read FClient;
442{$ELSE}
443 property TcpClient: TSocket read FClient;
444{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200445 property Host : string read FHost;
446 property Port: Integer read FPort;
447 end;
448
Jens Geyera019cda2019-11-09 23:24:52 +0100449 TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
Jens Geyer2646bd62019-11-09 23:24:52 +0100450 strict protected type
451 TFramedHeader = Int32;
Jens Geyerfad7fd32019-11-09 23:24:52 +0100452 strict protected
Jens Geyerd5436f52014-10-03 19:50:38 +0200453 FWriteBuffer : TMemoryStream;
454 FReadBuffer : TMemoryStream;
455
456 procedure InitWriteBuffer;
457 procedure ReadFrame;
Jens Geyerd5436f52014-10-03 19:50:38 +0200458
459 procedure Open(); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100460 function GetIsOpen: Boolean; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200461
462 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200463 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
464 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyera019cda2019-11-09 23:24:52 +0100465 procedure CheckReadBytesAvailable( const value : Int64); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200466 procedure Flush; override;
Jens Geyera019cda2019-11-09 23:24:52 +0100467
Jens Geyered994552019-11-09 23:24:52 +0100468 public
469 type
470 TFactory = class( TTransportFactoryImpl )
471 public
472 function GetTransport( const aTransport: ITransport): ITransport; override;
473 end;
474
475 constructor Create( const aTransport: ITransport); overload;
476 destructor Destroy; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200477 end;
478
Jens Geyerd5436f52014-10-03 19:50:38 +0200479
480const
Jens Geyer47f63172019-06-06 22:42:58 +0200481 DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
482
Jens Geyerd5436f52014-10-03 19:50:38 +0200483implementation
484
Jens Geyered994552019-11-09 23:24:52 +0100485
Jens Geyera019cda2019-11-09 23:24:52 +0100486{ TTransportBase }
Jens Geyer41f47af2019-11-09 23:24:52 +0100487
Jens Geyera019cda2019-11-09 23:24:52 +0100488procedure TTransportBase.Flush;
Jens Geyerd5436f52014-10-03 19:50:38 +0200489begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200490 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200491end;
492
Jens Geyera019cda2019-11-09 23:24:52 +0100493function TTransportBase.Peek: Boolean;
Jens Geyerd5436f52014-10-03 19:50:38 +0200494begin
495 Result := IsOpen;
496end;
497
Jens Geyera019cda2019-11-09 23:24:52 +0100498function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200499begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200500 if Length(buf) > 0
501 then result := Read( @buf[0], Length(buf), off, len)
502 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200503end;
504
Jens Geyera019cda2019-11-09 23:24:52 +0100505function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200506begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200507 if Length(buf) > 0
508 then result := ReadAll( @buf[0], Length(buf), off, len)
509 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200510end;
511
Jens Geyera019cda2019-11-09 23:24:52 +0100512function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200513var ret : Integer;
514begin
515 result := 0;
516 while result < len do begin
517 ret := Read( pBuf, buflen, off + result, len - result);
518 if ret > 0
519 then Inc( result, ret)
520 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
521 end;
522end;
523
Jens Geyera019cda2019-11-09 23:24:52 +0100524procedure TTransportBase.Write( const buf: TBytes);
Jens Geyered994552019-11-09 23:24:52 +0100525begin
526 if Length(buf) > 0
527 then Write( @buf[0], 0, Length(buf));
528end;
529
Jens Geyera019cda2019-11-09 23:24:52 +0100530procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
Jens Geyered994552019-11-09 23:24:52 +0100531begin
532 if Length(buf) > 0
533 then Write( @buf[0], off, len);
534end;
535
Jens Geyera019cda2019-11-09 23:24:52 +0100536procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200537begin
538 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200539end;
540
Jens Geyered994552019-11-09 23:24:52 +0100541
Jens Geyera019cda2019-11-09 23:24:52 +0100542{ TEndpointTransportBase }
543
544constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
Jens Geyer41f47af2019-11-09 23:24:52 +0100545begin
Jens Geyera019cda2019-11-09 23:24:52 +0100546 inherited Create;
547
548 if aConfig <> nil
549 then FConfiguration := aConfig
550 else FConfiguration := TThriftConfigurationImpl.Create;
551
552 ResetConsumedMessageSize;
Jens Geyer41f47af2019-11-09 23:24:52 +0100553end;
554
555
Jens Geyera019cda2019-11-09 23:24:52 +0100556function TEndpointTransportBase.Configuration : IThriftConfiguration;
Jens Geyer41f47af2019-11-09 23:24:52 +0100557begin
Jens Geyera019cda2019-11-09 23:24:52 +0100558 result := FConfiguration;
Jens Geyer41f47af2019-11-09 23:24:52 +0100559end;
560
561
Jens Geyera019cda2019-11-09 23:24:52 +0100562function TEndpointTransportBase.MaxMessageSize : Integer;
563begin
564 ASSERT( Configuration <> nil);
565 result := Configuration.MaxMessageSize;
566end;
567
568
569procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
570// Resets RemainingMessageSize to the configured maximum
571begin
572 // full reset
573 if newSize < 0 then begin
574 FKnownMessageSize := MaxMessageSize;
575 FRemainingMessageSize := MaxMessageSize;
576 Exit;
577 end;
578
579 // update only: message size can shrink, but not grow
580 ASSERT( KnownMessageSize <= MaxMessageSize);
581 if newSize > KnownMessageSize
Jens Geyerb0123182020-02-12 12:16:19 +0100582 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
Jens Geyera019cda2019-11-09 23:24:52 +0100583
584 FKnownMessageSize := newSize;
585 FRemainingMessageSize := newSize;
586end;
587
588
589procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
590// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
591// Will throw if we already consumed too many bytes.
592var consumed : Int64;
593begin
594 consumed := KnownMessageSize - RemainingMessageSize;
595 ResetConsumedMessageSize(size);
596 CountConsumedMessageBytes(consumed);
597end;
598
599
600procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
601// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
602begin
603 if RemainingMessageSize < numBytes
604 then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
605end;
606
607
608procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
609// Consumes numBytes from the RemainingMessageSize.
610begin
611 if (RemainingMessageSize >= numBytes)
612 then Dec( FRemainingMessageSize, numBytes)
613 else begin
614 FRemainingMessageSize := 0;
615 raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
616 end;
617end;
618
619{ TLayeredTransportBase }
620
621constructor TLayeredTransportBase<T>.Create( const aTransport: T);
622begin
623 inherited Create;
624 FTransport := aTransport;
625end;
626
627function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
628begin
629 result := InnerTransport;
630end;
631
632function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
633begin
634 result := InnerTransport.Configuration;
635end;
636
637procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
638begin
639 InnerTransport.UpdateKnownMessageSize( size);
640end;
641
642
643function TLayeredTransportBase<T>.MaxMessageSize : Integer;
644begin
645 result := InnerTransport.MaxMessageSize;
646end;
647
648
649procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
650begin
651 InnerTransport.ResetConsumedMessageSize( knownSize);
652end;
653
654
655procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
656begin
657 InnerTransport.CheckReadBytesAvailable( numBytes);
658end;
659
660
661
Jens Geyerd5436f52014-10-03 19:50:38 +0200662{ TTransportException }
663
Jens Geyere0e32402016-04-20 21:50:48 +0200664constructor TTransportException.HiddenCreate(const Msg: string);
665begin
666 inherited Create(Msg);
667end;
668
Jens Geyered994552019-11-09 23:24:52 +0100669class function TTransportException.Create(aType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200670begin
671 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200672{$WARN SYMBOL_DEPRECATED OFF}
Jens Geyered994552019-11-09 23:24:52 +0100673 Result := Create(aType, '')
Jens Geyere0e32402016-04-20 21:50:48 +0200674{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200675end;
676
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100677class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200678begin
Jens Geyered994552019-11-09 23:24:52 +0100679 case aType of
Jens Geyere0e32402016-04-20 21:50:48 +0200680 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
681 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
682 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
683 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
684 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
685 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
686 else
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100687 ASSERT( TExceptionType.Unknown = aType);
Jens Geyere0e32402016-04-20 21:50:48 +0200688 Result := TTransportExceptionUnknown.Create(msg);
689 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200690end;
691
Jens Geyere0e32402016-04-20 21:50:48 +0200692class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200693begin
Jens Geyere0e32402016-04-20 21:50:48 +0200694 Result := TTransportExceptionUnknown.Create(Msg);
695end;
696
697{ TTransportExceptionSpecialized }
698
699constructor TTransportExceptionSpecialized.Create(const Msg: string);
700begin
701 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200702end;
703
Jens Geyer9f11c1e2019-11-09 19:39:20 +0100704{ specialized TTransportExceptions }
705
706class function TTransportExceptionUnknown.GetType: TTransportException.TExceptionType;
707begin
708 result := TExceptionType.Unknown;
709end;
710
711class function TTransportExceptionNotOpen.GetType: TTransportException.TExceptionType;
712begin
713 result := TExceptionType.NotOpen;
714end;
715
716class function TTransportExceptionAlreadyOpen.GetType: TTransportException.TExceptionType;
717begin
718 result := TExceptionType.AlreadyOpen;
719end;
720
721class function TTransportExceptionTimedOut.GetType: TTransportException.TExceptionType;
722begin
723 result := TExceptionType.TimedOut;
724end;
725
726class function TTransportExceptionEndOfFile.GetType: TTransportException.TExceptionType;
727begin
728 result := TExceptionType.EndOfFile;
729end;
730
731class function TTransportExceptionBadArgs.GetType: TTransportException.TExceptionType;
732begin
733 result := TExceptionType.BadArgs;
734end;
735
736class function TTransportExceptionInterrupted.GetType: TTransportException.TExceptionType;
737begin
738 result := TExceptionType.Interrupted;
739end;
740
Jens Geyer2646bd62019-11-09 23:24:52 +0100741class function TTransportExceptionCorruptedData.GetType: TTransportException.TExceptionType;
742begin
743 result := TExceptionType.CorruptedData;
744end;
745
Jens Geyerd5436f52014-10-03 19:50:38 +0200746{ TTransportFactoryImpl }
747
Jens Geyered994552019-11-09 23:24:52 +0100748function TTransportFactoryImpl.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200749begin
Jens Geyered994552019-11-09 23:24:52 +0100750 Result := aTransport;
Jens Geyerd5436f52014-10-03 19:50:38 +0200751end;
752
Jens Geyera019cda2019-11-09 23:24:52 +0100753
754{ TServerTransportImpl }
755
756constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
757begin
758 inherited Create;
759 if aConfig <> nil
760 then FConfig := aConfig
761 else FConfig := TThriftConfigurationImpl.Create;
762end;
763
764function TServerTransportImpl.Configuration : IThriftConfiguration;
765begin
766 result := FConfig;
767end;
768
Jens Geyerd5436f52014-10-03 19:50:38 +0200769{ TServerSocket }
770
Jens Geyer23d67462015-12-19 11:44:57 +0100771{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100772constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200773{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100774constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100775{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200776begin
Jens Geyera019cda2019-11-09 23:24:52 +0100777 inherited Create( aConfig);
Jens Geyered994552019-11-09 23:24:52 +0100778 FServer := aServer;
Jens Geyera019cda2019-11-09 23:24:52 +0100779
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200780
781{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +0100782 FClientTimeout := aClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200783{$ELSE}
Jens Geyered994552019-11-09 23:24:52 +0100784 FServer.RecvTimeout := aClientTimeout;
785 FServer.SendTimeout := aClientTimeout;
786{$ENDIF}
787end;
788
789
790{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100791constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100792{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100793constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200794{$ENDIF}
795begin
Jens Geyera019cda2019-11-09 23:24:52 +0100796 inherited Create( aConfig);
Jens Geyer41f47af2019-11-09 23:24:52 +0100797
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200798{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +0100799 FPort := aPort;
800 FClientTimeout := aClientTimeout;
801
802 FOwnsServer := True;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200803 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200804 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200805 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200806 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200807 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200808 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200809 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200810{$ELSE}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200811 FOwnsServer := True;
Jens Geyered994552019-11-09 23:24:52 +0100812 FServer := TServerSocket.Create(aPort, aClientTimeout, aClientTimeout);
813{$ENDIF}
814
815 FUseBufferedSocket := aUseBufferedSockets;
Jens Geyerd5436f52014-10-03 19:50:38 +0200816end;
817
818destructor TServerSocketImpl.Destroy;
819begin
820 if FOwnsServer then begin
821 FServer.Free;
822 FServer := nil;
823 end;
824 inherited;
825end;
826
827function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
828var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200829{$IFDEF OLD_SOCKETS}
830 client : TCustomIpClient;
831{$ELSE}
832 client: TSocket;
833{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200834 trans : IStreamTransport;
835begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100836 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200837 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200838 end;
839
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200840{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200841 client := nil;
842 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200843 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200844
845 if Assigned(fnAccepting)
846 then fnAccepting();
847
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100848 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200849 client.Free;
850 Result := nil;
851 Exit;
852 end;
853
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100854 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200855 Result := nil;
856 Exit;
857 end;
858
Jens Geyera019cda2019-11-09 23:24:52 +0100859 trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
Jens Geyerd5436f52014-10-03 19:50:38 +0200860 client := nil; // trans owns it now
861
862 if FUseBufferedSocket
863 then result := TBufferedTransportImpl.Create( trans)
864 else result := trans;
865
866 except
867 on E: Exception do begin
868 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200869 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200870 end;
871 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200872{$ELSE}
873 if Assigned(fnAccepting) then
874 fnAccepting();
875
876 client := FServer.Accept;
877 try
Jens Geyera019cda2019-11-09 23:24:52 +0100878 trans := TSocketImpl.Create(client, TRUE, Configuration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200879 client := nil;
880
881 if FUseBufferedSocket then
882 Result := TBufferedTransportImpl.Create(trans)
883 else
884 Result := trans;
885 except
886 client.Free;
887 raise;
888 end;
889{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200890end;
891
892procedure TServerSocketImpl.Listen;
893begin
894 if FServer <> nil then
895 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200896{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200897 try
898 FServer.Active := True;
899 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200900 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200901 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200902 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200903{$ELSE}
904 FServer.Listen;
905{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200906 end;
907end;
908
909procedure TServerSocketImpl.Close;
910begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200911 if FServer <> nil then
912{$IFDEF OLD_SOCKETS}
913 try
914 FServer.Active := False;
915 except
916 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200917 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200918 end;
919{$ELSE}
920 FServer.Close;
921{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200922end;
923
924{ TSocket }
925
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200926{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100927constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100928{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100929constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
Jens Geyered994552019-11-09 23:24:52 +0100930{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200931var stream : IThriftStream;
932begin
Jens Geyered994552019-11-09 23:24:52 +0100933 FClient := aClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200934 FOwnsClient := aOwnsClient;
Jens Geyered994552019-11-09 23:24:52 +0100935
936{$IFDEF OLD_SOCKETS}
937 FTimeout := aTimeout;
938{$ELSE}
939 FTimeout := aClient.RecvTimeout;
940{$ENDIF}
941
Jens Geyerd5436f52014-10-03 19:50:38 +0200942 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
Jens Geyera019cda2019-11-09 23:24:52 +0100943 inherited Create( stream, stream, aConfig);
Jens Geyerd5436f52014-10-03 19:50:38 +0200944end;
945
Jens Geyera019cda2019-11-09 23:24:52 +0100946
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200947{$IFDEF OLD_SOCKETS}
Jens Geyera019cda2019-11-09 23:24:52 +0100948constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200949{$ELSE}
Jens Geyera019cda2019-11-09 23:24:52 +0100950constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200951{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200952begin
Jens Geyera019cda2019-11-09 23:24:52 +0100953 inherited Create(nil,nil, aConfig);
Jens Geyered994552019-11-09 23:24:52 +0100954 FHost := aHost;
955 FPort := aPort;
956 FTimeout := aTimeout;
Jens Geyerd5436f52014-10-03 19:50:38 +0200957 InitSocket;
958end;
959
960destructor TSocketImpl.Destroy;
961begin
962 if FOwnsClient
963 then FreeAndNil( FClient);
964 inherited;
965end;
966
967procedure TSocketImpl.Close;
968begin
969 inherited Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200970
971 FInputStream := nil;
972 FOutputStream := nil;
973
Jens Geyerd5436f52014-10-03 19:50:38 +0200974 if FOwnsClient
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200975 then FreeAndNil( FClient)
976 else FClient := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200977end;
978
979function TSocketImpl.GetIsOpen: Boolean;
980begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200981{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200982 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200983{$ELSE}
984 Result := (FClient <> nil) and FClient.IsOpen
985{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200986end;
987
988procedure TSocketImpl.InitSocket;
989var
990 stream : IThriftStream;
991begin
992 if FOwnsClient
993 then FreeAndNil( FClient)
994 else FClient := nil;
995
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200996{$IFDEF OLD_SOCKETS}
997 FClient := TTcpClient.Create( nil);
998{$ELSE}
999 FClient := TSocket.Create(FHost, FPort);
1000{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001001 FOwnsClient := True;
1002
1003 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1004 FInputStream := stream;
1005 FOutputStream := stream;
1006end;
1007
1008procedure TSocketImpl.Open;
1009begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001010 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001011 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +02001012 end;
1013
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001014 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001015 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +02001016 end;
1017
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001018 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001019 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +02001020 end;
1021
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001022 if FClient = nil
1023 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +02001024
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001025{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001026 FClient.RemoteHost := TSocketHost( Host);
1027 FClient.RemotePort := TSocketPort( IntToStr( Port));
1028 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001029{$ELSE}
1030 FClient.Open;
1031{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001032
1033 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
1034 FOutputStream := FInputStream;
1035end;
1036
1037{ TBufferedStream }
1038
1039procedure TBufferedStreamImpl.Close;
1040begin
1041 Flush;
1042 FStream := nil;
1043
1044 FReadBuffer.Free;
1045 FReadBuffer := nil;
1046
1047 FWriteBuffer.Free;
1048 FWriteBuffer := nil;
1049end;
1050
Jens Geyered994552019-11-09 23:24:52 +01001051constructor TBufferedStreamImpl.Create( const aStream: IThriftStream; const aBufSize : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001052begin
1053 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001054 FStream := aStream;
1055 FBufSize := aBufSize;
Jens Geyerd5436f52014-10-03 19:50:38 +02001056 FReadBuffer := TMemoryStream.Create;
1057 FWriteBuffer := TMemoryStream.Create;
1058end;
1059
1060destructor TBufferedStreamImpl.Destroy;
1061begin
1062 Close;
1063 inherited;
1064end;
1065
1066procedure TBufferedStreamImpl.Flush;
1067var
1068 buf : TBytes;
1069 len : Integer;
1070begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001071 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001072 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001073 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001074 SetLength( buf, len );
1075 FWriteBuffer.Position := 0;
1076 FWriteBuffer.Read( Pointer(@buf[0])^, len );
1077 FStream.Write( buf, 0, len );
1078 end;
1079 FWriteBuffer.Clear;
1080 end;
1081end;
1082
1083function TBufferedStreamImpl.IsOpen: Boolean;
1084begin
1085 Result := (FWriteBuffer <> nil)
1086 and (FReadBuffer <> nil)
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001087 and (FStream <> nil)
1088 and FStream.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001089end;
1090
1091procedure TBufferedStreamImpl.Open;
1092begin
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001093 FStream.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +02001094end;
1095
Jens Geyer17c3ad92017-09-05 20:31:27 +02001096function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001097var
1098 nRead : Integer;
1099 tempbuf : TBytes;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001100 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001101begin
1102 inherited;
1103 Result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001104
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001105 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001106 while count > 0 do begin
1107
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001108 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001109 FReadBuffer.Clear;
1110 SetLength( tempbuf, FBufSize);
1111 nRead := FStream.Read( tempbuf, 0, FBufSize );
1112 if nRead = 0 then Break; // avoid infinite loop
1113
1114 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
1115 FReadBuffer.Position := 0;
1116 end;
1117
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001118 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001119 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
1120 pTmp := pBuf;
1121 Inc( pTmp, offset);
1122 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +02001123 Dec( count, nRead);
1124 Inc( offset, nRead);
1125 end;
1126 end;
1127 end;
1128end;
1129
Jens Geyered994552019-11-09 23:24:52 +01001130
Jens Geyerd5436f52014-10-03 19:50:38 +02001131function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001132var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001133begin
Jens Geyera019cda2019-11-09 23:24:52 +01001134 if IsOpen
1135 then len := FReadBuffer.Size
1136 else len := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001137
1138 SetLength( Result, len);
1139
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001140 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001141 FReadBuffer.Position := 0;
1142 FReadBuffer.Read( Pointer(@Result[0])^, len );
1143 end;
1144end;
1145
Jens Geyer17c3ad92017-09-05 20:31:27 +02001146procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001147var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001148begin
1149 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001150 if count > 0 then begin
1151 if IsOpen then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001152 pTmp := pBuf;
1153 Inc( pTmp, offset);
1154 FWriteBuffer.Write( pTmp^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001155 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001156 Flush;
1157 end;
1158 end;
1159 end;
1160end;
1161
Jens Geyera019cda2019-11-09 23:24:52 +01001162
1163function TBufferedStreamImpl.Size : Int64;
1164begin
1165 result := FReadBuffer.Size;
1166end;
1167
1168
1169function TBufferedStreamImpl.Position : Int64;
1170begin
1171 result := FReadBuffer.Position;
1172end;
1173
1174
Jens Geyerd5436f52014-10-03 19:50:38 +02001175{ TStreamTransportImpl }
1176
Jens Geyera019cda2019-11-09 23:24:52 +01001177constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
Jens Geyerd5436f52014-10-03 19:50:38 +02001178begin
Jens Geyera019cda2019-11-09 23:24:52 +01001179 inherited Create( aConfig);
Jens Geyered994552019-11-09 23:24:52 +01001180 FInputStream := aInputStream;
1181 FOutputStream := aOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +02001182end;
1183
1184destructor TStreamTransportImpl.Destroy;
1185begin
1186 FInputStream := nil;
1187 FOutputStream := nil;
1188 inherited;
1189end;
1190
Jens Geyer20e727e2018-06-22 22:39:57 +02001191procedure TStreamTransportImpl.Close;
1192begin
1193 FInputStream := nil;
1194 FOutputStream := nil;
1195end;
1196
Jens Geyerd5436f52014-10-03 19:50:38 +02001197procedure TStreamTransportImpl.Flush;
1198begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001199 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001200 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001201 end;
1202
1203 FOutputStream.Flush;
1204end;
1205
1206function TStreamTransportImpl.GetInputStream: IThriftStream;
1207begin
1208 Result := FInputStream;
1209end;
1210
1211function TStreamTransportImpl.GetIsOpen: Boolean;
1212begin
1213 Result := True;
1214end;
1215
1216function TStreamTransportImpl.GetOutputStream: IThriftStream;
1217begin
Jens Geyer02fbe0e2018-03-19 17:35:44 +01001218 Result := FOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +02001219end;
1220
1221procedure TStreamTransportImpl.Open;
1222begin
Jens Geyer2646bd62019-11-09 23:24:52 +01001223 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +02001224end;
1225
Jens Geyer17c3ad92017-09-05 20:31:27 +02001226function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001227begin
Jens Geyered994552019-11-09 23:24:52 +01001228 if FInputStream = nil
1229 then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001230
Jens Geyer17c3ad92017-09-05 20:31:27 +02001231 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyera019cda2019-11-09 23:24:52 +01001232 CountConsumedMessageBytes( result);
Jens Geyerd5436f52014-10-03 19:50:38 +02001233end;
1234
Jens Geyer17c3ad92017-09-05 20:31:27 +02001235procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001236begin
Jens Geyered994552019-11-09 23:24:52 +01001237 if FOutputStream = nil
1238 then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001239
Jens Geyer17c3ad92017-09-05 20:31:27 +02001240 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001241end;
1242
1243{ TBufferedTransportImpl }
1244
Jens Geyered994552019-11-09 23:24:52 +01001245constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001246begin
Jens Geyered994552019-11-09 23:24:52 +01001247 ASSERT( aTransport <> nil);
Jens Geyera019cda2019-11-09 23:24:52 +01001248 inherited Create( aTransport);
Jens Geyered994552019-11-09 23:24:52 +01001249 FBufSize := aBufSize;
Jens Geyerd5436f52014-10-03 19:50:38 +02001250 InitBuffers;
1251end;
1252
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001253procedure TBufferedTransportImpl.Close;
1254begin
Jens Geyera019cda2019-11-09 23:24:52 +01001255 InnerTransport.Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001256 FInputBuffer := nil;
Jens Geyered994552019-11-09 23:24:52 +01001257 FOutputBuffer := nil;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001258end;
1259
Jens Geyerd5436f52014-10-03 19:50:38 +02001260procedure TBufferedTransportImpl.Flush;
1261begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001262 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001263 FOutputBuffer.Flush;
1264 end;
1265end;
1266
1267function TBufferedTransportImpl.GetIsOpen: Boolean;
1268begin
Jens Geyera019cda2019-11-09 23:24:52 +01001269 Result := InnerTransport.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001270end;
1271
1272procedure TBufferedTransportImpl.InitBuffers;
1273begin
Jens Geyera019cda2019-11-09 23:24:52 +01001274 if InnerTransport.InputStream <> nil then begin
1275 FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
Jens Geyerd5436f52014-10-03 19:50:38 +02001276 end;
Jens Geyera019cda2019-11-09 23:24:52 +01001277 if InnerTransport.OutputStream <> nil then begin
1278 FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
Jens Geyerd5436f52014-10-03 19:50:38 +02001279 end;
1280end;
1281
1282procedure TBufferedTransportImpl.Open;
1283begin
Jens Geyera019cda2019-11-09 23:24:52 +01001284 InnerTransport.Open;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001285 InitBuffers; // we need to get the buffers to match FTransport substreams again
Jens Geyerd5436f52014-10-03 19:50:38 +02001286end;
1287
Jens Geyer17c3ad92017-09-05 20:31:27 +02001288function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001289begin
Jens Geyered994552019-11-09 23:24:52 +01001290 if FInputBuffer <> nil
Jens Geyera019cda2019-11-09 23:24:52 +01001291 then Result := FInputBuffer.Read( pBuf,buflen, off, len )
Jens Geyered994552019-11-09 23:24:52 +01001292 else Result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001293end;
1294
Jens Geyer17c3ad92017-09-05 20:31:27 +02001295procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001296begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001297 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001298 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001299 end;
1300end;
1301
Jens Geyera019cda2019-11-09 23:24:52 +01001302procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1303var buffered, need : Int64;
Jens Geyer41f47af2019-11-09 23:24:52 +01001304begin
1305 need := value;
1306
1307 // buffered bytes
Jens Geyera019cda2019-11-09 23:24:52 +01001308 buffered := FInputBuffer.Size - FInputBuffer.Position;
1309 if buffered < need
1310 then InnerTransport.CheckReadBytesAvailable( need - buffered);
Jens Geyer41f47af2019-11-09 23:24:52 +01001311end;
1312
Jens Geyera019cda2019-11-09 23:24:52 +01001313
Jens Geyered994552019-11-09 23:24:52 +01001314{ TBufferedTransportImpl.TFactory }
Jens Geyerd5436f52014-10-03 19:50:38 +02001315
Jens Geyered994552019-11-09 23:24:52 +01001316function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +02001317begin
Jens Geyered994552019-11-09 23:24:52 +01001318 Result := TFramedTransportImpl.Create( aTransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001319end;
1320
Jens Geyered994552019-11-09 23:24:52 +01001321
1322{ TFramedTransportImpl }
1323
1324constructor TFramedTransportImpl.Create( const aTransport: ITransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001325begin
Jens Geyered994552019-11-09 23:24:52 +01001326 ASSERT( aTransport <> nil);
Jens Geyera019cda2019-11-09 23:24:52 +01001327 inherited Create( aTransport);
Jens Geyer2646bd62019-11-09 23:24:52 +01001328
Jens Geyerd5436f52014-10-03 19:50:38 +02001329 InitWriteBuffer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001330end;
1331
1332destructor TFramedTransportImpl.Destroy;
1333begin
1334 FWriteBuffer.Free;
Jens Geyera019cda2019-11-09 23:24:52 +01001335 FWriteBuffer := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001336 FReadBuffer.Free;
Jens Geyera019cda2019-11-09 23:24:52 +01001337 FReadBuffer := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001338 inherited;
1339end;
1340
Jens Geyer2646bd62019-11-09 23:24:52 +01001341procedure TFramedTransportImpl.Close;
1342begin
Jens Geyera019cda2019-11-09 23:24:52 +01001343 InnerTransport.Close;
Jens Geyer2646bd62019-11-09 23:24:52 +01001344end;
1345
Jens Geyerd5436f52014-10-03 19:50:38 +02001346procedure TFramedTransportImpl.Flush;
1347var
1348 buf : TBytes;
1349 len : Integer;
Jens Geyera019cda2019-11-09 23:24:52 +01001350 data_len : Int64;
Jens Geyerd5436f52014-10-03 19:50:38 +02001351begin
Jens Geyer528a0f02019-11-18 20:17:03 +01001352 if not IsOpen
1353 then raise TTransportExceptionNotOpen.Create('not open');
1354
Jens Geyerd5436f52014-10-03 19:50:38 +02001355 len := FWriteBuffer.Size;
1356 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001357 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001358 System.Move( FWriteBuffer.Memory^, buf[0], len );
1359 end;
1360
Jens Geyer2646bd62019-11-09 23:24:52 +01001361 data_len := len - SizeOf(TFramedHeader);
Jens Geyera019cda2019-11-09 23:24:52 +01001362 if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
1363 then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
1364 else UpdateKnownMessageSize( len);
Jens Geyerd5436f52014-10-03 19:50:38 +02001365
1366 InitWriteBuffer;
1367
1368 buf[0] := Byte($FF and (data_len shr 24));
1369 buf[1] := Byte($FF and (data_len shr 16));
1370 buf[2] := Byte($FF and (data_len shr 8));
1371 buf[3] := Byte($FF and data_len);
1372
Jens Geyera019cda2019-11-09 23:24:52 +01001373 InnerTransport.Write( buf, 0, len );
1374 InnerTransport.Flush;
Jens Geyerd5436f52014-10-03 19:50:38 +02001375end;
1376
1377function TFramedTransportImpl.GetIsOpen: Boolean;
1378begin
Jens Geyera019cda2019-11-09 23:24:52 +01001379 Result := InnerTransport.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +02001380end;
1381
1382type
1383 TAccessMemoryStream = class(TMemoryStream)
1384 end;
1385
1386procedure TFramedTransportImpl.InitWriteBuffer;
Jens Geyer2646bd62019-11-09 23:24:52 +01001387const DUMMY_HEADER : TFramedHeader = 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001388begin
Jens Geyer528a0f02019-11-18 20:17:03 +01001389 FreeAndNil( FWriteBuffer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001390 FWriteBuffer := TMemoryStream.Create;
1391 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
Jens Geyer2646bd62019-11-09 23:24:52 +01001392 FWriteBuffer.Write( DUMMY_HEADER, SizeOf(DUMMY_HEADER));
Jens Geyerd5436f52014-10-03 19:50:38 +02001393end;
1394
1395procedure TFramedTransportImpl.Open;
1396begin
Jens Geyera019cda2019-11-09 23:24:52 +01001397 InnerTransport.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +02001398end;
1399
Jens Geyer17c3ad92017-09-05 20:31:27 +02001400function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001401var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001402begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001403 if len > (buflen-off)
1404 then len := buflen-off;
1405
Jens Geyer5089b0a2018-02-01 22:37:18 +01001406 pTmp := pBuf;
1407 Inc( pTmp, off);
1408
Jens Geyer17c3ad92017-09-05 20:31:27 +02001409 if (FReadBuffer <> nil) and (len > 0) then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001410 result := FReadBuffer.Read( pTmp^, len);
Jens Geyered994552019-11-09 23:24:52 +01001411 if result > 0 then Exit;
Jens Geyerd5436f52014-10-03 19:50:38 +02001412 end;
1413
1414 ReadFrame;
1415 if len > 0
Jens Geyer5089b0a2018-02-01 22:37:18 +01001416 then Result := FReadBuffer.Read( pTmp^, len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001417 else Result := 0;
1418end;
1419
1420procedure TFramedTransportImpl.ReadFrame;
1421var
Jens Geyer2646bd62019-11-09 23:24:52 +01001422 i32rd : packed array[0..SizeOf(TFramedHeader)-1] of Byte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001423 size : Integer;
1424 buff : TBytes;
1425begin
Jens Geyera019cda2019-11-09 23:24:52 +01001426 InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
Jens Geyerd5436f52014-10-03 19:50:38 +02001427 size :=
1428 ((i32rd[0] and $FF) shl 24) or
1429 ((i32rd[1] and $FF) shl 16) or
1430 ((i32rd[2] and $FF) shl 8) or
1431 (i32rd[3] and $FF);
Jens Geyer2646bd62019-11-09 23:24:52 +01001432
1433 if size < 0 then begin
1434 Close();
1435 raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
1436 end;
1437
Jens Geyera019cda2019-11-09 23:24:52 +01001438 if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
Jens Geyer2646bd62019-11-09 23:24:52 +01001439 Close();
Jens Geyera019cda2019-11-09 23:24:52 +01001440 raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
Jens Geyer2646bd62019-11-09 23:24:52 +01001441 end;
1442
Jens Geyera019cda2019-11-09 23:24:52 +01001443 UpdateKnownMessageSize(size + SizeOf(size));
1444
Jens Geyerd5436f52014-10-03 19:50:38 +02001445 SetLength( buff, size );
Jens Geyera019cda2019-11-09 23:24:52 +01001446 InnerTransport.ReadAll( buff, 0, size );
Jens Geyered994552019-11-09 23:24:52 +01001447
1448 FreeAndNil( FReadBuffer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001449 FReadBuffer := TMemoryStream.Create;
Jens Geyera76e6c72017-09-08 21:03:30 +02001450 if Length(buff) > 0
1451 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001452 FReadBuffer.Position := 0;
1453end;
1454
Jens Geyer17c3ad92017-09-05 20:31:27 +02001455procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001456var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001457begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001458 if len > 0 then begin
1459 pTmp := pBuf;
1460 Inc( pTmp, off);
1461
1462 FWriteBuffer.Write( pTmp^, len );
1463 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001464end;
1465
Jens Geyered994552019-11-09 23:24:52 +01001466
Jens Geyera019cda2019-11-09 23:24:52 +01001467procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
1468var buffered, need : Int64;
Jens Geyer41f47af2019-11-09 23:24:52 +01001469begin
Jens Geyera019cda2019-11-09 23:24:52 +01001470 need := value;
Jens Geyer41f47af2019-11-09 23:24:52 +01001471
Jens Geyera019cda2019-11-09 23:24:52 +01001472 // buffered bytes
1473 buffered := FReadBuffer.Size - FReadBuffer.Position;
1474 if buffered < need
1475 then InnerTransport.CheckReadBytesAvailable( need - buffered);
Jens Geyer41f47af2019-11-09 23:24:52 +01001476end;
1477
1478
Jens Geyerd5436f52014-10-03 19:50:38 +02001479{ TFramedTransport.TFactory }
1480
Jens Geyered994552019-11-09 23:24:52 +01001481function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
Jens Geyerd5436f52014-10-03 19:50:38 +02001482begin
Jens Geyered994552019-11-09 23:24:52 +01001483 Result := TFramedTransportImpl.Create( aTransport);
Jens Geyerd5436f52014-10-03 19:50:38 +02001484end;
1485
1486{ TTcpSocketStreamImpl }
1487
1488procedure TTcpSocketStreamImpl.Close;
1489begin
1490 FTcpClient.Close;
1491end;
1492
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001493{$IFDEF OLD_SOCKETS}
Jens Geyered994552019-11-09 23:24:52 +01001494constructor TTcpSocketStreamImpl.Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001495begin
1496 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001497 FTcpClient := aTcpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +02001498 FTimeout := aTimeout;
1499end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001500{$ELSE}
Jens Geyered994552019-11-09 23:24:52 +01001501constructor TTcpSocketStreamImpl.Create( const aTcpClient: TSocket; const aTimeout : Longword);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001502begin
1503 inherited Create;
Jens Geyered994552019-11-09 23:24:52 +01001504 FTcpClient := aTcpClient;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001505 if aTimeout = 0 then
1506 FTcpClient.RecvTimeout := SLEEP_TIME
1507 else
1508 FTcpClient.RecvTimeout := aTimeout;
1509 FTcpClient.SendTimeout := aTimeout;
1510end;
1511{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001512
1513procedure TTcpSocketStreamImpl.Flush;
1514begin
Jens Geyera019cda2019-11-09 23:24:52 +01001515 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +02001516end;
1517
Jens Geyera019cda2019-11-09 23:24:52 +01001518
Jens Geyerd5436f52014-10-03 19:50:38 +02001519function TTcpSocketStreamImpl.IsOpen: Boolean;
1520begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001521{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001522 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001523{$ELSE}
1524 Result := FTcpClient.IsOpen;
1525{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001526end;
1527
1528procedure TTcpSocketStreamImpl.Open;
1529begin
1530 FTcpClient.Open;
1531end;
1532
1533
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001534{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001535function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1536 TimeOut: Integer; var wsaError : Integer): Integer;
1537var
1538 ReadFds: TFDset;
1539 ReadFdsptr: PFDset;
1540 WriteFds: TFDset;
1541 WriteFdsptr: PFDset;
1542 ExceptFds: TFDset;
1543 ExceptFdsptr: PFDset;
1544 tv: timeval;
1545 Timeptr: PTimeval;
1546 socket : TSocket;
1547begin
1548 if not FTcpClient.Active then begin
1549 wsaError := WSAEINVAL;
1550 Exit( SOCKET_ERROR);
1551 end;
1552
1553 socket := FTcpClient.Handle;
1554
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001555 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001556 ReadFdsptr := @ReadFds;
1557 FD_ZERO(ReadFds);
1558 FD_SET(socket, ReadFds);
1559 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001560 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001561 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001562 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001563
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001564 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001565 WriteFdsptr := @WriteFds;
1566 FD_ZERO(WriteFds);
1567 FD_SET(socket, WriteFds);
1568 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001569 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001570 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001571 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001572
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001573 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001574 ExceptFdsptr := @ExceptFds;
1575 FD_ZERO(ExceptFds);
1576 FD_SET(socket, ExceptFds);
1577 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001578 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001579 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001580 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001581
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001582 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001583 tv.tv_sec := TimeOut div 1000;
1584 tv.tv_usec := 1000 * (TimeOut mod 1000);
1585 Timeptr := @tv;
1586 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001587 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001588 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001589 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001590
1591 wsaError := 0;
1592 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001593 {$IFDEF MSWINDOWS}
1594 {$IFDEF OLD_UNIT_NAMES}
1595 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1596 {$ELSE}
1597 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1598 {$ENDIF}
1599 {$ENDIF}
1600 {$IFDEF LINUX}
1601 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1602 {$ENDIF}
Jens Geyera019cda2019-11-09 23:24:52 +01001603
Jens Geyerd5436f52014-10-03 19:50:38 +02001604 if result = SOCKET_ERROR
1605 then wsaError := WSAGetLastError;
1606
1607 except
1608 result := SOCKET_ERROR;
1609 end;
1610
1611 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001612 ReadReady^ := FD_ISSET(socket, ReadFds);
1613
Jens Geyerd5436f52014-10-03 19:50:38 +02001614 if Assigned(WriteReady) then
1615 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001616
Jens Geyerd5436f52014-10-03 19:50:38 +02001617 if Assigned(ExceptFlag) then
1618 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1619end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001620{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001621
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001622{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001623function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1624 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001625 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001626var bCanRead, bError : Boolean;
1627 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001628const
1629 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001630begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001631 bytesReady := 0;
1632
Jens Geyerd5436f52014-10-03 19:50:38 +02001633 // The select function returns the total number of socket handles that are ready
1634 // and contained in the fd_set structures, zero if the time limit expired,
1635 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1636 // WSAGetLastError can be used to retrieve a specific error code.
1637 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1638 if retval = SOCKET_ERROR
1639 then Exit( TWaitForData.wfd_Error);
1640 if (retval = 0) or not bCanRead
1641 then Exit( TWaitForData.wfd_Timeout);
1642
1643 // recv() returns the number of bytes received, or -1 if an error occurred.
1644 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001645
1646 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001647 if retval <= 0
1648 then Exit( TWaitForData.wfd_Error);
1649
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001650 // at least we have some data
1651 bytesReady := Min( retval, DesiredBytes);
1652 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001653end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001654{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001655
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001656{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001657function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001658// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001659var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001660 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001661 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001662 nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001663 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001664begin
1665 inherited;
1666
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001667 if FTimeout > 0
1668 then msecs := FTimeout
1669 else msecs := DEFAULT_THRIFT_TIMEOUT;
1670
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001671 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001672 pTmp := pBuf;
1673 Inc( pTmp, offset);
Jens Geyerc140bb92019-11-27 22:18:12 +01001674 while (count > 0) and (result = 0) do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001675
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001676 while TRUE do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001677 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001678 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001679 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001680 TWaitForData.wfd_HaveData : Break;
1681 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001682 if (FTimeout = 0)
1683 then Exit
Jens Geyera019cda2019-11-09 23:24:52 +01001684 else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001685 end;
1686 else
1687 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001688 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001689 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001690
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001691 // reduce the timeout once we got data
1692 if FTimeout > 0
1693 then msecs := FTimeout div 10
1694 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1695 msecs := Max( msecs, 200);
1696
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001697 ASSERT( nBytes <= count);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001698 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1699 Inc( pTmp, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001700 Dec( count, nBytes);
1701 Inc( result, nBytes);
1702 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001703end;
1704
1705function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001706// old sockets version
1707var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001708begin
1709 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001710 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001711 len := FTcpClient.BytesReceived;
1712 end;
1713
1714 SetLength( Result, len );
1715
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001716 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001717 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1718 end;
1719end;
1720
Jens Geyer17c3ad92017-09-05 20:31:27 +02001721procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001722// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001723var bCanWrite, bError : Boolean;
1724 retval, wsaError : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001725 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001726begin
1727 inherited;
1728
1729 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001730 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001731
1732 // The select function returns the total number of socket handles that are ready
1733 // and contained in the fd_set structures, zero if the time limit expired,
1734 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1735 // WSAGetLastError can be used to retrieve a specific error code.
1736 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1737 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001738 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001739
Jens Geyerd5436f52014-10-03 19:50:38 +02001740 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001741 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001742
Jens Geyerd5436f52014-10-03 19:50:38 +02001743 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001744 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001745
Jens Geyer5089b0a2018-02-01 22:37:18 +01001746 pTmp := pBuf;
1747 Inc( pTmp, offset);
1748 FTcpClient.SendBuf( pTmp^, count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001749end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001750
1751{$ELSE}
1752
Jens Geyer17c3ad92017-09-05 20:31:27 +02001753function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001754// new sockets version
1755var nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001756 pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001757begin
1758 inherited;
1759
1760 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001761 pTmp := pBuf;
1762 Inc( pTmp, offset);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001763 while count > 0 do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001764 nBytes := FTcpClient.Read( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001765 if nBytes = 0 then Exit;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001766 Inc( pTmp, nBytes);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001767 Dec( count, nBytes);
1768 Inc( result, nBytes);
1769 end;
1770end;
1771
1772function TTcpSocketStreamImpl.ToArray: TBytes;
1773// new sockets version
1774var len : Integer;
1775begin
1776 len := 0;
1777 try
1778 if FTcpClient.Peek then
1779 repeat
1780 SetLength(Result, Length(Result) + 1024);
1781 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1782 until len < 1024;
1783 except
1784 on TTransportException do begin { don't allow default exceptions } end;
1785 else raise;
1786 end;
1787 if len > 0 then
1788 SetLength(Result, Length(Result) - 1024 + len);
1789end;
1790
Jens Geyer17c3ad92017-09-05 20:31:27 +02001791procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001792// new sockets version
Jens Geyer5089b0a2018-02-01 22:37:18 +01001793var pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001794begin
1795 inherited;
1796
1797 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001798 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001799
Jens Geyer5089b0a2018-02-01 22:37:18 +01001800 pTmp := pBuf;
1801 Inc( pTmp, offset);
1802 FTcpClient.Write( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001803end;
1804
Jens Geyer23d67462015-12-19 11:44:57 +01001805{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001806
Jens Geyerd5436f52014-10-03 19:50:38 +02001807
1808end.