THRIFT-2768: Whitespace Fixup
Client: C#, Delphi
Patch: Jens Geyer
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index bc66c64..96735ec 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -1,1389 +1,1389 @@
-(*

- * Licensed to the Apache Software Foundation (ASF) under one

- * or more contributor license agreements. See the NOTICE file

- * distributed with this work for additional information

- * regarding copyright ownership. The ASF licenses this file

- * to you under the Apache License, Version 2.0 (the

- * "License"); you may not use this file except in compliance

- * with the License. You may obtain a copy of the License at

- *

- *   http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing,

- * software distributed under the License is distributed on an

- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

- * KIND, either express or implied. See the License for the

- * specific language governing permissions and limitations

- * under the License.

- *)

-

- {$SCOPEDENUMS ON}

-

-unit Thrift.Transport;

-

-interface

-

-uses

-  Classes,

-  SysUtils,

-  Math,

-  Sockets, WinSock,

-  Generics.Collections,

-  Thrift.Collections,

-  Thrift.Utils,

-  Thrift.Stream,

-  ActiveX,

-  msxml;

-

-type

-  ITransport = interface

-    ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']

-    function GetIsOpen: Boolean;

-    property IsOpen: Boolean read GetIsOpen;

-    function Peek: Boolean;

-    procedure Open;

-    procedure Close;

-    function Read(var buf: TBytes; off: Integer; len: Integer): Integer;

-    function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;

-    procedure Write( const buf: TBytes); overload;

-    procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;

-    procedure Flush;

-  end;

-

-  TTransportImpl = class( TInterfacedObject, ITransport)

-  protected

-    function GetIsOpen: Boolean; virtual; abstract;

-    property IsOpen: Boolean read GetIsOpen;

-    function Peek: Boolean; virtual;

-    procedure Open(); virtual; abstract;

-    procedure Close(); virtual; abstract;

-    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;

-    function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;

-    procedure Write( const buf: TBytes); overload; virtual;

-    procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;

-    procedure Flush; virtual;

-  end;

-

-  TTransportException = class( Exception )

-  public

-    type

-      TExceptionType = (

-        Unknown,

-        NotOpen,

-        AlreadyOpen,

-        TimedOut,

-        EndOfFile

-      );

-  private

-    FType : TExceptionType;

-  public

-    constructor Create( AType: TExceptionType); overload;

-    constructor Create( const msg: string); overload;

-    constructor Create( AType: TExceptionType; const msg: string); overload;

-    property Type_: TExceptionType read FType;

-  end;

-

-  IHTTPClient = interface( ITransport )

-    ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']

-    procedure SetConnectionTimeout(const Value: Integer);

-    function GetConnectionTimeout: Integer;

-    procedure SetReadTimeout(const Value: Integer);

-    function GetReadTimeout: Integer;

-    function GetCustomHeaders: IThriftDictionary<string,string>;

-    procedure SendRequest;

-    property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;

-    property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;

-    property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;

-  end;

-

-  THTTPClientImpl = class( TTransportImpl, IHTTPClient)

-  private

-    FUri : string;

-    FInputStream : IThriftStream;

-    FOutputStream : IThriftStream;

-    FConnectionTimeout : Integer;

-    FReadTimeout : Integer;

-    FCustomHeaders : IThriftDictionary<string,string>;

-

-    function CreateRequest: IXMLHTTPRequest;

-  protected

-    function GetIsOpen: Boolean; override;

-    procedure Open(); override;

-    procedure Close(); override;

-    function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;

-    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;

-    procedure Flush; override;

-

-    procedure SetConnectionTimeout(const Value: Integer);

-    function GetConnectionTimeout: Integer;

-    procedure SetReadTimeout(const Value: Integer);

-    function GetReadTimeout: Integer;

-    function GetCustomHeaders: IThriftDictionary<string,string>;

-    procedure SendRequest;

-    property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;

-    property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;

-    property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;

-  public

-    constructor Create( const AUri: string);

-    destructor Destroy; override;

-  end;

-

-  IServerTransport = interface

-    ['{C43B87ED-69EA-47C4-B77C-15E288252900}']

-    procedure Listen;

-    procedure Close;

-    function Accept( const fnAccepting: TProc): ITransport;

-  end;

-

-  TServerTransportImpl = class( TInterfacedObject, IServerTransport)

-  protected

-    procedure Listen; virtual; abstract;

-    procedure Close; virtual; abstract;

-    function Accept( const fnAccepting: TProc): ITransport;  virtual; abstract;

-  end;

-

-  ITransportFactory = interface

-    ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']

-    function GetTransport( const ATrans: ITransport): ITransport;

-  end;

-

-  TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)

-    function GetTransport( const ATrans: ITransport): ITransport; virtual;

-  end;

-

-  TTcpSocketStreamImpl = class( TThriftStreamImpl )

-  private type

-    TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);

-  private

-    FTcpClient : TCustomIpClient;

-    FTimeout : Integer;

-    function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;

-                     TimeOut: Integer; var wsaError : Integer): Integer;

-    function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;

-                          var wsaError : Integer): TWaitForData;

-  protected

-    procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;

-    function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;

-    procedure Open; override;

-    procedure Close; override;

-    procedure Flush; override;

-

-    function IsOpen: Boolean; override;

-    function ToArray: TBytes; override;

-  public

-    constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);

-  end;

-

-  IStreamTransport = interface( ITransport )

-    ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']

-    function GetInputStream: IThriftStream;

-    function GetOutputStream: IThriftStream;

-    property InputStream : IThriftStream read GetInputStream;

-    property OutputStream : IThriftStream read GetOutputStream;

-  end;

-

-  TStreamTransportImpl = class( TTransportImpl, IStreamTransport)

-  protected

-    FInputStream : IThriftStream;

-    FOutputStream : IThriftStream;

-  protected

-    function GetIsOpen: Boolean; override;

-

-    function GetInputStream: IThriftStream;

-    function GetOutputStream: IThriftStream;

-  public

-    property InputStream : IThriftStream read GetInputStream;

-    property OutputStream : IThriftStream read GetOutputStream;

-

-    procedure Open; override;

-    procedure Close; override;

-    procedure Flush; override;

-    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;

-    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;

-    constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);

-    destructor Destroy; override;

-  end;

-

-  TBufferedStreamImpl = class( TThriftStreamImpl)

-  private

-    FStream : IThriftStream;

-    FBufSize : Integer;

-    FReadBuffer : TMemoryStream;

-    FWriteBuffer : TMemoryStream;

-  protected

-    procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;

-    function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;

-    procedure Open;  override;

-    procedure Close; override;

-    procedure Flush; override;

-    function IsOpen: Boolean; override;

-    function ToArray: TBytes; override;

-  public

-    constructor Create( const AStream: IThriftStream; ABufSize: Integer);

-    destructor Destroy; override;

-  end;

-

-  TServerSocketImpl = class( TServerTransportImpl)

-  private

-    FServer : TTcpServer;

-    FPort : Integer;

-    FClientTimeout : Integer;

-    FUseBufferedSocket : Boolean;

-    FOwnsServer : Boolean;

-  protected

-    function Accept( const fnAccepting: TProc) : ITransport; override;

-  public

-    constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;

-    constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;

-    destructor Destroy; override;

-    procedure Listen; override;

-    procedure Close; override;

-  end;

-

-  TBufferedTransportImpl = class( TTransportImpl )

-  private

-    FInputBuffer : IThriftStream;

-    FOutputBuffer : IThriftStream;

-    FTransport : IStreamTransport;

-    FBufSize : Integer;

-

-    procedure InitBuffers;

-    function GetUnderlyingTransport: ITransport;

-  protected

-    function GetIsOpen: Boolean; override;

-    procedure Flush; override;

-  public

-    procedure Open(); override;

-    procedure Close(); override;

-    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;

-    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;

-    constructor Create( const ATransport : IStreamTransport ); overload;

-    constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;

-    property UnderlyingTransport: ITransport read GetUnderlyingTransport;

-    property IsOpen: Boolean read GetIsOpen;

-  end;

-

-  TSocketImpl = class(TStreamTransportImpl)

-  private

-    FClient : TCustomIpClient;

-    FOwnsClient : Boolean;

-    FHost : string;

-    FPort : Integer;

-    FTimeout : Integer;

-

-    procedure InitSocket;

-  protected

-    function GetIsOpen: Boolean; override;

-  public

-    procedure Open; override;

-    constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;

-    constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;

-    destructor Destroy; override;

-    procedure Close; override;

-    property TcpClient: TCustomIpClient read FClient;

-    property Host : string read FHost;

-    property Port: Integer read FPort;

-  end;

-

-  TFramedTransportImpl = class( TTransportImpl)

-  private const

-    FHeaderSize : Integer = 4;

-  private class var

-    FHeader_Dummy : array of Byte;

-  protected

-    FTransport : ITransport;

-    FWriteBuffer : TMemoryStream;

-    FReadBuffer : TMemoryStream;

-

-    procedure InitWriteBuffer;

-    procedure ReadFrame;

-  public

-    type

-      TFactory = class( TTransportFactoryImpl )

