THRIFT-3487 Full support for newer Delphi versions
Client: Delphi
Patch: Kyle Johnson
Slight refactoring plus some IFDEFs and the missing ASF header added by Jens Geyer
diff --git a/lib/delphi/src/Thrift.Socket.pas b/lib/delphi/src/Thrift.Socket.pas
new file mode 100644
index 0000000..26df9b0
--- /dev/null
+++ b/lib/delphi/src/Thrift.Socket.pas
@@ -0,0 +1,1617 @@
+unit Thrift.Socket;
+{$I-} // prevent annoying errors with default log delegate and no console
+ Winapi.Windows, Winapi.Winsock2;
+ AI_PASSIVE = $00000001; // Socket address will be used in bind() call
+ AI_CANONNAME = $00000002; // Return canonical name in first ai_canonname
+ AI_NUMERICHOST = $00000004; // Nodename must be a numeric address string
+ AI_NUMERICSERV = $00000008; // Servicename must be a numeric port number
+ AI_ALL = $00000100; // Query both IP6 and IP4 with AI_V4MAPPED
+ AI_ADDRCONFIG = $00000400; // Resolution only if global address configured
+ AI_V4MAPPED = $00000800; // On v6 failure, query v4 and convert to V4MAPPED format
+ AI_SECURE = $00008000; // LUP_SECURE
+ AI_FQDN = $00020000; // Return the FQDN in ai_canonname
+ AI_FILESERVER = $00040000; // Resolving fileserver name resolution
+ PAddrInfoA = ^TAddrInfoA;
+ TAddrInfoA = record
+ ai_flags: Integer;
+ ai_family: Integer;
+ ai_socktype: Integer;
+ ai_protocol: Integer;
+ ai_addrlen: NativeUInt;
+ ai_canonname: PAnsiChar;
+ ai_addr: PSockAddr;
+ ai_next: PAddrInfoA;
+ end;
+ PAddrInfoW = ^TAddrInfoW;
+ TAddrInfoW = record
+ ai_flags: Integer;
+ ai_family: Integer;
+ ai_socktype: Integer;
+ ai_protocol: Integer;
+ ai_addrlen: NativeUInt;
+ ai_canonname: PChar;
+ ai_addr: PSockAddr;
+ ai_next: PAddrInfoW;
+ end;
+ TAddressFamily = USHORT;
+ TIn6Addr = record
+ case Integer of
+ 0: (_Byte: array[0..15] of UCHAR);
+ 1: (_Word: array[0..7] of USHORT);
+ end;
+ TScopeId = record
+ public
+ Value: ULONG;
+ private
+ function GetBitField(Loc: Integer): Integer; inline;
+ procedure SetBitField(Loc: Integer; const aValue: Integer); inline;
+ public
+ property Zone: Integer index $0028 read GetBitField write SetBitField;
+ property Level: Integer index $2804 read GetBitField write SetBitField;
+ end;
+ TSockAddrIn6 = record
+ sin6_family: TAddressFamily;
+ sin6_port: USHORT;
+ sin6_flowinfo: ULONG;
+ sin6_addr: TIn6Addr;
+ case Integer of
+ 0: (sin6_scope_id: ULONG);
+ 1: (sin6_scope_struct: TScopeId);
+ end;
+ PSockAddrIn6 = ^TSockAddrIn6;
+ NI_NOFQDN = $01; // Only return nodename portion for local hosts
+ NI_NUMERICHOST = $02; // Return numeric form of the host's address
+ NI_NAMEREQD = $04; // Error if the host's name not in DNS
+ NI_NUMERICSERV = $08; // Return numeric form of the service (port #)
+ NI_DGRAM = $10; // Service is a datagram service
+ NI_MAXHOST = 1025; // Max size of a fully-qualified domain name
+ NI_MAXSERV = 32; // Max size of a service name
+function getaddrinfo(pNodeName, pServiceName: PAnsiChar; const pHints: TAddrInfoA; var ppResult: PAddrInfoA): Integer; stdcall;
+function GetAddrInfoW(pNodeName, pServiceName: PWideChar; const pHints: TAddrInfoW; var ppResult: PAddrInfoW): Integer; stdcall;
+procedure freeaddrinfo(pAddrInfo: PAddrInfoA); stdcall;
+procedure FreeAddrInfoW(pAddrInfo: PAddrInfoW); stdcall;
+function getnameinfo(const pSockaddr: TSockAddr; SockaddrLength: Integer; pNodeBuffer: PAnsiChar; NodeBufferSize: DWORD; pServiceBuffer: PAnsiChar;
+ ServiceBufferSize: DWORD; Flags: Integer): Integer; stdcall;
+function GetNameInfoW(const pSockaddr: TSockAddr; SockaddrLength: Integer; pNodeBuffer: PWideChar; NodeBufferSize: DWORD; pServiceBuffer: PWideChar;
+ ServiceBufferSize: DWORD; Flags: Integer): Integer; stdcall;
+ TSmartPointerDestroyer<T> = reference to procedure(Value: T);
+ ISmartPointer<T> = reference to function: T;
+ TSmartPointer<T> = class(TInterfacedObject, ISmartPointer<T>)
+ private
+ FValue: T;
+ FDestroyer: TSmartPointerDestroyer<T>;
+ public
+ constructor Create(AValue: T; ADestroyer: TSmartPointerDestroyer<T>);
+ destructor Destroy; override;
+ function Invoke: T;
+ end;
+ TBaseSocket = class abstract
+ public type
+ TLogDelegate = reference to procedure( const str: string);
+ strict private
+ FPort: Integer;
+ FSocket: Winapi.Winsock2.TSocket;
+ FSendTimeout,
+ FRecvTimeout: Longword;
+ FKeepAlive: Boolean;
+ FLogDelegate: TLogDelegate;
+ class constructor Create;
+ class destructor Destroy;
+ class procedure DefaultLogDelegate(const Str: string);
+ protected type
+ IGetAddrInfoWrapper = interface
+ function Init: Integer;
+ function GetRes: PAddrInfoW;
+ property Res: PAddrInfoW read GetRes;
+ end;
+ TGetAddrInfoWrapper = class(TInterfacedObject, IGetAddrInfoWrapper)
+ strict private
+ FNode: string;
+ FService: string;
+ FHints,
+ FRes: PAddrInfoW;
+ public
+ constructor Create(ANode, AService: string; AHints: PAddrInfoW);
+ destructor Destroy; override;
+ function Init: Integer;
+ function GetRes: PAddrInfoW;
+ property Res: PAddrInfoW read GetRes;
+ end;
+ strict protected
+ procedure CommonInit; virtual;
+ function CreateSocket(AAddress: string; APort: Integer): IGetAddrInfoWrapper;
+ procedure SetRecvTimeout(ARecvTimeout: Longword); virtual;
+ procedure SetSendTimeout(ASendTimeout: Longword); virtual;
+ procedure SetKeepAlive(AKeepAlive: Boolean); virtual;
+ procedure SetSocket(ASocket: Winapi.Winsock2.TSocket);
+ property LogDelegate: TLogDelegate read FLogDelegate;
+ public
+ //
+ // Constructs a new socket. Note that this does NOT actually connect the
+ // socket.
+ //
+ constructor Create(ALogDelegate: TLogDelegate = nil); overload;
+ constructor Create(APort: Integer; ALogDelegate: TLogDelegate = nil); overload;
+ //
+ // Destroys the socket object, closing it if necessary.
+ //
+ destructor Destroy; override;
+ //
+ // Shuts down communications on the socket
+ //
+ procedure Close; virtual;
+ // The port that the socket is connected to
+ property Port: Integer read FPort write FPort;
+ // The receive timeout
+ property RecvTimeout: Longword read FRecvTimeout write SetRecvTimeout;
+ // The send timeout
+ property SendTimeout: Longword read FSendTimeout write SetSendTimeout;
+ property KeepAlive: Boolean read FKeepAlive write SetKeepAlive;
+ // The underlying socket descriptor
+ property Socket: Winapi.Winsock2.TSocket read FSocket write SetSocket;
+ end;
+ TSocket = class(TBaseSocket)
+ strict private type
+ TCachedPeerAddr = record
+ case Integer of
+ 0: (ipv4: TSockAddrIn);
+ 1: (ipv6: TSockAddrIn6);
+ end;
+ strict private
+ FHost: string;
+ FPeerHost: string;
+ FPeerAddress: string;
+ FPeerPort: Integer;
+ FInterruptListener: ISmartPointer<Winapi.Winsock2.TSocket>;
+ FConnTimeout: Longword;
+ FLingerOn: Boolean;
+ FLingerVal: Integer;
+ FNoDelay: Boolean;
+ FMaxRecvRetries: Longword;
+ FCachedPeerAddr: TCachedPeerAddr;
+ procedure InitPeerInfo;
+ procedure OpenConnection(Res: TBaseSocket.IGetAddrInfoWrapper);
+ procedure LocalOpen;
+ procedure SetGenericTimeout(S: Winapi.Winsock2.TSocket; Timeout: Longword; OptName: Integer);
+ function GetIsOpen: Boolean;
+ procedure SetNoDelay(ANoDelay: Boolean);
+ function GetSocketInfo: string;
+ function GetPeerHost: string;
+ function GetPeerAddress: string;
+ function GetPeerPort: Integer;
+ function GetOrigin: string;
+ strict protected
+ procedure CommonInit; override;
+ procedure SetRecvTimeout(ARecvTimeout: Longword); override;
+ procedure SetSendTimeout(ASendTimeout: Longword); override;
+ procedure SetKeepAlive(AKeepAlive: Boolean); override;
+ public
+ //
+ // Constructs a new socket. Note that this does NOT actually connect the
+ // socket.
+ //
+ constructor Create(ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
+ //
+ // Constructs a new socket. Note that this does NOT actually connect the
+ // socket.
+ //
+ // @param host An IP address or hostname to connect to
+ // @param port The port to connect on
+ //
+ constructor Create(AHost: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
+ //
+ // Constructor to create socket from socket descriptor.
+ //
+ constructor Create(ASocket: Winapi.Winsock2.TSocket; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
+ //
+ // Constructor to create socket from socket descriptor that
+ // can be interrupted safely.
+ //
+ constructor Create(ASocket: Winapi.Winsock2.TSocket; AInterruptListener: ISmartPointer<Winapi.Winsock2.TSocket>;
+ ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
+ //
+ // Creates and opens the socket
+ //
+ // @throws ETransportationException If the socket could not connect
+ //
+ procedure Open;
+ //
+ // Shuts down communications on the socket
+ //
+ procedure Close; override;
+ //
+ // Reads from the underlying socket.
+ // \returns the number of bytes read or 0 indicates EOF
+ // \throws TTransportException of types:
+ // Interrupted means the socket was interrupted
+ // out of a blocking call
+ // NotOpen means the socket has been closed
+ // TimedOut means the receive timeout expired
+ // Unknown means something unexpected happened
+ //
+ function Read(var Buf; Len: Integer): Integer;
+ //
+ // Writes to the underlying socket. Loops until done or fail.
+ //
+ procedure Write(const Buf; Len: Integer);
+ //
+ // Writes to the underlying socket. Does single send() and returns result.
+ //
+ function WritePartial(const Buf; Len: Integer): Integer;
+ //
+ // Returns a cached copy of the peer address.
+ //
+ function GetCachedAddress(out Len: Integer): PSockAddr;
+ //
+ // Set a cache of the peer address (used when trivially available: e.g.
+ // accept() or connect()). Only caches IPV4 and IPV6; unset for others.
+ //
+ procedure SetCachedAddress(const Addr: TSockAddr; Len: Integer);
+ //
+ // Controls whether the linger option is set on the socket.
+ //
+ // @param on Whether SO_LINGER is on
+ // @param linger If linger is active, the number of seconds to linger for
+ //
+ procedure SetLinger(LingerOn: Boolean; LingerVal: Integer);
+ //
+ // Calls select() on the socket to see if there is more data available.
+ //
+ function Peek: Boolean;
+ // Whether the socket is alive
+ property IsOpen: Boolean read GetIsOpen;
+ // The host that the socket is connected to
+ property Host: string read FHost write FHost;
+ // Whether to enable or disable Nagle's algorithm
+ property NoDelay: Boolean read FNoDelay write SetNoDelay;
+ // Connect timeout
+ property ConnTimeout: Longword read FConnTimeout write FConnTimeout;
+ // The max number of recv retries in the case of a WSAEWOULDBLOCK
+ property MaxRecvRetries: Longword read FMaxRecvRetries write FMaxRecvRetries;
+ // Socket information formatted as a string <Host: x Port: x>
+ property SocketInfo: string read GetSocketInfo;
+ // The DNS name of the host to which the socket is connected
+ property PeerHost: string read GetPeerHost;
+ // The address of the host to which the socket is connected
+ property PeerAddress: string read GetPeerAddress;
+ // The port of the host to which the socket is connected
+ property PeerPort: Integer read GetPeerPort;
+ // The origin the socket is connected to
+ property Origin: string read GetOrigin;
+ end;
+ TServerSocketFunc = reference to procedure(sock: Winapi.Winsock2.TSocket);
+ TServerSocket = class(TBaseSocket)
+ strict private
+ FAddress: string;
+ FAcceptBacklog,
+ FRetryLimit,
+ FRetryDelay,
+ FTcpSendBuffer,
+ FTcpRecvBuffer: Integer;
+ FAcceptTimeout: Longword;
+ FListening,
+ FInterruptableChildren: Boolean;
+ FInterruptSockWriter, // is notified on Interrupt()
+ FInterruptSockReader, // is used in select with FSocket for interruptability
+ FChildInterruptSockWriter: Winapi.Winsock2.TSocket; // is notified on InterruptChildren()
+ FChildInterruptSockReader: ISmartPointer<Winapi.Winsock2.TSocket>; // if FnterruptableChildren this is shared with child TSockets
+ FListenCallback,
+ FAcceptCallback: TServerSocketFunc;
+ function CreateSocketObj(Client: Winapi.Winsock2.TSocket): TSocket;
+ procedure Notify(NotifySocket: Winapi.Winsock2.TSocket);
+ procedure SetInterruptableChildren(AValue: Boolean);
+ strict protected
+ procedure CommonInit; override;
+ public const
+ public
+ //
+ // Constructor.
+ //
+ // @param port Port number to bind to
+ //
+ constructor Create(APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
+ //
+ // Constructor.
+ //
+ // @param port Port number to bind to
+ // @param sendTimeout Socket send timeout
+ // @param recvTimeout Socket receive timeout
+ //
+ constructor Create(APort: Integer; ASendTimeout, ARecvTimeout: Longword; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
+ //
+ // Constructor.
+ //
+ // @param address Address to bind to
+ // @param port Port number to bind to
+ //
+ constructor Create(AAddress: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
+ procedure Listen;
+ function Accept: TSocket;
+ procedure Interrupt;
+ procedure InterruptChildren;
+ procedure Close; override;
+ property AcceptBacklog: Integer read FAcceptBacklog write FAcceptBacklog;
+ property AcceptTimeout: Longword read FAcceptTimeout write FAcceptTimeout;
+ property RetryLimit: Integer read FRetryLimit write FRetryLimit;
+ property RetryDelay: Integer read FRetryDelay write FRetryDelay;
+ property TcpSendBuffer: Integer read FTcpSendBuffer write FTcpSendBuffer;
+ property TcpRecvBuffer: Integer read FTcpRecvBuffer write FTcpRecvBuffer;
+ // When enabled (the default), new children TSockets will be constructed so
+ // they can be interrupted by TServerTransport.InterruptChildren().
+ // This is more expensive in terms of system calls (poll + recv) however
+ // ensures a connected client cannot interfere with TServer.Stop().
+ //
+ // When disabled, TSocket children do not incur an additional poll() call.
+ // Server-side reads are more efficient, however a client can interfere with
+ // the server's ability to shutdown properly by staying connected.
+ //
+ // Must be called before listen(); mode cannot be switched after that.
+ // \throws EPropertyError if listen() has been called
+ property InterruptableChildren: Boolean read FInterruptableChildren write SetInterruptableChildren;
+ // listenCallback gets called just before listen, and after all Thrift
+ // setsockopt calls have been made. If you have custom setsockopt
+ // things that need to happen on the listening socket, this is the place to do it.
+ property ListenCallback: TServerSocketFunc read FListenCallback write FListenCallback;
+ // acceptCallback gets called after each accept call, on the newly created socket.
+ // It is called after all Thrift setsockopt calls have been made. If you have
+ // custom setsockopt things that need to happen on the accepted
+ // socket, this is the place to do it.
+ property AcceptCallback: TServerSocketFunc read FAcceptCallback write FAcceptCallback;
+ end;
+{$ENDIF} // not for OLD_SOCKETS
+ System.SysUtils, System.Math, System.DateUtils, Thrift.Transport;
+constructor TBaseSocket.TGetAddrInfoWrapper.Create(ANode, AService: string; AHints: PAddrInfoW);
+ inherited Create;
+ FNode := ANode;
+ FService := AService;
+ FHints := AHints;
+ FRes := nil;
+destructor TBaseSocket.TGetAddrInfoWrapper.Destroy;
+ if Assigned(FRes) then
+ FreeAddrInfoW(FRes);
+ inherited Destroy;
+function TBaseSocket.TGetAddrInfoWrapper.Init: Integer;
+ if FRes = nil then
+ Exit(GetAddrInfoW(@FNode[1], @FService[1], FHints^, FRes));
+ Result := 0;
+function TBaseSocket.TGetAddrInfoWrapper.GetRes: PAddrInfoW;
+ Result := FRes;
+procedure DestroyerOfFineSockets(ssock: Winapi.Winsock2.TSocket);
+ closesocket(ssock);
+function TScopeId.GetBitField(Loc: Integer): Integer;
+ Result := (Value shr (Loc shr 8)) and ((1 shl (Loc and $FF)) - 1);
+procedure TScopeId.SetBitField(Loc: Integer; const aValue: Integer);
+ Value := (Value and ULONG((not ((1 shl (Loc and $FF)) - 1)))) or ULONG(aValue shl (Loc shr 8));
+function getaddrinfo; external 'ws2_32.dll' name 'getaddrinfo';
+function GetAddrInfoW; external 'ws2_32.dll' name 'GetAddrInfoW';
+procedure freeaddrinfo; external 'ws2_32.dll' name 'freeaddrinfo';
+procedure FreeAddrInfoW; external 'ws2_32.dll' name 'FreeAddrInfoW';
+function getnameinfo; external 'ws2_32.dll' name 'getnameinfo';
+function GetNameInfoW; external 'ws2_32.dll' name 'GetNameInfoW';
+constructor TSmartPointer<T>.Create(AValue: T; ADestroyer: TSmartPointerDestroyer<T>);
+ inherited Create;
+ FValue := AValue;
+ FDestroyer := ADestroyer;
+destructor TSmartPointer<T>.Destroy;
+ if Assigned(FDestroyer) then FDestroyer(FValue);
+ inherited Destroy;
+function TSmartPointer<T>.Invoke: T;
+ Result := FValue;
+class constructor TBaseSocket.Create;
+ Version: WORD;
+ Data: WSAData;
+ Error: Integer;
+ Version := $0202;
+ FillChar(Data, SizeOf(Data), 0);
+ Error := WSAStartup(Version, Data);
+ if Error <> 0 then
+ raise Exception.Create('Failed to initialize Winsock.');
+class destructor TBaseSocket.Destroy;
+ WSACleanup;
+class procedure TBaseSocket.DefaultLogDelegate(const Str: string);
+ OutStr: string;
+ OutStr := Format('Thrift: %s %s', [DateTimeToStr(Now, TFormatSettings.Create), Str]);
+ try
+ Writeln(OutStr);
+ if IoResult <> 0 then OutputDebugString(PChar(OutStr));
+ except
+ OutputDebugString(PChar(OutStr));
+ end;
+procedure TBaseSocket.CommonInit;
+ FPort := 0;
+ FSendTimeout := 0;
+ FRecvTimeout := 0;
+ FKeepAlive := False;
+ FLogDelegate := DefaultLogDelegate;
+function TBaseSocket.CreateSocket(AAddress: string; APort: Integer): IGetAddrInfoWrapper;
+ Hints: TAddrInfoW;
+ Res: PAddrInfoW;
+ ThePort: array[0..5] of Char;
+ Error: Integer;
+ FillChar(Hints, SizeOf(Hints), 0);
+ Hints.ai_family := PF_UNSPEC;
+ Hints.ai_socktype := SOCK_STREAM;
+ Hints.ai_flags := AI_PASSIVE or AI_ADDRCONFIG;
+ StrFmt(ThePort, '%d', [FPort]);
+ Result := TGetAddrInfoWrapper.Create(AAddress, ThePort, @Hints);
+ Error := Result.Init;
+ if Error <> 0 then begin
+ LogDelegate(Format('GetAddrInfoW %d: %s', [Error, SysErrorMessage(Error)]));
+ Close;
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, 'Could not resolve host for server socket.');
+ end;
+ // Pick the ipv6 address first since ipv4 addresses can be mapped
+ // into ipv6 space.
+ Res := Result.Res;
+ while Assigned(Res) do begin
+ if (Res^.ai_family = AF_INET6) or (not Assigned(Res^.ai_next)) then
+ Break;
+ Res := Res^.ai_next;
+ end;
+ FSocket := Winapi.Winsock2.socket(Res^.ai_family, Res^.ai_socktype, Res^.ai_protocol);
+ if FSocket = INVALID_SOCKET then begin
+ Error := WSAGetLastError;
+ LogDelegate(Format('TBaseSocket.CreateSocket() socket() %s', [SysErrorMessage(Error)]));
+ Close;
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('socket(): %s', [SysErrorMessage(Error)]));
+ end;
+procedure TBaseSocket.SetRecvTimeout(ARecvTimeout: Longword);
+ FRecvTimeout := ARecvTimeout;
+procedure TBaseSocket.SetSendTimeout(ASendTimeout: Longword);
+ FSendTimeout := ASendTimeout;
+procedure TBaseSocket.SetKeepAlive(AKeepAlive: Boolean);
+ FKeepAlive := AKeepAlive;
+procedure TBaseSocket.SetSocket(ASocket: Winapi.Winsock2.TSocket);
+ if FSocket <> INVALID_SOCKET then
+ Close;
+ FSocket := ASocket;
+constructor TBaseSocket.Create(ALogDelegate: TLogDelegate);
+ inherited Create;
+ CommonInit;
+ if Assigned(ALogDelegate) then FLogDelegate := ALogDelegate;
+constructor TBaseSocket.Create(APort: Integer; ALogDelegate: TLogDelegate);
+ inherited Create;
+ CommonInit;
+ FPort := APort;
+ if Assigned(ALogDelegate) then FLogDelegate := ALogDelegate;
+destructor TBaseSocket.Destroy;
+ Close;
+ inherited Destroy;
+procedure TBaseSocket.Close;
+ if FSocket <> INVALID_SOCKET then begin
+ shutdown(FSocket, SD_BOTH);
+ closesocket(FSocket);
+ end;
+procedure TSocket.InitPeerInfo;
+ FCachedPeerAddr.ipv4.sin_family := AF_UNSPEC;
+ FPeerHost := '';
+ FPeerAddress := '';
+ FPeerPort := 0;
+procedure TSocket.CommonInit;
+ inherited CommonInit;
+ FHost := '';
+ FInterruptListener := nil;
+ FConnTimeout := 0;
+ FLingerOn := True;
+ FLingerVal := 0;
+ FNoDelay := True;
+ FMaxRecvRetries := 5;
+ InitPeerInfo;
+procedure TSocket.OpenConnection(Res: TBaseSocket.IGetAddrInfoWrapper);
+ Done;
+ ErrnoCopy: Integer;
+ Ret,
+ Ret2: Integer;
+ Fds: TFdSet;
+ TVal: TTimeVal;
+ PTVal: PTimeVal;
+ Val,
+ Lon: Integer;
+ One,
+ Zero: Cardinal;
+ if SendTimeout > 0 then SetSendTimeout(SendTimeout);
+ if RecvTimeout > 0 then SetRecvTimeout(RecvTimeout);
+ if KeepAlive then SetKeepAlive(KeepAlive);
+ SetLinger(FLingerOn, FLingerVal);
+ SetNoDelay(FNoDelay);
+ // Set the socket to be non blocking for connect if a timeout exists
+ Zero := 0;
+ if FConnTimeout > 0 then begin
+ One := 1;
+ if ioctlsocket(Socket, Integer(FIONBIO), One) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TSocket.OpenConnection() ioctlsocket() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('ioctlsocket() failed: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ end
+ else begin
+ if ioctlsocket(Socket, Integer(FIONBIO), Zero) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TSocket.OpenConnection() ioctlsocket() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('ioctlsocket() failed: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ end;
+ Ret := connect(Socket, Res.Res^.ai_addr^, Res.Res^.ai_addrlen);
+ if Ret = 0 then goto Done;
+ ErrnoCopy := WSAGetLastError;
+ if (ErrnoCopy <> WSAEINPROGRESS) and (ErrnoCopy <> WSAEWOULDBLOCK) then begin
+ LogDelegate(Format('TSocket.OpenConnection() connect() ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('connect() failed: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ FD_ZERO(Fds);
+ _FD_SET(Socket, Fds);
+ if FConnTimeout > 0 then begin
+ TVal.tv_sec := FConnTimeout div 1000;
+ TVal.tv_usec := (FConnTimeout mod 1000) * 1000;
+ PTVal := @TVal;
+ end
+ else
+ PTVal := nil;
+ Ret := select(1, nil, @Fds, nil, PTVal);
+ if Ret > 0 then begin
+ // Ensure the socket is connected and that there are no errors set
+ Lon := SizeOf(Val);
+ Ret2 := getsockopt(Socket, SOL_SOCKET, SO_ERROR, @Val, Lon);
+ if Ret2 = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TSocket.OpenConnection() getsockopt() ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('getsockopt(): %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // no errors on socket, go to town
+ if Val = 0 then goto Done;
+ LogDelegate(Format('TSocket.OpenConnection() error on socket (after select()) ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('socket OpenConnection() error: %s', [SysErrorMessage(Val)]));
+ end
+ else if Ret = 0 then begin
+ // socket timed out
+ LogDelegate(Format('TSocket.OpenConnection() timed out ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, 'OpenConnection() timed out');
+ end
+ else begin
+ // error on select()
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TSocket.OpenConnection() select() ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('select() failed: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // Set socket back to normal mode (blocking)
+ ioctlsocket(Socket, Integer(FIONBIO), Zero);
+ SetCachedAddress(Res.Res^.ai_addr^, Res.Res^.ai_addrlen);
+procedure TSocket.LocalOpen;
+ Res: TBaseSocket.IGetAddrInfoWrapper;
+ if IsOpen then Exit;
+ // Validate port number
+ if (Port < 0) or (Port > $FFFF) then
+ raise TTransportException.Create(TTransportException.TExceptionType.BadArgs, 'Specified port is invalid');
+ Res := CreateSocket(Host, Port);
+ OpenConnection(Res);
+procedure TSocket.SetGenericTimeout(S: Winapi.Winsock2.TSocket; Timeout: Longword; OptName: Integer);
+ Time: DWORD;
+ if S = INVALID_SOCKET then
+ Exit;
+ Time := Timeout;
+ if setsockopt(S, SOL_SOCKET, OptName, @Time, SizeOf(Time)) = SOCKET_ERROR then
+ LogDelegate(Format('SetGenericTimeout() setsockopt() %s', [SysErrorMessage(WSAGetLastError)]));
+function TSocket.GetIsOpen: Boolean;
+ Result := Socket <> INVALID_SOCKET;
+procedure TSocket.SetNoDelay(ANoDelay: Boolean);
+ V: Integer;
+ FNoDelay := ANoDelay;
+ if Socket = INVALID_SOCKET then
+ Exit;
+ V := IfThen(FNoDelay, 1, 0);
+ if setsockopt(Socket, IPPROTO_TCP, TCP_NODELAY, @V, SizeOf(V)) = SOCKET_ERROR then
+ LogDelegate(Format('TSocket.SetNoDelay() setsockopt() %s %s', [SocketInfo, SysErrorMessage(WSAGetLastError)]));
+function TSocket.GetSocketInfo: string;
+ if (FHost = '') or (Port = 0) then
+ Result := '<Host: ' + GetPeerAddress + ' Port: ' + GetPeerPort.ToString + '>'
+ else
+ Result := '<Host: ' + FHost + ' Port: ' + Port.ToString + '>';
+function TSocket.GetPeerHost: string;
+ Addr: TSockAddrStorage;
+ AddrPtr: PSockAddr;
+ AddrLen: Integer;
+ ClientHost: array[0..NI_MAXHOST-1] of Char;
+ ClientService: array[0..NI_MAXSERV-1] of Char;
+ if FPeerHost = '' then begin
+ if Socket = INVALID_SOCKET then
+ Exit(FPeerHost);
+ AddrPtr := GetCachedAddress(AddrLen);
+ if AddrPtr = nil then begin
+ AddrLen := SizeOf(Addr);
+ if getpeername(Socket, PSockAddr(@Addr)^, AddrLen) <> 0 then
+ Exit(FPeerHost);
+ AddrPtr := PSockAddr(@Addr);
+ SetCachedAddress(AddrPtr^, AddrLen);
+ end;
+ GetNameInfoW(AddrPtr^, AddrLen, ClientHost, NI_MAXHOST, ClientService, NI_MAXSERV, 0);
+ FPeerHost := ClientHost;
+ end;
+ Result := FPeerHost;
+function TSocket.GetPeerAddress: string;
+ Addr: TSockAddrStorage;
+ AddrPtr: PSockAddr;
+ AddrLen: Integer;
+ ClientHost: array[0..NI_MAXHOST-1] of Char;
+ ClientService: array[0..NI_MAXSERV-1] of Char;
+ if FPeerAddress = '' then begin
+ if Socket = INVALID_SOCKET then
+ Exit(FPeerAddress);
+ AddrPtr := GetCachedAddress(AddrLen);
+ if AddrPtr = nil then begin
+ AddrLen := SizeOf(Addr);
+ if getpeername(Socket, PSockAddr(@Addr)^, AddrLen) <> 0 then
+ Exit(FPeerHost);
+ AddrPtr := PSockAddr(@Addr);
+ SetCachedAddress(AddrPtr^, AddrLen);
+ end;
+ GetNameInfoW(AddrPtr^, AddrLen, ClientHost, NI_MAXHOST, ClientService, NI_MAXSERV, NI_NUMERICHOST or NI_NUMERICSERV);
+ FPeerAddress := ClientHost;
+ TryStrToInt(ClientService, FPeerPort);
+ end;
+ Result := FPeerAddress
+function TSocket.GetPeerPort: Integer;
+ GetPeerAddress;
+ Result := FPeerPort;
+function TSocket.GetOrigin: string;
+ Result := GetPeerHost + ':' + GetPeerPort.ToString;
+procedure TSocket.SetRecvTimeout(ARecvTimeout: Longword);
+ inherited SetRecvTimeout(ARecvTimeout);
+ SetGenericTimeout(Socket, ARecvTimeout, SO_RCVTIMEO);
+procedure TSocket.SetSendTimeout(ASendTimeout: Longword);
+ inherited SetSendTimeout(ASendTimeout);
+ SetGenericTimeout(Socket, ASendTimeout, SO_SNDTIMEO);
+procedure TSocket.SetKeepAlive(AKeepAlive: Boolean);
+ Value: Integer;
+ inherited SetKeepAlive(AKeepAlive);
+ Value := IfThen(KeepAlive, 1, 0);
+ if setsockopt(Socket, SOL_SOCKET, SO_KEEPALIVE, @Value, SizeOf(Value)) = SOCKET_ERROR then
+ LogDelegate(Format('TSocket.SetKeepAlive() setsockopt() %s %s', [SocketInfo, SysErrorMessage(WSAGetLastError)]));
+constructor TSocket.Create(ALogDelegate: TBaseSocket.TLogDelegate = nil);
+ // Not needed, but just a placeholder
+ inherited Create(ALogDelegate);
+constructor TSocket.Create(AHost: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate);
+ inherited Create(APort, ALogDelegate);
+ FHost := AHost;
+constructor TSocket.Create(ASocket: Winapi.Winsock2.TSocket; ALogDelegate: TBaseSocket.TLogDelegate);
+ inherited Create(ALogDelegate);
+ Socket := ASocket;
+constructor TSocket.Create(ASocket: Winapi.Winsock2.TSocket; AInterruptListener: ISmartPointer<Winapi.Winsock2.TSocket>;
+ ALogDelegate: TBaseSocket.TLogDelegate);
+ inherited Create(ALogDelegate);
+ Socket := ASocket;
+ FInterruptListener := AInterruptListener;
+procedure TSocket.Open;
+ if IsOpen then Exit;
+ LocalOpen;
+procedure TSocket.Close;
+ inherited Close;
+ InitPeerInfo;
+function TSocket.Read(var Buf; Len: Integer): Integer;
+ TryAgain;
+ Retries: Longword;
+ EAgainThreshold,
+ ReadElapsed: UInt64;
+ Start: TDateTime;
+ Got: Integer;
+ Fds: TFdSet;
+ ErrnoCopy: Integer;
+ TVal: TTimeVal;
+ PTVal: PTimeVal;
+ Ret: Integer;
+ if Socket = INVALID_SOCKET then
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, 'Called read on non-open socket');
+ Retries := 0;
+ // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
+ // the system is out of resources (an awesome undocumented feature).
+ // The following is an approximation of the time interval under which
+ // THRIFT_EAGAIN is taken to indicate an out of resources error.
+ EAgainThreshold := 0;
+ if RecvTimeout <> 0 then
+ // if a readTimeout is specified along with a max number of recv retries, then
+ // the threshold will ensure that the read timeout is not exceeded even in the
+ // case of resource errors
+ EAgainThreshold := RecvTimeout div IfThen(FMaxRecvRetries > 0, FMaxRecvRetries, 2);
+ // Read from the socket
+ if RecvTimeout > 0 then
+ Start := Now
+ else
+ // if there is no read timeout we don't need the TOD to determine whether
+ // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
+ Start := 0;
+ if Assigned(FInterruptListener) then begin
+ FD_ZERO(Fds);
+ _FD_SET(Socket, Fds);
+ _FD_SET(FInterruptListener, Fds);
+ if RecvTimeout > 0 then begin
+ TVal.tv_sec := RecvTimeout div 1000;
+ TVal.tv_usec := (RecvTimeout mod 1000) * 1000;
+ PTVal := @TVal;
+ end
+ else
+ PTVal := nil;
+ Ret := select(2, @Fds, nil, nil, PTVal);
+ ErrnoCopy := WSAGetLastError;
+ if Ret < 0 then begin
+ // error cases
+ if (ErrnoCopy = WSAEINTR) and (Retries < FMaxRecvRetries) then begin
+ Inc(Retries);
+ goto TryAgain;
+ end;
+ LogDelegate(Format('TSocket.Read() select() %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
+ end
+ else if Ret > 0 then begin
+ // Check the interruptListener
+ if FD_ISSET(FInterruptListener, Fds) then
+ raise TTransportException.Create(TTransportException.TExceptionType.Interrupted, 'Interrupted');
+ end
+ else // Ret = 0
+ raise TTransportException.Create(TTransportException.TExceptionType.TimedOut, 'WSAEWOULDBLOCK (timed out)');
+ // falling through means there is something to recv and it cannot block
+ end;
+ Got := recv(Socket, Buf, Len, 0);
+ ErrnoCopy := WSAGetLastError;
+ // Check for error on read
+ if Got < 0 then begin
+ if ErrnoCopy = WSAEWOULDBLOCK then begin
+ // if no timeout we can assume that resource exhaustion has occurred.
+ if RecvTimeout = 0 then
+ raise TTransportException.Create(TTransportException.TExceptionType.TimedOut, 'WSAEWOULDBLOCK (unavailable resources)');
+ // check if this is the lack of resources or timeout case
+ ReadElapsed := MilliSecondsBetween(Now, Start);
+ if (EAgainThreshold = 0) or (ReadElapsed < EAgainThreshold) then begin
+ if Retries < FMaxRecvRetries then begin
+ Inc(Retries);
+ Sleep(1);
+ goto TryAgain;
+ end
+ else
+ raise TTransportException.Create(TTransportException.TExceptionType.TimedOut, 'WSAEWOULDBLOCK (unavailable resources)');
+ end
+ else
+ // infer that timeout has been hit
+ raise TTransportException.Create(TTransportException.TExceptionType.TimedOut, 'WSAEWOULDBLOCK (timed out)');
+ end;
+ // If interrupted, try again
+ if (ErrnoCopy = WSAEINTR) and (Retries < FMaxRecvRetries) then begin
+ Inc(Retries);
+ goto TryAgain;
+ end;
+ if ErrnoCopy = WSAECONNRESET then
+ Exit(0);
+ // This ish isn't open
+ if ErrnoCopy = WSAENOTCONN then
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, 'WSAENOTCONN');
+ // Timed out!
+ if ErrnoCopy = WSAETIMEDOUT then
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, 'WSAETIMEDOUT');
+ // Now it's not a try again case, but a real probblez
+ LogDelegate(Format('TSocket.Read() recv() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ // Some other error, whatevz
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ Result := Got;
+procedure TSocket.Write(const Buf; Len: Integer);
+ Sent, B: Integer;
+ Sent := 0;
+ while Sent < Len do begin
+ B := WritePartial((PByte(@Buf) + Sent)^, Len - Sent);
+ if B = 0 then
+ // This should only happen if the timeout set with SO_SNDTIMEO expired.
+ // Raise an exception.
+ raise TTransportException.Create(TTransportException.TExceptionType.TimedOut, 'send timeout expired');
+ Inc(Sent, B);
+ end;
+function TSocket.WritePartial(const Buf; Len: Integer): Integer;
+ B: Integer;
+ ErrnoCopy: Integer;
+ if Socket = INVALID_SOCKET then
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, 'Called write on non-open socket');
+ B := send(Socket, Buf, Len, 0);
+ if B < 0 then begin
+ // Fail on a send error
+ ErrnoCopy := WSAGetLastError;
+ if ErrnoCopy = WSAEWOULDBLOCK then
+ Exit(0);
+ LogDelegate(Format('TSocket.WritePartial() send() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ if (ErrnoCopy = WSAECONNRESET) or (ErrnoCopy = WSAENOTCONN) then begin
+ Close;
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('write() send(): %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('write() send(): %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // Fail on blocked send
+ if B = 0 then
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, 'Socket send returned 0.');
+ Result := B;
+function TSocket.GetCachedAddress(out Len: Integer): PSockAddr;
+ case FCachedPeerAddr.ipv4.sin_family of
+ AF_INET: begin
+ Len := SizeOf(TSockAddrIn);
+ Result := PSockAddr(@FCachedPeerAddr.ipv4);
+ end;
+ AF_INET6: begin
+ Len := SizeOf(TSockAddrIn6);
+ Result := PSockAddr(@FCachedPeerAddr.ipv6);
+ end;
+ else
+ Len := 0;
+ Result := nil;
+ end;
+procedure TSocket.SetCachedAddress(const Addr: TSockAddr; Len: Integer);
+ case Addr.sa_family of
+ AF_INET: if Len = SizeOf(TSockAddrIn) then FCachedPeerAddr.ipv4 := PSockAddrIn(@Addr)^;
+ AF_INET6: if Len = SizeOf(TSockAddrIn6) then FCachedPeerAddr.ipv6 := PSockAddrIn6(@Addr)^;
+ end;
+ FPeerAddress := '';
+ FPeerHost := '';
+ FPeerPort := 0;
+procedure TSocket.SetLinger(LingerOn: Boolean; LingerVal: Integer);
+ L: TLinger;
+ FLingerOn := LingerOn;
+ FLingerVal := LingerVal;
+ if Socket = INVALID_SOCKET then
+ Exit;
+ L.l_onoff := IfThen(FLingerOn, 1, 0);
+ L.l_linger := LingerVal;
+ if setsockopt(Socket, SOL_SOCKET, SO_LINGER, @L, SizeOf(L)) = SOCKET_ERROR then
+ LogDelegate(Format('TSocket.SetLinger() setsockopt() %s %s', [SocketInfo, SysErrorMessage(WSAGetLastError)]));
+function TSocket.Peek: Boolean;
+ Retries: Longword;
+ Fds: TFdSet;
+ TVal: TTimeVal;
+ PTVal: PTimeVal;
+ Ret: Integer;
+ ErrnoCopy: Integer;
+ Buf: Byte;
+ if not IsOpen then Exit(False);
+ if Assigned(FInterruptListener) then begin
+ Retries := 0;
+ while true do begin
+ FD_ZERO(Fds);
+ _FD_SET(Socket, Fds);
+ _FD_SET(FInterruptListener, Fds);
+ if RecvTimeout > 0 then begin
+ TVal.tv_sec := RecvTimeout div 1000;
+ TVal.tv_usec := (RecvTimeout mod 1000) * 1000;
+ PTVal := @TVal;
+ end
+ else
+ PTVal := nil;
+ Ret := select(2, @Fds, nil, nil, PTVal);
+ ErrnoCopy := WSAGetLastError;
+ if Ret < 0 then begin
+ // error cases
+ if (ErrnoCopy = WSAEINTR) and (Retries < FMaxRecvRetries) then begin
+ Inc(Retries);
+ Continue;
+ end;
+ LogDelegate(Format('TSocket.Peek() select() %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
+ end
+ else if Ret > 0 then begin
+ // Check the interruptListener
+ if FD_ISSET(FInterruptListener, Fds) then
+ Exit(False);
+ // There must be data or a disconnection, fall through to the PEEK
+ Break;
+ end
+ else
+ // timeout
+ Exit(False);
+ end;
+ end;
+ // Check to see if data is available or if the remote side closed
+ Ret := recv(Socket, Buf, 1, MSG_PEEK);
+ if Ret = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ if ErrnoCopy = WSAECONNRESET then begin
+ Close;
+ Exit(False);
+ end;
+ LogDelegate(Format('TSocket.Peek() recv() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('recv(): %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ Result := Ret > 0;
+function TServerSocket.CreateSocketObj(Client: Winapi.Winsock2.TSocket): TSocket;
+ if FInterruptableChildren then
+ Result := TSocket.Create(Client, FChildInterruptSockReader)
+ else
+ Result := TSocket.Create(Client);
+procedure TServerSocket.Notify(NotifySocket: Winapi.Winsock2.TSocket);
+ Byt: Byte;
+ if NotifySocket <> INVALID_SOCKET then begin
+ Byt := 0;
+ if send(NotifySocket, Byt, SizeOf(Byt), 0) = SOCKET_ERROR then
+ LogDelegate(Format('TServerSocket.Notify() send() %s', [SysErrorMessage(WSAGetLastError)]));
+ end;
+procedure TServerSocket.SetInterruptableChildren(AValue: Boolean);
+ if FListening then
+ raise Exception.Create('InterruptableChildren cannot be set after listen()');
+ FInterruptableChildren := AValue;
+procedure TServerSocket.CommonInit;
+ inherited CommonInit;
+ FInterruptableChildren := True;
+ FAcceptBacklog := DEFAULT_BACKLOG;
+ FAcceptTimeout := 0;
+ FRetryLimit := 0;
+ FRetryDelay := 0;
+ FTcpSendBuffer := 0;
+ FTcpRecvBuffer := 0;
+ FListening := False;
+ FInterruptSockWriter := INVALID_SOCKET;
+ FInterruptSockReader := INVALID_SOCKET;
+ FChildInterruptSockWriter := INVALID_SOCKET;
+constructor TServerSocket.Create(APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil);
+ // Unnecessary, but here for documentation purposes
+ inherited Create(APort, ALogDelegate);
+constructor TServerSocket.Create(APort: Integer; ASendTimeout, ARecvTimeout: Longword; ALogDelegate: TBaseSocket.TLogDelegate);
+ inherited Create(APort, ALogDelegate);
+ SendTimeout := ASendTimeout;
+ RecvTimeout := ARecvTimeout;
+constructor TServerSocket.Create(AAddress: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate);
+ inherited Create(APort, ALogDelegate);
+ FAddress := AAddress;
+procedure TServerSocket.Listen;
+ function CreateSocketPair(var Reader, Writer: Winapi.Winsock2.TSocket): Integer;
+ label
+ Error;
+ type
+ TSAUnion = record
+ case Integer of
+ 0: (inaddr: TSockAddrIn);
+ 1: (addr: TSockAddr);
+ end;
+ var
+ a: TSAUnion;
+ listener: Winapi.Winsock2.TSocket;
+ e: Integer;
+ addrlen: Integer;
+ flags: DWORD;
+ reuse: Integer;
+ begin
+ addrlen := SizeOf(a.inaddr);
+ flags := 0;
+ reuse := 1;
+ listener := Winapi.Winsock2.socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if listener = INVALID_SOCKET then
+ FillChar(a, SizeOf(a), 0);
+ a.inaddr.sin_family := AF_INET;
+ a.inaddr.sin_addr.s_addr := htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_port := 0;
+ // ignore errors coming out of this setsockopt. This is because
+ // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
+ // want to force socket pairs to be an admin.
+ setsockopt(listener, SOL_SOCKET, Integer(SO_EXCLUSIVEADDRUSE), @reuse, SizeOf(reuse));
+ if bind(listener, a.addr, SizeOf(a.inaddr)) = SOCKET_ERROR then
+ goto Error;
+ if getsockname(listener, a.addr, addrlen) = SOCKET_ERROR then
+ goto Error;
+ if Winapi.Winsock2.listen(listener, 1) = SOCKET_ERROR then
+ goto Error;
+ Reader := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, flags);
+ if Reader = INVALID_SOCKET then
+ goto Error;
+ if connect(Reader, a.addr, SizeOf(a.inaddr)) = SOCKET_ERROR then
+ goto Error;
+ Writer := Winapi.Winsock2.accept(listener, nil, nil);
+ if Writer = INVALID_SOCKET then
+ goto Error;
+ closesocket(listener);
+ Exit(0);
+ Error:
+ e := WSAGetLastError;
+ closesocket(listener);
+ closesocket(Reader);
+ closesocket(Writer);
+ WSASetLastError(e);
+ Result := SOCKET_ERROR;
+ end;
+ TempIntReader,
+ TempIntWriter: Winapi.Winsock2.TSocket;
+ One: Cardinal;
+ ErrnoCopy: Integer;
+ Ling: TLinger;
+ Retries: Integer;
+ AddrInfo: IGetAddrInfoWrapper;
+ SA: TSockAddrStorage;
+ Len: Integer;
+ // Create the socket pair used to interrupt
+ if CreateSocketPair(TempIntReader, TempIntWriter) = SOCKET_ERROR then begin
+ LogDelegate(Format('TServerSocket.Listen() CreateSocketPair() Interrupt %s', [SysErrorMessage(WSAGetLastError)]));
+ FInterruptSockReader := INVALID_SOCKET;
+ FInterruptSockWriter := INVALID_SOCKET;
+ end
+ else begin
+ FInterruptSockReader := TempIntReader;
+ FInterruptSockWriter := TempIntWriter;
+ end;
+ // Create the socket pair used to interrupt all clients
+ if CreateSocketPair(TempIntReader, TempIntWriter) = SOCKET_ERROR then begin
+ LogDelegate(Format('TServerSocket.Listen() CreateSocketPair() ChildInterrupt %s', [SysErrorMessage(WSAGetLastError)]));
+ FChildInterruptSockReader := TSmartPointer<Winapi.Winsock2.TSocket>.Create(INVALID_SOCKET, nil);
+ FChildInterruptSockWriter := INVALID_SOCKET;
+ end
+ else begin
+ FChildInterruptSockReader := TSmartPointer<Winapi.Winsock2.TSocket>.Create(TempIntReader, DestroyerOfFineSockets);
+ FChildInterruptSockWriter := TempIntWriter;
+ end;
+ if (Port < 0) or (Port > $FFFF) then
+ raise TTransportException.Create(TTransportException.TExceptionType.BadArgs, 'Specified port is invalid');
+ AddrInfo := CreateSocket(FAddress, Port);
+ // Set SO_EXCLUSIVEADDRUSE to prevent 2MSL delay on accept
+ One := 1;
+ setsockopt(Socket, SOL_SOCKET, Integer(SO_EXCLUSIVEADDRUSE), @one, SizeOf(One));
+ // ignore errors coming out of this setsockopt on Windows. This is because
+ // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
+ // want to force servers to be an admin.
+ // Set TCP buffer sizes
+ if FTcpSendBuffer > 0 then begin
+ if setsockopt(Socket, SOL_SOCKET, SO_SNDBUF, @FTcpSendBuffer, SizeOf(FTcpSendBuffer)) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Listen() setsockopt() SO_SNDBUF %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('Could not set SO_SNDBUF: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ end;
+ if FTcpRecvBuffer > 0 then begin
+ if setsockopt(Socket, SOL_SOCKET, SO_RCVBUF, @FTcpRecvBuffer, SizeOf(FTcpRecvBuffer)) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Listen() setsockopt() SO_RCVBUF %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('Could not set SO_RCVBUF: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ end;
+ // Turn linger off, don't want to block on calls to close
+ Ling.l_onoff := 0;
+ Ling.l_linger := 0;
+ if setsockopt(Socket, SOL_SOCKET, SO_LINGER, @Ling, SizeOf(Ling)) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Listen() setsockopt() SO_LINGER %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('Could not set SO_LINGER: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // TCP Nodelay, speed over bandwidth
+ if setsockopt(Socket, IPPROTO_TCP, TCP_NODELAY, @One, SizeOf(One)) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Listen() setsockopt() TCP_NODELAY %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('Could not set TCP_NODELAY: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // Set NONBLOCK on the accept socket
+ if ioctlsocket(Socket, Integer(FIONBIO), One) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Listen() ioctlsocket() FIONBIO %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('ioctlsocket() FIONBIO: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // prepare the port information
+ // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
+ // always seem to work. The client can configure the retry variables.
+ Retries := 0;
+ while True do begin
+ if bind(Socket, AddrInfo.Res^.ai_addr^, AddrInfo.Res^.ai_addrlen) = 0 then
+ Break;
+ Inc(Retries);
+ if Retries > FRetryLimit then
+ Break;
+ Sleep(FRetryDelay * 1000);
+ end;
+ // retrieve bind info
+ if (Port = 0) and (Retries < FRetryLimit) then begin
+ Len := SizeOf(SA);
+ FillChar(SA, Len, 0);
+ if getsockname(Socket, PSockAddr(@SA)^, Len) = SOCKET_ERROR then
+ LogDelegate(Format('TServerSocket.Listen() getsockname() %s', [SysErrorMessage(WSAGetLastError)]))
+ else begin
+ if SA.ss_family = AF_INET6 then
+ Port := ntohs(PSockAddrIn6(@SA)^.sin6_port)
+ else
+ Port := ntohs(PSockAddrIn(@SA)^.sin_port);
+ end;
+ end;
+ // throw an error if we failed to bind properly
+ if (Retries > FRetryLimit) then begin
+ LogDelegate(Format('TServerSocket.Listen() BIND %d', [Port]));
+ Close;
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('Could not bind: %s', [SysErrorMessage(WSAGetLastError)]));
+ end;
+ if Assigned(FListenCallback) then
+ FListenCallback(Socket);
+ // Call listen
+ if Winapi.Winsock2.listen(Socket, FAcceptBacklog) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Listen() listen() %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.NotOpen, Format('Could not listen: %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // The socket is now listening!
+function TServerSocket.Accept: TSocket;
+ Fds: TFdSet;
+ MaxEInters,
+ NumEInters: Integer;
+ TVal: TTimeVal;
+ PTVal: PTimeVal;
+ ErrnoCopy: Integer;
+ Buf: Byte;
+ ClientAddress: TSockAddrStorage;
+ Size: Integer;
+ ClientSocket: Winapi.Winsock2.TSocket;
+ Zero: Cardinal;
+ Client: TSocket;
+ Ret: Integer;
+ MaxEInters := 5;
+ NumEInters := 0;
+ while True do begin
+ FD_ZERO(Fds);
+ _FD_SET(Socket, Fds);
+ _FD_SET(FInterruptSockReader, Fds);
+ if FAcceptTimeout > 0 then begin
+ TVal.tv_sec := FAcceptTimeout div 1000;
+ TVal.tv_usec := (FAcceptTimeout mod 1000) * 1000;
+ PTVal := @TVal;
+ end
+ else
+ PTVal := nil;
+ // TODO: if WSAEINTR is received, we'll restart the timeout.
+ // To be accurate, we need to fix this in the future.
+ Ret := select(2, @Fds, nil, nil, PTVal);
+ if Ret < 0 then begin
+ // error cases
+ if (WSAGetLastError = WSAEINTR) and (NumEInters < MaxEInters) then begin
+ // THRIFT_EINTR needs to be handled manually and we can tolerate
+ // a certain number
+ Inc(NumEInters);
+ Continue;
+ end;
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Accept() select() %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
+ end
+ else if Ret > 0 then begin
+ // Check for an interrupt signal
+ if (FInterruptSockReader <> INVALID_SOCKET) and FD_ISSET(FInterruptSockReader, Fds) then begin
+ if recv(FInterruptSockReader, Buf, SizeOf(Buf), 0) = SOCKET_ERROR then
+ LogDelegate(Format('TServerSocket.Accept() recv() interrupt %s', [SysErrorMessage(WSAGetLastError)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.Interrupted);
+ end;
+ // Check for the actual server socket being ready
+ if FD_ISSET(Socket, Fds) then
+ Break;
+ end
+ else begin
+ LogDelegate('TServerSocket.Accept() select() 0');
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown);
+ end;
+ end;
+ Size := SizeOf(ClientAddress);
+ ClientSocket := Winapi.Winsock2.accept(Socket, @ClientAddress, @Size);
+ if ClientSocket = INVALID_SOCKET then begin
+ ErrnoCopy := WSAGetLastError;
+ LogDelegate(Format('TServerSocket.Accept() accept() %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('accept(): %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ // Make sure client socket is blocking
+ Zero := 0;
+ if ioctlsocket(ClientSocket, Integer(FIONBIO), Zero) = SOCKET_ERROR then begin
+ ErrnoCopy := WSAGetLastError;
+ closesocket(ClientSocket);
+ LogDelegate(Format('TServerSocket.Accept() ioctlsocket() FIONBIO %s', [SysErrorMessage(ErrnoCopy)]));
+ raise TTransportException.Create(TTransportException.TExceptionType.Unknown, Format('ioctlsocket(): %s', [SysErrorMessage(ErrnoCopy)]));
+ end;
+ Client := CreateSocketObj(ClientSocket);
+ if SendTimeout > 0 then
+ Client.SendTimeout := SendTimeout;
+ if RecvTimeout > 0 then
+ Client.RecvTimeout := RecvTimeout;
+ if KeepAlive then
+ Client.KeepAlive := KeepAlive;
+ Client.SetCachedAddress(PSockAddr(@ClientAddress)^, Size);
+ if Assigned(FAcceptCallback) then
+ FAcceptCallback(ClientSocket);
+ Result := Client;
+procedure TServerSocket.Interrupt;
+ Notify(FInterruptSockWriter);
+procedure TServerSocket.InterruptChildren;
+ Notify(FChildInterruptSockWriter);
+procedure TServerSocket.Close;
+ inherited Close;
+ if FInterruptSockWriter <> INVALID_SOCKET then
+ closesocket(FInterruptSockWriter);
+ if FInterruptSockReader <> INVALID_SOCKET then
+ closesocket(FInterruptSockReader);
+ if FChildInterruptSockWriter <> INVALID_SOCKET then
+ closesocket(FChildInterruptSockWriter);
+ FChildInterruptSockReader := TSmartPointer<Winapi.Winsock2.TSocket>.Create(INVALID_SOCKET, nil);
+ FListening := False;
+{$ENDIF} // not for OLD_SOCKETS
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
@@ -33,9 +33,9 @@
Winapi.ActiveX, Winapi.msxml, Winapi.WinSock,
- Web.Win.Sockets,
- {$ELSE}
- System.Win.ScktComp,
+ Web.Win.Sockets,
+ {$ELSE}
+ Thrift.Socket,
@@ -79,7 +79,9 @@
- EndOfFile
+ EndOfFile,
+ BadArgs,
+ Interrupted
FType : TExceptionType;
@@ -158,25 +160,22 @@
function GetTransport( const ATrans: ITransport): ITransport; virtual;
- TThriftCustomIpClient = TCustomIpClient;
- TThriftTcpServer = TTcpServer;
- TThriftTcpClient = TTcpClient;
- {$ELSE}
- // TODO
- {$ENDIF}
TTcpSocketStreamImpl = class( TThriftStreamImpl )
private type
TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
- FTcpClient : TThriftCustomIpClient;
+ FTcpClient : TCustomIpClient;
FTimeout : Integer;
function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
TimeOut: Integer; var wsaError : Integer): Integer;
function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
var wsaError, bytesReady : Integer): TWaitForData;
+ FTcpClient: TSocket;
+ protected const
+ SLEEP_TIME = 200;
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
@@ -187,9 +186,12 @@
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
- constructor Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer = 0);
+ constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+ constructor Create( const ATcpClient: TSocket; const aTimeout : Longword = 0);
- {$ENDIF}
IStreamTransport = interface( ITransport )
@@ -240,24 +242,31 @@
destructor Destroy; override;
TServerSocketImpl = class( TServerTransportImpl)
- FServer : TThriftTcpServer;
+ FServer : TTcpServer;
FPort : Integer;
FClientTimeout : Integer;
+ FServer: TServerSocket;
FUseBufferedSocket : Boolean;
FOwnsServer : Boolean;
function Accept( const fnAccepting: TProc) : ITransport; override;
- constructor Create( const AServer: TThriftTcpServer; AClientTimeout: Integer = 0); overload;
+ constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
+ constructor Create( const AServer: TServerSocket; AClientTimeout: Longword = 0); overload;
+ constructor Create( APort: Integer; AClientTimeout: Longword = 0; AUseBufferedSockets: Boolean = FALSE); overload;
destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
- {$ENDIF}
TBufferedTransportImpl = class( TTransportImpl )
@@ -282,29 +291,44 @@
property IsOpen: Boolean read GetIsOpen;
TSocketImpl = class(TStreamTransportImpl)
- FClient : TThriftCustomIpClient;
+ FClient : TCustomIpClient;
+ FClient: TSocket;
FOwnsClient : Boolean;
FHost : string;
FPort : Integer;
FTimeout : Integer;
+ FTimeout : Longword;
procedure InitSocket;
function GetIsOpen: Boolean; override;
procedure Open; override;
- constructor Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
+ constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
+ constructor Create(const AClient: TSocket; aOwnsClient: Boolean); overload;
+ constructor Create( const AHost: string; APort: Integer; ATimeout: Longword = 0); overload;
destructor Destroy; override;
procedure Close; override;
- property TcpClient: TThriftCustomIpClient read FClient;
+ property TcpClient: TCustomIpClient read FClient;
+ property TcpClient: TSocket read FClient;
property Host : string read FHost;
property Port: Integer read FPort;
- {$ENDIF}
TFramedTransportImpl = class( TTransportImpl)
private const
@@ -373,8 +397,8 @@
while got < len do begin
ret := Read( buf, off + got, len - got);
if ret > 0
- then Inc( got, ret)
- else raise TTransportException.Create( 'Cannot read, Remote side has closed' );
+ then Inc( got, ret)
+ else raise TTransportException.Create( 'Cannot read, Remote side has closed' );
Result := got;
@@ -546,27 +570,44 @@
{ TServerSocket }
-constructor TServerSocketImpl.Create( const AServer: TThriftTcpServer; AClientTimeout: Integer);
+constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
inherited Create;
FServer := AServer;
FClientTimeout := AClientTimeout;
-constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
+constructor TServerSocketImpl.Create( const AServer: TServerSocket; AClientTimeout: Longword);
inherited Create;
+ FServer := AServer;
+ FServer.RecvTimeout := AClientTimeout;
+ FServer.SendTimeout := AClientTimeout;
+constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
+constructor TServerSocketImpl.Create(APort: Integer; AClientTimeout: Longword; AUseBufferedSockets: Boolean);
+ inherited Create;
FPort := APort;
FClientTimeout := AClientTimeout;
- FUseBufferedSocket := AUseBufferedSockets;
- FOwnsServer := True;
- FServer := TThriftTcpServer.Create( nil );
+ FServer := TTcpServer.Create( nil );
FServer.BlockMode := bmBlocking;
{$IF CompilerVersion >= 21.0}
FServer.LocalPort := AnsiString( IntToStr( FPort));
FServer.LocalPort := IntToStr( FPort);
+ FServer := TServerSocket.Create(APort, AClientTimeout, AClientTimeout);
+ FUseBufferedSocket := AUseBufferedSockets;
+ FOwnsServer := True;
destructor TServerSocketImpl.Destroy;
@@ -580,7 +621,11 @@
function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
- client : TThriftCustomIpClient;
+ client : TCustomIpClient;
+ client: TSocket;
trans : IStreamTransport;
if FServer = nil then begin
@@ -588,9 +633,10 @@
'No underlying server socket.');
client := nil;
- client := TThriftCustomIpClient.Create(nil);
+ client := TCustomIpClient.Create(nil);
if Assigned(fnAccepting)
then fnAccepting();
@@ -619,35 +665,62 @@
raise TTransportException.Create( E.ToString );
+ if Assigned(fnAccepting) then
+ fnAccepting();
+ client := FServer.Accept;
+ try
+ trans := TSocketImpl.Create(client, True);
+ client := nil;
+ if FUseBufferedSocket then
+ Result := TBufferedTransportImpl.Create(trans)
+ else
+ Result := trans;
+ except
+ client.Free;
+ raise;
+ end;
procedure TServerSocketImpl.Listen;
if FServer <> nil then
FServer.Active := True;
- on E: Exception
+ on E: Exception
do raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
+ FServer.Listen;
procedure TServerSocketImpl.Close;
- if FServer <> nil
- then try
- FServer.Active := False;
- except
- on E: Exception
- do raise TTransportException.Create('Error on closing socket : ' + E.Message);
- end;
+ if FServer <> nil then
+ try
+ FServer.Active := False;
+ except
+ on E: Exception
+ do raise TTransportException.Create('Error on closing socket : ' + E.Message);
+ end;
+ FServer.Close;
{ TSocket }
-constructor TSocketImpl.Create( const AClient : TThriftCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
+constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
var stream : IThriftStream;
FClient := AClient;
@@ -656,8 +729,23 @@
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
inherited Create( stream, stream);
+constructor TSocketImpl.Create(const AClient: TSocket; aOwnsClient: Boolean);
+var stream : IThriftStream;
+ FClient := AClient;
+ FTimeout := AClient.RecvTimeout;
+ FOwnsClient := aOwnsClient;
+ stream := TTcpSocketStreamImpl.Create(FClient, FTimeout);
+ inherited Create(stream, stream);
constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
+constructor TSocketImpl.Create(const AHost: string; APort: Integer; ATimeout: Longword);
inherited Create(nil,nil);
FHost := AHost;
@@ -682,7 +770,11 @@
function TSocketImpl.GetIsOpen: Boolean;
Result := (FClient <> nil) and FClient.Connected;
+ Result := (FClient <> nil) and FClient.IsOpen
procedure TSocketImpl.InitSocket;
@@ -693,7 +785,11 @@
then FreeAndNil( FClient)
else FClient := nil;
- FClient := TThriftTcpClient.Create( nil);
+ FClient := TTcpClient.Create( nil);
+ FClient := TSocket.Create(FHost, FPort);
FOwnsClient := True;
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
@@ -721,14 +817,17 @@
if FClient = nil
then InitSocket;
FClient.RemoteHost := TSocketHost( Host);
FClient.RemotePort := TSocketPort( IntToStr( Port));
+ FClient.Open;
FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
FOutputStream := FInputStream;
{ TBufferedStream }
@@ -1140,18 +1239,30 @@
{ TTcpSocketStreamImpl }
procedure TTcpSocketStreamImpl.Close;
-constructor TTcpSocketStreamImpl.Create( const ATcpClient: TThriftCustomIpClient; const aTimeout : Integer);
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
inherited Create;
FTcpClient := ATcpClient;
FTimeout := aTimeout;
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TSocket; const aTimeout : Longword);
+ inherited Create;
+ FTcpClient := ATcpClient;
+ if aTimeout = 0 then
+ FTcpClient.RecvTimeout := SLEEP_TIME
+ else
+ FTcpClient.RecvTimeout := aTimeout;
+ FTcpClient.SendTimeout := aTimeout;
procedure TTcpSocketStreamImpl.Flush;
@@ -1160,7 +1271,11 @@
function TTcpSocketStreamImpl.IsOpen: Boolean;
Result := FTcpClient.Active;
+ Result := FTcpClient.IsOpen;
procedure TTcpSocketStreamImpl.Open;
@@ -1169,6 +1284,7 @@
function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
TimeOut: Integer; var wsaError : Integer): Integer;
@@ -1254,7 +1370,9 @@
if Assigned(ExceptFlag) then
ExceptFlag^ := FD_ISSET(socket, ExceptFds);
function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
DesiredBytes : Integer;
var wsaError, bytesReady : Integer): TWaitForData;
@@ -1286,12 +1404,16 @@
bytesReady := Min( retval, DesiredBytes);
result := TWaitForData.wfd_HaveData;
function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
+// old sockets version
var wfd : TWaitForData;
- wsaError, nBytes : Integer;
- pDest : PByte;
+ wsaError,
msecs : Integer;
+ nBytes : Integer;
+ pDest : PByte;
@@ -1337,8 +1459,8 @@
function TTcpSocketStreamImpl.ToArray: TBytes;
- len : Integer;
+// old sockets version
+var len : Integer;
len := 0;
if IsOpen then begin
@@ -1353,6 +1475,7 @@
procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
+// old sockets version
var bCanWrite, bError : Boolean;
retval, wsaError : Integer;
@@ -1378,8 +1501,60 @@
FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
+function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
+// new sockets version
+var nBytes : Integer;
+ pDest : PByte;
+ inherited;
+ result := 0;
+ pDest := Pointer(@buffer[offset]);
+ while count > 0 do begin
+ nBytes := FTcpClient.Read(pDest^, count);
+ if nBytes = 0 then Exit;
+ Inc( pDest, nBytes);
+ Dec( count, nBytes);
+ Inc( result, nBytes);
+ end;
+function TTcpSocketStreamImpl.ToArray: TBytes;
+// new sockets version
+var len : Integer;
+ len := 0;
+ try
+ if FTcpClient.Peek then
+ repeat
+ SetLength(Result, Length(Result) + 1024);
+ len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
+ until len < 1024;
+ except
+ on TTransportException do begin { don't allow default exceptions } end;
+ else raise;
+ end;
+ if len > 0 then
+ SetLength(Result, Length(Result) - 1024 + len);
+procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
+// new sockets version
+ inherited;
+ if not FTcpClient.IsOpen
+ then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
+ FTcpClient.Write(buffer[offset], count);
{$IF CompilerVersion < 21.0}
diff --git a/lib/delphi/test/TestClient.pas b/lib/delphi/test/TestClient.pas
@@ -308,11 +308,7 @@
case endpoint of
trns_Sockets: begin
Console.WriteLine('Using sockets ('+host+' port '+IntToStr(port)+')');
streamtrans := TSocketImpl.Create( host, port );
- {$ELSE}
- raise Exception.Create(ENDPOINT_TRANSPORTS[endpoint]+' transport not implemented');
- {$ENDIF}
trns_Http: begin
diff --git a/lib/delphi/test/TestServer.pas b/lib/delphi/test/TestServer.pas
index e3576dd..d7917ca 100644
--- a/lib/delphi/test/TestServer.pas
+++ b/lib/delphi/test/TestServer.pas
@@ -658,11 +658,7 @@
trns_Sockets : begin
Console.WriteLine('- sockets (port '+IntToStr(port)+')');
if (trns_Buffered in layered) then Console.WriteLine('- buffered');
servertrans := TServerSocketImpl.Create( Port, 0, (trns_Buffered in layered));
- {$ELSE}
- raise Exception.Create(ENDPOINT_TRANSPORTS[endpoint]+' server transport not implemented');
- {$ENDIF}
trns_Http : begin
@@ -28,6 +28,7 @@
Thrift.Test, // in 'gen-delphi\Thrift.Test.pas',
Thrift in '..\src\Thrift.pas',
Thrift.Transport in '..\src\Thrift.Transport.pas',
+ Thrift.Socket in '..\src\Thrift.Socket.pas',
Thrift.Transport.Pipes in '..\src\Thrift.Transport.Pipes.pas',
Thrift.Protocol in '..\src\Thrift.Protocol.pas',
Thrift.Protocol.JSON in '..\src\Thrift.Protocol.JSON.pas',
@@ -26,6 +26,7 @@
Multiplex.Client.Main in 'Multiplex.Client.Main.pas',
Thrift in '..\..\src\Thrift.pas',
+ Thrift.Socket in '..\..\src\Thrift.Socket.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
@@ -25,6 +25,7 @@
Multiplex.Server.Main in 'Multiplex.Server.Main.pas',
Thrift in '..\..\src\Thrift.pas',
+ Thrift.Socket in '..\..\src\Thrift.Socket.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
Thrift.Transport.Pipes in '..\..\src\Thrift.Transport.Pipes.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
@@ -24,6 +24,7 @@
Classes, Windows, SysUtils, Generics.Collections,
Thrift in '..\..\src\Thrift.pas',
+ Thrift.Socket in '..\..\src\Thrift.Socket.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas',
@@ -28,6 +28,7 @@
Thrift.Test, // in gen-delphi folder
Thrift in '..\src\Thrift.pas',
Thrift.Transport in '..\src\Thrift.Transport.pas',
+ Thrift.Socket in '..\src\Thrift.Socket.pas',
Thrift.Transport.Pipes in '..\src\Thrift.Transport.Pipes.pas',
Thrift.Protocol in '..\src\Thrift.Protocol.pas',
Thrift.Protocol.JSON in '..\src\Thrift.Protocol.JSON.pas',
@@ -25,6 +25,7 @@
Classes, Windows, SysUtils,
Thrift in '..\..\src\Thrift.pas',
+ Thrift.Socket in '..\..\src\Thrift.Socket.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas',
@@ -25,6 +25,7 @@
Classes, Windows, SysUtils,
Thrift in '..\..\src\Thrift.pas',
+ Thrift.Socket in '..\..\src\Thrift.Socket.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas',
@@ -25,6 +25,7 @@
Classes, Windows, SysUtils, Generics.Collections, TypInfo,
Thrift in '..\..\src\Thrift.pas',
Thrift.Transport in '..\..\src\Thrift.Transport.pas',
+ Thrift.Socket in '..\..\src\Thrift.Socket.pas',
Thrift.Protocol in '..\..\src\Thrift.Protocol.pas',
Thrift.Protocol.JSON in '..\..\src\Thrift.Protocol.JSON.pas',
Thrift.Collections in '..\..\src\Thrift.Collections.pas',