blob: 0774d1b09fa1704e85bceda39da9d86452a19cd6 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX, msxml, WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer9f7f11e2016-04-14 21:37:11 +020034 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
42 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030043 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020044
45type
46 ITransport = interface
Jens Geyer17c3ad92017-09-05 20:31:27 +020047 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
Jens Geyerd5436f52014-10-03 19:50:38 +020048 function GetIsOpen: Boolean;
49 property IsOpen: Boolean read GetIsOpen;
50 function Peek: Boolean;
51 procedure Open;
52 procedure Close;
Jens Geyer17c3ad92017-09-05 20:31:27 +020053 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
54 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
55 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
56 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020057 procedure Write( const buf: TBytes); overload;
58 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020059 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
60 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020061 procedure Flush;
62 end;
63
64 TTransportImpl = class( TInterfacedObject, ITransport)
65 protected
66 function GetIsOpen: Boolean; virtual; abstract;
67 property IsOpen: Boolean read GetIsOpen;
68 function Peek: Boolean; virtual;
69 procedure Open(); virtual; abstract;
70 procedure Close(); virtual; abstract;
Jens Geyer17c3ad92017-09-05 20:31:27 +020071 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
72 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
73 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
74 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
75 procedure Write( const buf: TBytes); overload; inline;
76 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
77 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
78 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020079 procedure Flush; virtual;
80 end;
81
82 TTransportException = class( Exception )
83 public
84 type
85 TExceptionType = (
86 Unknown,
87 NotOpen,
88 AlreadyOpen,
89 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +020090 EndOfFile,
91 BadArgs,
92 Interrupted
Jens Geyerd5436f52014-10-03 19:50:38 +020093 );
94 private
Jens Geyere0e32402016-04-20 21:50:48 +020095 function GetType: TExceptionType;
96 protected
97 constructor HiddenCreate(const Msg: string);
Jens Geyerd5436f52014-10-03 19:50:38 +020098 public
Jens Geyere0e32402016-04-20 21:50:48 +020099 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
100 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
101 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200103 end;
104
Jens Geyere0e32402016-04-20 21:50:48 +0200105 // Needed to remove deprecation warning
106 TTransportExceptionSpecialized = class abstract (TTransportException)
107 public
108 constructor Create(const Msg: string);
109 end;
110
111 TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
112 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
113 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
114 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
115 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
116 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
117 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
118
Jens Geyerd5436f52014-10-03 19:50:38 +0200119 IHTTPClient = interface( ITransport )
120 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
121 procedure SetConnectionTimeout(const Value: Integer);
122 function GetConnectionTimeout: Integer;
123 procedure SetReadTimeout(const Value: Integer);
124 function GetReadTimeout: Integer;
125 function GetCustomHeaders: IThriftDictionary<string,string>;
126 procedure SendRequest;
127 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
128 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
129 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
130 end;
131
132 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
133 private
134 FUri : string;
135 FInputStream : IThriftStream;
136 FOutputStream : IThriftStream;
137 FConnectionTimeout : Integer;
138 FReadTimeout : Integer;
139 FCustomHeaders : IThriftDictionary<string,string>;
140
141 function CreateRequest: IXMLHTTPRequest;
142 protected
143 function GetIsOpen: Boolean; override;
144 procedure Open(); override;
145 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200146 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
147 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200148 procedure Flush; override;
149
150 procedure SetConnectionTimeout(const Value: Integer);
151 function GetConnectionTimeout: Integer;
152 procedure SetReadTimeout(const Value: Integer);
153 function GetReadTimeout: Integer;
154 function GetCustomHeaders: IThriftDictionary<string,string>;
155 procedure SendRequest;
156 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
157 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
158 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
159 public
160 constructor Create( const AUri: string);
161 destructor Destroy; override;
162 end;
163
164 IServerTransport = interface
165 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
166 procedure Listen;
167 procedure Close;
168 function Accept( const fnAccepting: TProc): ITransport;
169 end;
170
171 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
172 protected
173 procedure Listen; virtual; abstract;
174 procedure Close; virtual; abstract;
175 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
176 end;
177
178 ITransportFactory = interface
179 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
180 function GetTransport( const ATrans: ITransport): ITransport;
181 end;
182
183 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
184 function GetTransport( const ATrans: ITransport): ITransport; virtual;
185 end;
186
187 TTcpSocketStreamImpl = class( TThriftStreamImpl )
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200188{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200189 private type
190 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
191 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200192 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200193 FTimeout : Integer;
194 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
195 TimeOut: Integer; var wsaError : Integer): Integer;
196 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200197 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200198{$ELSE}
199 FTcpClient: TSocket;
200 protected const
201 SLEEP_TIME = 200;
202{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200203 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200204 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
205 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200206 procedure Open; override;
207 procedure Close; override;
208 procedure Flush; override;
209
210 function IsOpen: Boolean; override;
211 function ToArray: TBytes; override;
212 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200213{$IFDEF OLD_SOCKETS}
214 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
215{$ELSE}
216 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
217{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200218 end;
219
220 IStreamTransport = interface( ITransport )
221 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
222 function GetInputStream: IThriftStream;
223 function GetOutputStream: IThriftStream;
224 property InputStream : IThriftStream read GetInputStream;
225 property OutputStream : IThriftStream read GetOutputStream;
226 end;
227
228 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
229 protected
230 FInputStream : IThriftStream;
231 FOutputStream : IThriftStream;
232 protected
233 function GetIsOpen: Boolean; override;
234
235 function GetInputStream: IThriftStream;
236 function GetOutputStream: IThriftStream;
237 public
238 property InputStream : IThriftStream read GetInputStream;
239 property OutputStream : IThriftStream read GetOutputStream;
240
241 procedure Open; override;
242 procedure Close; override;
243 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200244 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
245 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200246 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
247 destructor Destroy; override;
248 end;
249
250 TBufferedStreamImpl = class( TThriftStreamImpl)
251 private
252 FStream : IThriftStream;
253 FBufSize : Integer;
254 FReadBuffer : TMemoryStream;
255 FWriteBuffer : TMemoryStream;
256 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200257 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
258 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200259 procedure Open; override;
260 procedure Close; override;
261 procedure Flush; override;
262 function IsOpen: Boolean; override;
263 function ToArray: TBytes; override;
264 public
265 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
266 destructor Destroy; override;
267 end;
268
269 TServerSocketImpl = class( TServerTransportImpl)
270 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200271{$IFDEF OLD_SOCKETS}
272 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200273 FPort : Integer;
274 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200275{$ELSE}
276 FServer: TServerSocket;
277{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200278 FUseBufferedSocket : Boolean;
279 FOwnsServer : Boolean;
280 protected
281 function Accept( const fnAccepting: TProc) : ITransport; override;
282 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200283{$IFDEF OLD_SOCKETS}
284 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200285 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200286{$ELSE}
287 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
288 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
289{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200290 destructor Destroy; override;
291 procedure Listen; override;
292 procedure Close; override;
293 end;
294
295 TBufferedTransportImpl = class( TTransportImpl )
296 private
297 FInputBuffer : IThriftStream;
298 FOutputBuffer : IThriftStream;
299 FTransport : IStreamTransport;
300 FBufSize : Integer;
301
302 procedure InitBuffers;
303 function GetUnderlyingTransport: ITransport;
304 protected
305 function GetIsOpen: Boolean; override;
306 procedure Flush; override;
307 public
308 procedure Open(); override;
309 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200310 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
311 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200312 constructor Create( const ATransport : IStreamTransport ); overload;
313 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
314 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
315 property IsOpen: Boolean read GetIsOpen;
316 end;
317
318 TSocketImpl = class(TStreamTransportImpl)
319 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200320{$IFDEF OLD_SOCKETS}
321 FClient : TCustomIpClient;
322{$ELSE}
323 FClient: TSocket;
324{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200325 FOwnsClient : Boolean;
326 FHost : string;
327 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200328{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200329 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200330{$ELSE}
331 FTimeout : Longword;
332{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200333
334 procedure InitSocket;
335 protected
336 function GetIsOpen: Boolean; override;
337 public
338 procedure Open; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200339{$IFDEF OLD_SOCKETS}
340 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200341 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200342{$ELSE}
343 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
344 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
345{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200346 destructor Destroy; override;
347 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200348{$IFDEF OLD_SOCKETS}
349 property TcpClient: TCustomIpClient read FClient;
350{$ELSE}
351 property TcpClient: TSocket read FClient;
352{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200353 property Host : string read FHost;
354 property Port: Integer read FPort;
355 end;
356
357 TFramedTransportImpl = class( TTransportImpl)
358 private const
359 FHeaderSize : Integer = 4;
360 private class var
361 FHeader_Dummy : array of Byte;
362 protected
363 FTransport : ITransport;
364 FWriteBuffer : TMemoryStream;
365 FReadBuffer : TMemoryStream;
366
367 procedure InitWriteBuffer;
368 procedure ReadFrame;
369 public
370 type
371 TFactory = class( TTransportFactoryImpl )
372 public
373 function GetTransport( const ATrans: ITransport): ITransport; override;
374 end;
375
Jens Geyere0e32402016-04-20 21:50:48 +0200376 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200377 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200378 {$ENDIF}
Jens Geyere0e32402016-04-20 21:50:48 +0200379
Jens Geyerd5436f52014-10-03 19:50:38 +0200380 constructor Create; overload;
381 constructor Create( const ATrans: ITransport); overload;
382 destructor Destroy; override;
383
384 procedure Open(); override;
385 function GetIsOpen: Boolean; override;
386
387 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200388 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
389 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200390 procedure Flush; override;
391 end;
392
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200393{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200394procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200395{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200396
397const
398 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
399
400
401implementation
402
403{ TTransportImpl }
404
405procedure TTransportImpl.Flush;
406begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200407 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200408end;
409
410function TTransportImpl.Peek: Boolean;
411begin
412 Result := IsOpen;
413end;
414
Jens Geyer17c3ad92017-09-05 20:31:27 +0200415function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200416begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200417 if Length(buf) > 0
418 then result := Read( @buf[0], Length(buf), off, len)
419 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200420end;
421
422function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
423begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200424 if Length(buf) > 0
425 then result := ReadAll( @buf[0], Length(buf), off, len)
426 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200427end;
428
429procedure TTransportImpl.Write( const buf: TBytes);
430begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200431 if Length(buf) > 0
432 then Write( @buf[0], 0, Length(buf));
Jens Geyer17c3ad92017-09-05 20:31:27 +0200433end;
434
435procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
436begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200437 if Length(buf) > 0
438 then Write( @buf[0], off, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200439end;
440
441function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
442var ret : Integer;
443begin
444 result := 0;
445 while result < len do begin
446 ret := Read( pBuf, buflen, off + result, len - result);
447 if ret > 0
448 then Inc( result, ret)
449 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
450 end;
451end;
452
453procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
454begin
455 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200456end;
457
458{ THTTPClientImpl }
459
460procedure THTTPClientImpl.Close;
461begin
462 FInputStream := nil;
463 FOutputStream := nil;
464end;
465
466constructor THTTPClientImpl.Create(const AUri: string);
467begin
468 inherited Create;
469 FUri := AUri;
470 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
471 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
472end;
473
474function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
475var
476 pair : TPair<string,string>;
477begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200478 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200479 Result := CoXMLHTTP.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200480 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200481 Result := CoXMLHTTPRequest.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200482 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200483
484 Result.open('POST', FUri, False, '', '');
485 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
486 Result.setRequestHeader( 'Accept', 'application/x-thrift');
487 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
488
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200489 for pair in FCustomHeaders do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200490 Result.setRequestHeader( pair.Key, pair.Value );
491 end;
492end;
493
494destructor THTTPClientImpl.Destroy;
495begin
496 Close;
497 inherited;
498end;
499
500procedure THTTPClientImpl.Flush;
501begin
502 try
503 SendRequest;
504 finally
505 FOutputStream := nil;
506 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
507 end;
508end;
509
510function THTTPClientImpl.GetConnectionTimeout: Integer;
511begin
512 Result := FConnectionTimeout;
513end;
514
515function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
516begin
517 Result := FCustomHeaders;
518end;
519
520function THTTPClientImpl.GetIsOpen: Boolean;
521begin
522 Result := True;
523end;
524
525function THTTPClientImpl.GetReadTimeout: Integer;
526begin
527 Result := FReadTimeout;
528end;
529
530procedure THTTPClientImpl.Open;
531begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200532 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200533end;
534
Jens Geyer17c3ad92017-09-05 20:31:27 +0200535function THTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200536begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100537 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200538 raise TTransportExceptionNotOpen.Create('No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200539 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100540
Jens Geyerd5436f52014-10-03 19:50:38 +0200541 try
Jens Geyer17c3ad92017-09-05 20:31:27 +0200542 Result := FInputStream.Read( pBuf, buflen, off, len)
Jens Geyerd5436f52014-10-03 19:50:38 +0200543 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100544 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200545 do raise TTransportExceptionUnknown.Create(E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200546 end;
547end;
548
549procedure THTTPClientImpl.SendRequest;
550var
551 xmlhttp : IXMLHTTPRequest;
552 ms : TMemoryStream;
553 a : TBytes;
554 len : Integer;
555begin
556 xmlhttp := CreateRequest;
557
558 ms := TMemoryStream.Create;
559 try
560 a := FOutputStream.ToArray;
561 len := Length(a);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200562 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200563 ms.WriteBuffer( Pointer(@a[0])^, len);
564 end;
565 ms.Position := 0;
566 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
567 FInputStream := nil;
568 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
569 finally
570 ms.Free;
571 end;
572end;
573
574procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
575begin
576 FConnectionTimeout := Value;
577end;
578
579procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
580begin
581 FReadTimeout := Value
582end;
583
Jens Geyer17c3ad92017-09-05 20:31:27 +0200584procedure THTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200585begin
Jens Geyer17c3ad92017-09-05 20:31:27 +0200586 FOutputStream.Write( pBuf, off, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200587end;
588
589{ TTransportException }
590
Jens Geyere0e32402016-04-20 21:50:48 +0200591function TTransportException.GetType: TExceptionType;
592begin
593 if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
594 else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
595 else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
596 else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
597 else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
598 else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
599 else Result := TExceptionType.Unknown;
600end;
601
602constructor TTransportException.HiddenCreate(const Msg: string);
603begin
604 inherited Create(Msg);
605end;
606
607class function TTransportException.Create(AType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200608begin
609 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200610{$WARN SYMBOL_DEPRECATED OFF}
611 Result := Create(AType, '')
612{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200613end;
614
Jens Geyere0e32402016-04-20 21:50:48 +0200615class function TTransportException.Create(AType: TExceptionType;
616 const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200617begin
Jens Geyere0e32402016-04-20 21:50:48 +0200618 case AType of
619 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
620 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
621 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
622 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
623 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
624 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
625 else
626 Result := TTransportExceptionUnknown.Create(msg);
627 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200628end;
629
Jens Geyere0e32402016-04-20 21:50:48 +0200630class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200631begin
Jens Geyere0e32402016-04-20 21:50:48 +0200632 Result := TTransportExceptionUnknown.Create(Msg);
633end;
634
635{ TTransportExceptionSpecialized }
636
637constructor TTransportExceptionSpecialized.Create(const Msg: string);
638begin
639 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200640end;
641
642{ TTransportFactoryImpl }
643
644function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
645begin
646 Result := ATrans;
647end;
648
649{ TServerSocket }
650
Jens Geyer23d67462015-12-19 11:44:57 +0100651{$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200652constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200653begin
654 inherited Create;
655 FServer := AServer;
656 FClientTimeout := AClientTimeout;
657end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200658{$ELSE}
659constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
Jens Geyerd5436f52014-10-03 19:50:38 +0200660begin
661 inherited Create;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200662 FServer := AServer;
663 FServer.RecvTimeout := AClientTimeout;
664 FServer.SendTimeout := AClientTimeout;
665end;
666{$ENDIF}
667
668{$IFDEF OLD_SOCKETS}
669constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
670{$ELSE}
671constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
672{$ENDIF}
673begin
674 inherited Create;
675{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200676 FPort := APort;
677 FClientTimeout := AClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200678 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200679 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200680 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200681 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200682 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200683 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200684 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200685{$ELSE}
686 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
687{$ENDIF}
688 FUseBufferedSocket := AUseBufferedSockets;
689 FOwnsServer := True;
Jens Geyerd5436f52014-10-03 19:50:38 +0200690end;
691
692destructor TServerSocketImpl.Destroy;
693begin
694 if FOwnsServer then begin
695 FServer.Free;
696 FServer := nil;
697 end;
698 inherited;
699end;
700
701function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
702var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200703{$IFDEF OLD_SOCKETS}
704 client : TCustomIpClient;
705{$ELSE}
706 client: TSocket;
707{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200708 trans : IStreamTransport;
709begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100710 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200711 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200712 end;
713
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200714{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200715 client := nil;
716 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200717 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200718
719 if Assigned(fnAccepting)
720 then fnAccepting();
721
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100722 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200723 client.Free;
724 Result := nil;
725 Exit;
726 end;
727
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100728 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200729 Result := nil;
730 Exit;
731 end;
732
733 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
734 client := nil; // trans owns it now
735
736 if FUseBufferedSocket
737 then result := TBufferedTransportImpl.Create( trans)
738 else result := trans;
739
740 except
741 on E: Exception do begin
742 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200743 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200744 end;
745 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200746{$ELSE}
747 if Assigned(fnAccepting) then
748 fnAccepting();
749
750 client := FServer.Accept;
751 try
752 trans := TSocketImpl.Create(client, True);
753 client := nil;
754
755 if FUseBufferedSocket then
756 Result := TBufferedTransportImpl.Create(trans)
757 else
758 Result := trans;
759 except
760 client.Free;
761 raise;
762 end;
763{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200764end;
765
766procedure TServerSocketImpl.Listen;
767begin
768 if FServer <> nil then
769 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200770{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200771 try
772 FServer.Active := True;
773 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200774 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200775 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200776 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200777{$ELSE}
778 FServer.Listen;
779{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200780 end;
781end;
782
783procedure TServerSocketImpl.Close;
784begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200785 if FServer <> nil then
786{$IFDEF OLD_SOCKETS}
787 try
788 FServer.Active := False;
789 except
790 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200791 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200792 end;
793{$ELSE}
794 FServer.Close;
795{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200796end;
797
798{ TSocket }
799
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200800{$IFDEF OLD_SOCKETS}
801constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200802var stream : IThriftStream;
803begin
804 FClient := AClient;
805 FTimeout := ATimeout;
806 FOwnsClient := aOwnsClient;
807 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
808 inherited Create( stream, stream);
809end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200810{$ELSE}
811constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
812var stream : IThriftStream;
813begin
814 FClient := AClient;
815 FTimeout := AClient.RecvTimeout;
816 FOwnsClient := aOwnsClient;
817 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
818 inherited Create(stream, stream);
819end;
820{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200821
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200822{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200823constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200824{$ELSE}
825constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
826{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200827begin
828 inherited Create(nil,nil);
829 FHost := AHost;
830 FPort := APort;
831 FTimeout := ATimeout;
832 InitSocket;
833end;
834
835destructor TSocketImpl.Destroy;
836begin
837 if FOwnsClient
838 then FreeAndNil( FClient);
839 inherited;
840end;
841
842procedure TSocketImpl.Close;
843begin
844 inherited Close;
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200845
846 FInputStream := nil;
847 FOutputStream := nil;
848
Jens Geyerd5436f52014-10-03 19:50:38 +0200849 if FOwnsClient
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200850 then FreeAndNil( FClient)
851 else FClient := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200852end;
853
854function TSocketImpl.GetIsOpen: Boolean;
855begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200856{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200857 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200858{$ELSE}
859 Result := (FClient <> nil) and FClient.IsOpen
860{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200861end;
862
863procedure TSocketImpl.InitSocket;
864var
865 stream : IThriftStream;
866begin
867 if FOwnsClient
868 then FreeAndNil( FClient)
869 else FClient := nil;
870
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200871{$IFDEF OLD_SOCKETS}
872 FClient := TTcpClient.Create( nil);
873{$ELSE}
874 FClient := TSocket.Create(FHost, FPort);
875{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200876 FOwnsClient := True;
877
878 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
879 FInputStream := stream;
880 FOutputStream := stream;
881end;
882
883procedure TSocketImpl.Open;
884begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100885 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200886 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200887 end;
888
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100889 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200890 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200891 end;
892
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100893 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200894 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200895 end;
896
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100897 if FClient = nil
898 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200899
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200900{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200901 FClient.RemoteHost := TSocketHost( Host);
902 FClient.RemotePort := TSocketPort( IntToStr( Port));
903 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200904{$ELSE}
905 FClient.Open;
906{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200907
908 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
909 FOutputStream := FInputStream;
910end;
911
912{ TBufferedStream }
913
914procedure TBufferedStreamImpl.Close;
915begin
916 Flush;
917 FStream := nil;
918
919 FReadBuffer.Free;
920 FReadBuffer := nil;
921
922 FWriteBuffer.Free;
923 FWriteBuffer := nil;
924end;
925
926constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
927begin
928 inherited Create;
929 FStream := AStream;
930 FBufSize := ABufSize;
931 FReadBuffer := TMemoryStream.Create;
932 FWriteBuffer := TMemoryStream.Create;
933end;
934
935destructor TBufferedStreamImpl.Destroy;
936begin
937 Close;
938 inherited;
939end;
940
941procedure TBufferedStreamImpl.Flush;
942var
943 buf : TBytes;
944 len : Integer;
945begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200946 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200947 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200948 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200949 SetLength( buf, len );
950 FWriteBuffer.Position := 0;
951 FWriteBuffer.Read( Pointer(@buf[0])^, len );
952 FStream.Write( buf, 0, len );
953 end;
954 FWriteBuffer.Clear;
955 end;
956end;
957
958function TBufferedStreamImpl.IsOpen: Boolean;
959begin
960 Result := (FWriteBuffer <> nil)
961 and (FReadBuffer <> nil)
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200962 and (FStream <> nil)
963 and FStream.IsOpen;
Jens Geyerd5436f52014-10-03 19:50:38 +0200964end;
965
966procedure TBufferedStreamImpl.Open;
967begin
Jens Geyer3c0edfa2018-04-02 13:57:55 +0200968 FStream.Open;
Jens Geyerd5436f52014-10-03 19:50:38 +0200969end;
970
Jens Geyer17c3ad92017-09-05 20:31:27 +0200971function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200972var
973 nRead : Integer;
974 tempbuf : TBytes;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100975 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +0200976begin
977 inherited;
978 Result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +0100979
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200980 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200981 while count > 0 do begin
982
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200983 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200984 FReadBuffer.Clear;
985 SetLength( tempbuf, FBufSize);
986 nRead := FStream.Read( tempbuf, 0, FBufSize );
987 if nRead = 0 then Break; // avoid infinite loop
988
989 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
990 FReadBuffer.Position := 0;
991 end;
992
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200993 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +0100994 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
995 pTmp := pBuf;
996 Inc( pTmp, offset);
997 Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +0200998 Dec( count, nRead);
999 Inc( offset, nRead);
1000 end;
1001 end;
1002 end;
1003end;
1004
1005function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001006var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001007begin
1008 len := 0;
1009
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001010 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001011 len := FReadBuffer.Size;
1012 end;
1013
1014 SetLength( Result, len);
1015
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001016 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001017 FReadBuffer.Position := 0;
1018 FReadBuffer.Read( Pointer(@Result[0])^, len );
1019 end;
1020end;
1021
Jens Geyer17c3ad92017-09-05 20:31:27 +02001022procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001023var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001024begin
1025 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001026 if count > 0 then begin
1027 if IsOpen then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001028 pTmp := pBuf;
1029 Inc( pTmp, offset);
1030 FWriteBuffer.Write( pTmp^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001031 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001032 Flush;
1033 end;
1034 end;
1035 end;
1036end;
1037
1038{ TStreamTransportImpl }
1039
1040procedure TStreamTransportImpl.Close;
1041begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001042 FInputStream := nil;
1043 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001044end;
1045
1046constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
1047begin
1048 inherited Create;
1049 FInputStream := AInputStream;
1050 FOutputStream := AOutputStream;
1051end;
1052
1053destructor TStreamTransportImpl.Destroy;
1054begin
1055 FInputStream := nil;
1056 FOutputStream := nil;
1057 inherited;
1058end;
1059
1060procedure TStreamTransportImpl.Flush;
1061begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001062 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001063 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001064 end;
1065
1066 FOutputStream.Flush;
1067end;
1068
1069function TStreamTransportImpl.GetInputStream: IThriftStream;
1070begin
1071 Result := FInputStream;
1072end;
1073
1074function TStreamTransportImpl.GetIsOpen: Boolean;
1075begin
1076 Result := True;
1077end;
1078
1079function TStreamTransportImpl.GetOutputStream: IThriftStream;
1080begin
Jens Geyer02fbe0e2018-03-19 17:35:44 +01001081 Result := FOutputStream;
Jens Geyerd5436f52014-10-03 19:50:38 +02001082end;
1083
1084procedure TStreamTransportImpl.Open;
1085begin
1086
1087end;
1088
Jens Geyer17c3ad92017-09-05 20:31:27 +02001089function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001090begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001091 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001092 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001093 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001094
Jens Geyer17c3ad92017-09-05 20:31:27 +02001095 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001096end;
1097
Jens Geyer17c3ad92017-09-05 20:31:27 +02001098procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001099begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001100 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001101 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001102 end;
1103
Jens Geyer17c3ad92017-09-05 20:31:27 +02001104 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001105end;
1106
1107{ TBufferedTransportImpl }
1108
1109constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
1110begin
1111 //no inherited;
1112 Create( ATransport, 1024 );
1113end;
1114
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001115constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001116begin
1117 inherited Create;
1118 FTransport := ATransport;
1119 FBufSize := ABufSize;
1120 InitBuffers;
1121end;
1122
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001123procedure TBufferedTransportImpl.Close;
1124begin
1125 FTransport.Close;
1126 FInputBuffer := nil;
1127 FOutputBuffer := nil;
1128end;
1129
Jens Geyerd5436f52014-10-03 19:50:38 +02001130procedure TBufferedTransportImpl.Flush;
1131begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001132 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001133 FOutputBuffer.Flush;
1134 end;
1135end;
1136
1137function TBufferedTransportImpl.GetIsOpen: Boolean;
1138begin
1139 Result := FTransport.IsOpen;
1140end;
1141
1142function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1143begin
1144 Result := FTransport;
1145end;
1146
1147procedure TBufferedTransportImpl.InitBuffers;
1148begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001149 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001150 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1151 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001152 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001153 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1154 end;
1155end;
1156
1157procedure TBufferedTransportImpl.Open;
1158begin
1159 FTransport.Open
Jens Geyer3c0edfa2018-04-02 13:57:55 +02001160 InitBuffers; // we need to get the buffers to match FTransport substreams again
Jens Geyerd5436f52014-10-03 19:50:38 +02001161end;
1162
Jens Geyer17c3ad92017-09-05 20:31:27 +02001163function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001164begin
1165 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001166 if FInputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001167 Result := FInputBuffer.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001168 end;
1169end;
1170
Jens Geyer17c3ad92017-09-05 20:31:27 +02001171procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001172begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001173 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001174 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001175 end;
1176end;
1177
1178{ TFramedTransportImpl }
1179
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001180{$IFDEF HAVE_CLASS_CTOR}
1181class constructor TFramedTransportImpl.Create;
1182begin
1183 SetLength( FHeader_Dummy, FHeaderSize);
1184 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1185end;
1186{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001187procedure TFramedTransportImpl_Initialize;
1188begin
1189 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1190 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1191 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1192end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001193{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001194
1195constructor TFramedTransportImpl.Create;
1196begin
1197 inherited Create;
1198 InitWriteBuffer;
1199end;
1200
1201procedure TFramedTransportImpl.Close;
1202begin
1203 FTransport.Close;
1204end;
1205
1206constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1207begin
1208 inherited Create;
1209 InitWriteBuffer;
1210 FTransport := ATrans;
1211end;
1212
1213destructor TFramedTransportImpl.Destroy;
1214begin
1215 FWriteBuffer.Free;
1216 FReadBuffer.Free;
1217 inherited;
1218end;
1219
1220procedure TFramedTransportImpl.Flush;
1221var
1222 buf : TBytes;
1223 len : Integer;
1224 data_len : Integer;
1225
1226begin
1227 len := FWriteBuffer.Size;
1228 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001229 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001230 System.Move( FWriteBuffer.Memory^, buf[0], len );
1231 end;
1232
1233 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001234 if (data_len < 0) then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001235 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001236 end;
1237
1238 InitWriteBuffer;
1239
1240 buf[0] := Byte($FF and (data_len shr 24));
1241 buf[1] := Byte($FF and (data_len shr 16));
1242 buf[2] := Byte($FF and (data_len shr 8));
1243 buf[3] := Byte($FF and data_len);
1244
1245 FTransport.Write( buf, 0, len );
1246 FTransport.Flush;
1247end;
1248
1249function TFramedTransportImpl.GetIsOpen: Boolean;
1250begin
1251 Result := FTransport.IsOpen;
1252end;
1253
1254type
1255 TAccessMemoryStream = class(TMemoryStream)
1256 end;
1257
1258procedure TFramedTransportImpl.InitWriteBuffer;
1259begin
1260 FWriteBuffer.Free;
1261 FWriteBuffer := TMemoryStream.Create;
1262 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1263 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1264end;
1265
1266procedure TFramedTransportImpl.Open;
1267begin
1268 FTransport.Open;
1269end;
1270
Jens Geyer17c3ad92017-09-05 20:31:27 +02001271function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001272var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001273begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001274 if len > (buflen-off)
1275 then len := buflen-off;
1276
Jens Geyer5089b0a2018-02-01 22:37:18 +01001277 pTmp := pBuf;
1278 Inc( pTmp, off);
1279
Jens Geyer17c3ad92017-09-05 20:31:27 +02001280 if (FReadBuffer <> nil) and (len > 0) then begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001281 result := FReadBuffer.Read( pTmp^, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +02001282 if result > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001283 Exit;
1284 end;
1285 end;
1286
1287 ReadFrame;
1288 if len > 0
Jens Geyer5089b0a2018-02-01 22:37:18 +01001289 then Result := FReadBuffer.Read( pTmp^, len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001290 else Result := 0;
1291end;
1292
1293procedure TFramedTransportImpl.ReadFrame;
1294var
1295 i32rd : TBytes;
1296 size : Integer;
1297 buff : TBytes;
1298begin
1299 SetLength( i32rd, FHeaderSize );
1300 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1301 size :=
1302 ((i32rd[0] and $FF) shl 24) or
1303 ((i32rd[1] and $FF) shl 16) or
1304 ((i32rd[2] and $FF) shl 8) or
1305 (i32rd[3] and $FF);
1306 SetLength( buff, size );
1307 FTransport.ReadAll( buff, 0, size );
1308 FReadBuffer.Free;
1309 FReadBuffer := TMemoryStream.Create;
Jens Geyera76e6c72017-09-08 21:03:30 +02001310 if Length(buff) > 0
1311 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001312 FReadBuffer.Position := 0;
1313end;
1314
Jens Geyer17c3ad92017-09-05 20:31:27 +02001315procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001316var pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001317begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001318 if len > 0 then begin
1319 pTmp := pBuf;
1320 Inc( pTmp, off);
1321
1322 FWriteBuffer.Write( pTmp^, len );
1323 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001324end;
1325
1326{ TFramedTransport.TFactory }
1327
1328function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1329begin
1330 Result := TFramedTransportImpl.Create( ATrans );
1331end;
1332
1333{ TTcpSocketStreamImpl }
1334
1335procedure TTcpSocketStreamImpl.Close;
1336begin
1337 FTcpClient.Close;
1338end;
1339
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001340{$IFDEF OLD_SOCKETS}
1341constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001342begin
1343 inherited Create;
1344 FTcpClient := ATcpClient;
1345 FTimeout := aTimeout;
1346end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001347{$ELSE}
1348constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1349begin
1350 inherited Create;
1351 FTcpClient := ATcpClient;
1352 if aTimeout = 0 then
1353 FTcpClient.RecvTimeout := SLEEP_TIME
1354 else
1355 FTcpClient.RecvTimeout := aTimeout;
1356 FTcpClient.SendTimeout := aTimeout;
1357end;
1358{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001359
1360procedure TTcpSocketStreamImpl.Flush;
1361begin
1362
1363end;
1364
1365function TTcpSocketStreamImpl.IsOpen: Boolean;
1366begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001367{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001368 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001369{$ELSE}
1370 Result := FTcpClient.IsOpen;
1371{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001372end;
1373
1374procedure TTcpSocketStreamImpl.Open;
1375begin
1376 FTcpClient.Open;
1377end;
1378
1379
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001380{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001381function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1382 TimeOut: Integer; var wsaError : Integer): Integer;
1383var
1384 ReadFds: TFDset;
1385 ReadFdsptr: PFDset;
1386 WriteFds: TFDset;
1387 WriteFdsptr: PFDset;
1388 ExceptFds: TFDset;
1389 ExceptFdsptr: PFDset;
1390 tv: timeval;
1391 Timeptr: PTimeval;
1392 socket : TSocket;
1393begin
1394 if not FTcpClient.Active then begin
1395 wsaError := WSAEINVAL;
1396 Exit( SOCKET_ERROR);
1397 end;
1398
1399 socket := FTcpClient.Handle;
1400
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001401 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001402 ReadFdsptr := @ReadFds;
1403 FD_ZERO(ReadFds);
1404 FD_SET(socket, ReadFds);
1405 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001406 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001407 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001408 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001409
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001410 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001411 WriteFdsptr := @WriteFds;
1412 FD_ZERO(WriteFds);
1413 FD_SET(socket, WriteFds);
1414 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001415 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001416 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001417 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001418
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001419 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001420 ExceptFdsptr := @ExceptFds;
1421 FD_ZERO(ExceptFds);
1422 FD_SET(socket, ExceptFds);
1423 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001424 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001425 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001426 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001427
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001428 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001429 tv.tv_sec := TimeOut div 1000;
1430 tv.tv_usec := 1000 * (TimeOut mod 1000);
1431 Timeptr := @tv;
1432 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001433 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001434 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001435 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001436
1437 wsaError := 0;
1438 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001439 {$IFDEF MSWINDOWS}
1440 {$IFDEF OLD_UNIT_NAMES}
1441 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1442 {$ELSE}
1443 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1444 {$ENDIF}
1445 {$ENDIF}
1446 {$IFDEF LINUX}
1447 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1448 {$ENDIF}
1449
Jens Geyerd5436f52014-10-03 19:50:38 +02001450 if result = SOCKET_ERROR
1451 then wsaError := WSAGetLastError;
1452
1453 except
1454 result := SOCKET_ERROR;
1455 end;
1456
1457 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001458 ReadReady^ := FD_ISSET(socket, ReadFds);
1459
Jens Geyerd5436f52014-10-03 19:50:38 +02001460 if Assigned(WriteReady) then
1461 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001462
Jens Geyerd5436f52014-10-03 19:50:38 +02001463 if Assigned(ExceptFlag) then
1464 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1465end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001466{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001467
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001468{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001469function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1470 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001471 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001472var bCanRead, bError : Boolean;
1473 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001474const
1475 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001476begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001477 bytesReady := 0;
1478
Jens Geyerd5436f52014-10-03 19:50:38 +02001479 // The select function returns the total number of socket handles that are ready
1480 // and contained in the fd_set structures, zero if the time limit expired,
1481 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1482 // WSAGetLastError can be used to retrieve a specific error code.
1483 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1484 if retval = SOCKET_ERROR
1485 then Exit( TWaitForData.wfd_Error);
1486 if (retval = 0) or not bCanRead
1487 then Exit( TWaitForData.wfd_Timeout);
1488
1489 // recv() returns the number of bytes received, or -1 if an error occurred.
1490 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001491
1492 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001493 if retval <= 0
1494 then Exit( TWaitForData.wfd_Error);
1495
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001496 // at least we have some data
1497 bytesReady := Min( retval, DesiredBytes);
1498 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001499end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001500{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001501
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001502{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001503function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001504// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001505var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001506 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001507 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001508 nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001509 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001510begin
1511 inherited;
1512
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001513 if FTimeout > 0
1514 then msecs := FTimeout
1515 else msecs := DEFAULT_THRIFT_TIMEOUT;
1516
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001517 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001518 pTmp := pBuf;
1519 Inc( pTmp, offset);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001520 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001521
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001522 while TRUE do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001523 wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001524 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001525 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001526 TWaitForData.wfd_HaveData : Break;
1527 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001528 if (FTimeout = 0)
1529 then Exit
1530 else begin
Jens Geyere0e32402016-04-20 21:50:48 +02001531 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001532
1533 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001534 end;
1535 else
1536 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001537 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001538 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001539
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001540 // reduce the timeout once we got data
1541 if FTimeout > 0
1542 then msecs := FTimeout div 10
1543 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1544 msecs := Max( msecs, 200);
1545
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001546 ASSERT( nBytes <= count);
Jens Geyer5089b0a2018-02-01 22:37:18 +01001547 nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
1548 Inc( pTmp, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001549 Dec( count, nBytes);
1550 Inc( result, nBytes);
1551 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001552end;
1553
1554function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001555// old sockets version
1556var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001557begin
1558 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001559 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001560 len := FTcpClient.BytesReceived;
1561 end;
1562
1563 SetLength( Result, len );
1564
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001565 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001566 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1567 end;
1568end;
1569
Jens Geyer17c3ad92017-09-05 20:31:27 +02001570procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001571// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001572var bCanWrite, bError : Boolean;
1573 retval, wsaError : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001574 pTmp : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001575begin
1576 inherited;
1577
1578 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001579 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001580
1581 // The select function returns the total number of socket handles that are ready
1582 // and contained in the fd_set structures, zero if the time limit expired,
1583 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1584 // WSAGetLastError can be used to retrieve a specific error code.
1585 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1586 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001587 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001588
Jens Geyerd5436f52014-10-03 19:50:38 +02001589 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001590 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001591
Jens Geyerd5436f52014-10-03 19:50:38 +02001592 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001593 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001594
Jens Geyer5089b0a2018-02-01 22:37:18 +01001595 pTmp := pBuf;
1596 Inc( pTmp, offset);
1597 FTcpClient.SendBuf( pTmp^, count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001598end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001599
1600{$ELSE}
1601
Jens Geyer17c3ad92017-09-05 20:31:27 +02001602function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001603// new sockets version
1604var nBytes : Integer;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001605 pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001606begin
1607 inherited;
1608
1609 result := 0;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001610 pTmp := pBuf;
1611 Inc( pTmp, offset);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001612 while count > 0 do begin
Jens Geyer5089b0a2018-02-01 22:37:18 +01001613 nBytes := FTcpClient.Read( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001614 if nBytes = 0 then Exit;
Jens Geyer5089b0a2018-02-01 22:37:18 +01001615 Inc( pTmp, nBytes);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001616 Dec( count, nBytes);
1617 Inc( result, nBytes);
1618 end;
1619end;
1620
1621function TTcpSocketStreamImpl.ToArray: TBytes;
1622// new sockets version
1623var len : Integer;
1624begin
1625 len := 0;
1626 try
1627 if FTcpClient.Peek then
1628 repeat
1629 SetLength(Result, Length(Result) + 1024);
1630 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1631 until len < 1024;
1632 except
1633 on TTransportException do begin { don't allow default exceptions } end;
1634 else raise;
1635 end;
1636 if len > 0 then
1637 SetLength(Result, Length(Result) - 1024 + len);
1638end;
1639
Jens Geyer17c3ad92017-09-05 20:31:27 +02001640procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001641// new sockets version
Jens Geyer5089b0a2018-02-01 22:37:18 +01001642var pTmp : PByte;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001643begin
1644 inherited;
1645
1646 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001647 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001648
Jens Geyer5089b0a2018-02-01 22:37:18 +01001649 pTmp := pBuf;
1650 Inc( pTmp, offset);
1651 FTcpClient.Write( pTmp^, count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001652end;
1653
Jens Geyer23d67462015-12-19 11:44:57 +01001654{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001655
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001656
Jens Geyerd5436f52014-10-03 19:50:38 +02001657{$IF CompilerVersion < 21.0}
1658initialization
1659begin
1660 TFramedTransportImpl_Initialize;
1661end;
1662{$IFEND}
1663
1664
1665end.