-      public

-        function GetTransport( const ATrans: ITransport): ITransport; override;

-      end;

-

-{$IF CompilerVersion >= 21.0}

-    class constructor Create;

-{$IFEND}

-    constructor Create; overload;

-    constructor Create( const ATrans: ITransport); overload;

-    destructor Destroy; override;

-

-    procedure Open(); override;

-    function GetIsOpen: Boolean; override;

-

-    procedure Close(); override;

-    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;

-    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;

-    procedure Flush; override;

-  end;

-

-{$IF CompilerVersion < 21.0}

-procedure TFramedTransportImpl_Initialize;

-{$IFEND}

-

-const

-  DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms

-

-

-implementation

-

-{ TTransportImpl }

-

-procedure TTransportImpl.Flush;

-begin

-

-end;

-

-function TTransportImpl.Peek: Boolean;

-begin

-  Result := IsOpen;

-end;

-

-function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;

-var

-  got : Integer;

-  ret : Integer;

-begin

-  got := 0;

-  while ( got < len) do

-  begin

-    ret := Read( buf, off + got, len - got);

-    if ( ret <= 0 ) then

-    begin

-      raise TTransportException.Create( 'Cannot read, Remote side has closed' );

-    end;

-    got := got + ret;

-  end;

-  Result := got;

-end;

-

-procedure TTransportImpl.Write( const buf: TBytes);

-begin

-  Self.Write( buf, 0, Length(buf) );

-end;

-

-{ THTTPClientImpl }

-

-procedure THTTPClientImpl.Close;

-begin

-  FInputStream := nil;

-  FOutputStream := nil;

-end;

-

-constructor THTTPClientImpl.Create(const AUri: string);

-begin

-  inherited Create;

-  FUri := AUri;

-  FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;

-  FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);

-end;

-

-function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;

-var

-  pair : TPair<string,string>;

-begin

-{$IF CompilerVersion >= 21.0}

-  Result := CoXMLHTTP.Create;

-{$ELSE}

-  Result := CoXMLHTTPRequest.Create;

-{$IFEND}

-

-  Result.open('POST', FUri, False, '', '');

-  Result.setRequestHeader( 'Content-Type', 'application/x-thrift');

-  Result.setRequestHeader( 'Accept', 'application/x-thrift');

-  Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');

-

-  for pair in FCustomHeaders do

-  begin

-    Result.setRequestHeader( pair.Key, pair.Value );

-  end;

-end;

-

-destructor THTTPClientImpl.Destroy;

-begin

-  Close;

-  inherited;

-end;

-

-procedure THTTPClientImpl.Flush;

-begin

-  try

-    SendRequest;

-  finally

-    FOutputStream := nil;

-    FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);

-  end;

-end;

-

-function THTTPClientImpl.GetConnectionTimeout: Integer;

-begin

-  Result := FConnectionTimeout;

-end;

-

-function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;

-begin

-  Result := FCustomHeaders;

-end;

-

-function THTTPClientImpl.GetIsOpen: Boolean;

-begin

-  Result := True;

-end;

-

-function THTTPClientImpl.GetReadTimeout: Integer;

-begin

-  Result := FReadTimeout;

-end;

-

-procedure THTTPClientImpl.Open;

-begin

-

-end;

-

-function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;

-begin

-  if FInputStream = nil then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,

-      'No request has been sent');

-  end;

-  try

-    Result := FInputStream.Read( buf, off, len )

-  except

-    on E: Exception do

-    begin

-      raise TTransportException.Create( TTransportException.TExceptionType.Unknown,

-        E.Message);

-    end;

-  end;

-end;

-

-procedure THTTPClientImpl.SendRequest;

-var

-  xmlhttp : IXMLHTTPRequest;

-  ms : TMemoryStream;

-  a : TBytes;

-  len : Integer;

-begin

-  xmlhttp := CreateRequest;

-

-  ms := TMemoryStream.Create;

-  try

-    a := FOutputStream.ToArray;

-    len := Length(a);

-    if len > 0 then

-    begin

-      ms.WriteBuffer( Pointer(@a[0])^, len);

-    end;

-    ms.Position := 0;

-    xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));

-    FInputStream := nil;

-    FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);

-  finally

-    ms.Free;

-  end;

-end;

-

-procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);

-begin

-  FConnectionTimeout := Value;

-end;

-

-procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);

-begin

-  FReadTimeout := Value

-end;

-

-procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);

-begin

-  FOutputStream.Write( buf, off, len);

-end;

-

-{ TTransportException }

-

-constructor TTransportException.Create(AType: TExceptionType);

-begin

-  //no inherited;  

-  Create( AType, '' )

-end;

-

-constructor TTransportException.Create(AType: TExceptionType;

-  const msg: string);

-begin

-  inherited Create(msg);

-  FType := AType;

-end;

-

-constructor TTransportException.Create(const msg: string);

-begin

-  inherited Create(msg);

-end;

-

-{ TTransportFactoryImpl }

-

-function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;

-begin

-  Result := ATrans;

-end;

-

-{ TServerSocket }

-

-constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);

-begin

-  inherited Create;

-  FServer := AServer;

-  FClientTimeout := AClientTimeout;

-end;

-

-constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);

-begin

-  inherited Create;

-  FPort := APort;

-  FClientTimeout := AClientTimeout;

-  FUseBufferedSocket := AUseBufferedSockets;

-  FOwnsServer := True;

-  FServer := TTcpServer.Create( nil );

-  FServer.BlockMode := bmBlocking;

-{$IF CompilerVersion >= 21.0}

-  FServer.LocalPort := AnsiString( IntToStr( FPort));

-{$ELSE}

-  FServer.LocalPort := IntToStr( FPort);

-{$IFEND}

-end;

-

-destructor TServerSocketImpl.Destroy;

-begin

-  if FOwnsServer then begin

-    FServer.Free;

-    FServer := nil;

-  end;

-  inherited;

-end;

-

-function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;

-var

-  client : TCustomIpClient;

-  trans  : IStreamTransport;

-begin

-  if FServer = nil then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,

-      'No underlying server socket.');

-  end;

-

-  client := nil;

-  try

-    client := TCustomIpClient.Create(nil);

-

-    if Assigned(fnAccepting)

-    then fnAccepting();

-

-    if not FServer.Accept( client) then

-    begin

-      client.Free;

-      Result := nil;

-      Exit;

-    end;

-

-    if client = nil then

-    begin

-      Result := nil;

-      Exit;

-    end;

-

-    trans := TSocketImpl.Create( client, TRUE, FClientTimeout);

-    client := nil;  // trans owns it now

-

-    if FUseBufferedSocket

-    then result := TBufferedTransportImpl.Create( trans)

-    else result := trans;

-

-  except

-    on E: Exception do begin

-      client.Free;

-      raise TTransportException.Create( E.ToString );

-    end;

-  end;

-end;

-

-procedure TServerSocketImpl.Listen;

-begin

-  if FServer <> nil then

-  begin

-    try

-      FServer.Active := True;

-    except

-      on E: Exception do

-      begin

-        raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);

-      end;

-    end;

-  end;

-end;

-

-procedure TServerSocketImpl.Close;

-begin

-  if FServer <> nil then

-  begin

-    try

-      FServer.Active := False;

-    except

-      on E: Exception do

-      begin

-        raise TTransportException.Create('Error on closing socket : ' + E.Message);

-      end;

-    end;

-  end;

-end;

-

-{ TSocket }

-

-constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);

-var stream : IThriftStream;

-begin

-  FClient := AClient;

-  FTimeout := ATimeout;

-  FOwnsClient := aOwnsClient;

-  stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);

-  inherited Create( stream, stream);

-end;

-

-constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);

-begin

-  inherited Create(nil,nil);

-  FHost := AHost;

-  FPort := APort;

-  FTimeout := ATimeout;

-  InitSocket;

-end;

-

-destructor TSocketImpl.Destroy;

-begin

-  if FOwnsClient

-  then FreeAndNil( FClient);

-  inherited;

-end;

-

-procedure TSocketImpl.Close;

-begin

-  inherited Close;

-  if FOwnsClient

-  then FreeAndNil( FClient);

-end;

-

-function TSocketImpl.GetIsOpen: Boolean;

-begin

-  Result := (FClient <> nil) and FClient.Connected;

-end;

-

-procedure TSocketImpl.InitSocket;

-var

-  stream : IThriftStream;

-begin

-  if FOwnsClient

-  then FreeAndNil( FClient)

-  else FClient := nil;

-

-  FClient := TTcpClient.Create( nil);

-  FOwnsClient := True;

-

-  stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);

-  FInputStream := stream;

-  FOutputStream := stream;

-end;

-

-procedure TSocketImpl.Open;

-begin

-  if IsOpen then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,

-      'Socket already connected');

-  end;

-

-  if FHost =  '' then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,

-      'Cannot open null host');

-  end;

-

-  if Port <= 0 then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,

-      'Cannot open without port');

-  end;

-

-  if FClient = nil then

-  begin

-    InitSocket;

-  end;

-

