blob: d20eb2fb1d96e81a50ce6d1e896fa7f665572e33 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX, msxml, WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer9f7f11e2016-04-14 21:37:11 +020034 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
42 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030043 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020044
45type
46 ITransport = interface
Jens Geyer17c3ad92017-09-05 20:31:27 +020047 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
Jens Geyerd5436f52014-10-03 19:50:38 +020048 function GetIsOpen: Boolean;
49 property IsOpen: Boolean read GetIsOpen;
50 function Peek: Boolean;
51 procedure Open;
52 procedure Close;
Jens Geyer17c3ad92017-09-05 20:31:27 +020053 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
54 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
55 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
56 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020057 procedure Write( const buf: TBytes); overload;
58 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020059 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
60 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020061 procedure Flush;
62 end;
63
64 TTransportImpl = class( TInterfacedObject, ITransport)
65 protected
66 function GetIsOpen: Boolean; virtual; abstract;
67 property IsOpen: Boolean read GetIsOpen;
68 function Peek: Boolean; virtual;
69 procedure Open(); virtual; abstract;
70 procedure Close(); virtual; abstract;
Jens Geyer17c3ad92017-09-05 20:31:27 +020071 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
72 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
73 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
74 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
75 procedure Write( const buf: TBytes); overload; inline;
76 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
77 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
78 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020079 procedure Flush; virtual;
80 end;
81
82 TTransportException = class( Exception )
83 public
84 type
85 TExceptionType = (
86 Unknown,
87 NotOpen,
88 AlreadyOpen,
89 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +020090 EndOfFile,
91 BadArgs,
92 Interrupted
Jens Geyerd5436f52014-10-03 19:50:38 +020093 );
94 private
Jens Geyere0e32402016-04-20 21:50:48 +020095 function GetType: TExceptionType;
96 protected
97 constructor HiddenCreate(const Msg: string);
Jens Geyerd5436f52014-10-03 19:50:38 +020098 public
Jens Geyere0e32402016-04-20 21:50:48 +020099 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
100 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
101 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200103 end;
104
Jens Geyere0e32402016-04-20 21:50:48 +0200105 // Needed to remove deprecation warning
106 TTransportExceptionSpecialized = class abstract (TTransportException)
107 public
108 constructor Create(const Msg: string);
109 end;
110
111 TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
112 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
113 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
114 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
115 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
116 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
117 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
118
Jens Geyerd5436f52014-10-03 19:50:38 +0200119 IHTTPClient = interface( ITransport )
120 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
121 procedure SetConnectionTimeout(const Value: Integer);
122 function GetConnectionTimeout: Integer;
123 procedure SetReadTimeout(const Value: Integer);
124 function GetReadTimeout: Integer;
125 function GetCustomHeaders: IThriftDictionary<string,string>;
126 procedure SendRequest;
127 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
128 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
129 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
130 end;
131
132 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
133 private
134 FUri : string;
135 FInputStream : IThriftStream;
136 FOutputStream : IThriftStream;
137 FConnectionTimeout : Integer;
138 FReadTimeout : Integer;
139 FCustomHeaders : IThriftDictionary<string,string>;
140
141 function CreateRequest: IXMLHTTPRequest;
142 protected
143 function GetIsOpen: Boolean; override;
144 procedure Open(); override;
145 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200146 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
147 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200148 procedure Flush; override;
149
150 procedure SetConnectionTimeout(const Value: Integer);
151 function GetConnectionTimeout: Integer;
152 procedure SetReadTimeout(const Value: Integer);
153 function GetReadTimeout: Integer;
154 function GetCustomHeaders: IThriftDictionary<string,string>;
155 procedure SendRequest;
156 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
157 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
158 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
159 public
160 constructor Create( const AUri: string);
161 destructor Destroy; override;
162 end;
163
164 IServerTransport = interface
165 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
166 procedure Listen;
167 procedure Close;
168 function Accept( const fnAccepting: TProc): ITransport;
169 end;
170
171 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
172 protected
173 procedure Listen; virtual; abstract;
174 procedure Close; virtual; abstract;
175 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
176 end;
177
178 ITransportFactory = interface
179 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
180 function GetTransport( const ATrans: ITransport): ITransport;
181 end;
182
183 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
184 function GetTransport( const ATrans: ITransport): ITransport; virtual;
185 end;
186
187 TTcpSocketStreamImpl = class( TThriftStreamImpl )
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200188{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200189 private type
190 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
191 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200192 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200193 FTimeout : Integer;
194 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
195 TimeOut: Integer; var wsaError : Integer): Integer;
196 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200197 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200198{$ELSE}
199 FTcpClient: TSocket;
200 protected const
201 SLEEP_TIME = 200;
202{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200203 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200204 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
205 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200206 procedure Open; override;
207 procedure Close; override;
208 procedure Flush; override;
209
210 function IsOpen: Boolean; override;
211 function ToArray: TBytes; override;
212 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200213{$IFDEF OLD_SOCKETS}
214 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
215{$ELSE}
216 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
217{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200218 end;
219
220 IStreamTransport = interface( ITransport )
221 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
222 function GetInputStream: IThriftStream;
223 function GetOutputStream: IThriftStream;
224 property InputStream : IThriftStream read GetInputStream;
225 property OutputStream : IThriftStream read GetOutputStream;
226 end;
227
228 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
229 protected
230 FInputStream : IThriftStream;
231 FOutputStream : IThriftStream;
232 protected
233 function GetIsOpen: Boolean; override;
234
235 function GetInputStream: IThriftStream;
236 function GetOutputStream: IThriftStream;
237 public
238 property InputStream : IThriftStream read GetInputStream;
239 property OutputStream : IThriftStream read GetOutputStream;
240
241 procedure Open; override;
242 procedure Close; override;
243 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200244 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
245 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200246 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
247 destructor Destroy; override;
248 end;
249
250 TBufferedStreamImpl = class( TThriftStreamImpl)
251 private
252 FStream : IThriftStream;
253 FBufSize : Integer;
254 FReadBuffer : TMemoryStream;
255 FWriteBuffer : TMemoryStream;
256 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200257 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
258 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200259 procedure Open; override;
260 procedure Close; override;
261 procedure Flush; override;
262 function IsOpen: Boolean; override;
263 function ToArray: TBytes; override;
264 public
265 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
266 destructor Destroy; override;
267 end;
268
269 TServerSocketImpl = class( TServerTransportImpl)
270 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200271{$IFDEF OLD_SOCKETS}
272 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200273 FPort : Integer;
274 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200275{$ELSE}
276 FServer: TServerSocket;
277{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200278 FUseBufferedSocket : Boolean;
279 FOwnsServer : Boolean;
280 protected
281 function Accept( const fnAccepting: TProc) : ITransport; override;
282 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200283{$IFDEF OLD_SOCKETS}
284 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200285 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200286{$ELSE}
287 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
288 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
289{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200290 destructor Destroy; override;
291 procedure Listen; override;
292 procedure Close; override;
293 end;
294
295 TBufferedTransportImpl = class( TTransportImpl )
296 private
297 FInputBuffer : IThriftStream;
298 FOutputBuffer : IThriftStream;
299 FTransport : IStreamTransport;
300 FBufSize : Integer;
301
302 procedure InitBuffers;
303 function GetUnderlyingTransport: ITransport;
304 protected
305 function GetIsOpen: Boolean; override;
306 procedure Flush; override;
307 public
308 procedure Open(); override;
309 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200310 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
311 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200312 constructor Create( const ATransport : IStreamTransport ); overload;
313 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
314 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
315 property IsOpen: Boolean read GetIsOpen;
316 end;
317
318 TSocketImpl = class(TStreamTransportImpl)
319 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200320{$IFDEF OLD_SOCKETS}
321 FClient : TCustomIpClient;
322{$ELSE}
323 FClient: TSocket;
324{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200325 FOwnsClient : Boolean;
326 FHost : string;
327 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200328{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200329 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200330{$ELSE}
331 FTimeout : Longword;
332{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200333
334 procedure InitSocket;
335 protected
336 function GetIsOpen: Boolean; override;
337 public
338 procedure Open; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200339{$IFDEF OLD_SOCKETS}
340 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200341 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200342{$ELSE}
343 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
344 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
345{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200346 destructor Destroy; override;
347 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200348{$IFDEF OLD_SOCKETS}
349 property TcpClient: TCustomIpClient read FClient;
350{$ELSE}
351 property TcpClient: TSocket read FClient;
352{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200353 property Host : string read FHost;
354 property Port: Integer read FPort;
355 end;
356
357 TFramedTransportImpl = class( TTransportImpl)
358 private const
359 FHeaderSize : Integer = 4;
360 private class var
361 FHeader_Dummy : array of Byte;
362 protected
363 FTransport : ITransport;
364 FWriteBuffer : TMemoryStream;
365 FReadBuffer : TMemoryStream;
366
367 procedure InitWriteBuffer;
368 procedure ReadFrame;
369 public
370 type
371 TFactory = class( TTransportFactoryImpl )
372 public
373 function GetTransport( const ATrans: ITransport): ITransport; override;
374 end;
375
Jens Geyere0e32402016-04-20 21:50:48 +0200376 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200377 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200378 {$ENDIF}
Jens Geyere0e32402016-04-20 21:50:48 +0200379
Jens Geyerd5436f52014-10-03 19:50:38 +0200380 constructor Create; overload;
381 constructor Create( const ATrans: ITransport); overload;
382 destructor Destroy; override;
383
384 procedure Open(); override;
385 function GetIsOpen: Boolean; override;
386
387 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200388 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
389 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200390 procedure Flush; override;
391 end;
392
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200393{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200394procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200395{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200396
397const
398 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
399
400
401implementation
402
403{ TTransportImpl }
404
405procedure TTransportImpl.Flush;
406begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200407 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200408end;
409
410function TTransportImpl.Peek: Boolean;
411begin
412 Result := IsOpen;
413end;
414
Jens Geyer17c3ad92017-09-05 20:31:27 +0200415function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200416begin
Jens Geyer17c3ad92017-09-05 20:31:27 +0200417 result := Read( @buf[0], Length(buf), off, len);
418end;
419
420function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
421begin
422 result := ReadAll( @buf[0], Length(buf), off, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200423end;
424
425procedure TTransportImpl.Write( const buf: TBytes);
426begin
Jens Geyer17c3ad92017-09-05 20:31:27 +0200427 Write( @buf[0], 0, Length(buf));
428end;
429
430procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
431begin
432 Write( @buf[0], off, len);
433end;
434
435function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
436var ret : Integer;
437begin
438 result := 0;
439 while result < len do begin
440 ret := Read( pBuf, buflen, off + result, len - result);
441 if ret > 0
442 then Inc( result, ret)
443 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
444 end;
445end;
446
447procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
448begin
449 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200450end;
451
452{ THTTPClientImpl }
453
454procedure THTTPClientImpl.Close;
455begin
456 FInputStream := nil;
457 FOutputStream := nil;
458end;
459
460constructor THTTPClientImpl.Create(const AUri: string);
461begin
462 inherited Create;
463 FUri := AUri;
464 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
465 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
466end;
467
468function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
469var
470 pair : TPair<string,string>;
471begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200472 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200473 Result := CoXMLHTTP.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200474 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200475 Result := CoXMLHTTPRequest.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200476 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200477
478 Result.open('POST', FUri, False, '', '');
479 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
480 Result.setRequestHeader( 'Accept', 'application/x-thrift');
481 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
482
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200483 for pair in FCustomHeaders do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200484 Result.setRequestHeader( pair.Key, pair.Value );
485 end;
486end;
487
488destructor THTTPClientImpl.Destroy;
489begin
490 Close;
491 inherited;
492end;
493
494procedure THTTPClientImpl.Flush;
495begin
496 try
497 SendRequest;
498 finally
499 FOutputStream := nil;
500 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
501 end;
502end;
503
504function THTTPClientImpl.GetConnectionTimeout: Integer;
505begin
506 Result := FConnectionTimeout;
507end;
508
509function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
510begin
511 Result := FCustomHeaders;
512end;
513
514function THTTPClientImpl.GetIsOpen: Boolean;
515begin
516 Result := True;
517end;
518
519function THTTPClientImpl.GetReadTimeout: Integer;
520begin
521 Result := FReadTimeout;
522end;
523
524procedure THTTPClientImpl.Open;
525begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200526 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200527end;
528
Jens Geyer17c3ad92017-09-05 20:31:27 +0200529function THTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200530begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100531 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200532 raise TTransportExceptionNotOpen.Create('No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200533 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100534
Jens Geyerd5436f52014-10-03 19:50:38 +0200535 try
Jens Geyer17c3ad92017-09-05 20:31:27 +0200536 Result := FInputStream.Read( pBuf, buflen, off, len)
Jens Geyerd5436f52014-10-03 19:50:38 +0200537 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100538 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200539 do raise TTransportExceptionUnknown.Create(E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200540 end;
541end;
542
543procedure THTTPClientImpl.SendRequest;
544var
545 xmlhttp : IXMLHTTPRequest;
546 ms : TMemoryStream;
547 a : TBytes;
548 len : Integer;
549begin
550 xmlhttp := CreateRequest;
551
552 ms := TMemoryStream.Create;
553 try
554 a := FOutputStream.ToArray;
555 len := Length(a);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200556 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200557 ms.WriteBuffer( Pointer(@a[0])^, len);
558 end;
559 ms.Position := 0;
560 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
561 FInputStream := nil;
562 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
563 finally
564 ms.Free;
565 end;
566end;
567
568procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
569begin
570 FConnectionTimeout := Value;
571end;
572
573procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
574begin
575 FReadTimeout := Value
576end;
577
Jens Geyer17c3ad92017-09-05 20:31:27 +0200578procedure THTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200579begin
Jens Geyer17c3ad92017-09-05 20:31:27 +0200580 FOutputStream.Write( pBuf, off, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200581end;
582
583{ TTransportException }
584
Jens Geyere0e32402016-04-20 21:50:48 +0200585function TTransportException.GetType: TExceptionType;
586begin
587 if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
588 else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
589 else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
590 else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
591 else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
592 else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
593 else Result := TExceptionType.Unknown;
594end;
595
596constructor TTransportException.HiddenCreate(const Msg: string);
597begin
598 inherited Create(Msg);
599end;
600
601class function TTransportException.Create(AType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200602begin
603 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200604{$WARN SYMBOL_DEPRECATED OFF}
605 Result := Create(AType, '')
606{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200607end;
608
Jens Geyere0e32402016-04-20 21:50:48 +0200609class function TTransportException.Create(AType: TExceptionType;
610 const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200611begin
Jens Geyere0e32402016-04-20 21:50:48 +0200612 case AType of
613 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
614 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
615 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
616 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
617 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
618 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
619 else
620 Result := TTransportExceptionUnknown.Create(msg);
621 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200622end;
623
Jens Geyere0e32402016-04-20 21:50:48 +0200624class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200625begin
Jens Geyere0e32402016-04-20 21:50:48 +0200626 Result := TTransportExceptionUnknown.Create(Msg);
627end;
628
629{ TTransportExceptionSpecialized }
630
631constructor TTransportExceptionSpecialized.Create(const Msg: string);
632begin
633 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200634end;
635
636{ TTransportFactoryImpl }
637
638function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
639begin
640 Result := ATrans;
641end;
642
643{ TServerSocket }
644
Jens Geyer23d67462015-12-19 11:44:57 +0100645{$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200646constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200647begin
648 inherited Create;
649 FServer := AServer;
650 FClientTimeout := AClientTimeout;
651end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200652{$ELSE}
653constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
Jens Geyerd5436f52014-10-03 19:50:38 +0200654begin
655 inherited Create;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200656 FServer := AServer;
657 FServer.RecvTimeout := AClientTimeout;
658 FServer.SendTimeout := AClientTimeout;
659end;
660{$ENDIF}
661
662{$IFDEF OLD_SOCKETS}
663constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
664{$ELSE}
665constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
666{$ENDIF}
667begin
668 inherited Create;
669{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200670 FPort := APort;
671 FClientTimeout := AClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200672 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200673 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200674 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200675 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200676 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200677 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200678 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200679{$ELSE}
680 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
681{$ENDIF}
682 FUseBufferedSocket := AUseBufferedSockets;
683 FOwnsServer := True;
Jens Geyerd5436f52014-10-03 19:50:38 +0200684end;
685
686destructor TServerSocketImpl.Destroy;
687begin
688 if FOwnsServer then begin
689 FServer.Free;
690 FServer := nil;
691 end;
692 inherited;
693end;
694
695function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
696var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200697{$IFDEF OLD_SOCKETS}
698 client : TCustomIpClient;
699{$ELSE}
700 client: TSocket;
701{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200702 trans : IStreamTransport;
703begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100704 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200705 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200706 end;
707
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200708{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200709 client := nil;
710 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200711 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200712
713 if Assigned(fnAccepting)
714 then fnAccepting();
715
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100716 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200717 client.Free;
718 Result := nil;
719 Exit;
720 end;
721
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100722 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200723 Result := nil;
724 Exit;
725 end;
726
727 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
728 client := nil; // trans owns it now
729
730 if FUseBufferedSocket
731 then result := TBufferedTransportImpl.Create( trans)
732 else result := trans;
733
734 except
735 on E: Exception do begin
736 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200737 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200738 end;
739 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200740{$ELSE}
741 if Assigned(fnAccepting) then
742 fnAccepting();
743
744 client := FServer.Accept;
745 try
746 trans := TSocketImpl.Create(client, True);
747 client := nil;
748
749 if FUseBufferedSocket then
750 Result := TBufferedTransportImpl.Create(trans)
751 else
752 Result := trans;
753 except
754 client.Free;
755 raise;
756 end;
757{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200758end;
759
760procedure TServerSocketImpl.Listen;
761begin
762 if FServer <> nil then
763 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200764{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200765 try
766 FServer.Active := True;
767 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200768 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200769 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200770 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200771{$ELSE}
772 FServer.Listen;
773{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200774 end;
775end;
776
777procedure TServerSocketImpl.Close;
778begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200779 if FServer <> nil then
780{$IFDEF OLD_SOCKETS}
781 try
782 FServer.Active := False;
783 except
784 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200785 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200786 end;
787{$ELSE}
788 FServer.Close;
789{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200790end;
791
792{ TSocket }
793
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200794{$IFDEF OLD_SOCKETS}
795constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200796var stream : IThriftStream;
797begin
798 FClient := AClient;
799 FTimeout := ATimeout;
800 FOwnsClient := aOwnsClient;
801 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
802 inherited Create( stream, stream);
803end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200804{$ELSE}
805constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
806var stream : IThriftStream;
807begin
808 FClient := AClient;
809 FTimeout := AClient.RecvTimeout;
810 FOwnsClient := aOwnsClient;
811 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
812 inherited Create(stream, stream);
813end;
814{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200815
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200816{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200817constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200818{$ELSE}
819constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
820{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200821begin
822 inherited Create(nil,nil);
823 FHost := AHost;
824 FPort := APort;
825 FTimeout := ATimeout;
826 InitSocket;
827end;
828
829destructor TSocketImpl.Destroy;
830begin
831 if FOwnsClient
832 then FreeAndNil( FClient);
833 inherited;
834end;
835
836procedure TSocketImpl.Close;
837begin
838 inherited Close;
839 if FOwnsClient
840 then FreeAndNil( FClient);
841end;
842
843function TSocketImpl.GetIsOpen: Boolean;
844begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200845{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200846 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200847{$ELSE}
848 Result := (FClient <> nil) and FClient.IsOpen
849{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200850end;
851
852procedure TSocketImpl.InitSocket;
853var
854 stream : IThriftStream;
855begin
856 if FOwnsClient
857 then FreeAndNil( FClient)
858 else FClient := nil;
859
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200860{$IFDEF OLD_SOCKETS}
861 FClient := TTcpClient.Create( nil);
862{$ELSE}
863 FClient := TSocket.Create(FHost, FPort);
864{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200865 FOwnsClient := True;
866
867 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
868 FInputStream := stream;
869 FOutputStream := stream;
870end;
871
872procedure TSocketImpl.Open;
873begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100874 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200875 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200876 end;
877
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100878 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200879 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200880 end;
881
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100882 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200883 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200884 end;
885
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100886 if FClient = nil
887 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200888
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200889{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200890 FClient.RemoteHost := TSocketHost( Host);
891 FClient.RemotePort := TSocketPort( IntToStr( Port));
892 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200893{$ELSE}
894 FClient.Open;
895{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200896
897 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
898 FOutputStream := FInputStream;
899end;
900
901{ TBufferedStream }
902
903procedure TBufferedStreamImpl.Close;
904begin
905 Flush;
906 FStream := nil;
907
908 FReadBuffer.Free;
909 FReadBuffer := nil;
910
911 FWriteBuffer.Free;
912 FWriteBuffer := nil;
913end;
914
915constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
916begin
917 inherited Create;
918 FStream := AStream;
919 FBufSize := ABufSize;
920 FReadBuffer := TMemoryStream.Create;
921 FWriteBuffer := TMemoryStream.Create;
922end;
923
924destructor TBufferedStreamImpl.Destroy;
925begin
926 Close;
927 inherited;
928end;
929
930procedure TBufferedStreamImpl.Flush;
931var
932 buf : TBytes;
933 len : Integer;
934begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200935 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200936 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200937 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200938 SetLength( buf, len );
939 FWriteBuffer.Position := 0;
940 FWriteBuffer.Read( Pointer(@buf[0])^, len );
941 FStream.Write( buf, 0, len );
942 end;
943 FWriteBuffer.Clear;
944 end;
945end;
946
947function TBufferedStreamImpl.IsOpen: Boolean;
948begin
949 Result := (FWriteBuffer <> nil)
950 and (FReadBuffer <> nil)
951 and (FStream <> nil);
952end;
953
954procedure TBufferedStreamImpl.Open;
955begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200956 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200957end;
958
Jens Geyer17c3ad92017-09-05 20:31:27 +0200959function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200960var
961 nRead : Integer;
962 tempbuf : TBytes;
963begin
964 inherited;
965 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200966
967 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200968 while count > 0 do begin
969
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200970 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200971 FReadBuffer.Clear;
972 SetLength( tempbuf, FBufSize);
973 nRead := FStream.Read( tempbuf, 0, FBufSize );
974 if nRead = 0 then Break; // avoid infinite loop
975
976 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
977 FReadBuffer.Position := 0;
978 end;
979
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200980 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200981 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200982 Inc( Result, FReadBuffer.Read( PByteArray(pBuf)^[offset], nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +0200983 Dec( count, nRead);
984 Inc( offset, nRead);
985 end;
986 end;
987 end;
988end;
989
990function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200991var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200992begin
993 len := 0;
994
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200995 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200996 len := FReadBuffer.Size;
997 end;
998
999 SetLength( Result, len);
1000
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001001 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001002 FReadBuffer.Position := 0;
1003 FReadBuffer.Read( Pointer(@Result[0])^, len );
1004 end;
1005end;
1006
Jens Geyer17c3ad92017-09-05 20:31:27 +02001007procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001008begin
1009 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001010 if count > 0 then begin
1011 if IsOpen then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001012 FWriteBuffer.Write( PByteArray(pBuf)^[offset], count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001013 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001014 Flush;
1015 end;
1016 end;
1017 end;
1018end;
1019
1020{ TStreamTransportImpl }
1021
1022procedure TStreamTransportImpl.Close;
1023begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001024 FInputStream := nil;
1025 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001026end;
1027
1028constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
1029begin
1030 inherited Create;
1031 FInputStream := AInputStream;
1032 FOutputStream := AOutputStream;
1033end;
1034
1035destructor TStreamTransportImpl.Destroy;
1036begin
1037 FInputStream := nil;
1038 FOutputStream := nil;
1039 inherited;
1040end;
1041
1042procedure TStreamTransportImpl.Flush;
1043begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001044 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001045 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001046 end;
1047
1048 FOutputStream.Flush;
1049end;
1050
1051function TStreamTransportImpl.GetInputStream: IThriftStream;
1052begin
1053 Result := FInputStream;
1054end;
1055
1056function TStreamTransportImpl.GetIsOpen: Boolean;
1057begin
1058 Result := True;
1059end;
1060
1061function TStreamTransportImpl.GetOutputStream: IThriftStream;
1062begin
1063 Result := FInputStream;
1064end;
1065
1066procedure TStreamTransportImpl.Open;
1067begin
1068
1069end;
1070
Jens Geyer17c3ad92017-09-05 20:31:27 +02001071function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001072begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001073 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001074 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001075 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001076
Jens Geyer17c3ad92017-09-05 20:31:27 +02001077 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001078end;
1079
Jens Geyer17c3ad92017-09-05 20:31:27 +02001080procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001081begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001082 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001083 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001084 end;
1085
Jens Geyer17c3ad92017-09-05 20:31:27 +02001086 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001087end;
1088
1089{ TBufferedTransportImpl }
1090
1091constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
1092begin
1093 //no inherited;
1094 Create( ATransport, 1024 );
1095end;
1096
1097procedure TBufferedTransportImpl.Close;
1098begin
1099 FTransport.Close;
1100end;
1101
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001102constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001103begin
1104 inherited Create;
1105 FTransport := ATransport;
1106 FBufSize := ABufSize;
1107 InitBuffers;
1108end;
1109
1110procedure TBufferedTransportImpl.Flush;
1111begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001112 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001113 FOutputBuffer.Flush;
1114 end;
1115end;
1116
1117function TBufferedTransportImpl.GetIsOpen: Boolean;
1118begin
1119 Result := FTransport.IsOpen;
1120end;
1121
1122function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1123begin
1124 Result := FTransport;
1125end;
1126
1127procedure TBufferedTransportImpl.InitBuffers;
1128begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001129 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001130 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1131 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001132 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001133 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1134 end;
1135end;
1136
1137procedure TBufferedTransportImpl.Open;
1138begin
1139 FTransport.Open
1140end;
1141
Jens Geyer17c3ad92017-09-05 20:31:27 +02001142function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001143begin
1144 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001145 if FInputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001146 Result := FInputBuffer.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001147 end;
1148end;
1149
Jens Geyer17c3ad92017-09-05 20:31:27 +02001150procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001151begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001152 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001153 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001154 end;
1155end;
1156
1157{ TFramedTransportImpl }
1158
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001159{$IFDEF HAVE_CLASS_CTOR}
1160class constructor TFramedTransportImpl.Create;
1161begin
1162 SetLength( FHeader_Dummy, FHeaderSize);
1163 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1164end;
1165{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001166procedure TFramedTransportImpl_Initialize;
1167begin
1168 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1169 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1170 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1171end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001172{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001173
1174constructor TFramedTransportImpl.Create;
1175begin
1176 inherited Create;
1177 InitWriteBuffer;
1178end;
1179
1180procedure TFramedTransportImpl.Close;
1181begin
1182 FTransport.Close;
1183end;
1184
1185constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1186begin
1187 inherited Create;
1188 InitWriteBuffer;
1189 FTransport := ATrans;
1190end;
1191
1192destructor TFramedTransportImpl.Destroy;
1193begin
1194 FWriteBuffer.Free;
1195 FReadBuffer.Free;
1196 inherited;
1197end;
1198
1199procedure TFramedTransportImpl.Flush;
1200var
1201 buf : TBytes;
1202 len : Integer;
1203 data_len : Integer;
1204
1205begin
1206 len := FWriteBuffer.Size;
1207 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001208 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001209 System.Move( FWriteBuffer.Memory^, buf[0], len );
1210 end;
1211
1212 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001213 if (data_len < 0) then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001214 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001215 end;
1216
1217 InitWriteBuffer;
1218
1219 buf[0] := Byte($FF and (data_len shr 24));
1220 buf[1] := Byte($FF and (data_len shr 16));
1221 buf[2] := Byte($FF and (data_len shr 8));
1222 buf[3] := Byte($FF and data_len);
1223
1224 FTransport.Write( buf, 0, len );
1225 FTransport.Flush;
1226end;
1227
1228function TFramedTransportImpl.GetIsOpen: Boolean;
1229begin
1230 Result := FTransport.IsOpen;
1231end;
1232
1233type
1234 TAccessMemoryStream = class(TMemoryStream)
1235 end;
1236
1237procedure TFramedTransportImpl.InitWriteBuffer;
1238begin
1239 FWriteBuffer.Free;
1240 FWriteBuffer := TMemoryStream.Create;
1241 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1242 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1243end;
1244
1245procedure TFramedTransportImpl.Open;
1246begin
1247 FTransport.Open;
1248end;
1249
Jens Geyer17c3ad92017-09-05 20:31:27 +02001250function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001251begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001252 if len > (buflen-off)
1253 then len := buflen-off;
1254
1255 if (FReadBuffer <> nil) and (len > 0) then begin
1256 result := FReadBuffer.Read( PByteArray(pBuf)^[off], len);
1257 if result > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001258 Exit;
1259 end;
1260 end;
1261
1262 ReadFrame;
1263 if len > 0
Jens Geyer17c3ad92017-09-05 20:31:27 +02001264 then Result := FReadBuffer.Read( PByteArray(pBuf)^[off], len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001265 else Result := 0;
1266end;
1267
1268procedure TFramedTransportImpl.ReadFrame;
1269var
1270 i32rd : TBytes;
1271 size : Integer;
1272 buff : TBytes;
1273begin
1274 SetLength( i32rd, FHeaderSize );
1275 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1276 size :=
1277 ((i32rd[0] and $FF) shl 24) or
1278 ((i32rd[1] and $FF) shl 16) or
1279 ((i32rd[2] and $FF) shl 8) or
1280 (i32rd[3] and $FF);
1281 SetLength( buff, size );
1282 FTransport.ReadAll( buff, 0, size );
1283 FReadBuffer.Free;
1284 FReadBuffer := TMemoryStream.Create;
1285 FReadBuffer.Write( Pointer(@buff[0])^, size );
1286 FReadBuffer.Position := 0;
1287end;
1288
Jens Geyer17c3ad92017-09-05 20:31:27 +02001289procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001290begin
1291 if len > 0
Jens Geyer17c3ad92017-09-05 20:31:27 +02001292 then FWriteBuffer.Write( PByteArray(pBuf)^[off], len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001293end;
1294
1295{ TFramedTransport.TFactory }
1296
1297function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1298begin
1299 Result := TFramedTransportImpl.Create( ATrans );
1300end;
1301
1302{ TTcpSocketStreamImpl }
1303
1304procedure TTcpSocketStreamImpl.Close;
1305begin
1306 FTcpClient.Close;
1307end;
1308
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001309{$IFDEF OLD_SOCKETS}
1310constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001311begin
1312 inherited Create;
1313 FTcpClient := ATcpClient;
1314 FTimeout := aTimeout;
1315end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001316{$ELSE}
1317constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1318begin
1319 inherited Create;
1320 FTcpClient := ATcpClient;
1321 if aTimeout = 0 then
1322 FTcpClient.RecvTimeout := SLEEP_TIME
1323 else
1324 FTcpClient.RecvTimeout := aTimeout;
1325 FTcpClient.SendTimeout := aTimeout;
1326end;
1327{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001328
1329procedure TTcpSocketStreamImpl.Flush;
1330begin
1331
1332end;
1333
1334function TTcpSocketStreamImpl.IsOpen: Boolean;
1335begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001336{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001337 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001338{$ELSE}
1339 Result := FTcpClient.IsOpen;
1340{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001341end;
1342
1343procedure TTcpSocketStreamImpl.Open;
1344begin
1345 FTcpClient.Open;
1346end;
1347
1348
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001349{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001350function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1351 TimeOut: Integer; var wsaError : Integer): Integer;
1352var
1353 ReadFds: TFDset;
1354 ReadFdsptr: PFDset;
1355 WriteFds: TFDset;
1356 WriteFdsptr: PFDset;
1357 ExceptFds: TFDset;
1358 ExceptFdsptr: PFDset;
1359 tv: timeval;
1360 Timeptr: PTimeval;
1361 socket : TSocket;
1362begin
1363 if not FTcpClient.Active then begin
1364 wsaError := WSAEINVAL;
1365 Exit( SOCKET_ERROR);
1366 end;
1367
1368 socket := FTcpClient.Handle;
1369
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001370 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001371 ReadFdsptr := @ReadFds;
1372 FD_ZERO(ReadFds);
1373 FD_SET(socket, ReadFds);
1374 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001375 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001376 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001377 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001378
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001379 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001380 WriteFdsptr := @WriteFds;
1381 FD_ZERO(WriteFds);
1382 FD_SET(socket, WriteFds);
1383 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001384 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001385 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001386 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001387
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001388 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001389 ExceptFdsptr := @ExceptFds;
1390 FD_ZERO(ExceptFds);
1391 FD_SET(socket, ExceptFds);
1392 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001393 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001394 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001395 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001396
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001397 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001398 tv.tv_sec := TimeOut div 1000;
1399 tv.tv_usec := 1000 * (TimeOut mod 1000);
1400 Timeptr := @tv;
1401 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001402 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001403 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001404 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001405
1406 wsaError := 0;
1407 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001408 {$IFDEF MSWINDOWS}
1409 {$IFDEF OLD_UNIT_NAMES}
1410 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1411 {$ELSE}
1412 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1413 {$ENDIF}
1414 {$ENDIF}
1415 {$IFDEF LINUX}
1416 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1417 {$ENDIF}
1418
Jens Geyerd5436f52014-10-03 19:50:38 +02001419 if result = SOCKET_ERROR
1420 then wsaError := WSAGetLastError;
1421
1422 except
1423 result := SOCKET_ERROR;
1424 end;
1425
1426 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001427 ReadReady^ := FD_ISSET(socket, ReadFds);
1428
Jens Geyerd5436f52014-10-03 19:50:38 +02001429 if Assigned(WriteReady) then
1430 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001431
Jens Geyerd5436f52014-10-03 19:50:38 +02001432 if Assigned(ExceptFlag) then
1433 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1434end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001435{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001436
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001437{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001438function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1439 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001440 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001441var bCanRead, bError : Boolean;
1442 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001443const
1444 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001445begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001446 bytesReady := 0;
1447
Jens Geyerd5436f52014-10-03 19:50:38 +02001448 // The select function returns the total number of socket handles that are ready
1449 // and contained in the fd_set structures, zero if the time limit expired,
1450 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1451 // WSAGetLastError can be used to retrieve a specific error code.
1452 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1453 if retval = SOCKET_ERROR
1454 then Exit( TWaitForData.wfd_Error);
1455 if (retval = 0) or not bCanRead
1456 then Exit( TWaitForData.wfd_Timeout);
1457
1458 // recv() returns the number of bytes received, or -1 if an error occurred.
1459 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001460
1461 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001462 if retval <= 0
1463 then Exit( TWaitForData.wfd_Error);
1464
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001465 // at least we have some data
1466 bytesReady := Min( retval, DesiredBytes);
1467 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001468end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001469{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001470
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001471{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001472function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001473// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001474var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001475 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001476 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001477 nBytes : Integer;
1478 pDest : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001479begin
1480 inherited;
1481
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001482 if FTimeout > 0
1483 then msecs := FTimeout
1484 else msecs := DEFAULT_THRIFT_TIMEOUT;
1485
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001486 result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +02001487 pDest := @(PByteArray(pBuf)^[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001488 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001489
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001490 while TRUE do begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001491 wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001492 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001493 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001494 TWaitForData.wfd_HaveData : Break;
1495 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001496 if (FTimeout = 0)
1497 then Exit
1498 else begin
Jens Geyere0e32402016-04-20 21:50:48 +02001499 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001500
1501 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001502 end;
1503 else
1504 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001505 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001506 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001507
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001508 // reduce the timeout once we got data
1509 if FTimeout > 0
1510 then msecs := FTimeout div 10
1511 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1512 msecs := Max( msecs, 200);
1513
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001514 ASSERT( nBytes <= count);
1515 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1516 Inc( pDest, nBytes);
1517 Dec( count, nBytes);
1518 Inc( result, nBytes);
1519 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001520end;
1521
1522function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001523// old sockets version
1524var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001525begin
1526 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001527 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001528 len := FTcpClient.BytesReceived;
1529 end;
1530
1531 SetLength( Result, len );
1532
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001533 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001534 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1535 end;
1536end;
1537
Jens Geyer17c3ad92017-09-05 20:31:27 +02001538procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001539// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001540var bCanWrite, bError : Boolean;
1541 retval, wsaError : Integer;
1542begin
1543 inherited;
1544
1545 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001546 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001547
1548 // The select function returns the total number of socket handles that are ready
1549 // and contained in the fd_set structures, zero if the time limit expired,
1550 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1551 // WSAGetLastError can be used to retrieve a specific error code.
1552 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1553 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001554 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001555
Jens Geyerd5436f52014-10-03 19:50:38 +02001556 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001557 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001558
Jens Geyerd5436f52014-10-03 19:50:38 +02001559 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001560 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001561
Jens Geyer17c3ad92017-09-05 20:31:27 +02001562 FTcpClient.SendBuf( PByteArray(pBuf)^[offset], count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001563end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001564
1565{$ELSE}
1566
Jens Geyer17c3ad92017-09-05 20:31:27 +02001567function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001568// new sockets version
1569var nBytes : Integer;
1570 pDest : PByte;
1571begin
1572 inherited;
1573
1574 result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +02001575 pDest := @(PByteArray(pBuf)^[offset]);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001576 while count > 0 do begin
1577 nBytes := FTcpClient.Read(pDest^, count);
1578 if nBytes = 0 then Exit;
1579 Inc( pDest, nBytes);
1580 Dec( count, nBytes);
1581 Inc( result, nBytes);
1582 end;
1583end;
1584
1585function TTcpSocketStreamImpl.ToArray: TBytes;
1586// new sockets version
1587var len : Integer;
1588begin
1589 len := 0;
1590 try
1591 if FTcpClient.Peek then
1592 repeat
1593 SetLength(Result, Length(Result) + 1024);
1594 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1595 until len < 1024;
1596 except
1597 on TTransportException do begin { don't allow default exceptions } end;
1598 else raise;
1599 end;
1600 if len > 0 then
1601 SetLength(Result, Length(Result) - 1024 + len);
1602end;
1603
Jens Geyer17c3ad92017-09-05 20:31:27 +02001604procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001605// new sockets version
1606begin
1607 inherited;
1608
1609 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001610 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001611
Jens Geyer17c3ad92017-09-05 20:31:27 +02001612 FTcpClient.Write( PByteArray(pBuf)^[offset], count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001613end;
1614
Jens Geyer23d67462015-12-19 11:44:57 +01001615{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001616
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001617
Jens Geyerd5436f52014-10-03 19:50:38 +02001618{$IF CompilerVersion < 21.0}
1619initialization
1620begin
1621 TFramedTransportImpl_Initialize;
1622end;
1623{$IFEND}
1624
1625
1626end.