blob: e005d4f901fb3ef4ab0017410210a9131e80ced8 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
Jens Geyerd5436f52014-10-03 19:50:38 +020019unit Thrift.Transport;
20
Jens Geyer9f7f11e2016-04-14 21:37:11 +020021{$I Thrift.Defines.inc}
22{$SCOPEDENUMS ON}
23
Jens Geyerd5436f52014-10-03 19:50:38 +020024interface
25
26uses
27 Classes,
28 SysUtils,
29 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020030 Generics.Collections,
Jens Geyer9f7f11e2016-04-14 21:37:11 +020031 {$IFDEF OLD_UNIT_NAMES}
32 ActiveX, msxml, WinSock, Sockets,
Nick4f5229e2016-04-14 16:43:22 +030033 {$ELSE}
Jens Geyer9f7f11e2016-04-14 21:37:11 +020034 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
35 {$IFDEF OLD_SOCKETS}
36 Web.Win.Sockets,
37 {$ELSE}
38 System.Win.ScktComp,
39 {$ENDIF}
40 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020041 Thrift.Collections,
42 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030043 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020044
45type
46 ITransport = interface
47 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
48 function GetIsOpen: Boolean;
49 property IsOpen: Boolean read GetIsOpen;
50 function Peek: Boolean;
51 procedure Open;
52 procedure Close;
53 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
54 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
55 procedure Write( const buf: TBytes); overload;
56 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
57 procedure Flush;
58 end;
59
60 TTransportImpl = class( TInterfacedObject, ITransport)
61 protected
62 function GetIsOpen: Boolean; virtual; abstract;
63 property IsOpen: Boolean read GetIsOpen;
64 function Peek: Boolean; virtual;
65 procedure Open(); virtual; abstract;
66 procedure Close(); virtual; abstract;
67 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
68 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
69 procedure Write( const buf: TBytes); overload; virtual;
70 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
71 procedure Flush; virtual;
72 end;
73
74 TTransportException = class( Exception )
75 public
76 type
77 TExceptionType = (
78 Unknown,
79 NotOpen,
80 AlreadyOpen,
81 TimedOut,
82 EndOfFile
83 );
84 private
85 FType : TExceptionType;
86 public
87 constructor Create( AType: TExceptionType); overload;
88 constructor Create( const msg: string); overload;
89 constructor Create( AType: TExceptionType; const msg: string); overload;
90 property Type_: TExceptionType read FType;
91 end;
92
93 IHTTPClient = interface( ITransport )
94 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
95 procedure SetConnectionTimeout(const Value: Integer);
96 function GetConnectionTimeout: Integer;
97 procedure SetReadTimeout(const Value: Integer);
98 function GetReadTimeout: Integer;
99 function GetCustomHeaders: IThriftDictionary<string,string>;
100 procedure SendRequest;
101 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
102 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
103 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
104 end;
105
106 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
107 private
108 FUri : string;
109 FInputStream : IThriftStream;
110 FOutputStream : IThriftStream;
111 FConnectionTimeout : Integer;
112 FReadTimeout : Integer;
113 FCustomHeaders : IThriftDictionary<string,string>;
114
115 function CreateRequest: IXMLHTTPRequest;
116 protected
117 function GetIsOpen: Boolean; override;
118 procedure Open(); override;
119 procedure Close(); override;
120 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
121 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
122 procedure Flush; override;
123
124 procedure SetConnectionTimeout(const Value: Integer);
125 function GetConnectionTimeout: Integer;
126 procedure SetReadTimeout(const Value: Integer);
127 function GetReadTimeout: Integer;
128 function GetCustomHeaders: IThriftDictionary<string,string>;
129 procedure SendRequest;
130 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
131 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
132 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
133 public
134 constructor Create( const AUri: string);
135 destructor Destroy; override;
136 end;
137
138 IServerTransport = interface
139 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
140 procedure Listen;
141 procedure Close;
142 function Accept( const fnAccepting: TProc): ITransport;
143 end;
144
145 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
146 protected
147 procedure Listen; virtual; abstract;
148 procedure Close; virtual; abstract;
149 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
150 end;
151
152 ITransportFactory = interface
153 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
154 function GetTransport( const ATrans: ITransport): ITransport;
155 end;
156
157 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
158 function GetTransport( const ATrans: ITransport): ITransport; virtual;
159 end;
160
Jens Geyer23d67462015-12-19 11:44:57 +0100161 {$IFDEF OLD_SOCKETS}
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100162 TThriftCustomIpClient = TCustomIpClient;
163 TThriftTcpServer = TTcpServer;
164 TThriftTcpClient = TTcpClient;
165 {$ELSE}
166 // TODO
167 {$ENDIF}
168
169 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200170 TTcpSocketStreamImpl = class( TThriftStreamImpl )
171 private type
172 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
173 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100174 FTcpClient : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200175 FTimeout : Integer;
176 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
177 TimeOut: Integer; var wsaError : Integer): Integer;
178 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200179 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +0200180 protected
181 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
182 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
183 procedure Open; override;
184 procedure Close; override;
185 procedure Flush; override;
186
187 function IsOpen: Boolean; override;
188 function ToArray: TBytes; override;
189 public
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100190 constructor Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200191 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100192 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200193
194 IStreamTransport = interface( ITransport )
195 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
196 function GetInputStream: IThriftStream;
197 function GetOutputStream: IThriftStream;
198 property InputStream : IThriftStream read GetInputStream;
199 property OutputStream : IThriftStream read GetOutputStream;
200 end;
201
202 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
203 protected
204 FInputStream : IThriftStream;
205 FOutputStream : IThriftStream;
206 protected
207 function GetIsOpen: Boolean; override;
208
209 function GetInputStream: IThriftStream;
210 function GetOutputStream: IThriftStream;
211 public
212 property InputStream : IThriftStream read GetInputStream;
213 property OutputStream : IThriftStream read GetOutputStream;
214
215 procedure Open; override;
216 procedure Close; override;
217 procedure Flush; override;
218 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
219 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
220 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
221 destructor Destroy; override;
222 end;
223
224 TBufferedStreamImpl = class( TThriftStreamImpl)
225 private
226 FStream : IThriftStream;
227 FBufSize : Integer;
228 FReadBuffer : TMemoryStream;
229 FWriteBuffer : TMemoryStream;
230 protected
231 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
232 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
233 procedure Open; override;
234 procedure Close; override;
235 procedure Flush; override;
236 function IsOpen: Boolean; override;
237 function ToArray: TBytes; override;
238 public
239 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
240 destructor Destroy; override;
241 end;
242
Jens Geyer23d67462015-12-19 11:44:57 +0100243 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200244 TServerSocketImpl = class( TServerTransportImpl)
245 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100246 FServer : TThriftTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200247 FPort : Integer;
248 FClientTimeout : Integer;
249 FUseBufferedSocket : Boolean;
250 FOwnsServer : Boolean;
251 protected
252 function Accept( const fnAccepting: TProc) : ITransport; override;
253 public
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100254 constructor Create( const AServer: TThriftTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200255 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
256 destructor Destroy; override;
257 procedure Listen; override;
258 procedure Close; override;
259 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100260 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200261
262 TBufferedTransportImpl = class( TTransportImpl )
263 private
264 FInputBuffer : IThriftStream;
265 FOutputBuffer : IThriftStream;
266 FTransport : IStreamTransport;
267 FBufSize : Integer;
268
269 procedure InitBuffers;
270 function GetUnderlyingTransport: ITransport;
271 protected
272 function GetIsOpen: Boolean; override;
273 procedure Flush; override;
274 public
275 procedure Open(); override;
276 procedure Close(); override;
277 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
278 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
279 constructor Create( const ATransport : IStreamTransport ); overload;
280 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
281 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
282 property IsOpen: Boolean read GetIsOpen;
283 end;
284
Jens Geyer23d67462015-12-19 11:44:57 +0100285 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200286 TSocketImpl = class(TStreamTransportImpl)
287 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100288 FClient : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200289 FOwnsClient : Boolean;
290 FHost : string;
291 FPort : Integer;
292 FTimeout : Integer;
293
294 procedure InitSocket;
295 protected
296 function GetIsOpen: Boolean; override;
297 public
298 procedure Open; override;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100299 constructor Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200300 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
301 destructor Destroy; override;
302 procedure Close; override;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100303 property TcpClient: TThriftCustomIpClient read FClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200304 property Host : string read FHost;
305 property Port: Integer read FPort;
306 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100307 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200308
309 TFramedTransportImpl = class( TTransportImpl)
310 private const
311 FHeaderSize : Integer = 4;
312 private class var
313 FHeader_Dummy : array of Byte;
314 protected
315 FTransport : ITransport;
316 FWriteBuffer : TMemoryStream;
317 FReadBuffer : TMemoryStream;
318
319 procedure InitWriteBuffer;
320 procedure ReadFrame;
321 public
322 type
323 TFactory = class( TTransportFactoryImpl )
324 public
325 function GetTransport( const ATrans: ITransport): ITransport; override;
326 end;
327
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200328 {$IFDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200329 class constructor Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200330 {$ENDIF}
331
Jens Geyerd5436f52014-10-03 19:50:38 +0200332 constructor Create; overload;
333 constructor Create( const ATrans: ITransport); overload;
334 destructor Destroy; override;
335
336 procedure Open(); override;
337 function GetIsOpen: Boolean; override;
338
339 procedure Close(); override;
340 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
341 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
342 procedure Flush; override;
343 end;
344
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200345{$IFNDEF HAVE_CLASS_CTOR}
Jens Geyerd5436f52014-10-03 19:50:38 +0200346procedure TFramedTransportImpl_Initialize;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200347{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200348
349const
350 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
351
352
353implementation
354
355{ TTransportImpl }
356
357procedure TTransportImpl.Flush;
358begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200359 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200360end;
361
362function TTransportImpl.Peek: Boolean;
363begin
364 Result := IsOpen;
365end;
366
367function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
368var
369 got : Integer;
370 ret : Integer;
371begin
372 got := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200373 while got < len do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200374 ret := Read( buf, off + got, len - got);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200375 if ret > 0
376 then Inc( got, ret)
377 else raise TTransportException.Create( 'Cannot read, Remote side has closed' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200378 end;
379 Result := got;
380end;
381
382procedure TTransportImpl.Write( const buf: TBytes);
383begin
384 Self.Write( buf, 0, Length(buf) );
385end;
386
387{ THTTPClientImpl }
388
389procedure THTTPClientImpl.Close;
390begin
391 FInputStream := nil;
392 FOutputStream := nil;
393end;
394
395constructor THTTPClientImpl.Create(const AUri: string);
396begin
397 inherited Create;
398 FUri := AUri;
399 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
400 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
401end;
402
403function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
404var
405 pair : TPair<string,string>;
406begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200407 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200408 Result := CoXMLHTTP.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200409 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200410 Result := CoXMLHTTPRequest.Create;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200411 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200412
413 Result.open('POST', FUri, False, '', '');
414 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
415 Result.setRequestHeader( 'Accept', 'application/x-thrift');
416 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
417
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200418 for pair in FCustomHeaders do begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200419 Result.setRequestHeader( pair.Key, pair.Value );
420 end;
421end;
422
423destructor THTTPClientImpl.Destroy;
424begin
425 Close;
426 inherited;
427end;
428
429procedure THTTPClientImpl.Flush;
430begin
431 try
432 SendRequest;
433 finally
434 FOutputStream := nil;
435 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
436 end;
437end;
438
439function THTTPClientImpl.GetConnectionTimeout: Integer;
440begin
441 Result := FConnectionTimeout;
442end;
443
444function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
445begin
446 Result := FCustomHeaders;
447end;
448
449function THTTPClientImpl.GetIsOpen: Boolean;
450begin
451 Result := True;
452end;
453
454function THTTPClientImpl.GetReadTimeout: Integer;
455begin
456 Result := FReadTimeout;
457end;
458
459procedure THTTPClientImpl.Open;
460begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200461 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200462end;
463
464function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
465begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100466 if FInputStream = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200467 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100468 'No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200469 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100470
Jens Geyerd5436f52014-10-03 19:50:38 +0200471 try
472 Result := FInputStream.Read( buf, off, len )
473 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100474 on E: Exception
475 do raise TTransportException.Create( TTransportException.TExceptionType.Unknown, E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200476 end;
477end;
478
479procedure THTTPClientImpl.SendRequest;
480var
481 xmlhttp : IXMLHTTPRequest;
482 ms : TMemoryStream;
483 a : TBytes;
484 len : Integer;
485begin
486 xmlhttp := CreateRequest;
487
488 ms := TMemoryStream.Create;
489 try
490 a := FOutputStream.ToArray;
491 len := Length(a);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200492 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200493 ms.WriteBuffer( Pointer(@a[0])^, len);
494 end;
495 ms.Position := 0;
496 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
497 FInputStream := nil;
498 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
499 finally
500 ms.Free;
501 end;
502end;
503
504procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
505begin
506 FConnectionTimeout := Value;
507end;
508
509procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
510begin
511 FReadTimeout := Value
512end;
513
514procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
515begin
516 FOutputStream.Write( buf, off, len);
517end;
518
519{ TTransportException }
520
521constructor TTransportException.Create(AType: TExceptionType);
522begin
523 //no inherited;
524 Create( AType, '' )
525end;
526
527constructor TTransportException.Create(AType: TExceptionType;
528 const msg: string);
529begin
530 inherited Create(msg);
531 FType := AType;
532end;
533
534constructor TTransportException.Create(const msg: string);
535begin
536 inherited Create(msg);
537end;
538
539{ TTransportFactoryImpl }
540
541function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
542begin
543 Result := ATrans;
544end;
545
546{ TServerSocket }
547
Jens Geyer23d67462015-12-19 11:44:57 +0100548{$IFDEF OLD_SOCKETS}
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100549constructor TServerSocketImpl.Create( const AServer: TThriftTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200550begin
551 inherited Create;
552 FServer := AServer;
553 FClientTimeout := AClientTimeout;
554end;
555
556constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
557begin
558 inherited Create;
559 FPort := APort;
560 FClientTimeout := AClientTimeout;
561 FUseBufferedSocket := AUseBufferedSockets;
562 FOwnsServer := True;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100563 FServer := TThriftTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200564 FServer.BlockMode := bmBlocking;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200565 {$IF CompilerVersion >= 21.0}
Jens Geyerd5436f52014-10-03 19:50:38 +0200566 FServer.LocalPort := AnsiString( IntToStr( FPort));
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200567 {$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +0200568 FServer.LocalPort := IntToStr( FPort);
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200569 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +0200570end;
571
572destructor TServerSocketImpl.Destroy;
573begin
574 if FOwnsServer then begin
575 FServer.Free;
576 FServer := nil;
577 end;
578 inherited;
579end;
580
581function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
582var
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100583 client : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200584 trans : IStreamTransport;
585begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100586 if FServer = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200587 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100588 'No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200589 end;
590
591 client := nil;
592 try
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100593 client := TThriftCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200594
595 if Assigned(fnAccepting)
596 then fnAccepting();
597
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100598 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200599 client.Free;
600 Result := nil;
601 Exit;
602 end;
603
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100604 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200605 Result := nil;
606 Exit;
607 end;
608
609 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
610 client := nil; // trans owns it now
611
612 if FUseBufferedSocket
613 then result := TBufferedTransportImpl.Create( trans)
614 else result := trans;
615
616 except
617 on E: Exception do begin
618 client.Free;
619 raise TTransportException.Create( E.ToString );
620 end;
621 end;
622end;
623
624procedure TServerSocketImpl.Listen;
625begin
626 if FServer <> nil then
627 begin
628 try
629 FServer.Active := True;
630 except
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200631 on E: Exception
632 do raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200633 end;
634 end;
635end;
636
637procedure TServerSocketImpl.Close;
638begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100639 if FServer <> nil
640 then try
641 FServer.Active := False;
642 except
643 on E: Exception
644 do raise TTransportException.Create('Error on closing socket : ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200645 end;
646end;
647
648{ TSocket }
649
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100650constructor TSocketImpl.Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200651var stream : IThriftStream;
652begin
653 FClient := AClient;
654 FTimeout := ATimeout;
655 FOwnsClient := aOwnsClient;
656 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
657 inherited Create( stream, stream);
658end;
659
660constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
661begin
662 inherited Create(nil,nil);
663 FHost := AHost;
664 FPort := APort;
665 FTimeout := ATimeout;
666 InitSocket;
667end;
668
669destructor TSocketImpl.Destroy;
670begin
671 if FOwnsClient
672 then FreeAndNil( FClient);
673 inherited;
674end;
675
676procedure TSocketImpl.Close;
677begin
678 inherited Close;
679 if FOwnsClient
680 then FreeAndNil( FClient);
681end;
682
683function TSocketImpl.GetIsOpen: Boolean;
684begin
685 Result := (FClient <> nil) and FClient.Connected;
686end;
687
688procedure TSocketImpl.InitSocket;
689var
690 stream : IThriftStream;
691begin
692 if FOwnsClient
693 then FreeAndNil( FClient)
694 else FClient := nil;
695
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100696 FClient := TThriftTcpClient.Create( nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200697 FOwnsClient := True;
698
699 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
700 FInputStream := stream;
701 FOutputStream := stream;
702end;
703
704procedure TSocketImpl.Open;
705begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100706 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200707 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100708 'Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200709 end;
710
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100711 if FHost = '' then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200712 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100713 'Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200714 end;
715
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100716 if Port <= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200717 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100718 'Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200719 end;
720
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100721 if FClient = nil
722 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200723
724 FClient.RemoteHost := TSocketHost( Host);
725 FClient.RemotePort := TSocketPort( IntToStr( Port));
726 FClient.Connect;
727
728 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
729 FOutputStream := FInputStream;
730end;
Jens Geyer23d67462015-12-19 11:44:57 +0100731{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200732
733{ TBufferedStream }
734
735procedure TBufferedStreamImpl.Close;
736begin
737 Flush;
738 FStream := nil;
739
740 FReadBuffer.Free;
741 FReadBuffer := nil;
742
743 FWriteBuffer.Free;
744 FWriteBuffer := nil;
745end;
746
747constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
748begin
749 inherited Create;
750 FStream := AStream;
751 FBufSize := ABufSize;
752 FReadBuffer := TMemoryStream.Create;
753 FWriteBuffer := TMemoryStream.Create;
754end;
755
756destructor TBufferedStreamImpl.Destroy;
757begin
758 Close;
759 inherited;
760end;
761
762procedure TBufferedStreamImpl.Flush;
763var
764 buf : TBytes;
765 len : Integer;
766begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200767 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200768 len := FWriteBuffer.Size;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200769 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200770 SetLength( buf, len );
771 FWriteBuffer.Position := 0;
772 FWriteBuffer.Read( Pointer(@buf[0])^, len );
773 FStream.Write( buf, 0, len );
774 end;
775 FWriteBuffer.Clear;
776 end;
777end;
778
779function TBufferedStreamImpl.IsOpen: Boolean;
780begin
781 Result := (FWriteBuffer <> nil)
782 and (FReadBuffer <> nil)
783 and (FStream <> nil);
784end;
785
786procedure TBufferedStreamImpl.Open;
787begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200788 // nothing to do
Jens Geyerd5436f52014-10-03 19:50:38 +0200789end;
790
791function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
792var
793 nRead : Integer;
794 tempbuf : TBytes;
795begin
796 inherited;
797 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200798
799 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200800 while count > 0 do begin
801
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200802 if FReadBuffer.Position >= FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200803 FReadBuffer.Clear;
804 SetLength( tempbuf, FBufSize);
805 nRead := FStream.Read( tempbuf, 0, FBufSize );
806 if nRead = 0 then Break; // avoid infinite loop
807
808 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
809 FReadBuffer.Position := 0;
810 end;
811
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200812 if FReadBuffer.Position < FReadBuffer.Size then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200813 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
814 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
815 Dec( count, nRead);
816 Inc( offset, nRead);
817 end;
818 end;
819 end;
820end;
821
822function TBufferedStreamImpl.ToArray: TBytes;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200823var len : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200824begin
825 len := 0;
826
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200827 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200828 len := FReadBuffer.Size;
829 end;
830
831 SetLength( Result, len);
832
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200833 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200834 FReadBuffer.Position := 0;
835 FReadBuffer.Read( Pointer(@Result[0])^, len );
836 end;
837end;
838
839procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
840begin
841 inherited;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200842 if count > 0 then begin
843 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200844 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200845 if FWriteBuffer.Size > FBufSize then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200846 Flush;
847 end;
848 end;
849 end;
850end;
851
852{ TStreamTransportImpl }
853
854procedure TStreamTransportImpl.Close;
855begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100856 FInputStream := nil;
857 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200858end;
859
860constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
861begin
862 inherited Create;
863 FInputStream := AInputStream;
864 FOutputStream := AOutputStream;
865end;
866
867destructor TStreamTransportImpl.Destroy;
868begin
869 FInputStream := nil;
870 FOutputStream := nil;
871 inherited;
872end;
873
874procedure TStreamTransportImpl.Flush;
875begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100876 if FOutputStream = nil then begin
877 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
878 'Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200879 end;
880
881 FOutputStream.Flush;
882end;
883
884function TStreamTransportImpl.GetInputStream: IThriftStream;
885begin
886 Result := FInputStream;
887end;
888
889function TStreamTransportImpl.GetIsOpen: Boolean;
890begin
891 Result := True;
892end;
893
894function TStreamTransportImpl.GetOutputStream: IThriftStream;
895begin
896 Result := FInputStream;
897end;
898
899procedure TStreamTransportImpl.Open;
900begin
901
902end;
903
904function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
905begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100906 if FInputStream = nil then begin
907 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
908 'Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200909 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100910
Jens Geyerd5436f52014-10-03 19:50:38 +0200911 Result := FInputStream.Read( buf, off, len );
912end;
913
914procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
915begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100916 if FOutputStream = nil then begin
917 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
918 'Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200919 end;
920
921 FOutputStream.Write( buf, off, len );
922end;
923
924{ TBufferedTransportImpl }
925
926constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
927begin
928 //no inherited;
929 Create( ATransport, 1024 );
930end;
931
932procedure TBufferedTransportImpl.Close;
933begin
934 FTransport.Close;
935end;
936
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200937constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport; ABufSize: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200938begin
939 inherited Create;
940 FTransport := ATransport;
941 FBufSize := ABufSize;
942 InitBuffers;
943end;
944
945procedure TBufferedTransportImpl.Flush;
946begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200947 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200948 FOutputBuffer.Flush;
949 end;
950end;
951
952function TBufferedTransportImpl.GetIsOpen: Boolean;
953begin
954 Result := FTransport.IsOpen;
955end;
956
957function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
958begin
959 Result := FTransport;
960end;
961
962procedure TBufferedTransportImpl.InitBuffers;
963begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200964 if FTransport.InputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200965 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
966 end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200967 if FTransport.OutputStream <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200968 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
969 end;
970end;
971
972procedure TBufferedTransportImpl.Open;
973begin
974 FTransport.Open
975end;
976
977function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
978begin
979 Result := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200980 if FInputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200981 Result := FInputBuffer.Read( buf, off, len );
982 end;
983end;
984
985procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
986begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200987 if FOutputBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200988 FOutputBuffer.Write( buf, off, len );
989 end;
990end;
991
992{ TFramedTransportImpl }
993
Jens Geyer9f7f11e2016-04-14 21:37:11 +0200994{$IFDEF HAVE_CLASS_CTOR}
995class constructor TFramedTransportImpl.Create;
996begin
997 SetLength( FHeader_Dummy, FHeaderSize);
998 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
999end;
1000{$ELSE}
Jens Geyerd5436f52014-10-03 19:50:38 +02001001procedure TFramedTransportImpl_Initialize;
1002begin
1003 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1004 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1005 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1006end;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001007{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001008
1009constructor TFramedTransportImpl.Create;
1010begin
1011 inherited Create;
1012 InitWriteBuffer;
1013end;
1014
1015procedure TFramedTransportImpl.Close;
1016begin
1017 FTransport.Close;
1018end;
1019
1020constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1021begin
1022 inherited Create;
1023 InitWriteBuffer;
1024 FTransport := ATrans;
1025end;
1026
1027destructor TFramedTransportImpl.Destroy;
1028begin
1029 FWriteBuffer.Free;
1030 FReadBuffer.Free;
1031 inherited;
1032end;
1033
1034procedure TFramedTransportImpl.Flush;
1035var
1036 buf : TBytes;
1037 len : Integer;
1038 data_len : Integer;
1039
1040begin
1041 len := FWriteBuffer.Size;
1042 SetLength( buf, len);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001043 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001044 System.Move( FWriteBuffer.Memory^, buf[0], len );
1045 end;
1046
1047 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001048 if (data_len < 0) then begin
1049 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1050 'TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001051 end;
1052
1053 InitWriteBuffer;
1054
1055 buf[0] := Byte($FF and (data_len shr 24));
1056 buf[1] := Byte($FF and (data_len shr 16));
1057 buf[2] := Byte($FF and (data_len shr 8));
1058 buf[3] := Byte($FF and data_len);
1059
1060 FTransport.Write( buf, 0, len );
1061 FTransport.Flush;
1062end;
1063
1064function TFramedTransportImpl.GetIsOpen: Boolean;
1065begin
1066 Result := FTransport.IsOpen;
1067end;
1068
1069type
1070 TAccessMemoryStream = class(TMemoryStream)
1071 end;
1072
1073procedure TFramedTransportImpl.InitWriteBuffer;
1074begin
1075 FWriteBuffer.Free;
1076 FWriteBuffer := TMemoryStream.Create;
1077 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1078 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1079end;
1080
1081procedure TFramedTransportImpl.Open;
1082begin
1083 FTransport.Open;
1084end;
1085
1086function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1087var
1088 got : Integer;
1089begin
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001090 if FReadBuffer <> nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001091 if len > 0
1092 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1093 else got := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001094
1095 if got > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001096 Result := got;
1097 Exit;
1098 end;
1099 end;
1100
1101 ReadFrame;
1102 if len > 0
1103 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1104 else Result := 0;
1105end;
1106
1107procedure TFramedTransportImpl.ReadFrame;
1108var
1109 i32rd : TBytes;
1110 size : Integer;
1111 buff : TBytes;
1112begin
1113 SetLength( i32rd, FHeaderSize );
1114 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1115 size :=
1116 ((i32rd[0] and $FF) shl 24) or
1117 ((i32rd[1] and $FF) shl 16) or
1118 ((i32rd[2] and $FF) shl 8) or
1119 (i32rd[3] and $FF);
1120 SetLength( buff, size );
1121 FTransport.ReadAll( buff, 0, size );
1122 FReadBuffer.Free;
1123 FReadBuffer := TMemoryStream.Create;
1124 FReadBuffer.Write( Pointer(@buff[0])^, size );
1125 FReadBuffer.Position := 0;
1126end;
1127
1128procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1129begin
1130 if len > 0
1131 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
1132end;
1133
1134{ TFramedTransport.TFactory }
1135
1136function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1137begin
1138 Result := TFramedTransportImpl.Create( ATrans );
1139end;
1140
1141{ TTcpSocketStreamImpl }
1142
Jens Geyer23d67462015-12-19 11:44:57 +01001143{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001144procedure TTcpSocketStreamImpl.Close;
1145begin
1146 FTcpClient.Close;
1147end;
1148
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001149constructor TTcpSocketStreamImpl.Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001150begin
1151 inherited Create;
1152 FTcpClient := ATcpClient;
1153 FTimeout := aTimeout;
1154end;
1155
1156procedure TTcpSocketStreamImpl.Flush;
1157begin
1158
1159end;
1160
1161function TTcpSocketStreamImpl.IsOpen: Boolean;
1162begin
1163 Result := FTcpClient.Active;
1164end;
1165
1166procedure TTcpSocketStreamImpl.Open;
1167begin
1168 FTcpClient.Open;
1169end;
1170
1171
1172function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1173 TimeOut: Integer; var wsaError : Integer): Integer;
1174var
1175 ReadFds: TFDset;
1176 ReadFdsptr: PFDset;
1177 WriteFds: TFDset;
1178 WriteFdsptr: PFDset;
1179 ExceptFds: TFDset;
1180 ExceptFdsptr: PFDset;
1181 tv: timeval;
1182 Timeptr: PTimeval;
1183 socket : TSocket;
1184begin
1185 if not FTcpClient.Active then begin
1186 wsaError := WSAEINVAL;
1187 Exit( SOCKET_ERROR);
1188 end;
1189
1190 socket := FTcpClient.Handle;
1191
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001192 if Assigned(ReadReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001193 ReadFdsptr := @ReadFds;
1194 FD_ZERO(ReadFds);
1195 FD_SET(socket, ReadFds);
1196 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001197 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001198 ReadFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001199 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001200
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001201 if Assigned(WriteReady) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001202 WriteFdsptr := @WriteFds;
1203 FD_ZERO(WriteFds);
1204 FD_SET(socket, WriteFds);
1205 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001206 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001207 WriteFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001208 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001209
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001210 if Assigned(ExceptFlag) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001211 ExceptFdsptr := @ExceptFds;
1212 FD_ZERO(ExceptFds);
1213 FD_SET(socket, ExceptFds);
1214 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001215 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001216 ExceptFdsptr := nil;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001217 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001218
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001219 if TimeOut >= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001220 tv.tv_sec := TimeOut div 1000;
1221 tv.tv_usec := 1000 * (TimeOut mod 1000);
1222 Timeptr := @tv;
1223 end
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001224 else begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001225 Timeptr := nil; // wait forever
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001226 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001227
1228 wsaError := 0;
1229 try
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001230 {$IFDEF MSWINDOWS}
1231 {$IFDEF OLD_UNIT_NAMES}
1232 result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1233 {$ELSE}
1234 result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1235 {$ENDIF}
1236 {$ENDIF}
1237 {$IFDEF LINUX}
1238 result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1239 {$ENDIF}
1240
Jens Geyerd5436f52014-10-03 19:50:38 +02001241 if result = SOCKET_ERROR
1242 then wsaError := WSAGetLastError;
1243
1244 except
1245 result := SOCKET_ERROR;
1246 end;
1247
1248 if Assigned(ReadReady) then
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001249 ReadReady^ := FD_ISSET(socket, ReadFds);
1250
Jens Geyerd5436f52014-10-03 19:50:38 +02001251 if Assigned(WriteReady) then
1252 WriteReady^ := FD_ISSET(socket, WriteFds);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001253
Jens Geyerd5436f52014-10-03 19:50:38 +02001254 if Assigned(ExceptFlag) then
1255 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1256end;
1257
1258function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1259 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001260 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001261var bCanRead, bError : Boolean;
1262 retval : Integer;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001263const
1264 MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
Jens Geyerd5436f52014-10-03 19:50:38 +02001265begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001266 bytesReady := 0;
1267
Jens Geyerd5436f52014-10-03 19:50:38 +02001268 // The select function returns the total number of socket handles that are ready
1269 // and contained in the fd_set structures, zero if the time limit expired,
1270 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1271 // WSAGetLastError can be used to retrieve a specific error code.
1272 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1273 if retval = SOCKET_ERROR
1274 then Exit( TWaitForData.wfd_Error);
1275 if (retval = 0) or not bCanRead
1276 then Exit( TWaitForData.wfd_Timeout);
1277
1278 // recv() returns the number of bytes received, or -1 if an error occurred.
1279 // The return value will be 0 when the peer has performed an orderly shutdown.
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001280
1281 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
Jens Geyerd5436f52014-10-03 19:50:38 +02001282 if retval <= 0
1283 then Exit( TWaitForData.wfd_Error);
1284
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001285 // at least we have some data
1286 bytesReady := Min( retval, DesiredBytes);
1287 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001288end;
1289
1290function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
1291var wfd : TWaitForData;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001292 wsaError, nBytes : Integer;
1293 pDest : PByte;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001294 msecs : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001295begin
1296 inherited;
1297
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001298 if FTimeout > 0
1299 then msecs := FTimeout
1300 else msecs := DEFAULT_THRIFT_TIMEOUT;
1301
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001302 result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001303 pDest := Pointer(@buffer[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001304 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001305
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001306 while TRUE do begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001307 wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001308 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001309 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001310 TWaitForData.wfd_HaveData : Break;
1311 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001312 if (FTimeout = 0)
1313 then Exit
1314 else begin
1315 raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
1316 SysErrorMessage(Cardinal(wsaError)));
1317
1318 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001319 end;
1320 else
1321 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001322 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001323 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001324
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001325 // reduce the timeout once we got data
1326 if FTimeout > 0
1327 then msecs := FTimeout div 10
1328 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1329 msecs := Max( msecs, 200);
1330
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001331 ASSERT( nBytes <= count);
1332 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1333 Inc( pDest, nBytes);
1334 Dec( count, nBytes);
1335 Inc( result, nBytes);
1336 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001337end;
1338
1339function TTcpSocketStreamImpl.ToArray: TBytes;
1340var
1341 len : Integer;
1342begin
1343 len := 0;
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001344 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001345 len := FTcpClient.BytesReceived;
1346 end;
1347
1348 SetLength( Result, len );
1349
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001350 if len > 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001351 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1352 end;
1353end;
1354
1355procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1356var bCanWrite, bError : Boolean;
1357 retval, wsaError : Integer;
1358begin
1359 inherited;
1360
1361 if not FTcpClient.Active
1362 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
1363
1364 // The select function returns the total number of socket handles that are ready
1365 // and contained in the fd_set structures, zero if the time limit expired,
1366 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1367 // WSAGetLastError can be used to retrieve a specific error code.
1368 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1369 if retval = SOCKET_ERROR
1370 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1371 SysErrorMessage(Cardinal(wsaError)));
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001372
Jens Geyerd5436f52014-10-03 19:50:38 +02001373 if (retval = 0)
1374 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
Jens Geyer9f7f11e2016-04-14 21:37:11 +02001375
Jens Geyerd5436f52014-10-03 19:50:38 +02001376 if bError or not bCanWrite
1377 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
1378
1379 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1380end;
Jens Geyer23d67462015-12-19 11:44:57 +01001381{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001382
1383{$IF CompilerVersion < 21.0}
1384initialization
1385begin
1386 TFramedTransportImpl_Initialize;
1387end;
1388{$IFEND}
1389
1390
1391end.