-  FClient.RemoteHost := TSocketHost( Host);

-  FClient.RemotePort := TSocketPort( IntToStr( Port));

-  FClient.Connect;

-

-  FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);

-  FOutputStream := FInputStream;

-end;

-

-{ TBufferedStream }

-

-procedure TBufferedStreamImpl.Close;

-begin

-  Flush;

-  FStream := nil;

-

-  FReadBuffer.Free;

-  FReadBuffer := nil;

-

-  FWriteBuffer.Free;

-  FWriteBuffer := nil;

-end;

-

-constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);

-begin

-  inherited Create;

-  FStream := AStream;

-  FBufSize := ABufSize;

-  FReadBuffer := TMemoryStream.Create;

-  FWriteBuffer := TMemoryStream.Create;

-end;

-

-destructor TBufferedStreamImpl.Destroy;

-begin

-  Close;

-  inherited;

-end;

-

-procedure TBufferedStreamImpl.Flush;

-var

-  buf : TBytes;

-  len : Integer;

-begin

-  if IsOpen then

-  begin

-    len := FWriteBuffer.Size;

-    if len > 0 then

-    begin

-      SetLength( buf, len );

-      FWriteBuffer.Position := 0;

-      FWriteBuffer.Read( Pointer(@buf[0])^, len );

-      FStream.Write( buf, 0, len );

-    end;

-    FWriteBuffer.Clear;

-  end;

-end;

-

-function TBufferedStreamImpl.IsOpen: Boolean;

-begin

-  Result := (FWriteBuffer <> nil)

-        and (FReadBuffer <> nil)

-        and (FStream <> nil);

-end;

-

-procedure TBufferedStreamImpl.Open;

-begin

-

-end;

-

-function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;

-var

-  nRead : Integer;

-  tempbuf : TBytes;

-begin

-  inherited;

-  Result := 0;

-  if IsOpen then

-  begin

-    while count > 0 do begin

-

-      if FReadBuffer.Position >= FReadBuffer.Size then

-      begin

-        FReadBuffer.Clear;

-        SetLength( tempbuf, FBufSize);

-        nRead := FStream.Read( tempbuf, 0, FBufSize );

-        if nRead = 0 then Break; // avoid infinite loop

-

-        FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );

-        FReadBuffer.Position := 0;

-      end;

-

-      if FReadBuffer.Position < FReadBuffer.Size then

-      begin

-        nRead  := Min( FReadBuffer.Size - FReadBuffer.Position, count);

-        Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));

-        Dec( count, nRead);

-        Inc( offset, nRead);

-      end;

-    end;

-  end;

-end;

-

-function TBufferedStreamImpl.ToArray: TBytes;

-var

-  len : Integer;

-begin

-  len := 0;

-

-  if IsOpen then

-  begin

-    len := FReadBuffer.Size;

-  end;

-

-  SetLength( Result, len);

-

-  if len > 0 then

-  begin

-    FReadBuffer.Position := 0;

-    FReadBuffer.Read( Pointer(@Result[0])^, len );

-  end;

-end;

-

-procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);

-begin

-  inherited;

-  if count > 0 then

-  begin

-    if IsOpen then

-    begin

-      FWriteBuffer.Write( Pointer(@buffer[offset])^, count );

-      if FWriteBuffer.Size > FBufSize then

-      begin

-        Flush;

-      end;

-    end;

-  end;

-end;

-

-{ TStreamTransportImpl }

-

-procedure TStreamTransportImpl.Close;

-begin

-  if FInputStream <> FOutputStream then

-  begin

-    if FInputStream <> nil then

-    begin

-      FInputStream := nil;

-    end;

-    if FOutputStream <> nil then

-    begin

-      FOutputStream := nil;

-    end;

-  end else

-  begin

-    FInputStream := nil;

-    FOutputStream := nil;

-  end;

-end;

-

-constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);

-begin

-  inherited Create;

-  FInputStream := AInputStream;

-  FOutputStream := AOutputStream;

-end;

-

-destructor TStreamTransportImpl.Destroy;

-begin

-  FInputStream := nil;

-  FOutputStream := nil;

-  inherited;

-end;

-

-procedure TStreamTransportImpl.Flush;

-begin

-  if FOutputStream = nil then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );

-  end;

-

-  FOutputStream.Flush;

-end;

-

-function TStreamTransportImpl.GetInputStream: IThriftStream;

-begin

-  Result := FInputStream;

-end;

-

-function TStreamTransportImpl.GetIsOpen: Boolean;

-begin

-  Result := True;

-end;

-

-function TStreamTransportImpl.GetOutputStream: IThriftStream;

-begin

-  Result := FInputStream;

-end;

-

-procedure TStreamTransportImpl.Open;

-begin

-

-end;

-

-function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;

-begin

-  if FInputStream = nil then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );

-  end;

-  Result := FInputStream.Read( buf, off, len );

-end;

-

-procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);

-begin

-  if FOutputStream = nil then

-  begin

-    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );

-  end;

-

-  FOutputStream.Write( buf, off, len );

-end;

-

-{ TBufferedTransportImpl }

-

-constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);

-begin

-  //no inherited;  

-  Create( ATransport, 1024 );

-end;

-

-procedure TBufferedTransportImpl.Close;

-begin

-  FTransport.Close;

-end;

-

-constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;

-  ABufSize: Integer);

-begin

-  inherited Create;

-  FTransport := ATransport;

-  FBufSize := ABufSize;

-  InitBuffers;

-end;

-

-procedure TBufferedTransportImpl.Flush;

-begin

-  if FOutputBuffer <> nil then

-  begin

-    FOutputBuffer.Flush;

-  end;

-end;

-

-function TBufferedTransportImpl.GetIsOpen: Boolean;

-begin

-  Result := FTransport.IsOpen;

-end;

-

-function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;

-begin

-  Result := FTransport;

-end;

-

-procedure TBufferedTransportImpl.InitBuffers;

-begin

-  if FTransport.InputStream <> nil then

-  begin

-    FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );

-  end;

-  if FTransport.OutputStream <> nil then

-  begin

-    FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );

-  end;

-end;

-

-procedure TBufferedTransportImpl.Open;

-begin

-  FTransport.Open

-end;

-

-function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;

-begin

-  Result := 0;

-  if FInputBuffer <> nil then

-  begin

-    Result := FInputBuffer.Read( buf, off, len );

-  end;

-end;

-

-procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);

-begin

-  if FOutputBuffer <> nil then

-  begin

-    FOutputBuffer.Write( buf, off, len );

-  end;

-end;

-

-{ TFramedTransportImpl }

-

-{$IF CompilerVersion < 21.0}

-procedure TFramedTransportImpl_Initialize;

-begin

-  SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);

-  FillChar( TFramedTransportImpl.FHeader_Dummy[0],

-    Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);

-end;

-{$ELSE}

-class constructor TFramedTransportImpl.Create;

-begin

-  SetLength( FHeader_Dummy, FHeaderSize);

-  FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);

-end;

-{$IFEND}

-

-constructor TFramedTransportImpl.Create;

-begin

-  inherited Create;

-  InitWriteBuffer;

-end;

-

-procedure TFramedTransportImpl.Close;

-begin

-  FTransport.Close;

-end;

-

-constructor TFramedTransportImpl.Create( const ATrans: ITransport);

-begin

-  inherited Create;

-  InitWriteBuffer;

-  FTransport := ATrans;

-end;

-

-destructor TFramedTransportImpl.Destroy;

-begin

-  FWriteBuffer.Free;

-  FReadBuffer.Free;

-  inherited;

-end;

-

-procedure TFramedTransportImpl.Flush;

-var

-  buf : TBytes;

-  len : Integer;

-  data_len : Integer;

-

-begin

-  len := FWriteBuffer.Size;

-  SetLength( buf, len);

-  if len > 0 then

-  begin

-    System.Move( FWriteBuffer.Memory^, buf[0], len );

-  end;

-

-  data_len := len - FHeaderSize;

-  if (data_len < 0) then

-  begin

-    raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );

-  end;

-

-  InitWriteBuffer;

-

-  buf[0] := Byte($FF and (data_len shr 24));

-  buf[1] := Byte($FF and (data_len shr 16));

-  buf[2] := Byte($FF and (data_len shr 8));

-  buf[3] := Byte($FF and data_len);

-

-  FTransport.Write( buf, 0, len );

-  FTransport.Flush;

-end;

-

-function TFramedTransportImpl.GetIsOpen: Boolean;

-begin

-  Result := FTransport.IsOpen;

-end;

-

-type

-  TAccessMemoryStream = class(TMemoryStream)

-  end;

-

-procedure TFramedTransportImpl.InitWriteBuffer;

-begin

-  FWriteBuffer.Free;

-  FWriteBuffer := TMemoryStream.Create;

-  TAccessMemoryStream(FWriteBuffer).Capacity := 1024;

-  FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);

-end;

-

-procedure TFramedTransportImpl.Open;

-begin

-  FTransport.Open;

-end;

-

-function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;

-var

-  got : Integer;

-begin

-  if FReadBuffer <> nil then

-  begin

-    if len > 0

