blob: d02f0a32dbc91d607734b64f3d439b3d5afd7639 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX, msxml, WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer9f7f11e2016-04-14 21:37:11 +020034 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
42 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030043 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020044
45type
46 ITransport = interface
Jens Geyer17c3ad92017-09-05 20:31:27 +020047 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
Jens Geyerd5436f52014-10-03 19:50:38 +020048 function GetIsOpen: Boolean;
49 property IsOpen: Boolean read GetIsOpen;
50 function Peek: Boolean;
51 procedure Open;
52 procedure Close;
Jens Geyer17c3ad92017-09-05 20:31:27 +020053 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
54 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
55 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
56 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020057 procedure Write( const buf: TBytes); overload;
58 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020059 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
60 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020061 procedure Flush;
62 end;
63
64 TTransportImpl = class( TInterfacedObject, ITransport)
65 protected
66 function GetIsOpen: Boolean; virtual; abstract;
67 property IsOpen: Boolean read GetIsOpen;
68 function Peek: Boolean; virtual;
69 procedure Open(); virtual; abstract;
70 procedure Close(); virtual; abstract;
Jens Geyer17c3ad92017-09-05 20:31:27 +020071 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
72 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
73 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
74 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
75 procedure Write( const buf: TBytes); overload; inline;
76 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
77 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
78 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020079 procedure Flush; virtual;
80 end;
81
82 TTransportException = class( Exception )
83 public
84 type
85 TExceptionType = (
86 Unknown,
87 NotOpen,
88 AlreadyOpen,
89 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +020090 EndOfFile,
91 BadArgs,
92 Interrupted
Jens Geyerd5436f52014-10-03 19:50:38 +020093 );
94 private
Jens Geyere0e32402016-04-20 21:50:48 +020095 function GetType: TExceptionType;
96 protected
97 constructor HiddenCreate(const Msg: string);
Jens Geyerd5436f52014-10-03 19:50:38 +020098 public
Jens Geyere0e32402016-04-20 21:50:48 +020099 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
100 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
101 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200103 end;
104
Jens Geyere0e32402016-04-20 21:50:48 +0200105 // Needed to remove deprecation warning
106 TTransportExceptionSpecialized = class abstract (TTransportException)
107 public
108 constructor Create(const Msg: string);
109 end;
110
111 TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
112 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
113 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
114 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
115 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
116 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
117 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
118
Jens Geyerd5436f52014-10-03 19:50:38 +0200119 IHTTPClient = interface( ITransport )
120 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
121 procedure SetConnectionTimeout(const Value: Integer);
122 function GetConnectionTimeout: Integer;
123 procedure SetReadTimeout(const Value: Integer);
124 function GetReadTimeout: Integer;
125 function GetCustomHeaders: IThriftDictionary<string,string>;
126 procedure SendRequest;
127 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
128 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
129 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
130 end;
131
132 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
133 private
134 FUri : string;
135 FInputStream : IThriftStream;
136 FOutputStream : IThriftStream;
137 FConnectionTimeout : Integer;
138 FReadTimeout : Integer;
139 FCustomHeaders : IThriftDictionary<string,string>;
140
141 function CreateRequest: IXMLHTTPRequest;
142 protected
143 function GetIsOpen: Boolean; override;
144 procedure Open(); override;
145 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200146 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
147 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200148 procedure Flush; override;
149
150 procedure SetConnectionTimeout(const Value: Integer);
151 function GetConnectionTimeout: Integer;
152 procedure SetReadTimeout(const Value: Integer);
153 function GetReadTimeout: Integer;
154 function GetCustomHeaders: IThriftDictionary<string,string>;
155 procedure SendRequest;
156 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
157 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
158 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
159 public
160 constructor Create( const AUri: string);
161 destructor Destroy; override;
162 end;
163
164 IServerTransport = interface
165 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
166 procedure Listen;
167 procedure Close;
168 function Accept( const fnAccepting: TProc): ITransport;
169 end;
170
171 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
172 protected
173 procedure Listen; virtual; abstract;
174 procedure Close; virtual; abstract;
175 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
176 end;
177
178 ITransportFactory = interface
179 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
180 function GetTransport( const ATrans: ITransport): ITransport;
181 end;
182
183 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
184 function GetTransport( const ATrans: ITransport): ITransport; virtual;
185 end;
186
187 TTcpSocketStreamImpl = class( TThriftStreamImpl )
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200188{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200189 private type
190 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
191 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200192 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200193 FTimeout : Integer;
194 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
195 TimeOut: Integer; var wsaError : Integer): Integer;
196 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200197 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200198{$ELSE}
199 FTcpClient: TSocket;
200 protected const
201 SLEEP_TIME = 200;
202{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200203 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200204 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
205 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200206 procedure Open; override;
207 procedure Close; override;
208 procedure Flush; override;
209
210 function IsOpen: Boolean; override;
211 function ToArray: TBytes; override;
212 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200213{$IFDEF OLD_SOCKETS}
214 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
215{$ELSE}
216 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
217{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200218 end;
219
220 IStreamTransport = interface( ITransport )
221 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
222 function GetInputStream: IThriftStream;
223 function GetOutputStream: IThriftStream;
224 property InputStream : IThriftStream read GetInputStream;
225 property OutputStream : IThriftStream read GetOutputStream;
226 end;
227
228 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
229 protected
230 FInputStream : IThriftStream;
231 FOutputStream : IThriftStream;
232 protected
233 function GetIsOpen: Boolean; override;
234
235 function GetInputStream: IThriftStream;
236 function GetOutputStream: IThriftStream;
237 public
238 property InputStream : IThriftStream read GetInputStream;
239 property OutputStream : IThriftStream read GetOutputStream;
240
241 procedure Open; override;
242 procedure Close; override;
243 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200244 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
245 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200246 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
247 destructor Destroy; override;
248 end;
249
250 TBufferedStreamImpl = class( TThriftStreamImpl)
251 private
252 FStream : IThriftStream;
253 FBufSize : Integer;
254 FReadBuffer : TMemoryStream;
255 FWriteBuffer : TMemoryStream;
256 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200257 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
258 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200259 procedure Open; override;
260 procedure Close; override;
261 procedure Flush; override;
262 function IsOpen: Boolean; override;
263 function ToArray: TBytes; override;
264 public
265 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
266 destructor Destroy; override;
267 end;
268
269 TServerSocketImpl = class( TServerTransportImpl)
270 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200271{$IFDEF OLD_SOCKETS}
272 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200273 FPort : Integer;
274 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200275{$ELSE}
276 FServer: TServerSocket;
277{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200278 FUseBufferedSocket : Boolean;
279 FOwnsServer : Boolean;
280 protected
281 function Accept( const fnAccepting: TProc) : ITransport; override;
282 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200283{$IFDEF OLD_SOCKETS}
284 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200285 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200286{$ELSE}
287 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
288 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
289{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200290 destructor Destroy; override;
291 procedure Listen; override;
292 procedure Close; override;
293 end;
294
295 TBufferedTransportImpl = class( TTransportImpl )
296 private
297 FInputBuffer : IThriftStream;
298 FOutputBuffer : IThriftStream;
299 FTransport : IStreamTransport;
300 FBufSize : Integer;
301
302 procedure InitBuffers;
303 function GetUnderlyingTransport: ITransport;
304 protected
305 function GetIsOpen: Boolean; override;
306 procedure Flush; override;
307 public
308 procedure Open(); override;
309 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200310 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
311 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200312 constructor Create( const ATransport : IStreamTransport ); overload;
313 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
314 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
315 property IsOpen: Boolean read GetIsOpen;
316 end;
317
318 TSocketImpl = class(TStreamTransportImpl)
319 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200320{$IFDEF OLD_SOCKETS}
321 FClient : TCustomIpClient;
322{$ELSE}
323 FClient: TSocket;
324{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200325 FOwnsClient : Boolean;
326 FHost : string;
327 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200328{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200329 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200330{$ELSE}
331 FTimeout : Longword;
332{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200333
334 procedure InitSocket;
335 protected
336 function GetIsOpen: Boolean; override;
337 public
338 procedure Open; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200339{$IFDEF OLD_SOCKETS}
340 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200341 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200342{$ELSE}
343 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
344 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
345{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200346 destructor Destroy; override;
347 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200348{$IFDEF OLD_SOCKETS}
349 property TcpClient: TCustomIpClient read FClient;
350{$ELSE}
351 property TcpClient: TSocket read FClient;
352{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200353 property Host : string read FHost;
354 property Port: Integer read FPort;
355 end;
356
357 TFramedTransportImpl = class( TTransportImpl)
358 private const
359 FHeaderSize : Integer = 4;
360 private class var
361 FHeader_Dummy : array of Byte;
362 protected
363 FTransport : ITransport;
364 FWriteBuffer : TMemoryStream;
365 FReadBuffer : TMemoryStream;
366
367 procedure InitWriteBuffer;
368 procedure ReadFrame;
369 public
370 type
371 TFactory = class( TTransportFactoryImpl )
372 public
373 function GetTransport( const ATrans: ITransport): ITransport; override;
374 end;
375
Jens Geyere0e32402016-04-20 21:50:48 +0200376 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200377 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200378 {$ENDIF}
Jens Geyere0e32402016-04-20 21:50:48 +0200379
Jens Geyerd5436f52014-10-03 19:50:38 +0200380 constructor Create; overload;
381 constructor Create( const ATrans: ITransport); overload;
382 destructor Destroy; override;
383
384 procedure Open(); override;
385 function GetIsOpen: Boolean; override;
386
387 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200388 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
389 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200390 procedure Flush; override;
391 end;
392
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200393{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200394procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200395{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200396
397const
398 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
399
400
401implementation
402
403{ TTransportImpl }
404
405procedure TTransportImpl.Flush;
406begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200407 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200408end;
409
410function TTransportImpl.Peek: Boolean;
411begin
412 Result := IsOpen;
413end;
414
Jens Geyer17c3ad92017-09-05 20:31:27 +0200415function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200416begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200417 if Length(buf) > 0
418 then result := Read( @buf[0], Length(buf), off, len)
419 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200420end;
421
422function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
423begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200424 if Length(buf) > 0
425 then result := ReadAll( @buf[0], Length(buf), off, len)
426 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200427end;
428
429procedure TTransportImpl.Write( const buf: TBytes);
430begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200431 if Length(buf) > 0
432 then Write( @buf[0], 0, Length(buf));
Jens Geyer17c3ad92017-09-05 20:31:27 +0200433end;
434
435procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
436begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200437 if Length(buf) > 0
438 then Write( @buf[0], off, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200439end;
440
441function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
442var ret : Integer;
443begin
444 result := 0;
445 while result < len do begin
446 ret := Read( pBuf, buflen, off + result, len - result);
447 if ret > 0
448 then Inc( result, ret)
449 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
450 end;
451end;
452
453procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
454begin
455 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200456end;
457
458{ THTTPClientImpl }
459
460procedure THTTPClientImpl.Close;
461begin
462 FInputStream := nil;
463 FOutputStream := nil;
464end;
465
466constructor THTTPClientImpl.Create(const AUri: string);
467begin
468 inherited Create;
469 FUri := AUri;
470 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
471 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
472end;
473
474function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
475var
476 pair : TPair<string,string>;
477begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200478 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200479 Result := CoXMLHTTP.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200480 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200481 Result := CoXMLHTTPRequest.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200482 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200483
484 Result.open('POST', FUri, False, '', '');
485 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
486 Result.setRequestHeader( 'Accept', 'application/x-thrift');
487 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
488
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200489 for pair in FCustomHeaders do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200490 Result.setRequestHeader( pair.Key, pair.Value );
491 end;
492end;
493
494destructor THTTPClientImpl.Destroy;
495begin
496 Close;
497 inherited;
498end;
499
500procedure THTTPClientImpl.Flush;
501begin
502 try
503 SendRequest;
504 finally
505 FOutputStream := nil;
506 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
507 end;
508end;
509
510function THTTPClientImpl.GetConnectionTimeout: Integer;
511begin
512 Result := FConnectionTimeout;
513end;
514
515function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
516begin
517 Result := FCustomHeaders;
518end;
519
520function THTTPClientImpl.GetIsOpen: Boolean;
521begin
522 Result := True;
523end;
524
525function THTTPClientImpl.GetReadTimeout: Integer;
526begin
527 Result := FReadTimeout;
528end;
529
530procedure THTTPClientImpl.Open;
531begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200532 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200533end;
534
Jens Geyer17c3ad92017-09-05 20:31:27 +0200535function THTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200536begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100537 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200538 raise TTransportExceptionNotOpen.Create('No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200539 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100540
Jens Geyerd5436f52014-10-03 19:50:38 +0200541 try
Jens Geyer17c3ad92017-09-05 20:31:27 +0200542 Result := FInputStream.Read( pBuf, buflen, off, len)
Jens Geyerd5436f52014-10-03 19:50:38 +0200543 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100544 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200545 do raise TTransportExceptionUnknown.Create(E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200546 end;
547end;
548
549procedure THTTPClientImpl.SendRequest;
550var
551 xmlhttp : IXMLHTTPRequest;
552 ms : TMemoryStream;
553 a : TBytes;
554 len : Integer;
555begin
556 xmlhttp := CreateRequest;
557
558 ms := TMemoryStream.Create;
559 try
560 a := FOutputStream.ToArray;
561 len := Length(a);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200562 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200563 ms.WriteBuffer( Pointer(@a[0])^, len);
564 end;
565 ms.Position := 0;
566 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
567 FInputStream := nil;
568 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
569 finally
570 ms.Free;
571 end;
572end;
573
574procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
575begin
576 FConnectionTimeout := Value;
577end;
578
579procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
580begin
581 FReadTimeout := Value
582end;
583
Jens Geyer17c3ad92017-09-05 20:31:27 +0200584procedure THTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200585begin
Jens Geyer17c3ad92017-09-05 20:31:27 +0200586 FOutputStream.Write( pBuf, off, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200587end;
588
589{ TTransportException }
590
Jens Geyere0e32402016-04-20 21:50:48 +0200591function TTransportException.GetType: TExceptionType;
592begin
593 if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
594 else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
595 else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
596 else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
597 else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
598 else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
599 else Result := TExceptionType.Unknown;
600end;
601
602constructor TTransportException.HiddenCreate(const Msg: string);
603begin
604 inherited Create(Msg);
605end;
606
607class function TTransportException.Create(AType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200608begin
609 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200610{$WARN SYMBOL_DEPRECATED OFF}
611 Result := Create(AType, '')
612{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200613end;
614
Jens Geyere0e32402016-04-20 21:50:48 +0200615class function TTransportException.Create(AType: TExceptionType;
616 const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200617begin
Jens Geyere0e32402016-04-20 21:50:48 +0200618 case AType of
619 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
620 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
621 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
622 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
623 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
624 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
625 else
626 Result := TTransportExceptionUnknown.Create(msg);
627 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200628end;
629
Jens Geyere0e32402016-04-20 21:50:48 +0200630class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200631begin
Jens Geyere0e32402016-04-20 21:50:48 +0200632 Result := TTransportExceptionUnknown.Create(Msg);
633end;
634
635{ TTransportExceptionSpecialized }
636
637constructor TTransportExceptionSpecialized.Create(const Msg: string);
638begin
639 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200640end;
641
642{ TTransportFactoryImpl }
643
644function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
645begin
646 Result := ATrans;
647end;
648
649{ TServerSocket }
650
Jens Geyer23d67462015-12-19 11:44:57 +0100651{$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200652constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200653begin
654 inherited Create;
655 FServer := AServer;
656 FClientTimeout := AClientTimeout;
657end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200658{$ELSE}
659constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
Jens Geyerd5436f52014-10-03 19:50:38 +0200660begin
661 inherited Create;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200662 FServer := AServer;
663 FServer.RecvTimeout := AClientTimeout;
664 FServer.SendTimeout := AClientTimeout;
665end;
666{$ENDIF}
667
668{$IFDEF OLD_SOCKETS}
669constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
670{$ELSE}
671constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
672{$ENDIF}
673begin
674 inherited Create;
675{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200676 FPort := APort;
677 FClientTimeout := AClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200678 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200679 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200680 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200681 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200682 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200683 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200684 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200685{$ELSE}
686 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
687{$ENDIF}
688 FUseBufferedSocket := AUseBufferedSockets;
689 FOwnsServer := True;
Jens Geyerd5436f52014-10-03 19:50:38 +0200690end;
691
692destructor TServerSocketImpl.Destroy;
693begin
694 if FOwnsServer then begin
695 FServer.Free;
696 FServer := nil;
697 end;
698 inherited;
699end;
700
701function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
702var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200703{$IFDEF OLD_SOCKETS}
704 client : TCustomIpClient;
705{$ELSE}
706 client: TSocket;
707{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200708 trans : IStreamTransport;
709begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100710 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200711 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200712 end;
713
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200714{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200715 client := nil;
716 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200717 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200718
719 if Assigned(fnAccepting)
720 then fnAccepting();
721
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100722 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200723 client.Free;
724 Result := nil;
725 Exit;
726 end;
727
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100728 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200729 Result := nil;
730 Exit;
731 end;
732
733 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
734 client := nil; // trans owns it now
735
736 if FUseBufferedSocket
737 then result := TBufferedTransportImpl.Create( trans)
738 else result := trans;
739
740 except
741 on E: Exception do begin
742 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200743 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200744 end;
745 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200746{$ELSE}
747 if Assigned(fnAccepting) then
748 fnAccepting();
749
750 client := FServer.Accept;
751 try
752 trans := TSocketImpl.Create(client, True);
753 client := nil;
754
755 if FUseBufferedSocket then
756 Result := TBufferedTransportImpl.Create(trans)
757 else
758 Result := trans;
759 except
760 client.Free;
761 raise;
762 end;
763{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200764end;
765
766procedure TServerSocketImpl.Listen;
767begin
768 if FServer <> nil then
769 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200770{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200771 try
772 FServer.Active := True;
773 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200774 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200775 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200776 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200777{$ELSE}
778 FServer.Listen;
779{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200780 end;
781end;
782
783procedure TServerSocketImpl.Close;
784begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200785 if FServer <> nil then
786{$IFDEF OLD_SOCKETS}
787 try
788 FServer.Active := False;
789 except
790 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200791 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200792 end;
793{$ELSE}
794 FServer.Close;
795{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200796end;
797
798{ TSocket }
799
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200800{$IFDEF OLD_SOCKETS}
801constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200802var stream : IThriftStream;
803begin
804 FClient := AClient;
805 FTimeout := ATimeout;
806 FOwnsClient := aOwnsClient;
807 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
808 inherited Create( stream, stream);
809end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200810{$ELSE}
811constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
812var stream : IThriftStream;
813begin
814 FClient := AClient;
815 FTimeout := AClient.RecvTimeout;
816 FOwnsClient := aOwnsClient;
817 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
818 inherited Create(stream, stream);
819end;
820{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200821
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200822{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200823constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200824{$ELSE}
825constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
826{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200827begin
828 inherited Create(nil,nil);
829 FHost := AHost;
830 FPort := APort;
831 FTimeout := ATimeout;
832 InitSocket;
833end;
834
835destructor TSocketImpl.Destroy;
836begin
837 if FOwnsClient
838 then FreeAndNil( FClient);
839 inherited;
840end;
841
842procedure TSocketImpl.Close;
843begin
844 inherited Close;
845 if FOwnsClient
846 then FreeAndNil( FClient);
847end;
848
849function TSocketImpl.GetIsOpen: Boolean;
850begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200851{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200852 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200853{$ELSE}
854 Result := (FClient <> nil) and FClient.IsOpen
855{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200856end;
857
858procedure TSocketImpl.InitSocket;
859var
860 stream : IThriftStream;
861begin
862 if FOwnsClient
863 then FreeAndNil( FClient)
864 else FClient := nil;
865
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200866{$IFDEF OLD_SOCKETS}
867 FClient := TTcpClient.Create( nil);
868{$ELSE}
869 FClient := TSocket.Create(FHost, FPort);
870{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200871 FOwnsClient := True;
872
873 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
874 FInputStream := stream;
875 FOutputStream := stream;
876end;
877
878procedure TSocketImpl.Open;
879begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100880 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200881 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200882 end;
883
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100884 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200885 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200886 end;
887
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100888 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200889 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200890 end;
891
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100892 if FClient = nil
893 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200894
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200895{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200896 FClient.RemoteHost := TSocketHost( Host);
897 FClient.RemotePort := TSocketPort( IntToStr( Port));
898 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200899{$ELSE}
900 FClient.Open;
901{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200902
903 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
904 FOutputStream := FInputStream;
905end;
906
907{ TBufferedStream }
908
909procedure TBufferedStreamImpl.Close;
910begin
911 Flush;
912 FStream := nil;
913
914 FReadBuffer.Free;
915 FReadBuffer := nil;
916
917 FWriteBuffer.Free;
918 FWriteBuffer := nil;
919end;
920
921constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
922begin
923 inherited Create;
924 FStream := AStream;
925 FBufSize := ABufSize;
926 FReadBuffer := TMemoryStream.Create;
927 FWriteBuffer := TMemoryStream.Create;
928end;
929
930destructor TBufferedStreamImpl.Destroy;
931begin
932 Close;
933 inherited;
934end;
935
936procedure TBufferedStreamImpl.Flush;
937var
938 buf : TBytes;
939 len : Integer;
940begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200941 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200942 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200943 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200944 SetLength( buf, len );
945 FWriteBuffer.Position := 0;
946 FWriteBuffer.Read( Pointer(@buf[0])^, len );
947 FStream.Write( buf, 0, len );
948 end;
949 FWriteBuffer.Clear;
950 end;
951end;
952
953function TBufferedStreamImpl.IsOpen: Boolean;
954begin
955 Result := (FWriteBuffer <> nil)
956 and (FReadBuffer <> nil)
957 and (FStream <> nil);
958end;
959
960procedure TBufferedStreamImpl.Open;
961begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200962 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200963end;
964
Jens Geyer17c3ad92017-09-05 20:31:27 +0200965function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200966var
967 nRead : Integer;
968 tempbuf : TBytes;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100969 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +0200970begin
971 inherited;
972 Result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100973
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200974 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200975 while count > 0 do begin
976
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200977 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200978 FReadBuffer.Clear;
979 SetLength( tempbuf, FBufSize);
980 nRead := FStream.Read( tempbuf, 0, FBufSize );
981 if nRead = 0 then Break; // avoid infinite loop
982
983 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
984 FReadBuffer.Position := 0;
985 end;
986
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200987 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +0100988 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
989 pTmp := pBuf;
990 Inc( pTmp, offset);
991 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +0200992 Dec( count, nRead);
993 Inc( offset, nRead);
994 end;
995 end;
996 end;
997end;
998
999function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001000var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001001begin
1002 len := 0;
1003
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001004 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001005 len := FReadBuffer.Size;
1006 end;
1007
1008 SetLength( Result, len);
1009
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001010 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001011 FReadBuffer.Position := 0;
1012 FReadBuffer.Read( Pointer(@Result[0])^, len );
1013 end;
1014end;
1015
Jens Geyer17c3ad92017-09-05 20:31:27 +02001016procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001017var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001018begin
1019 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001020 if count > 0 then begin
1021 if IsOpen then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001022 pTmp := pBuf;
1023 Inc( pTmp, offset);
1024 FWriteBuffer.Write( pTmp^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001025 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001026 Flush;
1027 end;
1028 end;
1029 end;
1030end;
1031
1032{ TStreamTransportImpl }
1033
1034procedure TStreamTransportImpl.Close;
1035begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001036 FInputStream := nil;
1037 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001038end;
1039
1040constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
1041begin
1042 inherited Create;
1043 FInputStream := AInputStream;
1044 FOutputStream := AOutputStream;
1045end;
1046
1047destructor TStreamTransportImpl.Destroy;
1048begin
1049 FInputStream := nil;
1050 FOutputStream := nil;
1051 inherited;
1052end;
1053
1054procedure TStreamTransportImpl.Flush;
1055begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001056 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001057 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001058 end;
1059
1060 FOutputStream.Flush;
1061end;
1062
1063function TStreamTransportImpl.GetInputStream: IThriftStream;
1064begin
1065 Result := FInputStream;
1066end;
1067
1068function TStreamTransportImpl.GetIsOpen: Boolean;
1069begin
1070 Result := True;
1071end;
1072
1073function TStreamTransportImpl.GetOutputStream: IThriftStream;
1074begin
1075 Result := FInputStream;
1076end;
1077
1078procedure TStreamTransportImpl.Open;
1079begin
1080
1081end;
1082
Jens Geyer17c3ad92017-09-05 20:31:27 +02001083function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001084begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001085 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001086 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001087 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001088
Jens Geyer17c3ad92017-09-05 20:31:27 +02001089 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001090end;
1091
Jens Geyer17c3ad92017-09-05 20:31:27 +02001092procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001093begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001094 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001095 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001096 end;
1097
Jens Geyer17c3ad92017-09-05 20:31:27 +02001098 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001099end;
1100
1101{ TBufferedTransportImpl }
1102
1103constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
1104begin
1105 //no inherited;
1106 Create( ATransport, 1024 );
1107end;
1108
1109procedure TBufferedTransportImpl.Close;
1110begin
1111 FTransport.Close;
1112end;
1113
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001114constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001115begin
1116 inherited Create;
1117 FTransport := ATransport;
1118 FBufSize := ABufSize;
1119 InitBuffers;
1120end;
1121
1122procedure TBufferedTransportImpl.Flush;
1123begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001124 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001125 FOutputBuffer.Flush;
1126 end;
1127end;
1128
1129function TBufferedTransportImpl.GetIsOpen: Boolean;
1130begin
1131 Result := FTransport.IsOpen;
1132end;
1133
1134function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1135begin
1136 Result := FTransport;
1137end;
1138
1139procedure TBufferedTransportImpl.InitBuffers;
1140begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001141 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001142 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1143 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001144 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001145 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1146 end;
1147end;
1148
1149procedure TBufferedTransportImpl.Open;
1150begin
1151 FTransport.Open
1152end;
1153
Jens Geyer17c3ad92017-09-05 20:31:27 +02001154function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001155begin
1156 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001157 if FInputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001158 Result := FInputBuffer.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001159 end;
1160end;
1161
Jens Geyer17c3ad92017-09-05 20:31:27 +02001162procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001163begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001164 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001165 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001166 end;
1167end;
1168
1169{ TFramedTransportImpl }
1170
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001171{$IFDEF HAVE_CLASS_CTOR}
1172class constructor TFramedTransportImpl.Create;
1173begin
1174 SetLength( FHeader_Dummy, FHeaderSize);
1175 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1176end;
1177{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001178procedure TFramedTransportImpl_Initialize;
1179begin
1180 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1181 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1182 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1183end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001184{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001185
1186constructor TFramedTransportImpl.Create;
1187begin
1188 inherited Create;
1189 InitWriteBuffer;
1190end;
1191
1192procedure TFramedTransportImpl.Close;
1193begin
1194 FTransport.Close;
1195end;
1196
1197constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1198begin
1199 inherited Create;
1200 InitWriteBuffer;
1201 FTransport := ATrans;
1202end;
1203
1204destructor TFramedTransportImpl.Destroy;
1205begin
1206 FWriteBuffer.Free;
1207 FReadBuffer.Free;
1208 inherited;
1209end;
1210
1211procedure TFramedTransportImpl.Flush;
1212var
1213 buf : TBytes;
1214 len : Integer;
1215 data_len : Integer;
1216
1217begin
1218 len := FWriteBuffer.Size;
1219 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001220 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001221 System.Move( FWriteBuffer.Memory^, buf[0], len );
1222 end;
1223
1224 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001225 if (data_len < 0) then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001226 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001227 end;
1228
1229 InitWriteBuffer;
1230
1231 buf[0] := Byte($FF and (data_len shr 24));
1232 buf[1] := Byte($FF and (data_len shr 16));
1233 buf[2] := Byte($FF and (data_len shr 8));
1234 buf[3] := Byte($FF and data_len);
1235
1236 FTransport.Write( buf, 0, len );
1237 FTransport.Flush;
1238end;
1239
1240function TFramedTransportImpl.GetIsOpen: Boolean;
1241begin
1242 Result := FTransport.IsOpen;
1243end;
1244
1245type
1246 TAccessMemoryStream = class(TMemoryStream)
1247 end;
1248
1249procedure TFramedTransportImpl.InitWriteBuffer;
1250begin
1251 FWriteBuffer.Free;
1252 FWriteBuffer := TMemoryStream.Create;
1253 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1254 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1255end;
1256
1257procedure TFramedTransportImpl.Open;
1258begin
1259 FTransport.Open;
1260end;
1261
Jens Geyer17c3ad92017-09-05 20:31:27 +02001262function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001263var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001264begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001265 if len > (buflen-off)
1266 then len := buflen-off;
1267
Jens Geyer5089b0a2018-02-01 22:37:18 +01001268 pTmp := pBuf;
1269 Inc( pTmp, off);
1270
Jens Geyer17c3ad92017-09-05 20:31:27 +02001271 if (FReadBuffer <> nil) and (len > 0) then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001272 result := FReadBuffer.Read( pTmp^, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +02001273 if result > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001274 Exit;
1275 end;
1276 end;
1277
1278 ReadFrame;
1279 if len > 0
Jens Geyer5089b0a2018-02-01 22:37:18 +01001280 then Result := FReadBuffer.Read( pTmp^, len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001281 else Result := 0;
1282end;
1283
1284procedure TFramedTransportImpl.ReadFrame;
1285var
1286 i32rd : TBytes;
1287 size : Integer;
1288 buff : TBytes;
1289begin
1290 SetLength( i32rd, FHeaderSize );
1291 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1292 size :=
1293 ((i32rd[0] and $FF) shl 24) or
1294 ((i32rd[1] and $FF) shl 16) or
1295 ((i32rd[2] and $FF) shl 8) or
1296 (i32rd[3] and $FF);
1297 SetLength( buff, size );
1298 FTransport.ReadAll( buff, 0, size );
1299 FReadBuffer.Free;
1300 FReadBuffer := TMemoryStream.Create;
Jens Geyera76e6c72017-09-08 21:03:30 +02001301 if Length(buff) > 0
1302 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001303 FReadBuffer.Position := 0;
1304end;
1305
Jens Geyer17c3ad92017-09-05 20:31:27 +02001306procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001307var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001308begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001309 if len > 0 then begin
1310 pTmp := pBuf;
1311 Inc( pTmp, off);
1312
1313 FWriteBuffer.Write( pTmp^, len );
1314 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001315end;
1316
1317{ TFramedTransport.TFactory }
1318
1319function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1320begin
1321 Result := TFramedTransportImpl.Create( ATrans );
1322end;
1323
1324{ TTcpSocketStreamImpl }
1325
1326procedure TTcpSocketStreamImpl.Close;
1327begin
1328 FTcpClient.Close;
1329end;
1330
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001331{$IFDEF OLD_SOCKETS}
1332constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001333begin
1334 inherited Create;
1335 FTcpClient := ATcpClient;
1336 FTimeout := aTimeout;
1337end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001338{$ELSE}
1339constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1340begin
1341 inherited Create;
1342 FTcpClient := ATcpClient;
1343 if aTimeout = 0 then
1344 FTcpClient.RecvTimeout := SLEEP_TIME
1345 else
1346 FTcpClient.RecvTimeout := aTimeout;
1347 FTcpClient.SendTimeout := aTimeout;
1348end;
1349{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001350
1351procedure TTcpSocketStreamImpl.Flush;
1352begin
1353
1354end;
1355
1356function TTcpSocketStreamImpl.IsOpen: Boolean;
1357begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001358{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001359 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001360{$ELSE}
1361 Result := FTcpClient.IsOpen;
1362{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001363end;
1364
1365procedure TTcpSocketStreamImpl.Open;
1366begin
1367 FTcpClient.Open;
1368end;
1369
1370
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001371{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001372function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1373 TimeOut: Integer; var wsaError : Integer): Integer;
1374var
1375 ReadFds: TFDset;
1376 ReadFdsptr: PFDset;
1377 WriteFds: TFDset;
1378 WriteFdsptr: PFDset;
1379 ExceptFds: TFDset;
1380 ExceptFdsptr: PFDset;
1381 tv: timeval;
1382 Timeptr: PTimeval;
1383 socket : TSocket;
1384begin
1385 if not FTcpClient.Active then begin
1386 wsaError := WSAEINVAL;
1387 Exit( SOCKET_ERROR);
1388 end;
1389
1390 socket := FTcpClient.Handle;
1391
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001392 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001393 ReadFdsptr := @ReadFds;
1394 FD_ZERO(ReadFds);
1395 FD_SET(socket, ReadFds);
1396 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001397 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001398 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001399 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001400
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001401 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001402 WriteFdsptr := @WriteFds;
1403 FD_ZERO(WriteFds);
1404 FD_SET(socket, WriteFds);
1405 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001406 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001407 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001408 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001409
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001410 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001411 ExceptFdsptr := @ExceptFds;
1412 FD_ZERO(ExceptFds);
1413 FD_SET(socket, ExceptFds);
1414 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001415 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001416 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001417 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001418
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001419 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001420 tv.tv_sec := TimeOut div 1000;
1421 tv.tv_usec := 1000 * (TimeOut mod 1000);
1422 Timeptr := @tv;
1423 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001424 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001425 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001426 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001427
1428 wsaError := 0;
1429 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001430 {$IFDEF MSWINDOWS}
1431 {$IFDEF OLD_UNIT_NAMES}
1432 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1433 {$ELSE}
1434 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1435 {$ENDIF}
1436 {$ENDIF}
1437 {$IFDEF LINUX}
1438 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1439 {$ENDIF}
1440
Jens Geyerd5436f52014-10-03 19:50:38 +02001441 if result = SOCKET_ERROR
1442 then wsaError := WSAGetLastError;
1443
1444 except
1445 result := SOCKET_ERROR;
1446 end;
1447
1448 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001449 ReadReady^ := FD_ISSET(socket, ReadFds);
1450
Jens Geyerd5436f52014-10-03 19:50:38 +02001451 if Assigned(WriteReady) then
1452 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001453
Jens Geyerd5436f52014-10-03 19:50:38 +02001454 if Assigned(ExceptFlag) then
1455 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1456end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001457{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001458
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001459{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001460function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1461 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001462 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001463var bCanRead, bError : Boolean;
1464 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001465const
1466 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001467begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001468 bytesReady := 0;
1469
Jens Geyerd5436f52014-10-03 19:50:38 +02001470 // The select function returns the total number of socket handles that are ready
1471 // and contained in the fd_set structures, zero if the time limit expired,
1472 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1473 // WSAGetLastError can be used to retrieve a specific error code.
1474 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1475 if retval = SOCKET_ERROR
1476 then Exit( TWaitForData.wfd_Error);
1477 if (retval = 0) or not bCanRead
1478 then Exit( TWaitForData.wfd_Timeout);
1479
1480 // recv() returns the number of bytes received, or -1 if an error occurred.
1481 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001482
1483 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001484 if retval <= 0
1485 then Exit( TWaitForData.wfd_Error);
1486
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001487 // at least we have some data
1488 bytesReady := Min( retval, DesiredBytes);
1489 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001490end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001491{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001492
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001493{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001494function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001495// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001496var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001497 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001498 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001499 nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001500 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001501begin
1502 inherited;
1503
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001504 if FTimeout > 0
1505 then msecs := FTimeout
1506 else msecs := DEFAULT_THRIFT_TIMEOUT;
1507
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001508 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001509 pTmp := pBuf;
1510 Inc( pTmp, offset);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001511 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001512
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001513 while TRUE do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001514 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001515 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001516 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001517 TWaitForData.wfd_HaveData : Break;
1518 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001519 if (FTimeout = 0)
1520 then Exit
1521 else begin
Jens Geyere0e32402016-04-20 21:50:48 +02001522 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001523
1524 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001525 end;
1526 else
1527 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001528 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001529 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001530
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001531 // reduce the timeout once we got data
1532 if FTimeout > 0
1533 then msecs := FTimeout div 10
1534 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1535 msecs := Max( msecs, 200);
1536
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001537 ASSERT( nBytes <= count);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001538 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1539 Inc( pTmp, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001540 Dec( count, nBytes);
1541 Inc( result, nBytes);
1542 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001543end;
1544
1545function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001546// old sockets version
1547var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001548begin
1549 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001550 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001551 len := FTcpClient.BytesReceived;
1552 end;
1553
1554 SetLength( Result, len );
1555
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001556 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001557 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1558 end;
1559end;
1560
Jens Geyer17c3ad92017-09-05 20:31:27 +02001561procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001562// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001563var bCanWrite, bError : Boolean;
1564 retval, wsaError : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001565 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001566begin
1567 inherited;
1568
1569 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001570 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001571
1572 // The select function returns the total number of socket handles that are ready
1573 // and contained in the fd_set structures, zero if the time limit expired,
1574 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1575 // WSAGetLastError can be used to retrieve a specific error code.
1576 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1577 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001578 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001579
Jens Geyerd5436f52014-10-03 19:50:38 +02001580 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001581 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001582
Jens Geyerd5436f52014-10-03 19:50:38 +02001583 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001584 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001585
Jens Geyer5089b0a2018-02-01 22:37:18 +01001586 pTmp := pBuf;
1587 Inc( pTmp, offset);
1588 FTcpClient.SendBuf( pTmp^, count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001589end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001590
1591{$ELSE}
1592
Jens Geyer17c3ad92017-09-05 20:31:27 +02001593function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001594// new sockets version
1595var nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001596 pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001597begin
1598 inherited;
1599
1600 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001601 pTmp := pBuf;
1602 Inc( pTmp, offset);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001603 while count > 0 do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001604 nBytes := FTcpClient.Read( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001605 if nBytes = 0 then Exit;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001606 Inc( pTmp, nBytes);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001607 Dec( count, nBytes);
1608 Inc( result, nBytes);
1609 end;
1610end;
1611
1612function TTcpSocketStreamImpl.ToArray: TBytes;
1613// new sockets version
1614var len : Integer;
1615begin
1616 len := 0;
1617 try
1618 if FTcpClient.Peek then
1619 repeat
1620 SetLength(Result, Length(Result) + 1024);
1621 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1622 until len < 1024;
1623 except
1624 on TTransportException do begin { don't allow default exceptions } end;
1625 else raise;
1626 end;
1627 if len > 0 then
1628 SetLength(Result, Length(Result) - 1024 + len);
1629end;
1630
Jens Geyer17c3ad92017-09-05 20:31:27 +02001631procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001632// new sockets version
Jens Geyer5089b0a2018-02-01 22:37:18 +01001633var pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001634begin
1635 inherited;
1636
1637 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001638 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001639
Jens Geyer5089b0a2018-02-01 22:37:18 +01001640 pTmp := pBuf;
1641 Inc( pTmp, offset);
1642 FTcpClient.Write( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001643end;
1644
Jens Geyer23d67462015-12-19 11:44:57 +01001645{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001646
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001647
Jens Geyerd5436f52014-10-03 19:50:38 +02001648{$IF CompilerVersion < 21.0}
1649initialization
1650begin
1651 TFramedTransportImpl_Initialize;
1652end;
1653{$IFEND}
1654
1655
1656end.