blob: 997f4064d48504f63422cd92ef78c361ade6ecf2 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX, msxml, WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer9f7f11e2016-04-14 21:37:11 +020034 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
Jens Geyer606f1ef2018-04-09 23:09:41 +020042 Thrift.Exception,
Jens Geyerd5436f52014-10-03 19:50:38 +020043 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030044 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020045
46type
47 ITransport = interface
Jens Geyer17c3ad92017-09-05 20:31:27 +020048 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
Jens Geyerd5436f52014-10-03 19:50:38 +020049 function GetIsOpen: Boolean;
50 property IsOpen: Boolean read GetIsOpen;
51 function Peek: Boolean;
52 procedure Open;
53 procedure Close;
Jens Geyer17c3ad92017-09-05 20:31:27 +020054 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
55 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
56 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
57 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020058 procedure Write( const buf: TBytes); overload;
59 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020060 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
61 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020062 procedure Flush;
63 end;
64
65 TTransportImpl = class( TInterfacedObject, ITransport)
66 protected
67 function GetIsOpen: Boolean; virtual; abstract;
68 property IsOpen: Boolean read GetIsOpen;
69 function Peek: Boolean; virtual;
70 procedure Open(); virtual; abstract;
71 procedure Close(); virtual; abstract;
Jens Geyer17c3ad92017-09-05 20:31:27 +020072 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
73 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
74 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
75 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
76 procedure Write( const buf: TBytes); overload; inline;
77 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
78 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
79 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020080 procedure Flush; virtual;
81 end;
82
Jens Geyer606f1ef2018-04-09 23:09:41 +020083 TTransportException = class( TException)
Jens Geyerd5436f52014-10-03 19:50:38 +020084 public
85 type
86 TExceptionType = (
87 Unknown,
88 NotOpen,
89 AlreadyOpen,
90 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +020091 EndOfFile,
92 BadArgs,
93 Interrupted
Jens Geyerd5436f52014-10-03 19:50:38 +020094 );
95 private
Jens Geyere0e32402016-04-20 21:50:48 +020096 function GetType: TExceptionType;
97 protected
98 constructor HiddenCreate(const Msg: string);
Jens Geyerd5436f52014-10-03 19:50:38 +020099 public
Jens Geyere0e32402016-04-20 21:50:48 +0200100 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
101 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
103 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200104 end;
105
Jens Geyere0e32402016-04-20 21:50:48 +0200106 // Needed to remove deprecation warning
107 TTransportExceptionSpecialized = class abstract (TTransportException)
108 public
109 constructor Create(const Msg: string);
110 end;
111
112 TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
113 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
114 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
115 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
116 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
117 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
118 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
119
Jens Geyerd5436f52014-10-03 19:50:38 +0200120 IHTTPClient = interface( ITransport )
121 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
122 procedure SetConnectionTimeout(const Value: Integer);
123 function GetConnectionTimeout: Integer;
124 procedure SetReadTimeout(const Value: Integer);
125 function GetReadTimeout: Integer;
126 function GetCustomHeaders: IThriftDictionary<string,string>;
127 procedure SendRequest;
128 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
129 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
130 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
131 end;
132
133 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
134 private
135 FUri : string;
136 FInputStream : IThriftStream;
137 FOutputStream : IThriftStream;
138 FConnectionTimeout : Integer;
139 FReadTimeout : Integer;
140 FCustomHeaders : IThriftDictionary<string,string>;
141
142 function CreateRequest: IXMLHTTPRequest;
143 protected
144 function GetIsOpen: Boolean; override;
145 procedure Open(); override;
146 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200147 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
148 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200149 procedure Flush; override;
150
151 procedure SetConnectionTimeout(const Value: Integer);
152 function GetConnectionTimeout: Integer;
153 procedure SetReadTimeout(const Value: Integer);
154 function GetReadTimeout: Integer;
155 function GetCustomHeaders: IThriftDictionary<string,string>;
156 procedure SendRequest;
157 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
158 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
159 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
160 public
161 constructor Create( const AUri: string);
162 destructor Destroy; override;
163 end;
164
165 IServerTransport = interface
166 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
167 procedure Listen;
168 procedure Close;
169 function Accept( const fnAccepting: TProc): ITransport;
170 end;
171
172 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
173 protected
174 procedure Listen; virtual; abstract;
175 procedure Close; virtual; abstract;
176 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
177 end;
178
179 ITransportFactory = interface
180 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
181 function GetTransport( const ATrans: ITransport): ITransport;
182 end;
183
184 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
185 function GetTransport( const ATrans: ITransport): ITransport; virtual;
186 end;
187
188 TTcpSocketStreamImpl = class( TThriftStreamImpl )
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200189{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200190 private type
191 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
192 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200193 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200194 FTimeout : Integer;
195 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
196 TimeOut: Integer; var wsaError : Integer): Integer;
197 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200198 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200199{$ELSE}
200 FTcpClient: TSocket;
201 protected const
202 SLEEP_TIME = 200;
203{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200204 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200205 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
206 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200207 procedure Open; override;
208 procedure Close; override;
209 procedure Flush; override;
210
211 function IsOpen: Boolean; override;
212 function ToArray: TBytes; override;
213 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200214{$IFDEF OLD_SOCKETS}
215 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
216{$ELSE}
217 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
218{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200219 end;
220
221 IStreamTransport = interface( ITransport )
222 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
223 function GetInputStream: IThriftStream;
224 function GetOutputStream: IThriftStream;
225 property InputStream : IThriftStream read GetInputStream;
226 property OutputStream : IThriftStream read GetOutputStream;
227 end;
228
229 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
230 protected
231 FInputStream : IThriftStream;
232 FOutputStream : IThriftStream;
233 protected
234 function GetIsOpen: Boolean; override;
235
236 function GetInputStream: IThriftStream;
237 function GetOutputStream: IThriftStream;
238 public
239 property InputStream : IThriftStream read GetInputStream;
240 property OutputStream : IThriftStream read GetOutputStream;
241
242 procedure Open; override;
243 procedure Close; override;
244 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200245 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
246 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200247 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
248 destructor Destroy; override;
249 end;
250
251 TBufferedStreamImpl = class( TThriftStreamImpl)
252 private
253 FStream : IThriftStream;
254 FBufSize : Integer;
255 FReadBuffer : TMemoryStream;
256 FWriteBuffer : TMemoryStream;
257 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200258 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
259 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200260 procedure Open; override;
261 procedure Close; override;
262 procedure Flush; override;
263 function IsOpen: Boolean; override;
264 function ToArray: TBytes; override;
265 public
266 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
267 destructor Destroy; override;
268 end;
269
270 TServerSocketImpl = class( TServerTransportImpl)
271 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200272{$IFDEF OLD_SOCKETS}
273 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200274 FPort : Integer;
275 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200276{$ELSE}
277 FServer: TServerSocket;
278{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200279 FUseBufferedSocket : Boolean;
280 FOwnsServer : Boolean;
281 protected
282 function Accept( const fnAccepting: TProc) : ITransport; override;
283 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200284{$IFDEF OLD_SOCKETS}
285 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200286 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200287{$ELSE}
288 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
289 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
290{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200291 destructor Destroy; override;
292 procedure Listen; override;
293 procedure Close; override;
294 end;
295
296 TBufferedTransportImpl = class( TTransportImpl )
297 private
298 FInputBuffer : IThriftStream;
299 FOutputBuffer : IThriftStream;
300 FTransport : IStreamTransport;
301 FBufSize : Integer;
302
303 procedure InitBuffers;
304 function GetUnderlyingTransport: ITransport;
305 protected
306 function GetIsOpen: Boolean; override;
307 procedure Flush; override;
308 public
309 procedure Open(); override;
310 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200311 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
312 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200313 constructor Create( const ATransport : IStreamTransport ); overload;
314 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
315 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
316 property IsOpen: Boolean read GetIsOpen;
317 end;
318
319 TSocketImpl = class(TStreamTransportImpl)
320 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200321{$IFDEF OLD_SOCKETS}
322 FClient : TCustomIpClient;
323{$ELSE}
324 FClient: TSocket;
325{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200326 FOwnsClient : Boolean;
327 FHost : string;
328 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200329{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200330 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200331{$ELSE}
332 FTimeout : Longword;
333{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200334
335 procedure InitSocket;
336 protected
337 function GetIsOpen: Boolean; override;
338 public
339 procedure Open; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200340{$IFDEF OLD_SOCKETS}
341 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200342 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200343{$ELSE}
344 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
345 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
346{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200347 destructor Destroy; override;
348 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200349{$IFDEF OLD_SOCKETS}
350 property TcpClient: TCustomIpClient read FClient;
351{$ELSE}
352 property TcpClient: TSocket read FClient;
353{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200354 property Host : string read FHost;
355 property Port: Integer read FPort;
356 end;
357
358 TFramedTransportImpl = class( TTransportImpl)
359 private const
360 FHeaderSize : Integer = 4;
361 private class var
362 FHeader_Dummy : array of Byte;
363 protected
364 FTransport : ITransport;
365 FWriteBuffer : TMemoryStream;
366 FReadBuffer : TMemoryStream;
367
368 procedure InitWriteBuffer;
369 procedure ReadFrame;
370 public
371 type
372 TFactory = class( TTransportFactoryImpl )
373 public
374 function GetTransport( const ATrans: ITransport): ITransport; override;
375 end;
376
Jens Geyere0e32402016-04-20 21:50:48 +0200377 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200378 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200379 {$ENDIF}
Jens Geyere0e32402016-04-20 21:50:48 +0200380
Jens Geyerd5436f52014-10-03 19:50:38 +0200381 constructor Create; overload;
382 constructor Create( const ATrans: ITransport); overload;
383 destructor Destroy; override;
384
385 procedure Open(); override;
386 function GetIsOpen: Boolean; override;
387
388 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200389 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
390 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200391 procedure Flush; override;
392 end;
393
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200394{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200395procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200396{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200397
398const
399 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
400
401
402implementation
403
404{ TTransportImpl }
405
406procedure TTransportImpl.Flush;
407begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200408 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200409end;
410
411function TTransportImpl.Peek: Boolean;
412begin
413 Result := IsOpen;
414end;
415
Jens Geyer17c3ad92017-09-05 20:31:27 +0200416function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200417begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200418 if Length(buf) > 0
419 then result := Read( @buf[0], Length(buf), off, len)
420 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200421end;
422
423function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
424begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200425 if Length(buf) > 0
426 then result := ReadAll( @buf[0], Length(buf), off, len)
427 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200428end;
429
430procedure TTransportImpl.Write( const buf: TBytes);
431begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200432 if Length(buf) > 0
433 then Write( @buf[0], 0, Length(buf));
Jens Geyer17c3ad92017-09-05 20:31:27 +0200434end;
435
436procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
437begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200438 if Length(buf) > 0
439 then Write( @buf[0], off, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200440end;
441
442function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
443var ret : Integer;
444begin
445 result := 0;
446 while result < len do begin
447 ret := Read( pBuf, buflen, off + result, len - result);
448 if ret > 0
449 then Inc( result, ret)
450 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
451 end;
452end;
453
454procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
455begin
456 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200457end;
458
459{ THTTPClientImpl }
460
461procedure THTTPClientImpl.Close;
462begin
463 FInputStream := nil;
464 FOutputStream := nil;
465end;
466
467constructor THTTPClientImpl.Create(const AUri: string);
468begin
469 inherited Create;
470 FUri := AUri;
471 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
472 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
473end;
474
475function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
476var
477 pair : TPair<string,string>;
478begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200479 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200480 Result := CoXMLHTTP.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200481 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200482 Result := CoXMLHTTPRequest.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200483 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200484
485 Result.open('POST', FUri, False, '', '');
486 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
487 Result.setRequestHeader( 'Accept', 'application/x-thrift');
488 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
489
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200490 for pair in FCustomHeaders do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200491 Result.setRequestHeader( pair.Key, pair.Value );
492 end;
493end;
494
495destructor THTTPClientImpl.Destroy;
496begin
497 Close;
498 inherited;
499end;
500
501procedure THTTPClientImpl.Flush;
502begin
503 try
504 SendRequest;
505 finally
506 FOutputStream := nil;
507 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
508 end;
509end;
510
511function THTTPClientImpl.GetConnectionTimeout: Integer;
512begin
513 Result := FConnectionTimeout;
514end;
515
516function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
517begin
518 Result := FCustomHeaders;
519end;
520
521function THTTPClientImpl.GetIsOpen: Boolean;
522begin
523 Result := True;
524end;
525
526function THTTPClientImpl.GetReadTimeout: Integer;
527begin
528 Result := FReadTimeout;
529end;
530
531procedure THTTPClientImpl.Open;
532begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200533 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200534end;
535
Jens Geyer17c3ad92017-09-05 20:31:27 +0200536function THTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200537begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100538 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200539 raise TTransportExceptionNotOpen.Create('No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200540 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100541
Jens Geyerd5436f52014-10-03 19:50:38 +0200542 try
Jens Geyer17c3ad92017-09-05 20:31:27 +0200543 Result := FInputStream.Read( pBuf, buflen, off, len)
Jens Geyerd5436f52014-10-03 19:50:38 +0200544 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100545 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200546 do raise TTransportExceptionUnknown.Create(E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200547 end;
548end;
549
550procedure THTTPClientImpl.SendRequest;
551var
552 xmlhttp : IXMLHTTPRequest;
553 ms : TMemoryStream;
554 a : TBytes;
555 len : Integer;
556begin
557 xmlhttp := CreateRequest;
558
559 ms := TMemoryStream.Create;
560 try
561 a := FOutputStream.ToArray;
562 len := Length(a);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200563 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200564 ms.WriteBuffer( Pointer(@a[0])^, len);
565 end;
566 ms.Position := 0;
567 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
568 FInputStream := nil;
569 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
570 finally
571 ms.Free;
572 end;
573end;
574
575procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
576begin
577 FConnectionTimeout := Value;
578end;
579
580procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
581begin
582 FReadTimeout := Value
583end;
584
Jens Geyer17c3ad92017-09-05 20:31:27 +0200585procedure THTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200586begin
Jens Geyer17c3ad92017-09-05 20:31:27 +0200587 FOutputStream.Write( pBuf, off, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200588end;
589
590{ TTransportException }
591
Jens Geyere0e32402016-04-20 21:50:48 +0200592function TTransportException.GetType: TExceptionType;
593begin
594 if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
595 else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
596 else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
597 else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
598 else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
599 else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
600 else Result := TExceptionType.Unknown;
601end;
602
603constructor TTransportException.HiddenCreate(const Msg: string);
604begin
605 inherited Create(Msg);
606end;
607
608class function TTransportException.Create(AType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200609begin
610 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200611{$WARN SYMBOL_DEPRECATED OFF}
612 Result := Create(AType, '')
613{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200614end;
615
Jens Geyere0e32402016-04-20 21:50:48 +0200616class function TTransportException.Create(AType: TExceptionType;
617 const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200618begin
Jens Geyere0e32402016-04-20 21:50:48 +0200619 case AType of
620 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
621 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
622 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
623 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
624 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
625 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
626 else
627 Result := TTransportExceptionUnknown.Create(msg);
628 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200629end;
630
Jens Geyere0e32402016-04-20 21:50:48 +0200631class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200632begin
Jens Geyere0e32402016-04-20 21:50:48 +0200633 Result := TTransportExceptionUnknown.Create(Msg);
634end;
635
636{ TTransportExceptionSpecialized }
637
638constructor TTransportExceptionSpecialized.Create(const Msg: string);
639begin
640 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200641end;
642
643{ TTransportFactoryImpl }
644
645function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
646begin
647 Result := ATrans;
648end;
649
650{ TServerSocket }
651
Jens Geyer23d67462015-12-19 11:44:57 +0100652{$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200653constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200654begin
655 inherited Create;
656 FServer := AServer;
657 FClientTimeout := AClientTimeout;
658end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200659{$ELSE}
660constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
Jens Geyerd5436f52014-10-03 19:50:38 +0200661begin
662 inherited Create;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200663 FServer := AServer;
664 FServer.RecvTimeout := AClientTimeout;
665 FServer.SendTimeout := AClientTimeout;
666end;
667{$ENDIF}
668
669{$IFDEF OLD_SOCKETS}
670constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
671{$ELSE}
672constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
673{$ENDIF}
674begin
675 inherited Create;
676{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200677 FPort := APort;
678 FClientTimeout := AClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200679 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200680 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200681 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200682 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200683 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200684 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200685 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200686{$ELSE}
687 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
688{$ENDIF}
689 FUseBufferedSocket := AUseBufferedSockets;
690 FOwnsServer := True;
Jens Geyerd5436f52014-10-03 19:50:38 +0200691end;
692
693destructor TServerSocketImpl.Destroy;
694begin
695 if FOwnsServer then begin
696 FServer.Free;
697 FServer := nil;
698 end;
699 inherited;
700end;
701
702function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
703var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200704{$IFDEF OLD_SOCKETS}
705 client : TCustomIpClient;
706{$ELSE}
707 client: TSocket;
708{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200709 trans : IStreamTransport;
710begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100711 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200712 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200713 end;
714
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200715{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200716 client := nil;
717 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200718 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200719
720 if Assigned(fnAccepting)
721 then fnAccepting();
722
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100723 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200724 client.Free;
725 Result := nil;
726 Exit;
727 end;
728
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100729 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200730 Result := nil;
731 Exit;
732 end;
733
734 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
735 client := nil; // trans owns it now
736
737 if FUseBufferedSocket
738 then result := TBufferedTransportImpl.Create( trans)
739 else result := trans;
740
741 except
742 on E: Exception do begin
743 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200744 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200745 end;
746 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200747{$ELSE}
748 if Assigned(fnAccepting) then
749 fnAccepting();
750
751 client := FServer.Accept;
752 try
753 trans := TSocketImpl.Create(client, True);
754 client := nil;
755
756 if FUseBufferedSocket then
757 Result := TBufferedTransportImpl.Create(trans)
758 else
759 Result := trans;
760 except
761 client.Free;
762 raise;
763 end;
764{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200765end;
766
767procedure TServerSocketImpl.Listen;
768begin
769 if FServer <> nil then
770 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200771{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200772 try
773 FServer.Active := True;
774 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200775 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200776 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200777 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200778{$ELSE}
779 FServer.Listen;
780{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200781 end;
782end;
783
784procedure TServerSocketImpl.Close;
785begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200786 if FServer <> nil then
787{$IFDEF OLD_SOCKETS}
788 try
789 FServer.Active := False;
790 except
791 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200792 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200793 end;
794{$ELSE}
795 FServer.Close;
796{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200797end;
798
799{ TSocket }
800
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200801{$IFDEF OLD_SOCKETS}
802constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200803var stream : IThriftStream;
804begin
805 FClient := AClient;
806 FTimeout := ATimeout;
807 FOwnsClient := aOwnsClient;
808 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
809 inherited Create( stream, stream);
810end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200811{$ELSE}
812constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
813var stream : IThriftStream;
814begin
815 FClient := AClient;
816 FTimeout := AClient.RecvTimeout;
817 FOwnsClient := aOwnsClient;
818 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
819 inherited Create(stream, stream);
820end;
821{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200822
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200823{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200824constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200825{$ELSE}
826constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
827{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200828begin
829 inherited Create(nil,nil);
830 FHost := AHost;
831 FPort := APort;
832 FTimeout := ATimeout;
833 InitSocket;
834end;
835
836destructor TSocketImpl.Destroy;
837begin
838 if FOwnsClient
839 then FreeAndNil( FClient);
840 inherited;
841end;
842
843procedure TSocketImpl.Close;
844begin
845 inherited Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200846
847 FInputStream := nil;
848 FOutputStream := nil;
849
Jens Geyerd5436f52014-10-03 19:50:38 +0200850 if FOwnsClient
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200851 then FreeAndNil( FClient)
852 else FClient := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200853end;
854
855function TSocketImpl.GetIsOpen: Boolean;
856begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200857{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200858 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200859{$ELSE}
860 Result := (FClient <> nil) and FClient.IsOpen
861{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200862end;
863
864procedure TSocketImpl.InitSocket;
865var
866 stream : IThriftStream;
867begin
868 if FOwnsClient
869 then FreeAndNil( FClient)
870 else FClient := nil;
871
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200872{$IFDEF OLD_SOCKETS}
873 FClient := TTcpClient.Create( nil);
874{$ELSE}
875 FClient := TSocket.Create(FHost, FPort);
876{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200877 FOwnsClient := True;
878
879 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
880 FInputStream := stream;
881 FOutputStream := stream;
882end;
883
884procedure TSocketImpl.Open;
885begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100886 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200887 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200888 end;
889
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100890 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200891 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200892 end;
893
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100894 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200895 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200896 end;
897
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100898 if FClient = nil
899 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200900
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200901{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200902 FClient.RemoteHost := TSocketHost( Host);
903 FClient.RemotePort := TSocketPort( IntToStr( Port));
904 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200905{$ELSE}
906 FClient.Open;
907{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200908
909 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
910 FOutputStream := FInputStream;
911end;
912
913{ TBufferedStream }
914
915procedure TBufferedStreamImpl.Close;
916begin
917 Flush;
918 FStream := nil;
919
920 FReadBuffer.Free;
921 FReadBuffer := nil;
922
923 FWriteBuffer.Free;
924 FWriteBuffer := nil;
925end;
926
927constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
928begin
929 inherited Create;
930 FStream := AStream;
931 FBufSize := ABufSize;
932 FReadBuffer := TMemoryStream.Create;
933 FWriteBuffer := TMemoryStream.Create;
934end;
935
936destructor TBufferedStreamImpl.Destroy;
937begin
938 Close;
939 inherited;
940end;
941
942procedure TBufferedStreamImpl.Flush;
943var
944 buf : TBytes;
945 len : Integer;
946begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200947 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200948 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200949 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200950 SetLength( buf, len );
951 FWriteBuffer.Position := 0;
952 FWriteBuffer.Read( Pointer(@buf[0])^, len );
953 FStream.Write( buf, 0, len );
954 end;
955 FWriteBuffer.Clear;
956 end;
957end;
958
959function TBufferedStreamImpl.IsOpen: Boolean;
960begin
961 Result := (FWriteBuffer <> nil)
962 and (FReadBuffer <> nil)
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200963 and (FStream <> nil)
964 and FStream.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +0200965end;
966
967procedure TBufferedStreamImpl.Open;
968begin
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200969 FStream.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +0200970end;
971
Jens Geyer17c3ad92017-09-05 20:31:27 +0200972function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200973var
974 nRead : Integer;
975 tempbuf : TBytes;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100976 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +0200977begin
978 inherited;
979 Result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100980
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200981 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200982 while count > 0 do begin
983
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200984 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200985 FReadBuffer.Clear;
986 SetLength( tempbuf, FBufSize);
987 nRead := FStream.Read( tempbuf, 0, FBufSize );
988 if nRead = 0 then Break; // avoid infinite loop
989
990 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
991 FReadBuffer.Position := 0;
992 end;
993
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200994 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +0100995 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
996 pTmp := pBuf;
997 Inc( pTmp, offset);
998 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +0200999 Dec( count, nRead);
1000 Inc( offset, nRead);
1001 end;
1002 end;
1003 end;
1004end;
1005
1006function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001007var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001008begin
1009 len := 0;
1010
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001011 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001012 len := FReadBuffer.Size;
1013 end;
1014
1015 SetLength( Result, len);
1016
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001017 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001018 FReadBuffer.Position := 0;
1019 FReadBuffer.Read( Pointer(@Result[0])^, len );
1020 end;
1021end;
1022
Jens Geyer17c3ad92017-09-05 20:31:27 +02001023procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001024var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001025begin
1026 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001027 if count > 0 then begin
1028 if IsOpen then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001029 pTmp := pBuf;
1030 Inc( pTmp, offset);
1031 FWriteBuffer.Write( pTmp^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001032 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001033 Flush;
1034 end;
1035 end;
1036 end;
1037end;
1038
1039{ TStreamTransportImpl }
1040
1041procedure TStreamTransportImpl.Close;
1042begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001043 FInputStream := nil;
1044 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001045end;
1046
1047constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
1048begin
1049 inherited Create;
1050 FInputStream := AInputStream;
1051 FOutputStream := AOutputStream;
1052end;
1053
1054destructor TStreamTransportImpl.Destroy;
1055begin
1056 FInputStream := nil;
1057 FOutputStream := nil;
1058 inherited;
1059end;
1060
1061procedure TStreamTransportImpl.Flush;
1062begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001063 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001064 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001065 end;
1066
1067 FOutputStream.Flush;
1068end;
1069
1070function TStreamTransportImpl.GetInputStream: IThriftStream;
1071begin
1072 Result := FInputStream;
1073end;
1074
1075function TStreamTransportImpl.GetIsOpen: Boolean;
1076begin
1077 Result := True;
1078end;
1079
1080function TStreamTransportImpl.GetOutputStream: IThriftStream;
1081begin
Jens Geyer02fbe0e2018-03-19 17:35:44 +01001082 Result := FOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +02001083end;
1084
1085procedure TStreamTransportImpl.Open;
1086begin
1087
1088end;
1089
Jens Geyer17c3ad92017-09-05 20:31:27 +02001090function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001091begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001092 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001093 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001094 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001095
Jens Geyer17c3ad92017-09-05 20:31:27 +02001096 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001097end;
1098
Jens Geyer17c3ad92017-09-05 20:31:27 +02001099procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001100begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001101 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001102 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001103 end;
1104
Jens Geyer17c3ad92017-09-05 20:31:27 +02001105 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001106end;
1107
1108{ TBufferedTransportImpl }
1109
1110constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
1111begin
1112 //no inherited;
1113 Create( ATransport, 1024 );
1114end;
1115
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001116constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001117begin
1118 inherited Create;
1119 FTransport := ATransport;
1120 FBufSize := ABufSize;
1121 InitBuffers;
1122end;
1123
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001124procedure TBufferedTransportImpl.Close;
1125begin
1126 FTransport.Close;
1127 FInputBuffer := nil;
1128 FOutputBuffer := nil;
1129end;
1130
Jens Geyerd5436f52014-10-03 19:50:38 +02001131procedure TBufferedTransportImpl.Flush;
1132begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001133 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001134 FOutputBuffer.Flush;
1135 end;
1136end;
1137
1138function TBufferedTransportImpl.GetIsOpen: Boolean;
1139begin
1140 Result := FTransport.IsOpen;
1141end;
1142
1143function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1144begin
1145 Result := FTransport;
1146end;
1147
1148procedure TBufferedTransportImpl.InitBuffers;
1149begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001150 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001151 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1152 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001153 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001154 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1155 end;
1156end;
1157
1158procedure TBufferedTransportImpl.Open;
1159begin
Jens Geyera0cf38e2018-04-04 17:31:52 +02001160 FTransport.Open;
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001161 InitBuffers; // we need to get the buffers to match FTransport substreams again
Jens Geyerd5436f52014-10-03 19:50:38 +02001162end;
1163
Jens Geyer17c3ad92017-09-05 20:31:27 +02001164function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001165begin
1166 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001167 if FInputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001168 Result := FInputBuffer.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001169 end;
1170end;
1171
Jens Geyer17c3ad92017-09-05 20:31:27 +02001172procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001173begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001174 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001175 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001176 end;
1177end;
1178
1179{ TFramedTransportImpl }
1180
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001181{$IFDEF HAVE_CLASS_CTOR}
1182class constructor TFramedTransportImpl.Create;
1183begin
1184 SetLength( FHeader_Dummy, FHeaderSize);
1185 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1186end;
1187{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001188procedure TFramedTransportImpl_Initialize;
1189begin
1190 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1191 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1192 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1193end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001194{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001195
1196constructor TFramedTransportImpl.Create;
1197begin
1198 inherited Create;
1199 InitWriteBuffer;
1200end;
1201
1202procedure TFramedTransportImpl.Close;
1203begin
1204 FTransport.Close;
1205end;
1206
1207constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1208begin
1209 inherited Create;
1210 InitWriteBuffer;
1211 FTransport := ATrans;
1212end;
1213
1214destructor TFramedTransportImpl.Destroy;
1215begin
1216 FWriteBuffer.Free;
1217 FReadBuffer.Free;
1218 inherited;
1219end;
1220
1221procedure TFramedTransportImpl.Flush;
1222var
1223 buf : TBytes;
1224 len : Integer;
1225 data_len : Integer;
1226
1227begin
1228 len := FWriteBuffer.Size;
1229 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001230 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001231 System.Move( FWriteBuffer.Memory^, buf[0], len );
1232 end;
1233
1234 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001235 if (data_len < 0) then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001236 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001237 end;
1238
1239 InitWriteBuffer;
1240
1241 buf[0] := Byte($FF and (data_len shr 24));
1242 buf[1] := Byte($FF and (data_len shr 16));
1243 buf[2] := Byte($FF and (data_len shr 8));
1244 buf[3] := Byte($FF and data_len);
1245
1246 FTransport.Write( buf, 0, len );
1247 FTransport.Flush;
1248end;
1249
1250function TFramedTransportImpl.GetIsOpen: Boolean;
1251begin
1252 Result := FTransport.IsOpen;
1253end;
1254
1255type
1256 TAccessMemoryStream = class(TMemoryStream)
1257 end;
1258
1259procedure TFramedTransportImpl.InitWriteBuffer;
1260begin
1261 FWriteBuffer.Free;
1262 FWriteBuffer := TMemoryStream.Create;
1263 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1264 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1265end;
1266
1267procedure TFramedTransportImpl.Open;
1268begin
1269 FTransport.Open;
1270end;
1271
Jens Geyer17c3ad92017-09-05 20:31:27 +02001272function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001273var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001274begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001275 if len > (buflen-off)
1276 then len := buflen-off;
1277
Jens Geyer5089b0a2018-02-01 22:37:18 +01001278 pTmp := pBuf;
1279 Inc( pTmp, off);
1280
Jens Geyer17c3ad92017-09-05 20:31:27 +02001281 if (FReadBuffer <> nil) and (len > 0) then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001282 result := FReadBuffer.Read( pTmp^, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +02001283 if result > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001284 Exit;
1285 end;
1286 end;
1287
1288 ReadFrame;
1289 if len > 0
Jens Geyer5089b0a2018-02-01 22:37:18 +01001290 then Result := FReadBuffer.Read( pTmp^, len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001291 else Result := 0;
1292end;
1293
1294procedure TFramedTransportImpl.ReadFrame;
1295var
1296 i32rd : TBytes;
1297 size : Integer;
1298 buff : TBytes;
1299begin
1300 SetLength( i32rd, FHeaderSize );
1301 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1302 size :=
1303 ((i32rd[0] and $FF) shl 24) or
1304 ((i32rd[1] and $FF) shl 16) or
1305 ((i32rd[2] and $FF) shl 8) or
1306 (i32rd[3] and $FF);
1307 SetLength( buff, size );
1308 FTransport.ReadAll( buff, 0, size );
1309 FReadBuffer.Free;
1310 FReadBuffer := TMemoryStream.Create;
Jens Geyera76e6c72017-09-08 21:03:30 +02001311 if Length(buff) > 0
1312 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001313 FReadBuffer.Position := 0;
1314end;
1315
Jens Geyer17c3ad92017-09-05 20:31:27 +02001316procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001317var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001318begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001319 if len > 0 then begin
1320 pTmp := pBuf;
1321 Inc( pTmp, off);
1322
1323 FWriteBuffer.Write( pTmp^, len );
1324 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001325end;
1326
1327{ TFramedTransport.TFactory }
1328
1329function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1330begin
1331 Result := TFramedTransportImpl.Create( ATrans );
1332end;
1333
1334{ TTcpSocketStreamImpl }
1335
1336procedure TTcpSocketStreamImpl.Close;
1337begin
1338 FTcpClient.Close;
1339end;
1340
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001341{$IFDEF OLD_SOCKETS}
1342constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001343begin
1344 inherited Create;
1345 FTcpClient := ATcpClient;
1346 FTimeout := aTimeout;
1347end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001348{$ELSE}
1349constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1350begin
1351 inherited Create;
1352 FTcpClient := ATcpClient;
1353 if aTimeout = 0 then
1354 FTcpClient.RecvTimeout := SLEEP_TIME
1355 else
1356 FTcpClient.RecvTimeout := aTimeout;
1357 FTcpClient.SendTimeout := aTimeout;
1358end;
1359{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001360
1361procedure TTcpSocketStreamImpl.Flush;
1362begin
1363
1364end;
1365
1366function TTcpSocketStreamImpl.IsOpen: Boolean;
1367begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001368{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001369 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001370{$ELSE}
1371 Result := FTcpClient.IsOpen;
1372{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001373end;
1374
1375procedure TTcpSocketStreamImpl.Open;
1376begin
1377 FTcpClient.Open;
1378end;
1379
1380
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001381{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001382function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1383 TimeOut: Integer; var wsaError : Integer): Integer;
1384var
1385 ReadFds: TFDset;
1386 ReadFdsptr: PFDset;
1387 WriteFds: TFDset;
1388 WriteFdsptr: PFDset;
1389 ExceptFds: TFDset;
1390 ExceptFdsptr: PFDset;
1391 tv: timeval;
1392 Timeptr: PTimeval;
1393 socket : TSocket;
1394begin
1395 if not FTcpClient.Active then begin
1396 wsaError := WSAEINVAL;
1397 Exit( SOCKET_ERROR);
1398 end;
1399
1400 socket := FTcpClient.Handle;
1401
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001402 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001403 ReadFdsptr := @ReadFds;
1404 FD_ZERO(ReadFds);
1405 FD_SET(socket, ReadFds);
1406 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001407 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001408 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001409 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001410
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001411 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001412 WriteFdsptr := @WriteFds;
1413 FD_ZERO(WriteFds);
1414 FD_SET(socket, WriteFds);
1415 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001416 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001417 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001418 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001419
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001420 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001421 ExceptFdsptr := @ExceptFds;
1422 FD_ZERO(ExceptFds);
1423 FD_SET(socket, ExceptFds);
1424 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001425 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001426 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001427 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001428
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001429 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001430 tv.tv_sec := TimeOut div 1000;
1431 tv.tv_usec := 1000 * (TimeOut mod 1000);
1432 Timeptr := @tv;
1433 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001434 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001435 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001436 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001437
1438 wsaError := 0;
1439 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001440 {$IFDEF MSWINDOWS}
1441 {$IFDEF OLD_UNIT_NAMES}
1442 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1443 {$ELSE}
1444 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1445 {$ENDIF}
1446 {$ENDIF}
1447 {$IFDEF LINUX}
1448 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1449 {$ENDIF}
1450
Jens Geyerd5436f52014-10-03 19:50:38 +02001451 if result = SOCKET_ERROR
1452 then wsaError := WSAGetLastError;
1453
1454 except
1455 result := SOCKET_ERROR;
1456 end;
1457
1458 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001459 ReadReady^ := FD_ISSET(socket, ReadFds);
1460
Jens Geyerd5436f52014-10-03 19:50:38 +02001461 if Assigned(WriteReady) then
1462 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001463
Jens Geyerd5436f52014-10-03 19:50:38 +02001464 if Assigned(ExceptFlag) then
1465 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1466end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001467{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001468
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001469{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001470function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1471 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001472 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001473var bCanRead, bError : Boolean;
1474 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001475const
1476 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001477begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001478 bytesReady := 0;
1479
Jens Geyerd5436f52014-10-03 19:50:38 +02001480 // The select function returns the total number of socket handles that are ready
1481 // and contained in the fd_set structures, zero if the time limit expired,
1482 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1483 // WSAGetLastError can be used to retrieve a specific error code.
1484 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1485 if retval = SOCKET_ERROR
1486 then Exit( TWaitForData.wfd_Error);
1487 if (retval = 0) or not bCanRead
1488 then Exit( TWaitForData.wfd_Timeout);
1489
1490 // recv() returns the number of bytes received, or -1 if an error occurred.
1491 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001492
1493 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001494 if retval <= 0
1495 then Exit( TWaitForData.wfd_Error);
1496
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001497 // at least we have some data
1498 bytesReady := Min( retval, DesiredBytes);
1499 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001500end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001501{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001502
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001503{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001504function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001505// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001506var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001507 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001508 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001509 nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001510 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001511begin
1512 inherited;
1513
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001514 if FTimeout > 0
1515 then msecs := FTimeout
1516 else msecs := DEFAULT_THRIFT_TIMEOUT;
1517
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001518 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001519 pTmp := pBuf;
1520 Inc( pTmp, offset);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001521 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001522
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001523 while TRUE do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001524 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001525 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001526 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001527 TWaitForData.wfd_HaveData : Break;
1528 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001529 if (FTimeout = 0)
1530 then Exit
1531 else begin
Jens Geyere0e32402016-04-20 21:50:48 +02001532 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001533
1534 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001535 end;
1536 else
1537 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001538 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001539 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001540
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001541 // reduce the timeout once we got data
1542 if FTimeout > 0
1543 then msecs := FTimeout div 10
1544 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1545 msecs := Max( msecs, 200);
1546
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001547 ASSERT( nBytes <= count);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001548 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1549 Inc( pTmp, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001550 Dec( count, nBytes);
1551 Inc( result, nBytes);
1552 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001553end;
1554
1555function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001556// old sockets version
1557var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001558begin
1559 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001560 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001561 len := FTcpClient.BytesReceived;
1562 end;
1563
1564 SetLength( Result, len );
1565
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001566 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001567 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1568 end;
1569end;
1570
Jens Geyer17c3ad92017-09-05 20:31:27 +02001571procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001572// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001573var bCanWrite, bError : Boolean;
1574 retval, wsaError : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001575 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001576begin
1577 inherited;
1578
1579 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001580 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001581
1582 // The select function returns the total number of socket handles that are ready
1583 // and contained in the fd_set structures, zero if the time limit expired,
1584 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1585 // WSAGetLastError can be used to retrieve a specific error code.
1586 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1587 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001588 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001589
Jens Geyerd5436f52014-10-03 19:50:38 +02001590 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001591 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001592
Jens Geyerd5436f52014-10-03 19:50:38 +02001593 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001594 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001595
Jens Geyer5089b0a2018-02-01 22:37:18 +01001596 pTmp := pBuf;
1597 Inc( pTmp, offset);
1598 FTcpClient.SendBuf( pTmp^, count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001599end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001600
1601{$ELSE}
1602
Jens Geyer17c3ad92017-09-05 20:31:27 +02001603function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001604// new sockets version
1605var nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001606 pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001607begin
1608 inherited;
1609
1610 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001611 pTmp := pBuf;
1612 Inc( pTmp, offset);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001613 while count > 0 do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001614 nBytes := FTcpClient.Read( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001615 if nBytes = 0 then Exit;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001616 Inc( pTmp, nBytes);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001617 Dec( count, nBytes);
1618 Inc( result, nBytes);
1619 end;
1620end;
1621
1622function TTcpSocketStreamImpl.ToArray: TBytes;
1623// new sockets version
1624var len : Integer;
1625begin
1626 len := 0;
1627 try
1628 if FTcpClient.Peek then
1629 repeat
1630 SetLength(Result, Length(Result) + 1024);
1631 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1632 until len < 1024;
1633 except
1634 on TTransportException do begin { don't allow default exceptions } end;
1635 else raise;
1636 end;
1637 if len > 0 then
1638 SetLength(Result, Length(Result) - 1024 + len);
1639end;
1640
Jens Geyer17c3ad92017-09-05 20:31:27 +02001641procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001642// new sockets version
Jens Geyer5089b0a2018-02-01 22:37:18 +01001643var pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001644begin
1645 inherited;
1646
1647 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001648 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001649
Jens Geyer5089b0a2018-02-01 22:37:18 +01001650 pTmp := pBuf;
1651 Inc( pTmp, offset);
1652 FTcpClient.Write( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001653end;
1654
Jens Geyer23d67462015-12-19 11:44:57 +01001655{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001656
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001657
Jens Geyerd5436f52014-10-03 19:50:38 +02001658{$IF CompilerVersion < 21.0}
1659initialization
1660begin
1661 TFramedTransportImpl_Initialize;
1662end;
1663{$IFEND}
1664
1665
1666end.