-    then got := FReadBuffer.Read( Pointer(@buf[off])^, len )

-    else got := 0;

-    if got > 0 then

-    begin

-      Result := got;

-      Exit;

-    end;

-  end;

-

-  ReadFrame;

-  if len > 0

-  then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)

-  else Result := 0;

-end;

-

-procedure TFramedTransportImpl.ReadFrame;

-var

-  i32rd : TBytes;

-  size : Integer;

-  buff : TBytes;

-begin

-  SetLength( i32rd, FHeaderSize );

-  FTransport.ReadAll( i32rd, 0, FHeaderSize);

-  size :=

-    ((i32rd[0] and $FF) shl 24) or

-    ((i32rd[1] and $FF) shl 16) or

-    ((i32rd[2] and $FF) shl 8) or

-     (i32rd[3] and $FF);

-  SetLength( buff, size );

-  FTransport.ReadAll( buff, 0, size );

-  FReadBuffer.Free;

-  FReadBuffer := TMemoryStream.Create;

-  FReadBuffer.Write( Pointer(@buff[0])^, size );

-  FReadBuffer.Position := 0;

-end;

-

-procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);

-begin

-  if len > 0

-  then FWriteBuffer.Write( Pointer(@buf[off])^, len );

-end;

-

-{ TFramedTransport.TFactory }

-

-function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;

-begin

-  Result := TFramedTransportImpl.Create( ATrans );

-end;

-

-{ TTcpSocketStreamImpl }

-

-procedure TTcpSocketStreamImpl.Close;

-begin

-  FTcpClient.Close;

-end;

-

-constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);

-begin

-  inherited Create;

-  FTcpClient := ATcpClient;

-  FTimeout := aTimeout;

-end;

-

-procedure TTcpSocketStreamImpl.Flush;

-begin

-

-end;

-

-function TTcpSocketStreamImpl.IsOpen: Boolean;

-begin

-  Result := FTcpClient.Active;

-end;

-

-procedure TTcpSocketStreamImpl.Open;

-begin

-  FTcpClient.Open;

-end;

-

-

-function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;

-                                      TimeOut: Integer; var wsaError : Integer): Integer;

-var

-  ReadFds: TFDset;

-  ReadFdsptr: PFDset;

-  WriteFds: TFDset;

-  WriteFdsptr: PFDset;

-  ExceptFds: TFDset;

-  ExceptFdsptr: PFDset;

-  tv: timeval;

-  Timeptr: PTimeval;

-  socket : TSocket;

-begin

-  if not FTcpClient.Active then begin

-    wsaError := WSAEINVAL;

-    Exit( SOCKET_ERROR);

-  end;

-

-  socket := FTcpClient.Handle;

-

-  if Assigned(ReadReady) then

-  begin

-    ReadFdsptr := @ReadFds;

-    FD_ZERO(ReadFds);

-    FD_SET(socket, ReadFds);

-  end

-  else

-    ReadFdsptr := nil;

-

-  if Assigned(WriteReady) then

-  begin

-    WriteFdsptr := @WriteFds;

-    FD_ZERO(WriteFds);

-    FD_SET(socket, WriteFds);

-  end

-  else

-    WriteFdsptr := nil;

-

-  if Assigned(ExceptFlag) then

-  begin

-    ExceptFdsptr := @ExceptFds;

-    FD_ZERO(ExceptFds);

-    FD_SET(socket, ExceptFds);

-  end

-  else

-    ExceptFdsptr := nil;

-

-  if TimeOut >= 0 then

-  begin

-    tv.tv_sec := TimeOut div 1000;

-    tv.tv_usec :=  1000 * (TimeOut mod 1000);

-    Timeptr := @tv;

-  end

-  else

-    Timeptr := nil;  // wait forever

-

-  wsaError := 0;

-  try

-{$IFDEF MSWINDOWS}

-    result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);

-{$ENDIF}

-{$IFDEF LINUX}

-    result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);

-{$ENDIF}

-    if result = SOCKET_ERROR

-    then wsaError := WSAGetLastError;

-

-  except

-    result := SOCKET_ERROR;

-  end;

-

-  if Assigned(ReadReady) then

-    ReadReady^ := FD_ISSET(socket, ReadFds);

-  if Assigned(WriteReady) then

-    WriteReady^ := FD_ISSET(socket, WriteFds);

-  if Assigned(ExceptFlag) then

-    ExceptFlag^ := FD_ISSET(socket, ExceptFds);

-end;

-

-function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;

-                                           DesiredBytes : Integer;

-                                           var wsaError : Integer): TWaitForData;

-var bCanRead, bError : Boolean;

-    retval : Integer;

-begin

-  // The select function returns the total number of socket handles that are ready

-  // and contained in the fd_set structures, zero if the time limit expired,

-  // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,

-  // WSAGetLastError can be used to retrieve a specific error code.

-  retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);

-  if retval = SOCKET_ERROR

-  then Exit( TWaitForData.wfd_Error);

-  if (retval = 0) or not bCanRead

-  then Exit( TWaitForData.wfd_Timeout);

-

-  // recv() returns the number of bytes received, or -1 if an error occurred.

-  // The return value will be 0 when the peer has performed an orderly shutdown.

-  retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);

-  if retval <= 0

-  then Exit( TWaitForData.wfd_Error);

-

-  // Enough data ready to be read?

-  if retval = DesiredBytes

-  then result := TWaitForData.wfd_HaveData

-  else result := TWaitForData.wfd_Timeout;

-end;

-

-function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;

-var wfd : TWaitForData;

-    wsaError : Integer;

-    pDest : Pointer;

-const

-  SLEEP_TIME = 200;

-begin

-  inherited;

-

-  pDest := Pointer(@buffer[offset]);

-

-  while TRUE do begin

-    if FTimeout > 0

-    then wfd := WaitForData( FTimeout,   pDest, count, wsaError)

-    else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError);

-

-    case wfd of

-      TWaitForData.wfd_Error    :  Exit(0);

-      TWaitForData.wfd_HaveData :  Break;

-      TWaitForData.wfd_Timeout  :  begin

-        if (FTimeout > 0)

-        then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,

-                                               SysErrorMessage(Cardinal(wsaError)));

-      end;

-    else

-      ASSERT( FALSE);

-    end;

-  end;

-

-  Result := FTcpClient.ReceiveBuf( pDest^, count);

-end;

-

-function TTcpSocketStreamImpl.ToArray: TBytes;

-var

-  len : Integer;

-begin

-  len := 0;

-  if IsOpen then

-  begin

-    len := FTcpClient.BytesReceived;

-  end;

-

-  SetLength( Result, len );

-

-  if len > 0 then

-  begin

-    FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);

-  end;

-end;

-

-procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);

-var bCanWrite, bError : Boolean;

-    retval, wsaError : Integer;

-begin

-  inherited;

-

-  if not FTcpClient.Active

-  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);

-

-  // The select function returns the total number of socket handles that are ready

-  // and contained in the fd_set structures, zero if the time limit expired,

-  // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,

-  // WSAGetLastError can be used to retrieve a specific error code.

-  retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);

-  if retval = SOCKET_ERROR

-  then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,

-                                         SysErrorMessage(Cardinal(wsaError)));

-  if (retval = 0)

-  then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);

-  if bError or not bCanWrite

-  then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);

-

-  FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);

-end;

-

-{$IF CompilerVersion < 21.0}

-initialization

-begin

-  TFramedTransportImpl_Initialize;

-end;

-{$IFEND}

-

-

-end.

