blob: c0d6712d7077b2306dc32c6de7d747d1ad5a9197 [file] [log] [blame]
Jens Geyerd5436f52014-10-03 19:50:38 +02001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 {$SCOPEDENUMS ON}
21
Jens Geyer23d67462015-12-19 11:44:57 +010022{$IF CompilerVersion < 28.0}
23 {$DEFINE OLD_SOCKETS} // TODO: add socket support for CompilerVersion >= 28.0
24{$IFEND}
25
26
Jens Geyerd5436f52014-10-03 19:50:38 +020027unit Thrift.Transport;
28
29interface
30
31uses
32 Classes,
33 SysUtils,
34 Math,
Jens Geyer23d67462015-12-19 11:44:57 +010035 WinSock,
36 {$IFDEF OLD_SOCKETS}
37 Sockets,
38 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +020039 Generics.Collections,
40 Thrift.Collections,
41 Thrift.Utils,
42 Thrift.Stream,
43 ActiveX,
44 msxml;
45
46type
47 ITransport = interface
48 ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
49 function GetIsOpen: Boolean;
50 property IsOpen: Boolean read GetIsOpen;
51 function Peek: Boolean;
52 procedure Open;
53 procedure Close;
54 function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
55 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
56 procedure Write( const buf: TBytes); overload;
57 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
58 procedure Flush;
59 end;
60
61 TTransportImpl = class( TInterfacedObject, ITransport)
62 protected
63 function GetIsOpen: Boolean; virtual; abstract;
64 property IsOpen: Boolean read GetIsOpen;
65 function Peek: Boolean; virtual;
66 procedure Open(); virtual; abstract;
67 procedure Close(); virtual; abstract;
68 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
69 function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
70 procedure Write( const buf: TBytes); overload; virtual;
71 procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
72 procedure Flush; virtual;
73 end;
74
75 TTransportException = class( Exception )
76 public
77 type
78 TExceptionType = (
79 Unknown,
80 NotOpen,
81 AlreadyOpen,
82 TimedOut,
83 EndOfFile
84 );
85 private
86 FType : TExceptionType;
87 public
88 constructor Create( AType: TExceptionType); overload;
89 constructor Create( const msg: string); overload;
90 constructor Create( AType: TExceptionType; const msg: string); overload;
91 property Type_: TExceptionType read FType;
92 end;
93
94 IHTTPClient = interface( ITransport )
95 ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
96 procedure SetConnectionTimeout(const Value: Integer);
97 function GetConnectionTimeout: Integer;
98 procedure SetReadTimeout(const Value: Integer);
99 function GetReadTimeout: Integer;
100 function GetCustomHeaders: IThriftDictionary<string,string>;
101 procedure SendRequest;
102 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
103 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
104 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
105 end;
106
107 THTTPClientImpl = class( TTransportImpl, IHTTPClient)
108 private
109 FUri : string;
110 FInputStream : IThriftStream;
111 FOutputStream : IThriftStream;
112 FConnectionTimeout : Integer;
113 FReadTimeout : Integer;
114 FCustomHeaders : IThriftDictionary<string,string>;
115
116 function CreateRequest: IXMLHTTPRequest;
117 protected
118 function GetIsOpen: Boolean; override;
119 procedure Open(); override;
120 procedure Close(); override;
121 function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
122 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
123 procedure Flush; override;
124
125 procedure SetConnectionTimeout(const Value: Integer);
126 function GetConnectionTimeout: Integer;
127 procedure SetReadTimeout(const Value: Integer);
128 function GetReadTimeout: Integer;
129 function GetCustomHeaders: IThriftDictionary<string,string>;
130 procedure SendRequest;
131 property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
132 property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
133 property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
134 public
135 constructor Create( const AUri: string);
136 destructor Destroy; override;
137 end;
138
139 IServerTransport = interface
140 ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
141 procedure Listen;
142 procedure Close;
143 function Accept( const fnAccepting: TProc): ITransport;
144 end;
145
146 TServerTransportImpl = class( TInterfacedObject, IServerTransport)
147 protected
148 procedure Listen; virtual; abstract;
149 procedure Close; virtual; abstract;
150 function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
151 end;
152
153 ITransportFactory = interface
154 ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
155 function GetTransport( const ATrans: ITransport): ITransport;
156 end;
157
158 TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
159 function GetTransport( const ATrans: ITransport): ITransport; virtual;
160 end;
161
Jens Geyer23d67462015-12-19 11:44:57 +0100162 {$IFDEF OLD_SOCKETS}
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100163 TThriftCustomIpClient = TCustomIpClient;
164 TThriftTcpServer = TTcpServer;
165 TThriftTcpClient = TTcpClient;
166 {$ELSE}
167 // TODO
168 {$ENDIF}
169
170 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200171 TTcpSocketStreamImpl = class( TThriftStreamImpl )
172 private type
173 TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
174 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100175 FTcpClient : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200176 FTimeout : Integer;
177 function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
178 TimeOut: Integer; var wsaError : Integer): Integer;
179 function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +0200180 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +0200181 protected
182 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
183 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
184 procedure Open; override;
185 procedure Close; override;
186 procedure Flush; override;
187
188 function IsOpen: Boolean; override;
189 function ToArray: TBytes; override;
190 public
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100191 constructor Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200192 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100193 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200194
195 IStreamTransport = interface( ITransport )
196 ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
197 function GetInputStream: IThriftStream;
198 function GetOutputStream: IThriftStream;
199 property InputStream : IThriftStream read GetInputStream;
200 property OutputStream : IThriftStream read GetOutputStream;
201 end;
202
203 TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
204 protected
205 FInputStream : IThriftStream;
206 FOutputStream : IThriftStream;
207 protected
208 function GetIsOpen: Boolean; override;
209
210 function GetInputStream: IThriftStream;
211 function GetOutputStream: IThriftStream;
212 public
213 property InputStream : IThriftStream read GetInputStream;
214 property OutputStream : IThriftStream read GetOutputStream;
215
216 procedure Open; override;
217 procedure Close; override;
218 procedure Flush; override;
219 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
220 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
221 constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
222 destructor Destroy; override;
223 end;
224
225 TBufferedStreamImpl = class( TThriftStreamImpl)
226 private
227 FStream : IThriftStream;
228 FBufSize : Integer;
229 FReadBuffer : TMemoryStream;
230 FWriteBuffer : TMemoryStream;
231 protected
232 procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
233 function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
234 procedure Open; override;
235 procedure Close; override;
236 procedure Flush; override;
237 function IsOpen: Boolean; override;
238 function ToArray: TBytes; override;
239 public
240 constructor Create( const AStream: IThriftStream; ABufSize: Integer);
241 destructor Destroy; override;
242 end;
243
Jens Geyer23d67462015-12-19 11:44:57 +0100244 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200245 TServerSocketImpl = class( TServerTransportImpl)
246 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100247 FServer : TThriftTcpServer;
Jens Geyerd5436f52014-10-03 19:50:38 +0200248 FPort : Integer;
249 FClientTimeout : Integer;
250 FUseBufferedSocket : Boolean;
251 FOwnsServer : Boolean;
252 protected
253 function Accept( const fnAccepting: TProc) : ITransport; override;
254 public
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100255 constructor Create( const AServer: TThriftTcpServer; AClientTimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200256 constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
257 destructor Destroy; override;
258 procedure Listen; override;
259 procedure Close; override;
260 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100261 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200262
263 TBufferedTransportImpl = class( TTransportImpl )
264 private
265 FInputBuffer : IThriftStream;
266 FOutputBuffer : IThriftStream;
267 FTransport : IStreamTransport;
268 FBufSize : Integer;
269
270 procedure InitBuffers;
271 function GetUnderlyingTransport: ITransport;
272 protected
273 function GetIsOpen: Boolean; override;
274 procedure Flush; override;
275 public
276 procedure Open(); override;
277 procedure Close(); override;
278 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
279 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
280 constructor Create( const ATransport : IStreamTransport ); overload;
281 constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
282 property UnderlyingTransport: ITransport read GetUnderlyingTransport;
283 property IsOpen: Boolean read GetIsOpen;
284 end;
285
Jens Geyer23d67462015-12-19 11:44:57 +0100286 {$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +0200287 TSocketImpl = class(TStreamTransportImpl)
288 private
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100289 FClient : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200290 FOwnsClient : Boolean;
291 FHost : string;
292 FPort : Integer;
293 FTimeout : Integer;
294
295 procedure InitSocket;
296 protected
297 function GetIsOpen: Boolean; override;
298 public
299 procedure Open; override;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100300 constructor Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
Jens Geyerd5436f52014-10-03 19:50:38 +0200301 constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
302 destructor Destroy; override;
303 procedure Close; override;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100304 property TcpClient: TThriftCustomIpClient read FClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200305 property Host : string read FHost;
306 property Port: Integer read FPort;
307 end;
Jens Geyer23d67462015-12-19 11:44:57 +0100308 {$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200309
310 TFramedTransportImpl = class( TTransportImpl)
311 private const
312 FHeaderSize : Integer = 4;
313 private class var
314 FHeader_Dummy : array of Byte;
315 protected
316 FTransport : ITransport;
317 FWriteBuffer : TMemoryStream;
318 FReadBuffer : TMemoryStream;
319
320 procedure InitWriteBuffer;
321 procedure ReadFrame;
322 public
323 type
324 TFactory = class( TTransportFactoryImpl )
325 public
326 function GetTransport( const ATrans: ITransport): ITransport; override;
327 end;
328
329{$IF CompilerVersion >= 21.0}
330 class constructor Create;
331{$IFEND}
332 constructor Create; overload;
333 constructor Create( const ATrans: ITransport); overload;
334 destructor Destroy; override;
335
336 procedure Open(); override;
337 function GetIsOpen: Boolean; override;
338
339 procedure Close(); override;
340 function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
341 procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
342 procedure Flush; override;
343 end;
344
345{$IF CompilerVersion < 21.0}
346procedure TFramedTransportImpl_Initialize;
347{$IFEND}
348
349const
350 DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
351
352
353implementation
354
355{ TTransportImpl }
356
357procedure TTransportImpl.Flush;
358begin
359
360end;
361
362function TTransportImpl.Peek: Boolean;
363begin
364 Result := IsOpen;
365end;
366
367function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
368var
369 got : Integer;
370 ret : Integer;
371begin
372 got := 0;
373 while ( got < len) do
374 begin
375 ret := Read( buf, off + got, len - got);
376 if ( ret <= 0 ) then
377 begin
378 raise TTransportException.Create( 'Cannot read, Remote side has closed' );
379 end;
380 got := got + ret;
381 end;
382 Result := got;
383end;
384
385procedure TTransportImpl.Write( const buf: TBytes);
386begin
387 Self.Write( buf, 0, Length(buf) );
388end;
389
390{ THTTPClientImpl }
391
392procedure THTTPClientImpl.Close;
393begin
394 FInputStream := nil;
395 FOutputStream := nil;
396end;
397
398constructor THTTPClientImpl.Create(const AUri: string);
399begin
400 inherited Create;
401 FUri := AUri;
402 FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
403 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
404end;
405
406function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
407var
408 pair : TPair<string,string>;
409begin
410{$IF CompilerVersion >= 21.0}
411 Result := CoXMLHTTP.Create;
412{$ELSE}
413 Result := CoXMLHTTPRequest.Create;
414{$IFEND}
415
416 Result.open('POST', FUri, False, '', '');
417 Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
418 Result.setRequestHeader( 'Accept', 'application/x-thrift');
419 Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
420
421 for pair in FCustomHeaders do
422 begin
423 Result.setRequestHeader( pair.Key, pair.Value );
424 end;
425end;
426
427destructor THTTPClientImpl.Destroy;
428begin
429 Close;
430 inherited;
431end;
432
433procedure THTTPClientImpl.Flush;
434begin
435 try
436 SendRequest;
437 finally
438 FOutputStream := nil;
439 FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
440 end;
441end;
442
443function THTTPClientImpl.GetConnectionTimeout: Integer;
444begin
445 Result := FConnectionTimeout;
446end;
447
448function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
449begin
450 Result := FCustomHeaders;
451end;
452
453function THTTPClientImpl.GetIsOpen: Boolean;
454begin
455 Result := True;
456end;
457
458function THTTPClientImpl.GetReadTimeout: Integer;
459begin
460 Result := FReadTimeout;
461end;
462
463procedure THTTPClientImpl.Open;
464begin
465
466end;
467
468function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
469begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100470 if FInputStream = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200471 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100472 'No request has been sent');
Jens Geyerd5436f52014-10-03 19:50:38 +0200473 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100474
Jens Geyerd5436f52014-10-03 19:50:38 +0200475 try
476 Result := FInputStream.Read( buf, off, len )
477 except
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100478 on E: Exception
479 do raise TTransportException.Create( TTransportException.TExceptionType.Unknown, E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200480 end;
481end;
482
483procedure THTTPClientImpl.SendRequest;
484var
485 xmlhttp : IXMLHTTPRequest;
486 ms : TMemoryStream;
487 a : TBytes;
488 len : Integer;
489begin
490 xmlhttp := CreateRequest;
491
492 ms := TMemoryStream.Create;
493 try
494 a := FOutputStream.ToArray;
495 len := Length(a);
496 if len > 0 then
497 begin
498 ms.WriteBuffer( Pointer(@a[0])^, len);
499 end;
500 ms.Position := 0;
501 xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
502 FInputStream := nil;
503 FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
504 finally
505 ms.Free;
506 end;
507end;
508
509procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
510begin
511 FConnectionTimeout := Value;
512end;
513
514procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
515begin
516 FReadTimeout := Value
517end;
518
519procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
520begin
521 FOutputStream.Write( buf, off, len);
522end;
523
524{ TTransportException }
525
526constructor TTransportException.Create(AType: TExceptionType);
527begin
528 //no inherited;
529 Create( AType, '' )
530end;
531
532constructor TTransportException.Create(AType: TExceptionType;
533 const msg: string);
534begin
535 inherited Create(msg);
536 FType := AType;
537end;
538
539constructor TTransportException.Create(const msg: string);
540begin
541 inherited Create(msg);
542end;
543
544{ TTransportFactoryImpl }
545
546function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
547begin
548 Result := ATrans;
549end;
550
551{ TServerSocket }
552
Jens Geyer23d67462015-12-19 11:44:57 +0100553{$IFDEF OLD_SOCKETS}
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100554constructor TServerSocketImpl.Create( const AServer: TThriftTcpServer; AClientTimeout: Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +0200555begin
556 inherited Create;
557 FServer := AServer;
558 FClientTimeout := AClientTimeout;
559end;
560
561constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
562begin
563 inherited Create;
564 FPort := APort;
565 FClientTimeout := AClientTimeout;
566 FUseBufferedSocket := AUseBufferedSockets;
567 FOwnsServer := True;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100568 FServer := TThriftTcpServer.Create( nil );
Jens Geyerd5436f52014-10-03 19:50:38 +0200569 FServer.BlockMode := bmBlocking;
570{$IF CompilerVersion >= 21.0}
571 FServer.LocalPort := AnsiString( IntToStr( FPort));
572{$ELSE}
573 FServer.LocalPort := IntToStr( FPort);
574{$IFEND}
575end;
576
577destructor TServerSocketImpl.Destroy;
578begin
579 if FOwnsServer then begin
580 FServer.Free;
581 FServer := nil;
582 end;
583 inherited;
584end;
585
586function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
587var
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100588 client : TThriftCustomIpClient;
Jens Geyerd5436f52014-10-03 19:50:38 +0200589 trans : IStreamTransport;
590begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100591 if FServer = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200592 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100593 'No underlying server socket.');
Jens Geyerd5436f52014-10-03 19:50:38 +0200594 end;
595
596 client := nil;
597 try
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100598 client := TThriftCustomIpClient.Create(nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200599
600 if Assigned(fnAccepting)
601 then fnAccepting();
602
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100603 if not FServer.Accept( client) then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200604 client.Free;
605 Result := nil;
606 Exit;
607 end;
608
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100609 if client = nil then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200610 Result := nil;
611 Exit;
612 end;
613
614 trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
615 client := nil; // trans owns it now
616
617 if FUseBufferedSocket
618 then result := TBufferedTransportImpl.Create( trans)
619 else result := trans;
620
621 except
622 on E: Exception do begin
623 client.Free;
624 raise TTransportException.Create( E.ToString );
625 end;
626 end;
627end;
628
629procedure TServerSocketImpl.Listen;
630begin
631 if FServer <> nil then
632 begin
633 try
634 FServer.Active := True;
635 except
636 on E: Exception do
637 begin
638 raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
639 end;
640 end;
641 end;
642end;
643
644procedure TServerSocketImpl.Close;
645begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100646 if FServer <> nil
647 then try
648 FServer.Active := False;
649 except
650 on E: Exception
651 do raise TTransportException.Create('Error on closing socket : ' + E.Message);
Jens Geyerd5436f52014-10-03 19:50:38 +0200652 end;
653end;
654
655{ TSocket }
656
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100657constructor TSocketImpl.Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
Jens Geyerd5436f52014-10-03 19:50:38 +0200658var stream : IThriftStream;
659begin
660 FClient := AClient;
661 FTimeout := ATimeout;
662 FOwnsClient := aOwnsClient;
663 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
664 inherited Create( stream, stream);
665end;
666
667constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
668begin
669 inherited Create(nil,nil);
670 FHost := AHost;
671 FPort := APort;
672 FTimeout := ATimeout;
673 InitSocket;
674end;
675
676destructor TSocketImpl.Destroy;
677begin
678 if FOwnsClient
679 then FreeAndNil( FClient);
680 inherited;
681end;
682
683procedure TSocketImpl.Close;
684begin
685 inherited Close;
686 if FOwnsClient
687 then FreeAndNil( FClient);
688end;
689
690function TSocketImpl.GetIsOpen: Boolean;
691begin
692 Result := (FClient <> nil) and FClient.Connected;
693end;
694
695procedure TSocketImpl.InitSocket;
696var
697 stream : IThriftStream;
698begin
699 if FOwnsClient
700 then FreeAndNil( FClient)
701 else FClient := nil;
702
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100703 FClient := TThriftTcpClient.Create( nil);
Jens Geyerd5436f52014-10-03 19:50:38 +0200704 FOwnsClient := True;
705
706 stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
707 FInputStream := stream;
708 FOutputStream := stream;
709end;
710
711procedure TSocketImpl.Open;
712begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100713 if IsOpen then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200714 raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100715 'Socket already connected');
Jens Geyerd5436f52014-10-03 19:50:38 +0200716 end;
717
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100718 if FHost = '' then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200719 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100720 'Cannot open null host');
Jens Geyerd5436f52014-10-03 19:50:38 +0200721 end;
722
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100723 if Port <= 0 then begin
Jens Geyerd5436f52014-10-03 19:50:38 +0200724 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100725 'Cannot open without port');
Jens Geyerd5436f52014-10-03 19:50:38 +0200726 end;
727
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100728 if FClient = nil
729 then InitSocket;
Jens Geyerd5436f52014-10-03 19:50:38 +0200730
731 FClient.RemoteHost := TSocketHost( Host);
732 FClient.RemotePort := TSocketPort( IntToStr( Port));
733 FClient.Connect;
734
735 FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
736 FOutputStream := FInputStream;
737end;
Jens Geyer23d67462015-12-19 11:44:57 +0100738{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +0200739
740{ TBufferedStream }
741
742procedure TBufferedStreamImpl.Close;
743begin
744 Flush;
745 FStream := nil;
746
747 FReadBuffer.Free;
748 FReadBuffer := nil;
749
750 FWriteBuffer.Free;
751 FWriteBuffer := nil;
752end;
753
754constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
755begin
756 inherited Create;
757 FStream := AStream;
758 FBufSize := ABufSize;
759 FReadBuffer := TMemoryStream.Create;
760 FWriteBuffer := TMemoryStream.Create;
761end;
762
763destructor TBufferedStreamImpl.Destroy;
764begin
765 Close;
766 inherited;
767end;
768
769procedure TBufferedStreamImpl.Flush;
770var
771 buf : TBytes;
772 len : Integer;
773begin
774 if IsOpen then
775 begin
776 len := FWriteBuffer.Size;
777 if len > 0 then
778 begin
779 SetLength( buf, len );
780 FWriteBuffer.Position := 0;
781 FWriteBuffer.Read( Pointer(@buf[0])^, len );
782 FStream.Write( buf, 0, len );
783 end;
784 FWriteBuffer.Clear;
785 end;
786end;
787
788function TBufferedStreamImpl.IsOpen: Boolean;
789begin
790 Result := (FWriteBuffer <> nil)
791 and (FReadBuffer <> nil)
792 and (FStream <> nil);
793end;
794
795procedure TBufferedStreamImpl.Open;
796begin
797
798end;
799
800function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
801var
802 nRead : Integer;
803 tempbuf : TBytes;
804begin
805 inherited;
806 Result := 0;
807 if IsOpen then
808 begin
809 while count > 0 do begin
810
811 if FReadBuffer.Position >= FReadBuffer.Size then
812 begin
813 FReadBuffer.Clear;
814 SetLength( tempbuf, FBufSize);
815 nRead := FStream.Read( tempbuf, 0, FBufSize );
816 if nRead = 0 then Break; // avoid infinite loop
817
818 FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
819 FReadBuffer.Position := 0;
820 end;
821
822 if FReadBuffer.Position < FReadBuffer.Size then
823 begin
824 nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
825 Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
826 Dec( count, nRead);
827 Inc( offset, nRead);
828 end;
829 end;
830 end;
831end;
832
833function TBufferedStreamImpl.ToArray: TBytes;
834var
835 len : Integer;
836begin
837 len := 0;
838
839 if IsOpen then
840 begin
841 len := FReadBuffer.Size;
842 end;
843
844 SetLength( Result, len);
845
846 if len > 0 then
847 begin
848 FReadBuffer.Position := 0;
849 FReadBuffer.Read( Pointer(@Result[0])^, len );
850 end;
851end;
852
853procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
854begin
855 inherited;
856 if count > 0 then
857 begin
858 if IsOpen then
859 begin
860 FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
861 if FWriteBuffer.Size > FBufSize then
862 begin
863 Flush;
864 end;
865 end;
866 end;
867end;
868
869{ TStreamTransportImpl }
870
871procedure TStreamTransportImpl.Close;
872begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100873 FInputStream := nil;
874 FOutputStream := nil;
Jens Geyerd5436f52014-10-03 19:50:38 +0200875end;
876
877constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
878begin
879 inherited Create;
880 FInputStream := AInputStream;
881 FOutputStream := AOutputStream;
882end;
883
884destructor TStreamTransportImpl.Destroy;
885begin
886 FInputStream := nil;
887 FOutputStream := nil;
888 inherited;
889end;
890
891procedure TStreamTransportImpl.Flush;
892begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100893 if FOutputStream = nil then begin
894 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
895 'Cannot flush null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200896 end;
897
898 FOutputStream.Flush;
899end;
900
901function TStreamTransportImpl.GetInputStream: IThriftStream;
902begin
903 Result := FInputStream;
904end;
905
906function TStreamTransportImpl.GetIsOpen: Boolean;
907begin
908 Result := True;
909end;
910
911function TStreamTransportImpl.GetOutputStream: IThriftStream;
912begin
913 Result := FInputStream;
914end;
915
916procedure TStreamTransportImpl.Open;
917begin
918
919end;
920
921function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
922begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100923 if FInputStream = nil then begin
924 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
925 'Cannot read from null inputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200926 end;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100927
Jens Geyerd5436f52014-10-03 19:50:38 +0200928 Result := FInputStream.Read( buf, off, len );
929end;
930
931procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
932begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +0100933 if FOutputStream = nil then begin
934 raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
935 'Cannot write to null outputstream' );
Jens Geyerd5436f52014-10-03 19:50:38 +0200936 end;
937
938 FOutputStream.Write( buf, off, len );
939end;
940
941{ TBufferedTransportImpl }
942
943constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
944begin
945 //no inherited;
946 Create( ATransport, 1024 );
947end;
948
949procedure TBufferedTransportImpl.Close;
950begin
951 FTransport.Close;
952end;
953
954constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
955 ABufSize: Integer);
956begin
957 inherited Create;
958 FTransport := ATransport;
959 FBufSize := ABufSize;
960 InitBuffers;
961end;
962
963procedure TBufferedTransportImpl.Flush;
964begin
965 if FOutputBuffer <> nil then
966 begin
967 FOutputBuffer.Flush;
968 end;
969end;
970
971function TBufferedTransportImpl.GetIsOpen: Boolean;
972begin
973 Result := FTransport.IsOpen;
974end;
975
976function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
977begin
978 Result := FTransport;
979end;
980
981procedure TBufferedTransportImpl.InitBuffers;
982begin
983 if FTransport.InputStream <> nil then
984 begin
985 FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
986 end;
987 if FTransport.OutputStream <> nil then
988 begin
989 FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
990 end;
991end;
992
993procedure TBufferedTransportImpl.Open;
994begin
995 FTransport.Open
996end;
997
998function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
999begin
1000 Result := 0;
1001 if FInputBuffer <> nil then
1002 begin
1003 Result := FInputBuffer.Read( buf, off, len );
1004 end;
1005end;
1006
1007procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1008begin
1009 if FOutputBuffer <> nil then
1010 begin
1011 FOutputBuffer.Write( buf, off, len );
1012 end;
1013end;
1014
1015{ TFramedTransportImpl }
1016
1017{$IF CompilerVersion < 21.0}
1018procedure TFramedTransportImpl_Initialize;
1019begin
1020 SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
1021 FillChar( TFramedTransportImpl.FHeader_Dummy[0],
1022 Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
1023end;
1024{$ELSE}
1025class constructor TFramedTransportImpl.Create;
1026begin
1027 SetLength( FHeader_Dummy, FHeaderSize);
1028 FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
1029end;
1030{$IFEND}
1031
1032constructor TFramedTransportImpl.Create;
1033begin
1034 inherited Create;
1035 InitWriteBuffer;
1036end;
1037
1038procedure TFramedTransportImpl.Close;
1039begin
1040 FTransport.Close;
1041end;
1042
1043constructor TFramedTransportImpl.Create( const ATrans: ITransport);
1044begin
1045 inherited Create;
1046 InitWriteBuffer;
1047 FTransport := ATrans;
1048end;
1049
1050destructor TFramedTransportImpl.Destroy;
1051begin
1052 FWriteBuffer.Free;
1053 FReadBuffer.Free;
1054 inherited;
1055end;
1056
1057procedure TFramedTransportImpl.Flush;
1058var
1059 buf : TBytes;
1060 len : Integer;
1061 data_len : Integer;
1062
1063begin
1064 len := FWriteBuffer.Size;
1065 SetLength( buf, len);
1066 if len > 0 then
1067 begin
1068 System.Move( FWriteBuffer.Memory^, buf[0], len );
1069 end;
1070
1071 data_len := len - FHeaderSize;
Jens Geyer30ed90e2016-03-10 20:12:49 +01001072 if (data_len < 0) then begin
1073 raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1074 'TFramedTransport.Flush: data_len < 0' );
Jens Geyerd5436f52014-10-03 19:50:38 +02001075 end;
1076
1077 InitWriteBuffer;
1078
1079 buf[0] := Byte($FF and (data_len shr 24));
1080 buf[1] := Byte($FF and (data_len shr 16));
1081 buf[2] := Byte($FF and (data_len shr 8));
1082 buf[3] := Byte($FF and data_len);
1083
1084 FTransport.Write( buf, 0, len );
1085 FTransport.Flush;
1086end;
1087
1088function TFramedTransportImpl.GetIsOpen: Boolean;
1089begin
1090 Result := FTransport.IsOpen;
1091end;
1092
1093type
1094 TAccessMemoryStream = class(TMemoryStream)
1095 end;
1096
1097procedure TFramedTransportImpl.InitWriteBuffer;
1098begin
1099 FWriteBuffer.Free;
1100 FWriteBuffer := TMemoryStream.Create;
1101 TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
1102 FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
1103end;
1104
1105procedure TFramedTransportImpl.Open;
1106begin
1107 FTransport.Open;
1108end;
1109
1110function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
1111var
1112 got : Integer;
1113begin
1114 if FReadBuffer <> nil then
1115 begin
1116 if len > 0
1117 then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
1118 else got := 0;
1119 if got > 0 then
1120 begin
1121 Result := got;
1122 Exit;
1123 end;
1124 end;
1125
1126 ReadFrame;
1127 if len > 0
1128 then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
1129 else Result := 0;
1130end;
1131
1132procedure TFramedTransportImpl.ReadFrame;
1133var
1134 i32rd : TBytes;
1135 size : Integer;
1136 buff : TBytes;
1137begin
1138 SetLength( i32rd, FHeaderSize );
1139 FTransport.ReadAll( i32rd, 0, FHeaderSize);
1140 size :=
1141 ((i32rd[0] and $FF) shl 24) or
1142 ((i32rd[1] and $FF) shl 16) or
1143 ((i32rd[2] and $FF) shl 8) or
1144 (i32rd[3] and $FF);
1145 SetLength( buff, size );
1146 FTransport.ReadAll( buff, 0, size );
1147 FReadBuffer.Free;
1148 FReadBuffer := TMemoryStream.Create;
1149 FReadBuffer.Write( Pointer(@buff[0])^, size );
1150 FReadBuffer.Position := 0;
1151end;
1152
1153procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
1154begin
1155 if len > 0
1156 then FWriteBuffer.Write( Pointer(@buf[off])^, len );
1157end;
1158
1159{ TFramedTransport.TFactory }
1160
1161function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
1162begin
1163 Result := TFramedTransportImpl.Create( ATrans );
1164end;
1165
1166{ TTcpSocketStreamImpl }
1167
Jens Geyer23d67462015-12-19 11:44:57 +01001168{$IFDEF OLD_SOCKETS}
Jens Geyerd5436f52014-10-03 19:50:38 +02001169procedure TTcpSocketStreamImpl.Close;
1170begin
1171 FTcpClient.Close;
1172end;
1173
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001174constructor TTcpSocketStreamImpl.Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer);
Jens Geyerd5436f52014-10-03 19:50:38 +02001175begin
1176 inherited Create;
1177 FTcpClient := ATcpClient;
1178 FTimeout := aTimeout;
1179end;
1180
1181procedure TTcpSocketStreamImpl.Flush;
1182begin
1183
1184end;
1185
1186function TTcpSocketStreamImpl.IsOpen: Boolean;
1187begin
1188 Result := FTcpClient.Active;
1189end;
1190
1191procedure TTcpSocketStreamImpl.Open;
1192begin
1193 FTcpClient.Open;
1194end;
1195
1196
1197function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
1198 TimeOut: Integer; var wsaError : Integer): Integer;
1199var
1200 ReadFds: TFDset;
1201 ReadFdsptr: PFDset;
1202 WriteFds: TFDset;
1203 WriteFdsptr: PFDset;
1204 ExceptFds: TFDset;
1205 ExceptFdsptr: PFDset;
1206 tv: timeval;
1207 Timeptr: PTimeval;
1208 socket : TSocket;
1209begin
1210 if not FTcpClient.Active then begin
1211 wsaError := WSAEINVAL;
1212 Exit( SOCKET_ERROR);
1213 end;
1214
1215 socket := FTcpClient.Handle;
1216
1217 if Assigned(ReadReady) then
1218 begin
1219 ReadFdsptr := @ReadFds;
1220 FD_ZERO(ReadFds);
1221 FD_SET(socket, ReadFds);
1222 end
1223 else
1224 ReadFdsptr := nil;
1225
1226 if Assigned(WriteReady) then
1227 begin
1228 WriteFdsptr := @WriteFds;
1229 FD_ZERO(WriteFds);
1230 FD_SET(socket, WriteFds);
1231 end
1232 else
1233 WriteFdsptr := nil;
1234
1235 if Assigned(ExceptFlag) then
1236 begin
1237 ExceptFdsptr := @ExceptFds;
1238 FD_ZERO(ExceptFds);
1239 FD_SET(socket, ExceptFds);
1240 end
1241 else
1242 ExceptFdsptr := nil;
1243
1244 if TimeOut >= 0 then
1245 begin
1246 tv.tv_sec := TimeOut div 1000;
1247 tv.tv_usec := 1000 * (TimeOut mod 1000);
1248 Timeptr := @tv;
1249 end
1250 else
1251 Timeptr := nil; // wait forever
1252
1253 wsaError := 0;
1254 try
1255{$IFDEF MSWINDOWS}
1256 result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1257{$ENDIF}
1258{$IFDEF LINUX}
1259 result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
1260{$ENDIF}
1261 if result = SOCKET_ERROR
1262 then wsaError := WSAGetLastError;
1263
1264 except
1265 result := SOCKET_ERROR;
1266 end;
1267
1268 if Assigned(ReadReady) then
1269 ReadReady^ := FD_ISSET(socket, ReadFds);
1270 if Assigned(WriteReady) then
1271 WriteReady^ := FD_ISSET(socket, WriteFds);
1272 if Assigned(ExceptFlag) then
1273 ExceptFlag^ := FD_ISSET(socket, ExceptFds);
1274end;
1275
1276function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
1277 DesiredBytes : Integer;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001278 var wsaError, bytesReady : Integer): TWaitForData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001279var bCanRead, bError : Boolean;
1280 retval : Integer;
1281begin
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001282 bytesReady := 0;
1283
Jens Geyerd5436f52014-10-03 19:50:38 +02001284 // The select function returns the total number of socket handles that are ready
1285 // and contained in the fd_set structures, zero if the time limit expired,
1286 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1287 // WSAGetLastError can be used to retrieve a specific error code.
1288 retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
1289 if retval = SOCKET_ERROR
1290 then Exit( TWaitForData.wfd_Error);
1291 if (retval = 0) or not bCanRead
1292 then Exit( TWaitForData.wfd_Timeout);
1293
1294 // recv() returns the number of bytes received, or -1 if an error occurred.
1295 // The return value will be 0 when the peer has performed an orderly shutdown.
1296 retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);
1297 if retval <= 0
1298 then Exit( TWaitForData.wfd_Error);
1299
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001300 // at least we have some data
1301 bytesReady := Min( retval, DesiredBytes);
1302 result := TWaitForData.wfd_HaveData;
Jens Geyerd5436f52014-10-03 19:50:38 +02001303end;
1304
1305function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
1306var wfd : TWaitForData;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001307 wsaError, nBytes : Integer;
1308 pDest : PByte;
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001309 msecs : Integer;
Jens Geyerd5436f52014-10-03 19:50:38 +02001310begin
1311 inherited;
1312
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001313 if FTimeout > 0
1314 then msecs := FTimeout
1315 else msecs := DEFAULT_THRIFT_TIMEOUT;
1316
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001317 result := 0;
Jens Geyerd5436f52014-10-03 19:50:38 +02001318 pDest := Pointer(@buffer[offset]);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001319 while count > 0 do begin
Jens Geyerd5436f52014-10-03 19:50:38 +02001320
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001321 while TRUE do begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001322 wfd := WaitForData( msecs, pDest, count, wsaError, nBytes);
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001323 case wfd of
Jens Geyer65b17462016-03-09 00:07:46 +01001324 TWaitForData.wfd_Error : Exit;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001325 TWaitForData.wfd_HaveData : Break;
1326 TWaitForData.wfd_Timeout : begin
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001327 if (FTimeout = 0)
1328 then Exit
1329 else begin
1330 raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
1331 SysErrorMessage(Cardinal(wsaError)));
1332
1333 end;
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001334 end;
1335 else
1336 ASSERT( FALSE);
Jens Geyerd5436f52014-10-03 19:50:38 +02001337 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001338 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001339
Jens Geyer6f6aa8a2016-03-10 19:47:12 +01001340 // reduce the timeout once we got data
1341 if FTimeout > 0
1342 then msecs := FTimeout div 10
1343 else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
1344 msecs := Max( msecs, 200);
1345
Jens Geyerbcb17bc2015-07-17 23:11:14 +02001346 ASSERT( nBytes <= count);
1347 nBytes := FTcpClient.ReceiveBuf( pDest^, nBytes);
1348 Inc( pDest, nBytes);
1349 Dec( count, nBytes);
1350 Inc( result, nBytes);
1351 end;
Jens Geyerd5436f52014-10-03 19:50:38 +02001352end;
1353
1354function TTcpSocketStreamImpl.ToArray: TBytes;
1355var
1356 len : Integer;
1357begin
1358 len := 0;
1359 if IsOpen then
1360 begin
1361 len := FTcpClient.BytesReceived;
1362 end;
1363
1364 SetLength( Result, len );
1365
1366 if len > 0 then
1367 begin
1368 FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
1369 end;
1370end;
1371
1372procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
1373var bCanWrite, bError : Boolean;
1374 retval, wsaError : Integer;
1375begin
1376 inherited;
1377
1378 if not FTcpClient.Active
1379 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
1380
1381 // The select function returns the total number of socket handles that are ready
1382 // and contained in the fd_set structures, zero if the time limit expired,
1383 // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
1384 // WSAGetLastError can be used to retrieve a specific error code.
1385 retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
1386 if retval = SOCKET_ERROR
1387 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
1388 SysErrorMessage(Cardinal(wsaError)));
1389 if (retval = 0)
1390 then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
1391 if bError or not bCanWrite
1392 then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
1393
1394 FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
1395end;
Jens Geyer23d67462015-12-19 11:44:57 +01001396{$ENDIF}
Jens Geyerd5436f52014-10-03 19:50:38 +02001397
1398{$IF CompilerVersion < 21.0}
1399initialization
1400begin
1401 TFramedTransportImpl_Initialize;
1402end;
1403{$IFEND}
1404
1405
1406end.