blob: 52b617bb95c4acb9beffff8c51b616c53abf1e5f [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX, msxml, WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer9f7f11e2016-04-14 21:37:11 +020034 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +020036 Web.Win.Sockets,
37 {$ELSE}
38 Thrift.Socket,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020039 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
42 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030043 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020044
45type
46 ITransport = interface
Jens Geyer17c3ad92017-09-05 20:31:27 +020047 ['{DB84961E-8BB3-4532-99E1-A8C7AC2300F7}']
Jens Geyerd5436f52014-10-03 19:50:38 +020048 function GetIsOpen: Boolean;
49 property IsOpen: Boolean read GetIsOpen;
50 function Peek: Boolean;
51 procedure Open;
52 procedure Close;
Jens Geyer17c3ad92017-09-05 20:31:27 +020053 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
54 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
55 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
56 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020057 procedure Write( const buf: TBytes); overload;
58 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
Jens Geyer17c3ad92017-09-05 20:31:27 +020059 procedure Write( const pBuf : Pointer; off, len : Integer); overload;
60 procedure Write( const pBuf : Pointer; len : Integer); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +020061 procedure Flush;
62 end;
63
64 TTransportImpl = class( TInterfacedObject, ITransport)
65 protected
66 function GetIsOpen: Boolean; virtual; abstract;
67 property IsOpen: Boolean read GetIsOpen;
68 function Peek: Boolean; virtual;
69 procedure Open(); virtual; abstract;
70 procedure Close(); virtual; abstract;
Jens Geyer17c3ad92017-09-05 20:31:27 +020071 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
72 function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
73 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
74 function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
75 procedure Write( const buf: TBytes); overload; inline;
76 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
77 procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
78 procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
Jens Geyerd5436f52014-10-03 19:50:38 +020079 procedure Flush; virtual;
80 end;
81
82 TTransportException = class( Exception )
83 public
84 type
85 TExceptionType = (
86 Unknown,
87 NotOpen,
88 AlreadyOpen,
89 TimedOut,
Jens Geyerbea9bbe2016-04-20 00:02:40 +020090 EndOfFile,
91 BadArgs,
92 Interrupted
Jens Geyerd5436f52014-10-03 19:50:38 +020093 );
94 private
Jens Geyere0e32402016-04-20 21:50:48 +020095 function GetType: TExceptionType;
96 protected
97 constructor HiddenCreate(const Msg: string);
Jens Geyerd5436f52014-10-03 19:50:38 +020098 public
Jens Geyere0e32402016-04-20 21:50:48 +020099 class function Create( AType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
100 class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
101 class function Create( AType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
102 property Type_: TExceptionType read GetType;
Jens Geyerd5436f52014-10-03 19:50:38 +0200103 end;
104
Jens Geyere0e32402016-04-20 21:50:48 +0200105 // Needed to remove deprecation warning
106 TTransportExceptionSpecialized = class abstract (TTransportException)
107 public
108 constructor Create(const Msg: string);
109 end;
110
111 TTransportExceptionUnknown = class (TTransportExceptionSpecialized);
112 TTransportExceptionNotOpen = class (TTransportExceptionSpecialized);
113 TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized);
114 TTransportExceptionTimedOut = class (TTransportExceptionSpecialized);
115 TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized);
116 TTransportExceptionBadArgs = class (TTransportExceptionSpecialized);
117 TTransportExceptionInterrupted = class (TTransportExceptionSpecialized);
118
Jens Geyerd5436f52014-10-03 19:50:38 +0200119 IHTTPClient = interface( ITransport )
120 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
121 procedure SetConnectionTimeout(const Value: Integer);
122 function GetConnectionTimeout: Integer;
123 procedure SetReadTimeout(const Value: Integer);
124 function GetReadTimeout: Integer;
125 function GetCustomHeaders: IThriftDictionary<string,string>;
126 procedure SendRequest;
127 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
128 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
129 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
130 end;
131
132 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
133 private
134 FUri : string;
135 FInputStream : IThriftStream;
136 FOutputStream : IThriftStream;
137 FConnectionTimeout : Integer;
138 FReadTimeout : Integer;
139 FCustomHeaders : IThriftDictionary<string,string>;
140
141 function CreateRequest: IXMLHTTPRequest;
142 protected
143 function GetIsOpen: Boolean; override;
144 procedure Open(); override;
145 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200146 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
147 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200148 procedure Flush; override;
149
150 procedure SetConnectionTimeout(const Value: Integer);
151 function GetConnectionTimeout: Integer;
152 procedure SetReadTimeout(const Value: Integer);
153 function GetReadTimeout: Integer;
154 function GetCustomHeaders: IThriftDictionary<string,string>;
155 procedure SendRequest;
156 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
157 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
158 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
159 public
160 constructor Create( const AUri: string);
161 destructor Destroy; override;
162 end;
163
164 IServerTransport = interface
165 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
166 procedure Listen;
167 procedure Close;
168 function Accept( const fnAccepting: TProc): ITransport;
169 end;
170
171 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
172 protected
173 procedure Listen; virtual; abstract;
174 procedure Close; virtual; abstract;
175 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
176 end;
177
178 ITransportFactory = interface
179 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
180 function GetTransport( const ATrans: ITransport): ITransport;
181 end;
182
183 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
184 function GetTransport( const ATrans: ITransport): ITransport; virtual;
185 end;
186
187 TTcpSocketStreamImpl = class( TThriftStreamImpl )
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200188{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200189 private type
190 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
191 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200192 FTcpClient : TCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200193 FTimeout : Integer;
194 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
195 TimeOut: Integer; var wsaError : Integer): Integer;
196 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200197 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200198{$ELSE}
199 FTcpClient: TSocket;
200 protected const
201 SLEEP_TIME = 200;
202{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200203 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200204 procedure Write( const pBuf : Pointer; offset, count: Integer); override;
205 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200206 procedure Open; override;
207 procedure Close; override;
208 procedure Flush; override;
209
210 function IsOpen: Boolean; override;
211 function ToArray: TBytes; override;
212 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200213{$IFDEF OLD_SOCKETS}
214 constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
215{$ELSE}
216 constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
217{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200218 end;
219
220 IStreamTransport = interface( ITransport )
221 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
222 function GetInputStream: IThriftStream;
223 function GetOutputStream: IThriftStream;
224 property InputStream : IThriftStream read GetInputStream;
225 property OutputStream : IThriftStream read GetOutputStream;
226 end;
227
228 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
229 protected
230 FInputStream : IThriftStream;
231 FOutputStream : IThriftStream;
232 protected
233 function GetIsOpen: Boolean; override;
234
235 function GetInputStream: IThriftStream;
236 function GetOutputStream: IThriftStream;
237 public
238 property InputStream : IThriftStream read GetInputStream;
239 property OutputStream : IThriftStream read GetOutputStream;
240
241 procedure Open; override;
242 procedure Close; override;
243 procedure Flush; override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200244 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
245 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200246 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
247 destructor Destroy; override;
248 end;
249
250 TBufferedStreamImpl = class( TThriftStreamImpl)
251 private
252 FStream : IThriftStream;
253 FBufSize : Integer;
254 FReadBuffer : TMemoryStream;
255 FWriteBuffer : TMemoryStream;
256 protected
Jens Geyer17c3ad92017-09-05 20:31:27 +0200257 procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
258 function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200259 procedure Open; override;
260 procedure Close; override;
261 procedure Flush; override;
262 function IsOpen: Boolean; override;
263 function ToArray: TBytes; override;
264 public
265 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
266 destructor Destroy; override;
267 end;
268
269 TServerSocketImpl = class( TServerTransportImpl)
270 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200271{$IFDEF OLD_SOCKETS}
272 FServer : TTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200273 FPort : Integer;
274 FClientTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200275{$ELSE}
276 FServer: TServerSocket;
277{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200278 FUseBufferedSocket : Boolean;
279 FOwnsServer : Boolean;
280 protected
281 function Accept( const fnAccepting: TProc) : ITransport; override;
282 public
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200283{$IFDEF OLD_SOCKETS}
284 constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200285 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200286{$ELSE}
287 constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
288 constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
289{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200290 destructor Destroy; override;
291 procedure Listen; override;
292 procedure Close; override;
293 end;
294
295 TBufferedTransportImpl = class( TTransportImpl )
296 private
297 FInputBuffer : IThriftStream;
298 FOutputBuffer : IThriftStream;
299 FTransport : IStreamTransport;
300 FBufSize : Integer;
301
302 procedure InitBuffers;
303 function GetUnderlyingTransport: ITransport;
304 protected
305 function GetIsOpen: Boolean; override;
306 procedure Flush; override;
307 public
308 procedure Open(); override;
309 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200310 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
311 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200312 constructor Create( const ATransport : IStreamTransport ); overload;
313 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
314 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
315 property IsOpen: Boolean read GetIsOpen;
316 end;
317
318 TSocketImpl = class(TStreamTransportImpl)
319 private
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200320{$IFDEF OLD_SOCKETS}
321 FClient : TCustomIpClient;
322{$ELSE}
323 FClient: TSocket;
324{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200325 FOwnsClient : Boolean;
326 FHost : string;
327 FPort : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200328{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200329 FTimeout : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200330{$ELSE}
331 FTimeout : Longword;
332{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200333
334 procedure InitSocket;
335 protected
336 function GetIsOpen: Boolean; override;
337 public
338 procedure Open; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200339{$IFDEF OLD_SOCKETS}
340 constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200341 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200342{$ELSE}
343 constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
344 constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
345{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200346 destructor Destroy; override;
347 procedure Close; override;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200348{$IFDEF OLD_SOCKETS}
349 property TcpClient: TCustomIpClient read FClient;
350{$ELSE}
351 property TcpClient: TSocket read FClient;
352{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200353 property Host : string read FHost;
354 property Port: Integer read FPort;
355 end;
356
357 TFramedTransportImpl = class( TTransportImpl)
358 private const
359 FHeaderSize : Integer = 4;
360 private class var
361 FHeader_Dummy : array of Byte;
362 protected
363 FTransport : ITransport;
364 FWriteBuffer : TMemoryStream;
365 FReadBuffer : TMemoryStream;
366
367 procedure InitWriteBuffer;
368 procedure ReadFrame;
369 public
370 type
371 TFactory = class( TTransportFactoryImpl )
372 public
373 function GetTransport( const ATrans: ITransport): ITransport; override;
374 end;
375
Jens Geyere0e32402016-04-20 21:50:48 +0200376 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200377 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200378 {$ENDIF}
Jens Geyere0e32402016-04-20 21:50:48 +0200379
Jens Geyerd5436f52014-10-03 19:50:38 +0200380 constructor Create; overload;
381 constructor Create( const ATrans: ITransport); overload;
382 destructor Destroy; override;
383
384 procedure Open(); override;
385 function GetIsOpen: Boolean; override;
386
387 procedure Close(); override;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200388 function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
389 procedure Write( const pBuf : Pointer; off, len : Integer); override;
Jens Geyerd5436f52014-10-03 19:50:38 +0200390 procedure Flush; override;
391 end;
392
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200393{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200394procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200395{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200396
397const
398 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
399
400
401implementation
402
403{ TTransportImpl }
404
405procedure TTransportImpl.Flush;
406begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200407 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200408end;
409
410function TTransportImpl.Peek: Boolean;
411begin
412 Result := IsOpen;
413end;
414
Jens Geyer17c3ad92017-09-05 20:31:27 +0200415function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200416begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200417 if Length(buf) > 0
418 then result := Read( @buf[0], Length(buf), off, len)
419 else result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +0200420end;
421
422function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
423begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200424 if Length(buf) > 0
425 then result := ReadAll( @buf[0], Length(buf), off, len)
426 else result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +0200427end;
428
429procedure TTransportImpl.Write( const buf: TBytes);
430begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200431 if Length(buf) > 0
432 then Write( @buf[0], 0, Length(buf));
Jens Geyer17c3ad92017-09-05 20:31:27 +0200433end;
434
435procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
436begin
Jens Geyera76e6c72017-09-08 21:03:30 +0200437 if Length(buf) > 0
438 then Write( @buf[0], off, len);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200439end;
440
441function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
442var ret : Integer;
443begin
444 result := 0;
445 while result < len do begin
446 ret := Read( pBuf, buflen, off + result, len - result);
447 if ret > 0
448 then Inc( result, ret)
449 else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
450 end;
451end;
452
453procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
454begin
455 Self.Write( pBuf, 0, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200456end;
457
458{ THTTPClientImpl }
459
460procedure THTTPClientImpl.Close;
461begin
462 FInputStream := nil;
463 FOutputStream := nil;
464end;
465
466constructor THTTPClientImpl.Create(const AUri: string);
467begin
468 inherited Create;
469 FUri := AUri;
470 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
471 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
472end;
473
474function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
475var
476 pair : TPair<string,string>;
477begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200478 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200479 Result := CoXMLHTTP.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200480 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200481 Result := CoXMLHTTPRequest.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200482 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200483
484 Result.open('POST', FUri, False, '', '');
485 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
486 Result.setRequestHeader( 'Accept', 'application/x-thrift');
487 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
488
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200489 for pair in FCustomHeaders do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200490 Result.setRequestHeader( pair.Key, pair.Value );
491 end;
492end;
493
494destructor THTTPClientImpl.Destroy;
495begin
496 Close;
497 inherited;
498end;
499
500procedure THTTPClientImpl.Flush;
501begin
502 try
503 SendRequest;
504 finally
505 FOutputStream := nil;
506 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
507 end;
508end;
509
510function THTTPClientImpl.GetConnectionTimeout: Integer;
511begin
512 Result := FConnectionTimeout;
513end;
514
515function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
516begin
517 Result := FCustomHeaders;
518end;
519
520function THTTPClientImpl.GetIsOpen: Boolean;
521begin
522 Result := True;
523end;
524
525function THTTPClientImpl.GetReadTimeout: Integer;
526begin
527 Result := FReadTimeout;
528end;
529
530procedure THTTPClientImpl.Open;
531begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200532 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200533end;
534
Jens Geyer17c3ad92017-09-05 20:31:27 +0200535function THTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200536begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100537 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200538 raise TTransportExceptionNotOpen.Create('No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200539 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100540
Jens Geyerd5436f52014-10-03 19:50:38 +0200541 try
Jens Geyer17c3ad92017-09-05 20:31:27 +0200542 Result := FInputStream.Read( pBuf, buflen, off, len)
Jens Geyerd5436f52014-10-03 19:50:38 +0200543 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100544 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200545 do raise TTransportExceptionUnknown.Create(E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200546 end;
547end;
548
549procedure THTTPClientImpl.SendRequest;
550var
551 xmlhttp : IXMLHTTPRequest;
552 ms : TMemoryStream;
553 a : TBytes;
554 len : Integer;
555begin
556 xmlhttp := CreateRequest;
557
558 ms := TMemoryStream.Create;
559 try
560 a := FOutputStream.ToArray;
561 len := Length(a);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200562 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200563 ms.WriteBuffer( Pointer(@a[0])^, len);
564 end;
565 ms.Position := 0;
566 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
567 FInputStream := nil;
568 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
569 finally
570 ms.Free;
571 end;
572end;
573
574procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
575begin
576 FConnectionTimeout := Value;
577end;
578
579procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
580begin
581 FReadTimeout := Value
582end;
583
Jens Geyer17c3ad92017-09-05 20:31:27 +0200584procedure THTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200585begin
Jens Geyer17c3ad92017-09-05 20:31:27 +0200586 FOutputStream.Write( pBuf, off, len);
Jens Geyerd5436f52014-10-03 19:50:38 +0200587end;
588
589{ TTransportException }
590
Jens Geyere0e32402016-04-20 21:50:48 +0200591function TTransportException.GetType: TExceptionType;
592begin
593 if Self is TTransportExceptionNotOpen then Result := TExceptionType.NotOpen
594 else if Self is TTransportExceptionAlreadyOpen then Result := TExceptionType.AlreadyOpen
595 else if Self is TTransportExceptionTimedOut then Result := TExceptionType.TimedOut
596 else if Self is TTransportExceptionEndOfFile then Result := TExceptionType.EndOfFile
597 else if Self is TTransportExceptionBadArgs then Result := TExceptionType.BadArgs
598 else if Self is TTransportExceptionInterrupted then Result := TExceptionType.Interrupted
599 else Result := TExceptionType.Unknown;
600end;
601
602constructor TTransportException.HiddenCreate(const Msg: string);
603begin
604 inherited Create(Msg);
605end;
606
607class function TTransportException.Create(AType: TExceptionType): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200608begin
609 //no inherited;
Jens Geyere0e32402016-04-20 21:50:48 +0200610{$WARN SYMBOL_DEPRECATED OFF}
611 Result := Create(AType, '')
612{$WARN SYMBOL_DEPRECATED DEFAULT}
Jens Geyerd5436f52014-10-03 19:50:38 +0200613end;
614
Jens Geyere0e32402016-04-20 21:50:48 +0200615class function TTransportException.Create(AType: TExceptionType;
616 const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200617begin
Jens Geyere0e32402016-04-20 21:50:48 +0200618 case AType of
619 TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
620 TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
621 TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
622 TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
623 TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
624 TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
625 else
626 Result := TTransportExceptionUnknown.Create(msg);
627 end;
Jens Geyerd5436f52014-10-03 19:50:38 +0200628end;
629
Jens Geyere0e32402016-04-20 21:50:48 +0200630class function TTransportException.Create(const msg: string): TTransportException;
Jens Geyerd5436f52014-10-03 19:50:38 +0200631begin
Jens Geyere0e32402016-04-20 21:50:48 +0200632 Result := TTransportExceptionUnknown.Create(Msg);
633end;
634
635{ TTransportExceptionSpecialized }
636
637constructor TTransportExceptionSpecialized.Create(const Msg: string);
638begin
639 inherited HiddenCreate(Msg);
Jens Geyerd5436f52014-10-03 19:50:38 +0200640end;
641
642{ TTransportFactoryImpl }
643
644function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
645begin
646 Result := ATrans;
647end;
648
649{ TServerSocket }
650
Jens Geyer23d67462015-12-19 11:44:57 +0100651{$IFDEF OLD_SOCKETS}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200652constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200653begin
654 inherited Create;
655 FServer := AServer;
656 FClientTimeout := AClientTimeout;
657end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200658{$ELSE}
659constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
Jens Geyerd5436f52014-10-03 19:50:38 +0200660begin
661 inherited Create;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200662 FServer := AServer;
663 FServer.RecvTimeout := AClientTimeout;
664 FServer.SendTimeout := AClientTimeout;
665end;
666{$ENDIF}
667
668{$IFDEF OLD_SOCKETS}
669constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
670{$ELSE}
671constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
672{$ENDIF}
673begin
674 inherited Create;
675{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200676 FPort := APort;
677 FClientTimeout := AClientTimeout;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200678 FServer := TTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200679 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200680 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200681 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200682 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200683 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200684 {$IFEND}
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200685{$ELSE}
686 FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
687{$ENDIF}
688 FUseBufferedSocket := AUseBufferedSockets;
689 FOwnsServer := True;
Jens Geyerd5436f52014-10-03 19:50:38 +0200690end;
691
692destructor TServerSocketImpl.Destroy;
693begin
694 if FOwnsServer then begin
695 FServer.Free;
696 FServer := nil;
697 end;
698 inherited;
699end;
700
701function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
702var
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200703{$IFDEF OLD_SOCKETS}
704 client : TCustomIpClient;
705{$ELSE}
706 client: TSocket;
707{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200708 trans : IStreamTransport;
709begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100710 if FServer = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200711 raise TTransportExceptionNotOpen.Create('No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200712 end;
713
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200714{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200715 client := nil;
716 try
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200717 client := TCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200718
719 if Assigned(fnAccepting)
720 then fnAccepting();
721
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100722 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200723 client.Free;
724 Result := nil;
725 Exit;
726 end;
727
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100728 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200729 Result := nil;
730 Exit;
731 end;
732
733 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
734 client := nil; // trans owns it now
735
736 if FUseBufferedSocket
737 then result := TBufferedTransportImpl.Create( trans)
738 else result := trans;
739
740 except
741 on E: Exception do begin
742 client.Free;
Jens Geyere0e32402016-04-20 21:50:48 +0200743 raise TTransportExceptionUnknown.Create(E.ToString);
Jens Geyerd5436f52014-10-03 19:50:38 +0200744 end;
745 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200746{$ELSE}
747 if Assigned(fnAccepting) then
748 fnAccepting();
749
750 client := FServer.Accept;
751 try
752 trans := TSocketImpl.Create(client, True);
753 client := nil;
754
755 if FUseBufferedSocket then
756 Result := TBufferedTransportImpl.Create(trans)
757 else
758 Result := trans;
759 except
760 client.Free;
761 raise;
762 end;
763{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200764end;
765
766procedure TServerSocketImpl.Listen;
767begin
768 if FServer <> nil then
769 begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200770{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200771 try
772 FServer.Active := True;
773 except
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200774 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200775 do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200776 end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200777{$ELSE}
778 FServer.Listen;
779{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200780 end;
781end;
782
783procedure TServerSocketImpl.Close;
784begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200785 if FServer <> nil then
786{$IFDEF OLD_SOCKETS}
787 try
788 FServer.Active := False;
789 except
790 on E: Exception
Jens Geyere0e32402016-04-20 21:50:48 +0200791 do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200792 end;
793{$ELSE}
794 FServer.Close;
795{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200796end;
797
798{ TSocket }
799
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200800{$IFDEF OLD_SOCKETS}
801constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200802var stream : IThriftStream;
803begin
804 FClient := AClient;
805 FTimeout := ATimeout;
806 FOwnsClient := aOwnsClient;
807 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
808 inherited Create( stream, stream);
809end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200810{$ELSE}
811constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
812var stream : IThriftStream;
813begin
814 FClient := AClient;
815 FTimeout := AClient.RecvTimeout;
816 FOwnsClient := aOwnsClient;
817 stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
818 inherited Create(stream, stream);
819end;
820{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200821
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200822{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200823constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200824{$ELSE}
825constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
826{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200827begin
828 inherited Create(nil,nil);
829 FHost := AHost;
830 FPort := APort;
831 FTimeout := ATimeout;
832 InitSocket;
833end;
834
835destructor TSocketImpl.Destroy;
836begin
837 if FOwnsClient
838 then FreeAndNil( FClient);
839 inherited;
840end;
841
842procedure TSocketImpl.Close;
843begin
844 inherited Close;
845 if FOwnsClient
846 then FreeAndNil( FClient);
847end;
848
849function TSocketImpl.GetIsOpen: Boolean;
850begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200851{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200852 Result := (FClient <> nil) and FClient.Connected;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200853{$ELSE}
854 Result := (FClient <> nil) and FClient.IsOpen
855{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200856end;
857
858procedure TSocketImpl.InitSocket;
859var
860 stream : IThriftStream;
861begin
862 if FOwnsClient
863 then FreeAndNil( FClient)
864 else FClient := nil;
865
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200866{$IFDEF OLD_SOCKETS}
867 FClient := TTcpClient.Create( nil);
868{$ELSE}
869 FClient := TSocket.Create(FHost, FPort);
870{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200871 FOwnsClient := True;
872
873 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
874 FInputStream := stream;
875 FOutputStream := stream;
876end;
877
878procedure TSocketImpl.Open;
879begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100880 if IsOpen then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200881 raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200882 end;
883
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100884 if FHost = '' then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200885 raise TTransportExceptionNotOpen.Create('Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200886 end;
887
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100888 if Port <= 0 then begin
Jens Geyere0e32402016-04-20 21:50:48 +0200889 raise TTransportExceptionNotOpen.Create('Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200890 end;
891
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100892 if FClient = nil
893 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200894
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200895{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200896 FClient.RemoteHost := TSocketHost( Host);
897 FClient.RemotePort := TSocketPort( IntToStr( Port));
898 FClient.Connect;
Jens Geyerbea9bbe2016-04-20 00:02:40 +0200899{$ELSE}
900 FClient.Open;
901{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200902
903 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
904 FOutputStream := FInputStream;
905end;
906
907{ TBufferedStream }
908
909procedure TBufferedStreamImpl.Close;
910begin
911 Flush;
912 FStream := nil;
913
914 FReadBuffer.Free;
915 FReadBuffer := nil;
916
917 FWriteBuffer.Free;
918 FWriteBuffer := nil;
919end;
920
921constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
922begin
923 inherited Create;
924 FStream := AStream;
925 FBufSize := ABufSize;
926 FReadBuffer := TMemoryStream.Create;
927 FWriteBuffer := TMemoryStream.Create;
928end;
929
930destructor TBufferedStreamImpl.Destroy;
931begin
932 Close;
933 inherited;
934end;
935
936procedure TBufferedStreamImpl.Flush;
937var
938 buf : TBytes;
939 len : Integer;
940begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200941 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200942 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200943 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200944 SetLength( buf, len );
945 FWriteBuffer.Position := 0;
946 FWriteBuffer.Read( Pointer(@buf[0])^, len );
947 FStream.Write( buf, 0, len );
948 end;
949 FWriteBuffer.Clear;
950 end;
951end;
952
953function TBufferedStreamImpl.IsOpen: Boolean;
954begin
955 Result := (FWriteBuffer <> nil)
956 and (FReadBuffer <> nil)
957 and (FStream <> nil);
958end;
959
960procedure TBufferedStreamImpl.Open;
961begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200962 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200963end;
964
Jens Geyer17c3ad92017-09-05 20:31:27 +0200965function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200966var
967 nRead : Integer;
968 tempbuf : TBytes;
969begin
970 inherited;
971 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200972
973 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200974 while count > 0 do begin
975
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200976 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200977 FReadBuffer.Clear;
978 SetLength( tempbuf, FBufSize);
979 nRead := FStream.Read( tempbuf, 0, FBufSize );
980 if nRead = 0 then Break; // avoid infinite loop
981
982 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
983 FReadBuffer.Position := 0;
984 end;
985
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200986 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200987 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
Jens Geyer17c3ad92017-09-05 20:31:27 +0200988 Inc( Result, FReadBuffer.Read( PByteArray(pBuf)^[offset], nRead));
Jens Geyerd5436f52014-10-03 19:50:38 +0200989 Dec( count, nRead);
990 Inc( offset, nRead);
991 end;
992 end;
993 end;
994end;
995
996function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200997var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200998begin
999 len := 0;
1000
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001001 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001002 len := FReadBuffer.Size;
1003 end;
1004
1005 SetLength( Result, len);
1006
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001007 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001008 FReadBuffer.Position := 0;
1009 FReadBuffer.Read( Pointer(@Result[0])^, len );
1010 end;
1011end;
1012
Jens Geyer17c3ad92017-09-05 20:31:27 +02001013procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001014begin
1015 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001016 if count > 0 then begin
1017 if IsOpen then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001018 FWriteBuffer.Write( PByteArray(pBuf)^[offset], count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001019 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001020 Flush;
1021 end;
1022 end;
1023 end;
1024end;
1025
1026{ TStreamTransportImpl }
1027
1028procedure TStreamTransportImpl.Close;
1029begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001030 FInputStream := nil;
1031 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +02001032end;
1033
1034constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
1035begin
1036 inherited Create;
1037 FInputStream := AInputStream;
1038 FOutputStream := AOutputStream;
1039end;
1040
1041destructor TStreamTransportImpl.Destroy;
1042begin
1043 FInputStream := nil;
1044 FOutputStream := nil;
1045 inherited;
1046end;
1047
1048procedure TStreamTransportImpl.Flush;
1049begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001050 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001051 raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001052 end;
1053
1054 FOutputStream.Flush;
1055end;
1056
1057function TStreamTransportImpl.GetInputStream: IThriftStream;
1058begin
1059 Result := FInputStream;
1060end;
1061
1062function TStreamTransportImpl.GetIsOpen: Boolean;
1063begin
1064 Result := True;
1065end;
1066
1067function TStreamTransportImpl.GetOutputStream: IThriftStream;
1068begin
1069 Result := FInputStream;
1070end;
1071
1072procedure TStreamTransportImpl.Open;
1073begin
1074
1075end;
1076
Jens Geyer17c3ad92017-09-05 20:31:27 +02001077function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001078begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001079 if FInputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001080 raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001081 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001082
Jens Geyer17c3ad92017-09-05 20:31:27 +02001083 Result := FInputStream.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001084end;
1085
Jens Geyer17c3ad92017-09-05 20:31:27 +02001086procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001087begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001088 if FOutputStream = nil then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001089 raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001090 end;
1091
Jens Geyer17c3ad92017-09-05 20:31:27 +02001092 FOutputStream.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001093end;
1094
1095{ TBufferedTransportImpl }
1096
1097constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
1098begin
1099 //no inherited;
1100 Create( ATransport, 1024 );
1101end;
1102
1103procedure TBufferedTransportImpl.Close;
1104begin
1105 FTransport.Close;
1106end;
1107
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001108constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001109begin
1110 inherited Create;
1111 FTransport := ATransport;
1112 FBufSize := ABufSize;
1113 InitBuffers;
1114end;
1115
1116procedure TBufferedTransportImpl.Flush;
1117begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001118 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001119 FOutputBuffer.Flush;
1120 end;
1121end;
1122
1123function TBufferedTransportImpl.GetIsOpen: Boolean;
1124begin
1125 Result := FTransport.IsOpen;
1126end;
1127
1128function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
1129begin
1130 Result := FTransport;
1131end;
1132
1133procedure TBufferedTransportImpl.InitBuffers;
1134begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001135 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001136 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
1137 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001138 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001139 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
1140 end;
1141end;
1142
1143procedure TBufferedTransportImpl.Open;
1144begin
1145 FTransport.Open
1146end;
1147
Jens Geyer17c3ad92017-09-05 20:31:27 +02001148function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001149begin
1150 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001151 if FInputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001152 Result := FInputBuffer.Read( pBuf,buflen, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001153 end;
1154end;
1155
Jens Geyer17c3ad92017-09-05 20:31:27 +02001156procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001157begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001158 if FOutputBuffer <> nil then begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001159 FOutputBuffer.Write( pBuf, off, len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001160 end;
1161end;
1162
1163{ TFramedTransportImpl }
1164
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001165{$IFDEF HAVE_CLASS_CTOR}
1166class constructor TFramedTransportImpl.Create;
1167begin
1168 SetLength( FHeader_Dummy, FHeaderSize);
1169 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1170end;
1171{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001172procedure TFramedTransportImpl_Initialize;
1173begin
1174 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1175 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1176 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1177end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001178{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001179
1180constructor TFramedTransportImpl.Create;
1181begin
1182 inherited Create;
1183 InitWriteBuffer;
1184end;
1185
1186procedure TFramedTransportImpl.Close;
1187begin
1188 FTransport.Close;
1189end;
1190
1191constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1192begin
1193 inherited Create;
1194 InitWriteBuffer;
1195 FTransport := ATrans;
1196end;
1197
1198destructor TFramedTransportImpl.Destroy;
1199begin
1200 FWriteBuffer.Free;
1201 FReadBuffer.Free;
1202 inherited;
1203end;
1204
1205procedure TFramedTransportImpl.Flush;
1206var
1207 buf : TBytes;
1208 len : Integer;
1209 data_len : Integer;
1210
1211begin
1212 len := FWriteBuffer.Size;
1213 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001214 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001215 System.Move( FWriteBuffer.Memory^, buf[0], len );
1216 end;
1217
1218 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001219 if (data_len < 0) then begin
Jens Geyere0e32402016-04-20 21:50:48 +02001220 raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001221 end;
1222
1223 InitWriteBuffer;
1224
1225 buf[0] := Byte($FF and (data_len shr 24));
1226 buf[1] := Byte($FF and (data_len shr 16));
1227 buf[2] := Byte($FF and (data_len shr 8));
1228 buf[3] := Byte($FF and data_len);
1229
1230 FTransport.Write( buf, 0, len );
1231 FTransport.Flush;
1232end;
1233
1234function TFramedTransportImpl.GetIsOpen: Boolean;
1235begin
1236 Result := FTransport.IsOpen;
1237end;
1238
1239type
1240 TAccessMemoryStream = class(TMemoryStream)
1241 end;
1242
1243procedure TFramedTransportImpl.InitWriteBuffer;
1244begin
1245 FWriteBuffer.Free;
1246 FWriteBuffer := TMemoryStream.Create;
1247 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1248 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1249end;
1250
1251procedure TFramedTransportImpl.Open;
1252begin
1253 FTransport.Open;
1254end;
1255
Jens Geyer17c3ad92017-09-05 20:31:27 +02001256function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001257begin
Jens Geyer17c3ad92017-09-05 20:31:27 +02001258 if len > (buflen-off)
1259 then len := buflen-off;
1260
1261 if (FReadBuffer <> nil) and (len > 0) then begin
1262 result := FReadBuffer.Read( PByteArray(pBuf)^[off], len);
1263 if result > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001264 Exit;
1265 end;
1266 end;
1267
1268 ReadFrame;
1269 if len > 0
Jens Geyer17c3ad92017-09-05 20:31:27 +02001270 then Result := FReadBuffer.Read( PByteArray(pBuf)^[off], len)
Jens Geyerd5436f52014-10-03 19:50:38 +02001271 else Result := 0;
1272end;
1273
1274procedure TFramedTransportImpl.ReadFrame;
1275var
1276 i32rd : TBytes;
1277 size : Integer;
1278 buff : TBytes;
1279begin
1280 SetLength( i32rd, FHeaderSize );
1281 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1282 size :=
1283 ((i32rd[0] and $FF) shl 24) or
1284 ((i32rd[1] and $FF) shl 16) or
1285 ((i32rd[2] and $FF) shl 8) or
1286 (i32rd[3] and $FF);
1287 SetLength( buff, size );
1288 FTransport.ReadAll( buff, 0, size );
1289 FReadBuffer.Free;
1290 FReadBuffer := TMemoryStream.Create;
Jens Geyera76e6c72017-09-08 21:03:30 +02001291 if Length(buff) > 0
1292 then FReadBuffer.Write( Pointer(@buff[0])^, size );
Jens Geyerd5436f52014-10-03 19:50:38 +02001293 FReadBuffer.Position := 0;
1294end;
1295
Jens Geyer17c3ad92017-09-05 20:31:27 +02001296procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001297begin
1298 if len > 0
Jens Geyer17c3ad92017-09-05 20:31:27 +02001299 then FWriteBuffer.Write( PByteArray(pBuf)^[off], len );
Jens Geyerd5436f52014-10-03 19:50:38 +02001300end;
1301
1302{ TFramedTransport.TFactory }
1303
1304function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1305begin
1306 Result := TFramedTransportImpl.Create( ATrans );
1307end;
1308
1309{ TTcpSocketStreamImpl }
1310
1311procedure TTcpSocketStreamImpl.Close;
1312begin
1313 FTcpClient.Close;
1314end;
1315
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001316{$IFDEF OLD_SOCKETS}
1317constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001318begin
1319 inherited Create;
1320 FTcpClient := ATcpClient;
1321 FTimeout := aTimeout;
1322end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001323{$ELSE}
1324constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
1325begin
1326 inherited Create;
1327 FTcpClient := ATcpClient;
1328 if aTimeout = 0 then
1329 FTcpClient.RecvTimeout := SLEEP_TIME
1330 else
1331 FTcpClient.RecvTimeout := aTimeout;
1332 FTcpClient.SendTimeout := aTimeout;
1333end;
1334{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001335
1336procedure TTcpSocketStreamImpl.Flush;
1337begin
1338
1339end;
1340
1341function TTcpSocketStreamImpl.IsOpen: Boolean;
1342begin
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001343{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001344 Result := FTcpClient.Active;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001345{$ELSE}
1346 Result := FTcpClient.IsOpen;
1347{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001348end;
1349
1350procedure TTcpSocketStreamImpl.Open;
1351begin
1352 FTcpClient.Open;
1353end;
1354
1355
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001356{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001357function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1358 TimeOut: Integer; var wsaError : Integer): Integer;
1359var
1360 ReadFds: TFDset;
1361 ReadFdsptr: PFDset;
1362 WriteFds: TFDset;
1363 WriteFdsptr: PFDset;
1364 ExceptFds: TFDset;
1365 ExceptFdsptr: PFDset;
1366 tv: timeval;
1367 Timeptr: PTimeval;
1368 socket : TSocket;
1369begin
1370 if not FTcpClient.Active then begin
1371 wsaError := WSAEINVAL;
1372 Exit( SOCKET_ERROR);
1373 end;
1374
1375 socket := FTcpClient.Handle;
1376
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001377 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001378 ReadFdsptr := @ReadFds;
1379 FD_ZERO(ReadFds);
1380 FD_SET(socket, ReadFds);
1381 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001382 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001383 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001384 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001385
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001386 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001387 WriteFdsptr := @WriteFds;
1388 FD_ZERO(WriteFds);
1389 FD_SET(socket, WriteFds);
1390 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001391 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001392 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001393 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001394
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001395 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001396 ExceptFdsptr := @ExceptFds;
1397 FD_ZERO(ExceptFds);
1398 FD_SET(socket, ExceptFds);
1399 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001400 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001401 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001402 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001403
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001404 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001405 tv.tv_sec := TimeOut div 1000;
1406 tv.tv_usec := 1000 * (TimeOut mod 1000);
1407 Timeptr := @tv;
1408 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001409 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001410 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001411 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001412
1413 wsaError := 0;
1414 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001415 {$IFDEF MSWINDOWS}
1416 {$IFDEF OLD_UNIT_NAMES}
1417 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1418 {$ELSE}
1419 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1420 {$ENDIF}
1421 {$ENDIF}
1422 {$IFDEF LINUX}
1423 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1424 {$ENDIF}
1425
Jens Geyerd5436f52014-10-03 19:50:38 +02001426 if result = SOCKET_ERROR
1427 then wsaError := WSAGetLastError;
1428
1429 except
1430 result := SOCKET_ERROR;
1431 end;
1432
1433 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001434 ReadReady^ := FD_ISSET(socket, ReadFds);
1435
Jens Geyerd5436f52014-10-03 19:50:38 +02001436 if Assigned(WriteReady) then
1437 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001438
Jens Geyerd5436f52014-10-03 19:50:38 +02001439 if Assigned(ExceptFlag) then
1440 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1441end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001442{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001443
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001444{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001445function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1446 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001447 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001448var bCanRead, bError : Boolean;
1449 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001450const
1451 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001452begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001453 bytesReady := 0;
1454
Jens Geyerd5436f52014-10-03 19:50:38 +02001455 // The select function returns the total number of socket handles that are ready
1456 // and contained in the fd_set structures, zero if the time limit expired,
1457 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1458 // WSAGetLastError can be used to retrieve a specific error code.
1459 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1460 if retval = SOCKET_ERROR
1461 then Exit( TWaitForData.wfd_Error);
1462 if (retval = 0) or not bCanRead
1463 then Exit( TWaitForData.wfd_Timeout);
1464
1465 // recv() returns the number of bytes received, or -1 if an error occurred.
1466 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001467
1468 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001469 if retval <= 0
1470 then Exit( TWaitForData.wfd_Error);
1471
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001472 // at least we have some data
1473 bytesReady := Min( retval, DesiredBytes);
1474 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001475end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001476{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001477
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001478{$IFDEF OLD_SOCKETS}
Jens Geyer17c3ad92017-09-05 20:31:27 +02001479function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001480// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001481var wfd : TWaitForData;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001482 wsaError,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001483 msecs : Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001484 nBytes : Integer;
1485 pDest : PByte;
Jens Geyerd5436f52014-10-03 19:50:38 +02001486begin
1487 inherited;
1488
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001489 if FTimeout > 0
1490 then msecs := FTimeout
1491 else msecs := DEFAULT_THRIFT_TIMEOUT;
1492
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001493 result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +02001494 pDest := @(PByteArray(pBuf)^[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001495 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001496
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001497 while TRUE do begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001498 wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001499 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001500 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001501 TWaitForData.wfd_HaveData : Break;
1502 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001503 if (FTimeout = 0)
1504 then Exit
1505 else begin
Jens Geyere0e32402016-04-20 21:50:48 +02001506 raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001507
1508 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001509 end;
1510 else
1511 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001512 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001513 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001514
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001515 // reduce the timeout once we got data
1516 if FTimeout > 0
1517 then msecs := FTimeout div 10
1518 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1519 msecs := Max( msecs, 200);
1520
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001521 ASSERT( nBytes <= count);
1522 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1523 Inc( pDest, nBytes);
1524 Dec( count, nBytes);
1525 Inc( result, nBytes);
1526 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001527end;
1528
1529function TTcpSocketStreamImpl.ToArray: TBytes;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001530// old sockets version
1531var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001532begin
1533 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001534 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001535 len := FTcpClient.BytesReceived;
1536 end;
1537
1538 SetLength( Result, len );
1539
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001540 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001541 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1542 end;
1543end;
1544
Jens Geyer17c3ad92017-09-05 20:31:27 +02001545procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001546// old sockets version
Jens Geyerd5436f52014-10-03 19:50:38 +02001547var bCanWrite, bError : Boolean;
1548 retval, wsaError : Integer;
1549begin
1550 inherited;
1551
1552 if not FTcpClient.Active
Jens Geyere0e32402016-04-20 21:50:48 +02001553 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerd5436f52014-10-03 19:50:38 +02001554
1555 // The select function returns the total number of socket handles that are ready
1556 // and contained in the fd_set structures, zero if the time limit expired,
1557 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1558 // WSAGetLastError can be used to retrieve a specific error code.
1559 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1560 if retval = SOCKET_ERROR
Jens Geyere0e32402016-04-20 21:50:48 +02001561 then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001562
Jens Geyerd5436f52014-10-03 19:50:38 +02001563 if (retval = 0)
Jens Geyere0e32402016-04-20 21:50:48 +02001564 then raise TTransportExceptionTimedOut.Create('timed out');
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001565
Jens Geyerd5436f52014-10-03 19:50:38 +02001566 if bError or not bCanWrite
Jens Geyere0e32402016-04-20 21:50:48 +02001567 then raise TTransportExceptionUnknown.Create('unknown error');
Jens Geyerd5436f52014-10-03 19:50:38 +02001568
Jens Geyer17c3ad92017-09-05 20:31:27 +02001569 FTcpClient.SendBuf( PByteArray(pBuf)^[offset], count);
Jens Geyerd5436f52014-10-03 19:50:38 +02001570end;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001571
1572{$ELSE}
1573
Jens Geyer17c3ad92017-09-05 20:31:27 +02001574function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001575// new sockets version
1576var nBytes : Integer;
1577 pDest : PByte;
1578begin
1579 inherited;
1580
1581 result := 0;
Jens Geyer17c3ad92017-09-05 20:31:27 +02001582 pDest := @(PByteArray(pBuf)^[offset]);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001583 while count > 0 do begin
1584 nBytes := FTcpClient.Read(pDest^, count);
1585 if nBytes = 0 then Exit;
1586 Inc( pDest, nBytes);
1587 Dec( count, nBytes);
1588 Inc( result, nBytes);
1589 end;
1590end;
1591
1592function TTcpSocketStreamImpl.ToArray: TBytes;
1593// new sockets version
1594var len : Integer;
1595begin
1596 len := 0;
1597 try
1598 if FTcpClient.Peek then
1599 repeat
1600 SetLength(Result, Length(Result) + 1024);
1601 len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
1602 until len < 1024;
1603 except
1604 on TTransportException do begin { don't allow default exceptions } end;
1605 else raise;
1606 end;
1607 if len > 0 then
1608 SetLength(Result, Length(Result) - 1024 + len);
1609end;
1610
Jens Geyer17c3ad92017-09-05 20:31:27 +02001611procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001612// new sockets version
1613begin
1614 inherited;
1615
1616 if not FTcpClient.IsOpen
Kyle Johnsone363a342016-04-22 19:11:16 -05001617 then raise TTransportExceptionNotOpen.Create('not open');
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001618
Jens Geyer17c3ad92017-09-05 20:31:27 +02001619 FTcpClient.Write( PByteArray(pBuf)^[offset], count);
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001620end;
1621
Jens Geyer23d67462015-12-19 11:44:57 +01001622{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001623
Jens Geyerbea9bbe2016-04-20 00:02:40 +02001624
Jens Geyerd5436f52014-10-03 19:50:38 +02001625{$IF CompilerVersion < 21.0}
1626initialization
1627begin
1628 TFramedTransportImpl_Initialize;
1629end;
1630{$IFEND}
1631
1632
1633end.