+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+ {$SCOPEDENUMS ON}
+
+unit Thrift.Transport;
+
+interface
+
+uses
+  Classes,
+  SysUtils,
+  Math,
+  Sockets, WinSock,
+  Generics.Collections,
+  Thrift.Collections,
+  Thrift.Utils,
+  Thrift.Stream,
+  ActiveX,
+  msxml;
+
+type
+  ITransport = interface
+    ['{A4A9FC37-D620-44DC-AD21-662D16364CE4}']
+    function GetIsOpen: Boolean;
+    property IsOpen: Boolean read GetIsOpen;
+    function Peek: Boolean;
+    procedure Open;
+    procedure Close;
+    function Read(var buf: TBytes; off: Integer; len: Integer): Integer;
+    function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
+    procedure Write( const buf: TBytes); overload;
+    procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
+    procedure Flush;
+  end;
+
+  TTransportImpl = class( TInterfacedObject, ITransport)
+  protected
+    function GetIsOpen: Boolean; virtual; abstract;
+    property IsOpen: Boolean read GetIsOpen;
+    function Peek: Boolean; virtual;
+    procedure Open(); virtual; abstract;
+    procedure Close(); virtual; abstract;
+    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; virtual; abstract;
+    function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; virtual;
+    procedure Write( const buf: TBytes); overload; virtual;
+    procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; virtual; abstract;
+    procedure Flush; virtual;
+  end;
+
+  TTransportException = class( Exception )
+  public
+    type
+      TExceptionType = (
+        Unknown,
+        NotOpen,
+        AlreadyOpen,
+        TimedOut,
+        EndOfFile
+      );
+  private
+    FType : TExceptionType;
+  public
+    constructor Create( AType: TExceptionType); overload;
+    constructor Create( const msg: string); overload;
+    constructor Create( AType: TExceptionType; const msg: string); overload;
+    property Type_: TExceptionType read FType;
+  end;
+
+  IHTTPClient = interface( ITransport )
+    ['{0F5DB8AB-710D-4338-AAC9-46B5734C5057}']
+    procedure SetConnectionTimeout(const Value: Integer);
+    function GetConnectionTimeout: Integer;
+    procedure SetReadTimeout(const Value: Integer);
+    function GetReadTimeout: Integer;
+    function GetCustomHeaders: IThriftDictionary<string,string>;
+    procedure SendRequest;
+    property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
+    property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
+    property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
+  end;
+
+  THTTPClientImpl = class( TTransportImpl, IHTTPClient)
+  private
+    FUri : string;
+    FInputStream : IThriftStream;
+    FOutputStream : IThriftStream;
+    FConnectionTimeout : Integer;
+    FReadTimeout : Integer;
+    FCustomHeaders : IThriftDictionary<string,string>;
+
+    function CreateRequest: IXMLHTTPRequest;
+  protected
+    function GetIsOpen: Boolean; override;
+    procedure Open(); override;
+    procedure Close(); override;
+    function Read( var buf: TBytes; off: Integer; len: Integer): Integer; override;
+    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
+    procedure Flush; override;
+
+    procedure SetConnectionTimeout(const Value: Integer);
+    function GetConnectionTimeout: Integer;
+    procedure SetReadTimeout(const Value: Integer);
+    function GetReadTimeout: Integer;
+    function GetCustomHeaders: IThriftDictionary<string,string>;
+    procedure SendRequest;
+    property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
+    property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
+    property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
+  public
+    constructor Create( const AUri: string);
+    destructor Destroy; override;
+  end;
+
+  IServerTransport = interface
+    ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
+    procedure Listen;
+    procedure Close;
+    function Accept( const fnAccepting: TProc): ITransport;
+  end;
+
+  TServerTransportImpl = class( TInterfacedObject, IServerTransport)
+  protected
+    procedure Listen; virtual; abstract;
+    procedure Close; virtual; abstract;
+    function Accept( const fnAccepting: TProc): ITransport;  virtual; abstract;
+  end;
+
+  ITransportFactory = interface
+    ['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
+    function GetTransport( const ATrans: ITransport): ITransport;
+  end;
+
+  TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
+    function GetTransport( const ATrans: ITransport): ITransport; virtual;
+  end;
+
+  TTcpSocketStreamImpl = class( TThriftStreamImpl )
+  private type
+    TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
+  private
+    FTcpClient : TCustomIpClient;
+    FTimeout : Integer;
+    function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
+                     TimeOut: Integer; var wsaError : Integer): Integer;
+    function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
+                          var wsaError : Integer): TWaitForData;
+  protected
+    procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
+    function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
+    procedure Open; override;
+    procedure Close; override;
+    procedure Flush; override;
+
+    function IsOpen: Boolean; override;
+    function ToArray: TBytes; override;
+  public
+    constructor Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+  end;
+
+  IStreamTransport = interface( ITransport )
+    ['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
+    function GetInputStream: IThriftStream;
+    function GetOutputStream: IThriftStream;
+    property InputStream : IThriftStream read GetInputStream;
+    property OutputStream : IThriftStream read GetOutputStream;
+  end;
+
+  TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
+  protected
+    FInputStream : IThriftStream;
+    FOutputStream : IThriftStream;
+  protected
+    function GetIsOpen: Boolean; override;
+
+    function GetInputStream: IThriftStream;
+    function GetOutputStream: IThriftStream;
+  public
+    property InputStream : IThriftStream read GetInputStream;
+    property OutputStream : IThriftStream read GetOutputStream;
+
+    procedure Open; override;
+    procedure Close; override;
+    procedure Flush; override;
+    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
+    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
+    constructor Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
+    destructor Destroy; override;
+  end;
+
+  TBufferedStreamImpl = class( TThriftStreamImpl)
+  private
+    FStream : IThriftStream;
+    FBufSize : Integer;
+    FReadBuffer : TMemoryStream;
+    FWriteBuffer : TMemoryStream;
+  protected
+    procedure Write( const buffer: TBytes; offset: Integer; count: Integer); override;
+    function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; override;
+    procedure Open;  override;
+    procedure Close; override;
+    procedure Flush; override;
+    function IsOpen: Boolean; override;
+    function ToArray: TBytes; override;
+  public
+    constructor Create( const AStream: IThriftStream; ABufSize: Integer);
+    destructor Destroy; override;
+  end;
+
+  TServerSocketImpl = class( TServerTransportImpl)
+  private
+    FServer : TTcpServer;
+    FPort : Integer;
+    FClientTimeout : Integer;
+    FUseBufferedSocket : Boolean;
+    FOwnsServer : Boolean;
+  protected
+    function Accept( const fnAccepting: TProc) : ITransport; override;
+  public
+    constructor Create( const AServer: TTcpServer; AClientTimeout: Integer = 0); overload;
+    constructor Create( APort: Integer; AClientTimeout: Integer = 0; AUseBufferedSockets: Boolean = FALSE); overload;
+    destructor Destroy; override;
+    procedure Listen; override;
+    procedure Close; override;
+  end;
+
+  TBufferedTransportImpl = class( TTransportImpl )
+  private
+    FInputBuffer : IThriftStream;
+    FOutputBuffer : IThriftStream;
+    FTransport : IStreamTransport;
+    FBufSize : Integer;
+
+    procedure InitBuffers;
+    function GetUnderlyingTransport: ITransport;
+  protected
+    function GetIsOpen: Boolean; override;
+    procedure Flush; override;
+  public
+    procedure Open(); override;
+    procedure Close(); override;
+    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
+    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
+    constructor Create( const ATransport : IStreamTransport ); overload;
+    constructor Create( const ATransport : IStreamTransport; ABufSize: Integer); overload;
+    property UnderlyingTransport: ITransport read GetUnderlyingTransport;
+    property IsOpen: Boolean read GetIsOpen;
+  end;
+
+  TSocketImpl = class(TStreamTransportImpl)
+  private
+    FClient : TCustomIpClient;
+    FOwnsClient : Boolean;
+    FHost : string;
+    FPort : Integer;
+    FTimeout : Integer;
+
+    procedure InitSocket;
+  protected
+    function GetIsOpen: Boolean; override;
+  public
+    procedure Open; override;
+    constructor Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0); overload;
+    constructor Create( const AHost: string; APort: Integer; ATimeout: Integer = 0); overload;
+    destructor Destroy; override;
+    procedure Close; override;
+    property TcpClient: TCustomIpClient read FClient;
+    property Host : string read FHost;
+    property Port: Integer read FPort;
+  end;
+
+  TFramedTransportImpl = class( TTransportImpl)
+  private const
+    FHeaderSize : Integer = 4;
+  private class var
+    FHeader_Dummy : array of Byte;
+  protected
+    FTransport : ITransport;
+    FWriteBuffer : TMemoryStream;
+    FReadBuffer : TMemoryStream;
+
+    procedure InitWriteBuffer;
+    procedure ReadFrame;
+  public
+    type
+      TFactory = class( TTransportFactoryImpl )
+      public
+        function GetTransport( const ATrans: ITransport): ITransport; override;
+      end;
+
+{$IF CompilerVersion >= 21.0}
+    class constructor Create;
+{$IFEND}
+    constructor Create; overload;
+    constructor Create( const ATrans: ITransport); overload;
+    destructor Destroy; override;
+
+    procedure Open(); override;
+    function GetIsOpen: Boolean; override;
+
+    procedure Close(); override;
+    function Read(var buf: TBytes; off: Integer; len: Integer): Integer; override;
+    procedure Write( const buf: TBytes; off: Integer; len: Integer); override;
+    procedure Flush; override;
+  end;
+
+{$IF CompilerVersion < 21.0}
+procedure TFramedTransportImpl_Initialize;
+{$IFEND}
+
+const
+  DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
+
+
+implementation
+
+{ TTransportImpl }
+
+procedure TTransportImpl.Flush;
+begin
+
+end;
+
+function TTransportImpl.Peek: Boolean;
+begin
+  Result := IsOpen;
+end;
+
+function TTransportImpl.ReadAll( var buf: TBytes; off, len: Integer): Integer;
+var
+  got : Integer;
+  ret : Integer;
+begin
+  got := 0;
+  while ( got < len) do
+  begin
+    ret := Read( buf, off + got, len - got);
+    if ( ret <= 0 ) then
+    begin
+      raise TTransportException.Create( 'Cannot read, Remote side has closed' );
+    end;
+    got := got + ret;
+  end;
+  Result := got;
+end;
+
+procedure TTransportImpl.Write( const buf: TBytes);
+begin
+  Self.Write( buf, 0, Length(buf) );
+end;
+
+{ THTTPClientImpl }
+
+procedure THTTPClientImpl.Close;
+begin
+  FInputStream := nil;
+  FOutputStream := nil;
+end;
+
+constructor THTTPClientImpl.Create(const AUri: string);
+begin
+  inherited Create;
+  FUri := AUri;
+  FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
+  FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
+end;
+
+function THTTPClientImpl.CreateRequest: IXMLHTTPRequest;
+var
+  pair : TPair<string,string>;
+begin
+{$IF CompilerVersion >= 21.0}
+  Result := CoXMLHTTP.Create;
+{$ELSE}
+  Result := CoXMLHTTPRequest.Create;
+{$IFEND}
+
+  Result.open('POST', FUri, False, '', '');
+  Result.setRequestHeader( 'Content-Type', 'application/x-thrift');
+  Result.setRequestHeader( 'Accept', 'application/x-thrift');
+  Result.setRequestHeader( 'User-Agent', 'Delphi/IHTTPClient');
+
+  for pair in FCustomHeaders do
+  begin
+    Result.setRequestHeader( pair.Key, pair.Value );
+  end;
+end;
+
+destructor THTTPClientImpl.Destroy;
+begin
+  Close;
+  inherited;
+end;
+
+procedure THTTPClientImpl.Flush;
+begin
+  try
+    SendRequest;
+  finally
+    FOutputStream := nil;
+    FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
+  end;
+end;
+
+function THTTPClientImpl.GetConnectionTimeout: Integer;
+begin
+  Result := FConnectionTimeout;
+end;
+
+function THTTPClientImpl.GetCustomHeaders: IThriftDictionary<string,string>;
+begin
+  Result := FCustomHeaders;
+end;
+
+function THTTPClientImpl.GetIsOpen: Boolean;
+begin
+  Result := True;
+end;
+
+function THTTPClientImpl.GetReadTimeout: Integer;
+begin
+  Result := FReadTimeout;
+end;
+
+procedure THTTPClientImpl.Open;
+begin
+
+end;
+
+function THTTPClientImpl.Read( var buf: TBytes; off, len: Integer): Integer;
+begin
+  if FInputStream = nil then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+      'No request has been sent');
+  end;
+  try
+    Result := FInputStream.Read( buf, off, len )
+  except
+    on E: Exception do
+    begin
+      raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+        E.Message);
+    end;
+  end;
+end;
+
+procedure THTTPClientImpl.SendRequest;
+var
+  xmlhttp : IXMLHTTPRequest;
+  ms : TMemoryStream;
+  a : TBytes;
+  len : Integer;
+begin
+  xmlhttp := CreateRequest;
+
+  ms := TMemoryStream.Create;
+  try
+    a := FOutputStream.ToArray;
+    len := Length(a);
+    if len > 0 then
+    begin
+      ms.WriteBuffer( Pointer(@a[0])^, len);
+    end;
+    ms.Position := 0;
+    xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
+    FInputStream := nil;
+    FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
+  finally
+    ms.Free;
+  end;
+end;
+
+procedure THTTPClientImpl.SetConnectionTimeout(const Value: Integer);
+begin
+  FConnectionTimeout := Value;
+end;
+
+procedure THTTPClientImpl.SetReadTimeout(const Value: Integer);
+begin
+  FReadTimeout := Value
+end;
+
+procedure THTTPClientImpl.Write( const buf: TBytes; off, len: Integer);
+begin
+  FOutputStream.Write( buf, off, len);
+end;
+
+{ TTransportException }
+
+constructor TTransportException.Create(AType: TExceptionType);
+begin
+  //no inherited;
+  Create( AType, '' )
+end;
+
+constructor TTransportException.Create(AType: TExceptionType;
+  const msg: string);
+begin
+  inherited Create(msg);
+  FType := AType;
+end;
+
+constructor TTransportException.Create(const msg: string);
+begin
+  inherited Create(msg);
+end;
+
+{ TTransportFactoryImpl }
+
+function TTransportFactoryImpl.GetTransport( const ATrans: ITransport): ITransport;
+begin
+  Result := ATrans;
+end;
+
+{ TServerSocket }
+
+constructor TServerSocketImpl.Create( const AServer: TTcpServer; AClientTimeout: Integer);
+begin
+  inherited Create;
+  FServer := AServer;
+  FClientTimeout := AClientTimeout;
+end;
+
+constructor TServerSocketImpl.Create(APort, AClientTimeout: Integer; AUseBufferedSockets: Boolean);
+begin
+  inherited Create;
+  FPort := APort;
+  FClientTimeout := AClientTimeout;
+  FUseBufferedSocket := AUseBufferedSockets;
+  FOwnsServer := True;
+  FServer := TTcpServer.Create( nil );
+  FServer.BlockMode := bmBlocking;
+{$IF CompilerVersion >= 21.0}
+  FServer.LocalPort := AnsiString( IntToStr( FPort));
+{$ELSE}
+  FServer.LocalPort := IntToStr( FPort);
+{$IFEND}
+end;
+
+destructor TServerSocketImpl.Destroy;
+begin
+  if FOwnsServer then begin
+    FServer.Free;
+    FServer := nil;
+  end;
+  inherited;
+end;
+
+function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
+var
+  client : TCustomIpClient;
+  trans  : IStreamTransport;
+begin
+  if FServer = nil then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+      'No underlying server socket.');
+  end;
+
+  client := nil;
+  try
+    client := TCustomIpClient.Create(nil);
+
+    if Assigned(fnAccepting)
+    then fnAccepting();
+
+    if not FServer.Accept( client) then
+    begin
+      client.Free;
+      Result := nil;
+      Exit;
+    end;
+
+    if client = nil then
+    begin
+      Result := nil;
+      Exit;
+    end;
+
+    trans := TSocketImpl.Create( client, TRUE, FClientTimeout);
+    client := nil;  // trans owns it now
+
+    if FUseBufferedSocket
+    then result := TBufferedTransportImpl.Create( trans)
+    else result := trans;
+
+  except
+    on E: Exception do begin
+      client.Free;
+      raise TTransportException.Create( E.ToString );
+    end;
+  end;
+end;
+
+procedure TServerSocketImpl.Listen;
+begin
+  if FServer <> nil then
+  begin
+    try
+      FServer.Active := True;
+    except
+      on E: Exception do
+      begin
+        raise TTransportException.Create('Could not accept on listening socket: ' + E.Message);
+      end;
+    end;
+  end;
+end;
+
+procedure TServerSocketImpl.Close;
+begin
+  if FServer <> nil then
+  begin
+    try
+      FServer.Active := False;
+    except
+      on E: Exception do
+      begin
+        raise TTransportException.Create('Error on closing socket : ' + E.Message);
+      end;
+    end;
+  end;
+end;
+
+{ TSocket }
+
+constructor TSocketImpl.Create( const AClient : TCustomIpClient; aOwnsClient : Boolean; ATimeout: Integer = 0);
+var stream : IThriftStream;
+begin
+  FClient := AClient;
+  FTimeout := ATimeout;
+  FOwnsClient := aOwnsClient;
+  stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
+  inherited Create( stream, stream);
+end;
+
+constructor TSocketImpl.Create(const AHost: string; APort, ATimeout: Integer);
+begin
+  inherited Create(nil,nil);
+  FHost := AHost;
+  FPort := APort;
+  FTimeout := ATimeout;
+  InitSocket;
+end;
+
+destructor TSocketImpl.Destroy;
+begin
+  if FOwnsClient
+  then FreeAndNil( FClient);
+  inherited;
+end;
+
+procedure TSocketImpl.Close;
+begin
+  inherited Close;
+  if FOwnsClient
+  then FreeAndNil( FClient);
+end;
+
+function TSocketImpl.GetIsOpen: Boolean;
+begin
+  Result := (FClient <> nil) and FClient.Connected;
+end;
+
+procedure TSocketImpl.InitSocket;
+var
+  stream : IThriftStream;
+begin
+  if FOwnsClient
+  then FreeAndNil( FClient)
+  else FClient := nil;
+
+  FClient := TTcpClient.Create( nil);
+  FOwnsClient := True;
+
+  stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
+  FInputStream := stream;
+  FOutputStream := stream;
+end;
+
+procedure TSocketImpl.Open;
+begin
+  if IsOpen then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen,
+      'Socket already connected');
+  end;
+
+  if FHost =  '' then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+      'Cannot open null host');
+  end;
+
+  if Port <= 0 then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
+      'Cannot open without port');
+  end;
+
+  if FClient = nil then
+  begin
+    InitSocket;
+  end;
+
+  FClient.RemoteHost := TSocketHost( Host);
+  FClient.RemotePort := TSocketPort( IntToStr( Port));
+  FClient.Connect;
+
+  FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
+  FOutputStream := FInputStream;
+end;
+
+{ TBufferedStream }
+
+procedure TBufferedStreamImpl.Close;
+begin
+  Flush;
+  FStream := nil;
+
+  FReadBuffer.Free;
+  FReadBuffer := nil;
+
+  FWriteBuffer.Free;
+  FWriteBuffer := nil;
+end;
+
+constructor TBufferedStreamImpl.Create( const AStream: IThriftStream; ABufSize: Integer);
+begin
+  inherited Create;
+  FStream := AStream;
+  FBufSize := ABufSize;
+  FReadBuffer := TMemoryStream.Create;
+  FWriteBuffer := TMemoryStream.Create;
+end;
+
+destructor TBufferedStreamImpl.Destroy;
+begin
+  Close;
+  inherited;
+end;
+
+procedure TBufferedStreamImpl.Flush;
+var
+  buf : TBytes;
+  len : Integer;
+begin
+  if IsOpen then
+  begin
+    len := FWriteBuffer.Size;
+    if len > 0 then
+    begin
+      SetLength( buf, len );
+      FWriteBuffer.Position := 0;
+      FWriteBuffer.Read( Pointer(@buf[0])^, len );
+      FStream.Write( buf, 0, len );
+    end;
+    FWriteBuffer.Clear;
+  end;
+end;
+
+function TBufferedStreamImpl.IsOpen: Boolean;
+begin
+  Result := (FWriteBuffer <> nil)
+        and (FReadBuffer <> nil)
+        and (FStream <> nil);
+end;
+
+procedure TBufferedStreamImpl.Open;
+begin
+
+end;
+
+function TBufferedStreamImpl.Read( var buffer: TBytes; offset: Integer; count: Integer): Integer;
+var
+  nRead : Integer;
+  tempbuf : TBytes;
+begin
+  inherited;
+  Result := 0;
+  if IsOpen then
+  begin
+    while count > 0 do begin
+
+      if FReadBuffer.Position >= FReadBuffer.Size then
+      begin
+        FReadBuffer.Clear;
+        SetLength( tempbuf, FBufSize);
+        nRead := FStream.Read( tempbuf, 0, FBufSize );
+        if nRead = 0 then Break; // avoid infinite loop
+
+        FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
+        FReadBuffer.Position := 0;
+      end;
+
+      if FReadBuffer.Position < FReadBuffer.Size then
+      begin
+        nRead  := Min( FReadBuffer.Size - FReadBuffer.Position, count);
+        Inc( Result, FReadBuffer.Read( Pointer(@buffer[offset])^, nRead));
+        Dec( count, nRead);
+        Inc( offset, nRead);
+      end;
+    end;
+  end;
+end;
+
+function TBufferedStreamImpl.ToArray: TBytes;
+var
+  len : Integer;
+begin
+  len := 0;
+
+  if IsOpen then
+  begin
+    len := FReadBuffer.Size;
+  end;
+
+  SetLength( Result, len);
+
+  if len > 0 then
+  begin
+    FReadBuffer.Position := 0;
+    FReadBuffer.Read( Pointer(@Result[0])^, len );
+  end;
+end;
+
+procedure TBufferedStreamImpl.Write( const buffer: TBytes; offset: Integer; count: Integer);
+begin
+  inherited;
+  if count > 0 then
+  begin
+    if IsOpen then
+    begin
+      FWriteBuffer.Write( Pointer(@buffer[offset])^, count );
+      if FWriteBuffer.Size > FBufSize then
+      begin
+        Flush;
+      end;
+    end;
+  end;
+end;
+
+{ TStreamTransportImpl }
+
+procedure TStreamTransportImpl.Close;
+begin
+  if FInputStream <> FOutputStream then
+  begin
+    if FInputStream <> nil then
+    begin
+      FInputStream := nil;
+    end;
+    if FOutputStream <> nil then
+    begin
+      FOutputStream := nil;
+    end;
+  end else
+  begin
+    FInputStream := nil;
+    FOutputStream := nil;
+  end;
+end;
+
+constructor TStreamTransportImpl.Create( const AInputStream : IThriftStream; const AOutputStream : IThriftStream);
+begin
+  inherited Create;
+  FInputStream := AInputStream;
+  FOutputStream := AOutputStream;
+end;
+
+destructor TStreamTransportImpl.Destroy;
+begin
+  FInputStream := nil;
+  FOutputStream := nil;
+  inherited;
+end;
+
+procedure TStreamTransportImpl.Flush;
+begin
+  if FOutputStream = nil then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot flush null outputstream' );
+  end;
+
+  FOutputStream.Flush;
+end;
+
+function TStreamTransportImpl.GetInputStream: IThriftStream;
+begin
+  Result := FInputStream;
+end;
+
+function TStreamTransportImpl.GetIsOpen: Boolean;
+begin
+  Result := True;
+end;
+
+function TStreamTransportImpl.GetOutputStream: IThriftStream;
+begin
+  Result := FInputStream;
+end;
+
+procedure TStreamTransportImpl.Open;
+begin
+
+end;
+
+function TStreamTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
+begin
+  if FInputStream = nil then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot read from null inputstream' );
+  end;
+  Result := FInputStream.Read( buf, off, len );
+end;
+
+procedure TStreamTransportImpl.Write(const buf: TBytes; off, len: Integer);
+begin
+  if FOutputStream = nil then
+  begin
+    raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'Cannot write to null outputstream' );
+  end;
+
+  FOutputStream.Write( buf, off, len );
+end;
+
+{ TBufferedTransportImpl }
+
+constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport);
+begin
+  //no inherited;
+  Create( ATransport, 1024 );
+end;
+
+procedure TBufferedTransportImpl.Close;
+begin
+  FTransport.Close;
+end;
+
+constructor TBufferedTransportImpl.Create( const ATransport: IStreamTransport;
+  ABufSize: Integer);
+begin
+  inherited Create;
+  FTransport := ATransport;
+  FBufSize := ABufSize;
+  InitBuffers;
+end;
+
+procedure TBufferedTransportImpl.Flush;
+begin
+  if FOutputBuffer <> nil then
+  begin
+    FOutputBuffer.Flush;
+  end;
+end;
+
+function TBufferedTransportImpl.GetIsOpen: Boolean;
+begin
+  Result := FTransport.IsOpen;
+end;
+
+function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
+begin
+  Result := FTransport;
+end;
+
+procedure TBufferedTransportImpl.InitBuffers;
+begin
+  if FTransport.InputStream <> nil then
+  begin
+    FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
+  end;
+  if FTransport.OutputStream <> nil then
+  begin
+    FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
+  end;
+end;
+
+procedure TBufferedTransportImpl.Open;
+begin
+  FTransport.Open
+end;
+
+function TBufferedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
+begin
+  Result := 0;
+  if FInputBuffer <> nil then
+  begin
+    Result := FInputBuffer.Read( buf, off, len );
+  end;
+end;
+
+procedure TBufferedTransportImpl.Write(const buf: TBytes; off, len: Integer);
+begin
+  if FOutputBuffer <> nil then
+  begin
+    FOutputBuffer.Write( buf, off, len );
+  end;
+end;
+
+{ TFramedTransportImpl }
+
+{$IF CompilerVersion < 21.0}
+procedure TFramedTransportImpl_Initialize;
+begin
+  SetLength( TFramedTransportImpl.FHeader_Dummy, TFramedTransportImpl.FHeaderSize);
+  FillChar( TFramedTransportImpl.FHeader_Dummy[0],
+    Length( TFramedTransportImpl.FHeader_Dummy) * SizeOf( Byte ), 0);
+end;
+{$ELSE}
+class constructor TFramedTransportImpl.Create;
+begin
+  SetLength( FHeader_Dummy, FHeaderSize);
+  FillChar( FHeader_Dummy[0], Length( FHeader_Dummy) * SizeOf( Byte ), 0);
+end;
+{$IFEND}
+
+constructor TFramedTransportImpl.Create;
+begin
+  inherited Create;
+  InitWriteBuffer;
+end;
+
+procedure TFramedTransportImpl.Close;
+begin
+  FTransport.Close;
+end;
+
+constructor TFramedTransportImpl.Create( const ATrans: ITransport);
+begin
+  inherited Create;
+  InitWriteBuffer;
+  FTransport := ATrans;
+end;
+
+destructor TFramedTransportImpl.Destroy;
+begin
+  FWriteBuffer.Free;
+  FReadBuffer.Free;
+  inherited;
+end;
+
+procedure TFramedTransportImpl.Flush;
+var
+  buf : TBytes;
+  len : Integer;
+  data_len : Integer;
+
+begin
+  len := FWriteBuffer.Size;
+  SetLength( buf, len);
+  if len > 0 then
+  begin
+    System.Move( FWriteBuffer.Memory^, buf[0], len );
+  end;
+
+  data_len := len - FHeaderSize;
+  if (data_len < 0) then
+  begin
+    raise Exception.Create( 'TFramedTransport.Flush: data_len < 0' );
+  end;
+
+  InitWriteBuffer;
+
+  buf[0] := Byte($FF and (data_len shr 24));
+  buf[1] := Byte($FF and (data_len shr 16));
+  buf[2] := Byte($FF and (data_len shr 8));
+  buf[3] := Byte($FF and data_len);
+
+  FTransport.Write( buf, 0, len );
+  FTransport.Flush;
+end;
+
+function TFramedTransportImpl.GetIsOpen: Boolean;
+begin
+  Result := FTransport.IsOpen;
+end;
+
+type
+  TAccessMemoryStream = class(TMemoryStream)
+  end;
+
+procedure TFramedTransportImpl.InitWriteBuffer;
+begin
+  FWriteBuffer.Free;
+  FWriteBuffer := TMemoryStream.Create;
+  TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
+  FWriteBuffer.Write( Pointer(@FHeader_Dummy[0])^, FHeaderSize);
+end;
+
+procedure TFramedTransportImpl.Open;
+begin
+  FTransport.Open;
+end;
+
+function TFramedTransportImpl.Read(var buf: TBytes; off, len: Integer): Integer;
+var
+  got : Integer;
+begin
+  if FReadBuffer <> nil then
+  begin
+    if len > 0
+    then got := FReadBuffer.Read( Pointer(@buf[off])^, len )
+    else got := 0;
+    if got > 0 then
+    begin
+      Result := got;
+      Exit;
+    end;
+  end;
+
+  ReadFrame;
+  if len > 0
+  then Result := FReadBuffer.Read( Pointer(@buf[off])^, len)
+  else Result := 0;
+end;
+
+procedure TFramedTransportImpl.ReadFrame;
+var
+  i32rd : TBytes;
+  size : Integer;
+  buff : TBytes;
+begin
+  SetLength( i32rd, FHeaderSize );
+  FTransport.ReadAll( i32rd, 0, FHeaderSize);
+  size :=
+    ((i32rd[0] and $FF) shl 24) or
+    ((i32rd[1] and $FF) shl 16) or
+    ((i32rd[2] and $FF) shl 8) or
+     (i32rd[3] and $FF);
+  SetLength( buff, size );
+  FTransport.ReadAll( buff, 0, size );
+  FReadBuffer.Free;
+  FReadBuffer := TMemoryStream.Create;
+  FReadBuffer.Write( Pointer(@buff[0])^, size );
+  FReadBuffer.Position := 0;
+end;
+
+procedure TFramedTransportImpl.Write(const buf: TBytes; off, len: Integer);
+begin
+  if len > 0
+  then FWriteBuffer.Write( Pointer(@buf[off])^, len );
+end;
+
+{ TFramedTransport.TFactory }
+
+function TFramedTransportImpl.TFactory.GetTransport( const ATrans: ITransport): ITransport;
+begin
+  Result := TFramedTransportImpl.Create( ATrans );
+end;
+
+{ TTcpSocketStreamImpl }
+
+procedure TTcpSocketStreamImpl.Close;
+begin
+  FTcpClient.Close;
+end;
+
+constructor TTcpSocketStreamImpl.Create( const ATcpClient: TCustomIpClient; const aTimeout : Integer);
+begin
+  inherited Create;
+  FTcpClient := ATcpClient;
+  FTimeout := aTimeout;
+end;
+
+procedure TTcpSocketStreamImpl.Flush;
+begin
+
+end;
+
+function TTcpSocketStreamImpl.IsOpen: Boolean;
+begin
+  Result := FTcpClient.Active;
+end;
+
+procedure TTcpSocketStreamImpl.Open;
+begin
+  FTcpClient.Open;
+end;
+
+
+function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
+                                      TimeOut: Integer; var wsaError : Integer): Integer;
+var
+  ReadFds: TFDset;
+  ReadFdsptr: PFDset;
+  WriteFds: TFDset;
+  WriteFdsptr: PFDset;
+  ExceptFds: TFDset;
+  ExceptFdsptr: PFDset;
+  tv: timeval;
+  Timeptr: PTimeval;
+  socket : TSocket;
+begin
+  if not FTcpClient.Active then begin
+    wsaError := WSAEINVAL;
+    Exit( SOCKET_ERROR);
+  end;
+
+  socket := FTcpClient.Handle;
+
+  if Assigned(ReadReady) then
+  begin
+    ReadFdsptr := @ReadFds;
+    FD_ZERO(ReadFds);
+    FD_SET(socket, ReadFds);
+  end
+  else
+    ReadFdsptr := nil;
+
+  if Assigned(WriteReady) then
+  begin
+    WriteFdsptr := @WriteFds;
+    FD_ZERO(WriteFds);
+    FD_SET(socket, WriteFds);
+  end
+  else
+    WriteFdsptr := nil;
+
+  if Assigned(ExceptFlag) then
+  begin
+    ExceptFdsptr := @ExceptFds;
+    FD_ZERO(ExceptFds);
+    FD_SET(socket, ExceptFds);
+  end
+  else
+    ExceptFdsptr := nil;
+
+  if TimeOut >= 0 then
+  begin
+    tv.tv_sec := TimeOut div 1000;
+    tv.tv_usec :=  1000 * (TimeOut mod 1000);
+    Timeptr := @tv;
+  end
+  else
+    Timeptr := nil;  // wait forever
+
+  wsaError := 0;
+  try
+{$IFDEF MSWINDOWS}
+    result := WinSock.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
+{$ENDIF}
+{$IFDEF LINUX}
+    result := Libc.select(socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
+{$ENDIF}
+    if result = SOCKET_ERROR
+    then wsaError := WSAGetLastError;
+
+  except
+    result := SOCKET_ERROR;
+  end;
+
+  if Assigned(ReadReady) then
+    ReadReady^ := FD_ISSET(socket, ReadFds);
+  if Assigned(WriteReady) then
+    WriteReady^ := FD_ISSET(socket, WriteFds);
+  if Assigned(ExceptFlag) then
+    ExceptFlag^ := FD_ISSET(socket, ExceptFds);
+end;
+
+function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
+                                           DesiredBytes : Integer;
+                                           var wsaError : Integer): TWaitForData;
+var bCanRead, bError : Boolean;
+    retval : Integer;
+begin
+  // The select function returns the total number of socket handles that are ready
+  // and contained in the fd_set structures, zero if the time limit expired,
+  // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
+  // WSAGetLastError can be used to retrieve a specific error code.
+  retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
+  if retval = SOCKET_ERROR
+  then Exit( TWaitForData.wfd_Error);
+  if (retval = 0) or not bCanRead
+  then Exit( TWaitForData.wfd_Timeout);
+
+  // recv() returns the number of bytes received, or -1 if an error occurred.
+  // The return value will be 0 when the peer has performed an orderly shutdown.
+  retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, WinSock.MSG_PEEK);
+  if retval <= 0
+  then Exit( TWaitForData.wfd_Error);
+
+  // Enough data ready to be read?
+  if retval = DesiredBytes
+  then result := TWaitForData.wfd_HaveData
+  else result := TWaitForData.wfd_Timeout;
+end;
+
+function TTcpSocketStreamImpl.Read(var buffer: TBytes; offset, count: Integer): Integer;
+var wfd : TWaitForData;
+    wsaError : Integer;
+    pDest : Pointer;
+const
+  SLEEP_TIME = 200;
+begin
+  inherited;
+
+  pDest := Pointer(@buffer[offset]);
+
+  while TRUE do begin
+    if FTimeout > 0
+    then wfd := WaitForData( FTimeout,   pDest, count, wsaError)
+    else wfd := WaitForData( SLEEP_TIME, pDest, count, wsaError);
+
+    case wfd of
+      TWaitForData.wfd_Error    :  Exit(0);
+      TWaitForData.wfd_HaveData :  Break;
+      TWaitForData.wfd_Timeout  :  begin
+        if (FTimeout > 0)
+        then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut,
+                                               SysErrorMessage(Cardinal(wsaError)));
+      end;
+    else
+      ASSERT( FALSE);
+    end;
+  end;
+
+  Result := FTcpClient.ReceiveBuf( pDest^, count);
+end;
+
+function TTcpSocketStreamImpl.ToArray: TBytes;
+var
+  len : Integer;
+begin
+  len := 0;
+  if IsOpen then
+  begin
+    len := FTcpClient.BytesReceived;
+  end;
+
+  SetLength( Result, len );
+
+  if len > 0 then
+  begin
+    FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
+  end;
+end;
+
+procedure TTcpSocketStreamImpl.Write(const buffer: TBytes; offset, count: Integer);
+var bCanWrite, bError : Boolean;
+    retval, wsaError : Integer;
+begin
+  inherited;
+
+  if not FTcpClient.Active
+  then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen);
+
+  // The select function returns the total number of socket handles that are ready
+  // and contained in the fd_set structures, zero if the time limit expired,
+  // or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
+  // WSAGetLastError can be used to retrieve a specific error code.
+  retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
+  if retval = SOCKET_ERROR
+  then raise TTransportException.Create( TTransportException.TExceptionType.Unknown,
+                                         SysErrorMessage(Cardinal(wsaError)));
+  if (retval = 0)
+  then raise TTransportException.Create( TTransportException.TExceptionType.TimedOut);
+  if bError or not bCanWrite
+  then raise TTransportException.Create( TTransportException.TExceptionType.Unknown);
+
+  FTcpClient.SendBuf( Pointer(@buffer[offset])^, count);
+end;
+
+{$IF CompilerVersion < 21.0}
+initialization
+begin
+  TFramedTransportImpl_Initialize;
+end;
+{$IFEND}
+
+
+end.