blob: c0f3111afa62a31f6d06a0f3b62e345ee840f4fd [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
Nick4f5229e2016-04-14 16:43:22 +030020{$SCOPEDENUMS ON}
21{$IF CompilerVersion >= 23.0}
22 {$LEGACYIFEND ON}
23{$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +020024
Jens Geyer23d67462015-12-19 11:44:57 +010025{$IF CompilerVersion < 28.0}
26 {$DEFINE OLD_SOCKETS} // TODO: add socket support for CompilerVersion >= 28.0
27{$IFEND}
28
29
Jens Geyerd5436f52014-10-03 19:50:38 +020030unit Thrift.Transport;
31
32interface
33
34uses
35 Classes,
36 SysUtils,
37 Math,
Jens Geyerd5436f52014-10-03 19:50:38 +020038 Generics.Collections,
Nick4f5229e2016-04-14 16:43:22 +030039{$IF CompilerVersion < 23.0}
40 ActiveX, msxml, WinSock, Sockets,
41{$ELSE}
42 Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
43 {$IF CompilerVersion < 28.0}
44 Web.Win.Sockets,
45 {$ELSE}
46 System.Win.ScktComp,
47 {$IFEND}
48{$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +020049 Thrift.Collections,
50 Thrift.Utils,
Nick4f5229e2016-04-14 16:43:22 +030051 Thrift.Stream;
Jens Geyerd5436f52014-10-03 19:50:38 +020052
53type
54 ITransport = interface
55 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
56 function GetIsOpen: Boolean;
57 property IsOpen: Boolean read GetIsOpen;
58 function Peek: Boolean;
59 procedure Open;
60 procedure Close;
61 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
62 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
63 procedure Write( const buf: TBytes); overload;
64 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
65 procedure Flush;
66 end;
67
68 TTransportImpl = class( TInterfacedObject, ITransport)
69 protected
70 function GetIsOpen: Boolean; virtual; abstract;
71 property IsOpen: Boolean read GetIsOpen;
72 function Peek: Boolean; virtual;
73 procedure Open(); virtual; abstract;
74 procedure Close(); virtual; abstract;
75 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
76 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
77 procedure Write( const buf: TBytes); overload; virtual;
78 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
79 procedure Flush; virtual;
80 end;
81
82 TTransportException = class( Exception )
83 public
84 type
85 TExceptionType = (
86 Unknown,
87 NotOpen,
88 AlreadyOpen,
89 TimedOut,
90 EndOfFile
91 );
92 private
93 FType : TExceptionType;
94 public
95 constructor Create( AType: TExceptionType); overload;
96 constructor Create( const msg: string); overload;
97 constructor Create( AType: TExceptionType; const msg: string); overload;
98 property Type_: TExceptionType read FType;
99 end;
100
101 IHTTPClient = interface( ITransport )
102 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
103 procedure SetConnectionTimeout(const Value: Integer);
104 function GetConnectionTimeout: Integer;
105 procedure SetReadTimeout(const Value: Integer);
106 function GetReadTimeout: Integer;
107 function GetCustomHeaders: IThriftDictionary<string,string>;
108 procedure SendRequest;
109 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
110 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
111 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
112 end;
113
114 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
115 private
116 FUri : string;
117 FInputStream : IThriftStream;
118 FOutputStream : IThriftStream;
119 FConnectionTimeout : Integer;
120 FReadTimeout : Integer;
121 FCustomHeaders : IThriftDictionary<string,string>;
122
123 function CreateRequest: IXMLHTTPRequest;
124 protected
125 function GetIsOpen: Boolean; override;
126 procedure Open(); override;
127 procedure Close(); override;
128 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
129 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
130 procedure Flush; override;
131
132 procedure SetConnectionTimeout(const Value: Integer);
133 function GetConnectionTimeout: Integer;
134 procedure SetReadTimeout(const Value: Integer);
135 function GetReadTimeout: Integer;
136 function GetCustomHeaders: IThriftDictionary<string,string>;
137 procedure SendRequest;
138 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
139 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
140 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
141 public
142 constructor Create( const AUri: string);
143 destructor Destroy; override;
144 end;
145
146 IServerTransport = interface
147 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
148 procedure Listen;
149 procedure Close;
150 function Accept( const fnAccepting: TProc): ITransport;
151 end;
152
153 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
154 protected
155 procedure Listen; virtual; abstract;
156 procedure Close; virtual; abstract;
157 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
158 end;
159
160 ITransportFactory = interface
161 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
162 function GetTransport( const ATrans: ITransport): ITransport;
163 end;
164
165 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
166 function GetTransport( const ATrans: ITransport): ITransport; virtual;
167 end;
168
Jens Geyer23d67462015-12-19 11:44:57 +0100169 {$IFDEF OLD_SOCKETS}
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100170 TThriftCustomIpClient = TCustomIpClient;
171 TThriftTcpServer = TTcpServer;
172 TThriftTcpClient = TTcpClient;
173 {$ELSE}
174 // TODO
175 {$ENDIF}
176
177 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200178 TTcpSocketStreamImpl = class( TThriftStreamImpl )
179 private type
180 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
181 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100182 FTcpClient : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200183 FTimeout : Integer;
184 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
185 TimeOut: Integer; var wsaError : Integer): Integer;
186 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200187 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +0200188 protected
189 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
190 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
191 procedure Open; override;
192 procedure Close; override;
193 procedure Flush; override;
194
195 function IsOpen: Boolean; override;
196 function ToArray: TBytes; override;
197 public
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100198 constructor Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200199 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100200 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200201
202 IStreamTransport = interface( ITransport )
203 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
204 function GetInputStream: IThriftStream;
205 function GetOutputStream: IThriftStream;
206 property InputStream : IThriftStream read GetInputStream;
207 property OutputStream : IThriftStream read GetOutputStream;
208 end;
209
210 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
211 protected
212 FInputStream : IThriftStream;
213 FOutputStream : IThriftStream;
214 protected
215 function GetIsOpen: Boolean; override;
216
217 function GetInputStream: IThriftStream;
218 function GetOutputStream: IThriftStream;
219 public
220 property InputStream : IThriftStream read GetInputStream;
221 property OutputStream : IThriftStream read GetOutputStream;
222
223 procedure Open; override;
224 procedure Close; override;
225 procedure Flush; override;
226 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
227 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
228 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
229 destructor Destroy; override;
230 end;
231
232 TBufferedStreamImpl = class( TThriftStreamImpl)
233 private
234 FStream : IThriftStream;
235 FBufSize : Integer;
236 FReadBuffer : TMemoryStream;
237 FWriteBuffer : TMemoryStream;
238 protected
239 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
240 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
241 procedure Open; override;
242 procedure Close; override;
243 procedure Flush; override;
244 function IsOpen: Boolean; override;
245 function ToArray: TBytes; override;
246 public
247 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
248 destructor Destroy; override;
249 end;
250
Jens Geyer23d67462015-12-19 11:44:57 +0100251 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200252 TServerSocketImpl = class( TServerTransportImpl)
253 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100254 FServer : TThriftTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200255 FPort : Integer;
256 FClientTimeout : Integer;
257 FUseBufferedSocket : Boolean;
258 FOwnsServer : Boolean;
259 protected
260 function Accept( const fnAccepting: TProc) : ITransport; override;
261 public
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100262 constructor Create( const AServer: TThriftTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200263 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
264 destructor Destroy; override;
265 procedure Listen; override;
266 procedure Close; override;
267 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100268 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200269
270 TBufferedTransportImpl = class( TTransportImpl )
271 private
272 FInputBuffer : IThriftStream;
273 FOutputBuffer : IThriftStream;
274 FTransport : IStreamTransport;
275 FBufSize : Integer;
276
277 procedure InitBuffers;
278 function GetUnderlyingTransport: ITransport;
279 protected
280 function GetIsOpen: Boolean; override;
281 procedure Flush; override;
282 public
283 procedure Open(); override;
284 procedure Close(); override;
285 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
286 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
287 constructor Create( const ATransport : IStreamTransport ); overload;
288 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
289 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
290 property IsOpen: Boolean read GetIsOpen;
291 end;
292
Jens Geyer23d67462015-12-19 11:44:57 +0100293 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200294 TSocketImpl = class(TStreamTransportImpl)
295 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100296 FClient : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200297 FOwnsClient : Boolean;
298 FHost : string;
299 FPort : Integer;
300 FTimeout : Integer;
301
302 procedure InitSocket;
303 protected
304 function GetIsOpen: Boolean; override;
305 public
306 procedure Open; override;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100307 constructor Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200308 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
309 destructor Destroy; override;
310 procedure Close; override;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100311 property TcpClient: TThriftCustomIpClient read FClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200312 property Host : string read FHost;
313 property Port: Integer read FPort;
314 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100315 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200316
317 TFramedTransportImpl = class( TTransportImpl)
318 private const
319 FHeaderSize : Integer = 4;
320 private class var
321 FHeader_Dummy : array of Byte;
322 protected
323 FTransport : ITransport;
324 FWriteBuffer : TMemoryStream;
325 FReadBuffer : TMemoryStream;
326
327 procedure InitWriteBuffer;
328 procedure ReadFrame;
329 public
330 type
331 TFactory = class( TTransportFactoryImpl )
332 public
333 function GetTransport( const ATrans: ITransport): ITransport; override;
334 end;
335
336{$IF CompilerVersion >= 21.0}
337 class constructor Create;
338{$IFEND}
339 constructor Create; overload;
340 constructor Create( const ATrans: ITransport); overload;
341 destructor Destroy; override;
342
343 procedure Open(); override;
344 function GetIsOpen: Boolean; override;
345
346 procedure Close(); override;
347 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
348 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
349 procedure Flush; override;
350 end;
351
352{$IF CompilerVersion < 21.0}
353procedure TFramedTransportImpl_Initialize;
354{$IFEND}
355
356const
357 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
358
359
360implementation
361
362{ TTransportImpl }
363
364procedure TTransportImpl.Flush;
365begin
366
367end;
368
369function TTransportImpl.Peek: Boolean;
370begin
371 Result := IsOpen;
372end;
373
374function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
375var
376 got : Integer;
377 ret : Integer;
378begin
379 got := 0;
380 while ( got < len) do
381 begin
382 ret := Read( buf, off + got, len - got);
383 if ( ret <= 0 ) then
384 begin
385 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
386 end;
387 got := got + ret;
388 end;
389 Result := got;
390end;
391
392procedure TTransportImpl.Write( const buf: TBytes);
393begin
394 Self.Write( buf, 0, Length(buf) );
395end;
396
397{ THTTPClientImpl }
398
399procedure THTTPClientImpl.Close;
400begin
401 FInputStream := nil;
402 FOutputStream := nil;
403end;
404
405constructor THTTPClientImpl.Create(const AUri: string);
406begin
407 inherited Create;
408 FUri := AUri;
409 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
410 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
411end;
412
413function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
414var
415 pair : TPair<string,string>;
416begin
417{$IF CompilerVersion >= 21.0}
418 Result := CoXMLHTTP.Create;
419{$ELSE}
420 Result := CoXMLHTTPRequest.Create;
421{$IFEND}
422
423 Result.open('POST', FUri, False, '', '');
424 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
425 Result.setRequestHeader( 'Accept', 'application/x-thrift');
426 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
427
428 for pair in FCustomHeaders do
429 begin
430 Result.setRequestHeader( pair.Key, pair.Value );
431 end;
432end;
433
434destructor THTTPClientImpl.Destroy;
435begin
436 Close;
437 inherited;
438end;
439
440procedure THTTPClientImpl.Flush;
441begin
442 try
443 SendRequest;
444 finally
445 FOutputStream := nil;
446 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
447 end;
448end;
449
450function THTTPClientImpl.GetConnectionTimeout: Integer;
451begin
452 Result := FConnectionTimeout;
453end;
454
455function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
456begin
457 Result := FCustomHeaders;
458end;
459
460function THTTPClientImpl.GetIsOpen: Boolean;
461begin
462 Result := True;
463end;
464
465function THTTPClientImpl.GetReadTimeout: Integer;
466begin
467 Result := FReadTimeout;
468end;
469
470procedure THTTPClientImpl.Open;
471begin
472
473end;
474
475function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
476begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100477 if FInputStream = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200478 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100479 'No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200480 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100481
Jens Geyerd5436f52014-10-03 19:50:38 +0200482 try
483 Result := FInputStream.Read( buf, off, len )
484 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100485 on E: Exception
486 do raise TTransportException.Create( TTransportException.TExceptionType.Unknown, E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200487 end;
488end;
489
490procedure THTTPClientImpl.SendRequest;
491var
492 xmlhttp : IXMLHTTPRequest;
493 ms : TMemoryStream;
494 a : TBytes;
495 len : Integer;
496begin
497 xmlhttp := CreateRequest;
498
499 ms := TMemoryStream.Create;
500 try
501 a := FOutputStream.ToArray;
502 len := Length(a);
503 if len > 0 then
504 begin
505 ms.WriteBuffer( Pointer(@a[0])^, len);
506 end;
507 ms.Position := 0;
508 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
509 FInputStream := nil;
510 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
511 finally
512 ms.Free;
513 end;
514end;
515
516procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
517begin
518 FConnectionTimeout := Value;
519end;
520
521procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
522begin
523 FReadTimeout := Value
524end;
525
526procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
527begin
528 FOutputStream.Write( buf, off, len);
529end;
530
531{ TTransportException }
532
533constructor TTransportException.Create(AType: TExceptionType);
534begin
535 //no inherited;
536 Create( AType, '' )
537end;
538
539constructor TTransportException.Create(AType: TExceptionType;
540 const msg: string);
541begin
542 inherited Create(msg);
543 FType := AType;
544end;
545
546constructor TTransportException.Create(const msg: string);
547begin
548 inherited Create(msg);
549end;
550
551{ TTransportFactoryImpl }
552
553function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
554begin
555 Result := ATrans;
556end;
557
558{ TServerSocket }
559
Jens Geyer23d67462015-12-19 11:44:57 +0100560{$IFDEF OLD_SOCKETS}
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100561constructor TServerSocketImpl.Create( const AServer: TThriftTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200562begin
563 inherited Create;
564 FServer := AServer;
565 FClientTimeout := AClientTimeout;
566end;
567
568constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
569begin
570 inherited Create;
571 FPort := APort;
572 FClientTimeout := AClientTimeout;
573 FUseBufferedSocket := AUseBufferedSockets;
574 FOwnsServer := True;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100575 FServer := TThriftTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200576 FServer.BlockMode := bmBlocking;
577{$IF CompilerVersion >= 21.0}
578 FServer.LocalPort := AnsiString( IntToStr( FPort));
579{$ELSE}
580 FServer.LocalPort := IntToStr( FPort);
581{$IFEND}
582end;
583
584destructor TServerSocketImpl.Destroy;
585begin
586 if FOwnsServer then begin
587 FServer.Free;
588 FServer := nil;
589 end;
590 inherited;
591end;
592
593function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
594var
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100595 client : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200596 trans : IStreamTransport;
597begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100598 if FServer = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200599 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100600 'No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200601 end;
602
603 client := nil;
604 try
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100605 client := TThriftCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200606
607 if Assigned(fnAccepting)
608 then fnAccepting();
609
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100610 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200611 client.Free;
612 Result := nil;
613 Exit;
614 end;
615
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100616 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200617 Result := nil;
618 Exit;
619 end;
620
621 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
622 client := nil; // trans owns it now
623
624 if FUseBufferedSocket
625 then result := TBufferedTransportImpl.Create( trans)
626 else result := trans;
627
628 except
629 on E: Exception do begin
630 client.Free;
631 raise TTransportException.Create( E.ToString );
632 end;
633 end;
634end;
635
636procedure TServerSocketImpl.Listen;
637begin
638 if FServer <> nil then
639 begin
640 try
641 FServer.Active := True;
642 except
643 on E: Exception do
644 begin
645 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
646 end;
647 end;
648 end;
649end;
650
651procedure TServerSocketImpl.Close;
652begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100653 if FServer <> nil
654 then try
655 FServer.Active := False;
656 except
657 on E: Exception
658 do raise TTransportException.Create('Error on closing socket : ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200659 end;
660end;
661
662{ TSocket }
663
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100664constructor TSocketImpl.Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200665var stream : IThriftStream;
666begin
667 FClient := AClient;
668 FTimeout := ATimeout;
669 FOwnsClient := aOwnsClient;
670 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
671 inherited Create( stream, stream);
672end;
673
674constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
675begin
676 inherited Create(nil,nil);
677 FHost := AHost;
678 FPort := APort;
679 FTimeout := ATimeout;
680 InitSocket;
681end;
682
683destructor TSocketImpl.Destroy;
684begin
685 if FOwnsClient
686 then FreeAndNil( FClient);
687 inherited;
688end;
689
690procedure TSocketImpl.Close;
691begin
692 inherited Close;
693 if FOwnsClient
694 then FreeAndNil( FClient);
695end;
696
697function TSocketImpl.GetIsOpen: Boolean;
698begin
699 Result := (FClient <> nil) and FClient.Connected;
700end;
701
702procedure TSocketImpl.InitSocket;
703var
704 stream : IThriftStream;
705begin
706 if FOwnsClient
707 then FreeAndNil( FClient)
708 else FClient := nil;
709
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100710 FClient := TThriftTcpClient.Create( nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200711 FOwnsClient := True;
712
713 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
714 FInputStream := stream;
715 FOutputStream := stream;
716end;
717
718procedure TSocketImpl.Open;
719begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100720 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200721 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100722 'Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200723 end;
724
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100725 if FHost = '' then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200726 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100727 'Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200728 end;
729
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100730 if Port <= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200731 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100732 'Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200733 end;
734
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100735 if FClient = nil
736 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200737
738 FClient.RemoteHost := TSocketHost( Host);
739 FClient.RemotePort := TSocketPort( IntToStr( Port));
740 FClient.Connect;
741
742 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
743 FOutputStream := FInputStream;
744end;
Jens Geyer23d67462015-12-19 11:44:57 +0100745{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200746
747{ TBufferedStream }
748
749procedure TBufferedStreamImpl.Close;
750begin
751 Flush;
752 FStream := nil;
753
754 FReadBuffer.Free;
755 FReadBuffer := nil;
756
757 FWriteBuffer.Free;
758 FWriteBuffer := nil;
759end;
760
761constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
762begin
763 inherited Create;
764 FStream := AStream;
765 FBufSize := ABufSize;
766 FReadBuffer := TMemoryStream.Create;
767 FWriteBuffer := TMemoryStream.Create;
768end;
769
770destructor TBufferedStreamImpl.Destroy;
771begin
772 Close;
773 inherited;
774end;
775
776procedure TBufferedStreamImpl.Flush;
777var
778 buf : TBytes;
779 len : Integer;
780begin
781 if IsOpen then
782 begin
783 len := FWriteBuffer.Size;
784 if len > 0 then
785 begin
786 SetLength( buf, len );
787 FWriteBuffer.Position := 0;
788 FWriteBuffer.Read( Pointer(@buf[0])^, len );
789 FStream.Write( buf, 0, len );
790 end;
791 FWriteBuffer.Clear;
792 end;
793end;
794
795function TBufferedStreamImpl.IsOpen: Boolean;
796begin
797 Result := (FWriteBuffer <> nil)
798 and (FReadBuffer <> nil)
799 and (FStream <> nil);
800end;
801
802procedure TBufferedStreamImpl.Open;
803begin
804
805end;
806
807function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
808var
809 nRead : Integer;
810 tempbuf : TBytes;
811begin
812 inherited;
813 Result := 0;
814 if IsOpen then
815 begin
816 while count > 0 do begin
817
818 if FReadBuffer.Position >= FReadBuffer.Size then
819 begin
820 FReadBuffer.Clear;
821 SetLength( tempbuf, FBufSize);
822 nRead := FStream.Read( tempbuf, 0, FBufSize );
823 if nRead = 0 then Break; // avoid infinite loop
824
825 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
826 FReadBuffer.Position := 0;
827 end;
828
829 if FReadBuffer.Position < FReadBuffer.Size then
830 begin
831 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
832 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
833 Dec( count, nRead);
834 Inc( offset, nRead);
835 end;
836 end;
837 end;
838end;
839
840function TBufferedStreamImpl.ToArray: TBytes;
841var
842 len : Integer;
843begin
844 len := 0;
845
846 if IsOpen then
847 begin
848 len := FReadBuffer.Size;
849 end;
850
851 SetLength( Result, len);
852
853 if len > 0 then
854 begin
855 FReadBuffer.Position := 0;
856 FReadBuffer.Read( Pointer(@Result[0])^, len );
857 end;
858end;
859
860procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
861begin
862 inherited;
863 if count > 0 then
864 begin
865 if IsOpen then
866 begin
867 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
868 if FWriteBuffer.Size > FBufSize then
869 begin
870 Flush;
871 end;
872 end;
873 end;
874end;
875
876{ TStreamTransportImpl }
877
878procedure TStreamTransportImpl.Close;
879begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100880 FInputStream := nil;
881 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200882end;
883
884constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
885begin
886 inherited Create;
887 FInputStream := AInputStream;
888 FOutputStream := AOutputStream;
889end;
890
891destructor TStreamTransportImpl.Destroy;
892begin
893 FInputStream := nil;
894 FOutputStream := nil;
895 inherited;
896end;
897
898procedure TStreamTransportImpl.Flush;
899begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100900 if FOutputStream = nil then begin
901 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
902 'Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200903 end;
904
905 FOutputStream.Flush;
906end;
907
908function TStreamTransportImpl.GetInputStream: IThriftStream;
909begin
910 Result := FInputStream;
911end;
912
913function TStreamTransportImpl.GetIsOpen: Boolean;
914begin
915 Result := True;
916end;
917
918function TStreamTransportImpl.GetOutputStream: IThriftStream;
919begin
920 Result := FInputStream;
921end;
922
923procedure TStreamTransportImpl.Open;
924begin
925
926end;
927
928function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
929begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100930 if FInputStream = nil then begin
931 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
932 'Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200933 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100934
Jens Geyerd5436f52014-10-03 19:50:38 +0200935 Result := FInputStream.Read( buf, off, len );
936end;
937
938procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
939begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100940 if FOutputStream = nil then begin
941 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
942 'Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200943 end;
944
945 FOutputStream.Write( buf, off, len );
946end;
947
948{ TBufferedTransportImpl }
949
950constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
951begin
952 //no inherited;
953 Create( ATransport, 1024 );
954end;
955
956procedure TBufferedTransportImpl.Close;
957begin
958 FTransport.Close;
959end;
960
961constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
962 ABufSize: Integer);
963begin
964 inherited Create;
965 FTransport := ATransport;
966 FBufSize := ABufSize;
967 InitBuffers;
968end;
969
970procedure TBufferedTransportImpl.Flush;
971begin
972 if FOutputBuffer <> nil then
973 begin
974 FOutputBuffer.Flush;
975 end;
976end;
977
978function TBufferedTransportImpl.GetIsOpen: Boolean;
979begin
980 Result := FTransport.IsOpen;
981end;
982
983function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
984begin
985 Result := FTransport;
986end;
987
988procedure TBufferedTransportImpl.InitBuffers;
989begin
990 if FTransport.InputStream <> nil then
991 begin
992 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
993 end;
994 if FTransport.OutputStream <> nil then
995 begin
996 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
997 end;
998end;
999
1000procedure TBufferedTransportImpl.Open;
1001begin
1002 FTransport.Open
1003end;
1004
1005function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1006begin
1007 Result := 0;
1008 if FInputBuffer <> nil then
1009 begin
1010 Result := FInputBuffer.Read( buf, off, len );
1011 end;
1012end;
1013
1014procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1015begin
1016 if FOutputBuffer <> nil then
1017 begin
1018 FOutputBuffer.Write( buf, off, len );
1019 end;
1020end;
1021
1022{ TFramedTransportImpl }
1023
1024{$IF CompilerVersion < 21.0}
1025procedure TFramedTransportImpl_Initialize;
1026begin
1027 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1028 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1029 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1030end;
1031{$ELSE}
1032class constructor TFramedTransportImpl.Create;
1033begin
1034 SetLength( FHeader_Dummy, FHeaderSize);
1035 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1036end;
1037{$IFEND}
1038
1039constructor TFramedTransportImpl.Create;
1040begin
1041 inherited Create;
1042 InitWriteBuffer;
1043end;
1044
1045procedure TFramedTransportImpl.Close;
1046begin
1047 FTransport.Close;
1048end;
1049
1050constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1051begin
1052 inherited Create;
1053 InitWriteBuffer;
1054 FTransport := ATrans;
1055end;
1056
1057destructor TFramedTransportImpl.Destroy;
1058begin
1059 FWriteBuffer.Free;
1060 FReadBuffer.Free;
1061 inherited;
1062end;
1063
1064procedure TFramedTransportImpl.Flush;
1065var
1066 buf : TBytes;
1067 len : Integer;
1068 data_len : Integer;
1069
1070begin
1071 len := FWriteBuffer.Size;
1072 SetLength( buf, len);
1073 if len > 0 then
1074 begin
1075 System.Move( FWriteBuffer.Memory^, buf[0], len );
1076 end;
1077
1078 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001079 if (data_len < 0) then begin
1080 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1081 'TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001082 end;
1083
1084 InitWriteBuffer;
1085
1086 buf[0] := Byte($FF and (data_len shr 24));
1087 buf[1] := Byte($FF and (data_len shr 16));
1088 buf[2] := Byte($FF and (data_len shr 8));
1089 buf[3] := Byte($FF and data_len);
1090
1091 FTransport.Write( buf, 0, len );
1092 FTransport.Flush;
1093end;
1094
1095function TFramedTransportImpl.GetIsOpen: Boolean;
1096begin
1097 Result := FTransport.IsOpen;
1098end;
1099
1100type
1101 TAccessMemoryStream = class(TMemoryStream)
1102 end;
1103
1104procedure TFramedTransportImpl.InitWriteBuffer;
1105begin
1106 FWriteBuffer.Free;
1107 FWriteBuffer := TMemoryStream.Create;
1108 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1109 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1110end;
1111
1112procedure TFramedTransportImpl.Open;
1113begin
1114 FTransport.Open;
1115end;
1116
1117function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1118var
1119 got : Integer;
1120begin
1121 if FReadBuffer <> nil then
1122 begin
1123 if len > 0
1124 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1125 else got := 0;
1126 if got > 0 then
1127 begin
1128 Result := got;
1129 Exit;
1130 end;
1131 end;
1132
1133 ReadFrame;
1134 if len > 0
1135 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1136 else Result := 0;
1137end;
1138
1139procedure TFramedTransportImpl.ReadFrame;
1140var
1141 i32rd : TBytes;
1142 size : Integer;
1143 buff : TBytes;
1144begin
1145 SetLength( i32rd, FHeaderSize );
1146 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1147 size :=
1148 ((i32rd[0] and $FF) shl 24) or
1149 ((i32rd[1] and $FF) shl 16) or
1150 ((i32rd[2] and $FF) shl 8) or
1151 (i32rd[3] and $FF);
1152 SetLength( buff, size );
1153 FTransport.ReadAll( buff, 0, size );
1154 FReadBuffer.Free;
1155 FReadBuffer := TMemoryStream.Create;
1156 FReadBuffer.Write( Pointer(@buff[0])^, size );
1157 FReadBuffer.Position := 0;
1158end;
1159
1160procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1161begin
1162 if len > 0
1163 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
1164end;
1165
1166{ TFramedTransport.TFactory }
1167
1168function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1169begin
1170 Result := TFramedTransportImpl.Create( ATrans );
1171end;
1172
1173{ TTcpSocketStreamImpl }
1174
Jens Geyer23d67462015-12-19 11:44:57 +01001175{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001176procedure TTcpSocketStreamImpl.Close;
1177begin
1178 FTcpClient.Close;
1179end;
1180
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001181constructor TTcpSocketStreamImpl.Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001182begin
1183 inherited Create;
1184 FTcpClient := ATcpClient;
1185 FTimeout := aTimeout;
1186end;
1187
1188procedure TTcpSocketStreamImpl.Flush;
1189begin
1190
1191end;
1192
1193function TTcpSocketStreamImpl.IsOpen: Boolean;
1194begin
1195 Result := FTcpClient.Active;
1196end;
1197
1198procedure TTcpSocketStreamImpl.Open;
1199begin
1200 FTcpClient.Open;
1201end;
1202
1203
1204function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1205 TimeOut: Integer; var wsaError : Integer): Integer;
1206var
1207 ReadFds: TFDset;
1208 ReadFdsptr: PFDset;
1209 WriteFds: TFDset;
1210 WriteFdsptr: PFDset;
1211 ExceptFds: TFDset;
1212 ExceptFdsptr: PFDset;
1213 tv: timeval;
1214 Timeptr: PTimeval;
1215 socket : TSocket;
1216begin
1217 if not FTcpClient.Active then begin
1218 wsaError := WSAEINVAL;
1219 Exit( SOCKET_ERROR);
1220 end;
1221
1222 socket := FTcpClient.Handle;
1223
1224 if Assigned(ReadReady) then
1225 begin
1226 ReadFdsptr := @ReadFds;
1227 FD_ZERO(ReadFds);
1228 FD_SET(socket, ReadFds);
1229 end
1230 else
1231 ReadFdsptr := nil;
1232
1233 if Assigned(WriteReady) then
1234 begin
1235 WriteFdsptr := @WriteFds;
1236 FD_ZERO(WriteFds);
1237 FD_SET(socket, WriteFds);
1238 end
1239 else
1240 WriteFdsptr := nil;
1241
1242 if Assigned(ExceptFlag) then
1243 begin
1244 ExceptFdsptr := @ExceptFds;
1245 FD_ZERO(ExceptFds);
1246 FD_SET(socket, ExceptFds);
1247 end
1248 else
1249 ExceptFdsptr := nil;
1250
1251 if TimeOut >= 0 then
1252 begin
1253 tv.tv_sec := TimeOut div 1000;
1254 tv.tv_usec := 1000 * (TimeOut mod 1000);
1255 Timeptr := @tv;
1256 end
1257 else
1258 Timeptr := nil; // wait forever
1259
1260 wsaError := 0;
1261 try
1262{$IFDEF MSWINDOWS}
Nick4f5229e2016-04-14 16:43:22 +03001263 {$IF CompilerVersion < 23.0}
Jens Geyerd5436f52014-10-03 19:50:38 +02001264 result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
Nick4f5229e2016-04-14 16:43:22 +03001265 {$ELSE}
1266 result := Winapi.WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1267 {$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +02001268{$ENDIF}
1269{$IFDEF LINUX}
1270 result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1271{$ENDIF}
1272 if result = SOCKET_ERROR
1273 then wsaError := WSAGetLastError;
1274
1275 except
1276 result := SOCKET_ERROR;
1277 end;
1278
1279 if Assigned(ReadReady) then
1280 ReadReady^ := FD_ISSET(socket, ReadFds);
1281 if Assigned(WriteReady) then
1282 WriteReady^ := FD_ISSET(socket, WriteFds);
1283 if Assigned(ExceptFlag) then
1284 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1285end;
1286
1287function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1288 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001289 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001290var bCanRead, bError : Boolean;
1291 retval : Integer;
1292begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001293 bytesReady := 0;
1294
Jens Geyerd5436f52014-10-03 19:50:38 +02001295 // The select function returns the total number of socket handles that are ready
1296 // and contained in the fd_set structures, zero if the time limit expired,
1297 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1298 // WSAGetLastError can be used to retrieve a specific error code.
1299 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1300 if retval = SOCKET_ERROR
1301 then Exit( TWaitForData.wfd_Error);
1302 if (retval = 0) or not bCanRead
1303 then Exit( TWaitForData.wfd_Timeout);
1304
1305 // recv() returns the number of bytes received, or -1 if an error occurred.
1306 // The return value will be 0 when the peer has performed an orderly shutdown.
Nick4f5229e2016-04-14 16:43:22 +03001307{$IF CompilerVersion < 23.0}
Jens Geyerd5436f52014-10-03 19:50:38 +02001308 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);
Nick4f5229e2016-04-14 16:43:22 +03001309{$ELSE}
1310 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, Winapi.WinSock.MSG_PEEK);
1311{$IFEND}
Jens Geyerd5436f52014-10-03 19:50:38 +02001312 if retval <= 0
1313 then Exit( TWaitForData.wfd_Error);
1314
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001315 // at least we have some data
1316 bytesReady := Min( retval, DesiredBytes);
1317 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001318end;
1319
1320function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
1321var wfd : TWaitForData;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001322 wsaError, nBytes : Integer;
1323 pDest : PByte;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001324 msecs : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001325begin
1326 inherited;
1327
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001328 if FTimeout > 0
1329 then msecs := FTimeout
1330 else msecs := DEFAULT_THRIFT_TIMEOUT;
1331
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001332 result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001333 pDest := Pointer(@buffer[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001334 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001335
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001336 while TRUE do begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001337 wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001338 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001339 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001340 TWaitForData.wfd_HaveData : Break;
1341 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001342 if (FTimeout = 0)
1343 then Exit
1344 else begin
1345 raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
1346 SysErrorMessage(Cardinal(wsaError)));
1347
1348 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001349 end;
1350 else
1351 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001352 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001353 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001354
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001355 // reduce the timeout once we got data
1356 if FTimeout > 0
1357 then msecs := FTimeout div 10
1358 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1359 msecs := Max( msecs, 200);
1360
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001361 ASSERT( nBytes <= count);
1362 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1363 Inc( pDest, nBytes);
1364 Dec( count, nBytes);
1365 Inc( result, nBytes);
1366 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001367end;
1368
1369function TTcpSocketStreamImpl.ToArray: TBytes;
1370var
1371 len : Integer;
1372begin
1373 len := 0;
1374 if IsOpen then
1375 begin
1376 len := FTcpClient.BytesReceived;
1377 end;
1378
1379 SetLength( Result, len );
1380
1381 if len > 0 then
1382 begin
1383 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1384 end;
1385end;
1386
1387procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1388var bCanWrite, bError : Boolean;
1389 retval, wsaError : Integer;
1390begin
1391 inherited;
1392
1393 if not FTcpClient.Active
1394 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
1395
1396 // The select function returns the total number of socket handles that are ready
1397 // and contained in the fd_set structures, zero if the time limit expired,
1398 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1399 // WSAGetLastError can be used to retrieve a specific error code.
1400 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1401 if retval = SOCKET_ERROR
1402 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1403 SysErrorMessage(Cardinal(wsaError)));
1404 if (retval = 0)
1405 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
1406 if bError or not bCanWrite
1407 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
1408
1409 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1410end;
Jens Geyer23d67462015-12-19 11:44:57 +01001411{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001412
1413{$IF CompilerVersion < 21.0}
1414initialization
1415begin
1416 TFramedTransportImpl_Initialize;
1417end;
1418{$IFEND}
1419
1420
1421end.