THRIFT-5012 Centralize configuration aspects into a commonly used configuration object [ci skip]
Client: Delphi
Patch: Jens Geyer
This closes #1955
diff --git a/lib/delphi/src/Thrift.Configuration.pas b/lib/delphi/src/Thrift.Configuration.pas
new file mode 100644
index 0000000..0cb11af
--- /dev/null
+++ b/lib/delphi/src/Thrift.Configuration.pas
@@ -0,0 +1,121 @@
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *)
+
+unit Thrift.Configuration;
+
+interface
+
+uses
+ SysUtils, Generics.Collections, Generics.Defaults;
+
+const
+ DEFAULT_RECURSION_LIMIT = 64;
+ DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB
+ DEFAULT_MAX_FRAME_SIZE = 16384000; // this value is used consistently across all Thrift libraries
+
+ DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
+
+type
+ IThriftConfiguration = interface
+ ['{ADD75449-1A67-4B78-9B75-502A1E338CFC}']
+ function GetRecursionLimit : Cardinal;
+ procedure SetRecursionLimit( const value : Cardinal);
+ function GetMaxFrameSize : Cardinal;
+ procedure SetMaxFrameSize( const value : Cardinal);
+ function GetMaxMessageSize : Cardinal;
+ procedure SetMaxMessageSize( const value : Cardinal);
+
+ property RecursionLimit : Cardinal read GetRecursionLimit write SetRecursionLimit;
+ property MaxFrameSize : Cardinal read GetMaxFrameSize write SetMaxFrameSize;
+ property MaxMessageSize : Cardinal read GetMaxMessageSize write SetMaxMessageSize;
+ end;
+
+
+ TThriftConfigurationImpl = class( TInterfacedObject, IThriftConfiguration)
+ strict protected
+ FRecursionLimit : Cardinal;
+ FMaxFrameSize : Cardinal;
+ FMaxMessageSize : Cardinal;
+
+ // IThriftConfiguration
+ function GetRecursionLimit : Cardinal;
+ procedure SetRecursionLimit( const value : Cardinal);
+ function GetMaxFrameSize : Cardinal;
+ procedure SetMaxFrameSize( const value : Cardinal);
+ function GetMaxMessageSize : Cardinal;
+ procedure SetMaxMessageSize( const value : Cardinal);
+
+ public
+ constructor Create;
+ end;
+
+
+implementation
+
+
+{ TThriftConfigurationImpl }
+
+
+constructor TThriftConfigurationImpl.Create;
+begin
+ inherited Create;
+
+ FRecursionLimit := DEFAULT_RECURSION_LIMIT;
+ FMaxFrameSize := DEFAULT_MAX_FRAME_SIZE;
+ FMaxMessageSize := DEFAULT_MAX_MESSAGE_SIZE;
+end;
+
+
+function TThriftConfigurationImpl.GetRecursionLimit: Cardinal;
+begin
+ result := FRecursionLimit;
+end;
+
+
+procedure TThriftConfigurationImpl.SetRecursionLimit(const value: Cardinal);
+begin
+ FRecursionLimit := value;
+end;
+
+
+function TThriftConfigurationImpl.GetMaxFrameSize: Cardinal;
+begin
+ result := FMaxFrameSize;
+end;
+
+
+procedure TThriftConfigurationImpl.SetMaxFrameSize(const value: Cardinal);
+begin
+ FMaxFrameSize := value;
+end;
+
+
+function TThriftConfigurationImpl.GetMaxMessageSize: Cardinal;
+begin
+ result := FMaxMessageSize;
+end;
+
+
+procedure TThriftConfigurationImpl.SetMaxMessageSize(const value: Cardinal);
+begin
+ FMaxMessageSize := value;
+end;
+
+
+end.
diff --git a/lib/delphi/src/Thrift.Protocol.Compact.pas b/lib/delphi/src/Thrift.Protocol.Compact.pas
index 109e660..665cfc4 100644
--- a/lib/delphi/src/Thrift.Protocol.Compact.pas
+++ b/lib/delphi/src/Thrift.Protocol.Compact.pas
@@ -28,6 +28,7 @@
SysUtils,
Math,
Generics.Collections,
+ Thrift.Configuration,
Thrift.Transport,
Thrift.Protocol,
Thrift.Utils;
@@ -268,7 +269,7 @@
//--- TCompactProtocolImpl -------------------------------------------------
-constructor TCompactProtocolImpl.Create(const trans: ITransport);
+constructor TCompactProtocolImpl.Create( const trans : ITransport);
begin
inherited Create( trans);
diff --git a/lib/delphi/src/Thrift.Protocol.JSON.pas b/lib/delphi/src/Thrift.Protocol.JSON.pas
index e72a81d..61cad8b 100644
--- a/lib/delphi/src/Thrift.Protocol.JSON.pas
+++ b/lib/delphi/src/Thrift.Protocol.JSON.pas
@@ -29,6 +29,7 @@
SysUtils,
Math,
Generics.Collections,
+ Thrift.Configuration,
Thrift.Transport,
Thrift.Protocol,
Thrift.Utils;
@@ -298,7 +299,7 @@
function TJSONProtocolImpl.TFactory.GetProtocol( const trans: ITransport): IProtocol;
begin
- result := TJSONProtocolImpl.Create(trans);
+ result := TJSONProtocolImpl.Create( trans);
end;
class function TJSONProtocolImpl.GetTypeNameForTypeID(typeID : TType) : string;
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 94e6e18..d5a7587 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -31,6 +31,7 @@
Thrift.Stream,
Thrift.Utils,
Thrift.Collections,
+ Thrift.Configuration,
Thrift.Transport;
type
@@ -67,9 +68,6 @@
VALID_MESSAGETYPES = [Low(TMessageType)..High(TMessageType)];
-const
- DEFAULT_RECURSION_LIMIT = 64;
-
type
IProtocol = interface;
@@ -196,7 +194,7 @@
end;
IProtocol = interface
- ['{7F3640D7-5082-49E7-B562-84202F323C3A}']
+ ['{F0040D99-937F-400D-9932-AF04F665899F}']
function GetTransport: ITransport;
procedure WriteMessageBegin( const msg: TThriftMessage);
procedure WriteMessageEnd;
@@ -243,15 +241,13 @@
function ReadString: string;
function ReadAnsiString: AnsiString;
- procedure SetRecursionLimit( value : Integer);
- function GetRecursionLimit : Integer;
function NextRecursionLevel : IProtocolRecursionTracker;
procedure IncrementRecursionDepth;
procedure DecrementRecursionDepth;
function GetMinSerializedSize( const aType : TType) : Integer;
property Transport: ITransport read GetTransport;
- property RecursionLimit : Integer read GetRecursionLimit write SetRecursionLimit;
+ function Configuration : IThriftConfiguration;
end;
TProtocolImpl = class abstract( TInterfacedObject, IProtocol)
@@ -260,8 +256,6 @@
FRecursionLimit : Integer;
FRecursionDepth : Integer;
- procedure SetRecursionLimit( value : Integer);
- function GetRecursionLimit : Integer;
function NextRecursionLevel : IProtocolRecursionTracker;
procedure IncrementRecursionDepth;
procedure DecrementRecursionDepth;
@@ -272,8 +266,9 @@
procedure CheckReadBytesAvailable( const value : TThriftMap); overload; inline;
procedure Reset; virtual;
- function GetTransport: ITransport;
- public
+ function GetTransport: ITransport;
+ function Configuration : IThriftConfiguration;
+
procedure WriteMessageBegin( const msg: TThriftMessage); virtual; abstract;
procedure WriteMessageEnd; virtual; abstract;
procedure WriteStructBegin( const struc: TThriftStruct); virtual; abstract;
@@ -319,9 +314,10 @@
function ReadString: string; virtual;
function ReadAnsiString: AnsiString; virtual;
- property Transport: ITransport read GetTransport;
+ property Transport: ITransport read GetTransport;
- constructor Create( trans: ITransport );
+ public
+ constructor Create( const aTransport : ITransport);
end;
IBase = interface( ISupportsToString)
@@ -554,24 +550,14 @@
{ TProtocolImpl }
-constructor TProtocolImpl.Create(trans: ITransport);
+constructor TProtocolImpl.Create( const aTransport : ITransport);
begin
inherited Create;
- FTrans := trans;
- FRecursionLimit := DEFAULT_RECURSION_LIMIT;
+ FTrans := aTransport;
+ FRecursionLimit := aTransport.Configuration.RecursionLimit;
FRecursionDepth := 0;
end;
-procedure TProtocolImpl.SetRecursionLimit( value : Integer);
-begin
- FRecursionLimit := value;
-end;
-
-function TProtocolImpl.GetRecursionLimit : Integer;
-begin
- result := FRecursionLimit;
-end;
-
function TProtocolImpl.NextRecursionLevel : IProtocolRecursionTracker;
begin
result := TProtocolRecursionTrackerImpl.Create(Self);
@@ -594,10 +580,14 @@
Result := FTrans;
end;
+function TProtocolImpl.Configuration : IThriftConfiguration;
+begin
+ Result := FTrans.Configuration;
+end;
+
procedure TProtocolImpl.Reset;
begin
- if FTrans.TransportControl <> nil
- then FTrans.TransportControl.ResetConsumedMessageSize;
+ FTrans.ResetConsumedMessageSize;
end;
function TProtocolImpl.ReadAnsiString: AnsiString;
@@ -654,15 +644,12 @@
procedure TProtocolImpl.CheckReadBytesAvailable( const value : TThriftMap);
-var nPairSize : Integer
-;
+var nPairSize : Integer;
begin
nPairSize := GetMinSerializedSize(value.KeyType) + GetMinSerializedSize(value.ValueType);
FTrans.CheckReadBytesAvailable( value.Count * nPairSize);
end;
-
-
{ TProtocolUtil }
class procedure TProtocolUtil.Skip( prot: IProtocol; type_: TType);
@@ -1486,7 +1473,5 @@
-
-
end.
diff --git a/lib/delphi/src/Thrift.Serializer.pas b/lib/delphi/src/Thrift.Serializer.pas
index 1cbcbec..cb62603 100644
--- a/lib/delphi/src/Thrift.Serializer.pas
+++ b/lib/delphi/src/Thrift.Serializer.pas
@@ -28,6 +28,7 @@
{$ELSE}
System.Classes, Winapi.Windows, System.SysUtils,
{$ENDIF}
+ Thrift.Configuration,
Thrift.Protocol,
Thrift.Transport,
Thrift.Stream;
@@ -42,16 +43,9 @@
FProtocol : IProtocol;
public
- // Create a new TSerializer that uses the TBinaryProtocol by default.
- constructor Create; overload;
-
- // Create a new TSerializer.
- // It will use the TProtocol specified by the factory that is passed in.
- constructor Create( const factory : IProtocolFactory); overload;
-
- // Create a new TSerializer.
- // It will use the TProtocol and layered transports specified by the factories that are passed in.
- constructor Create( const protfact : IProtocolFactory; const transfact : ITransportFactory); overload;
+ constructor Create( const aProtFact : IProtocolFactory = nil; // defaults to TBinaryProtocol
+ const aTransFact : ITransportFactory = nil;
+ const aConfig : IThriftConfiguration = nil);
// DTOR
destructor Destroy; override;
@@ -70,19 +64,9 @@
FProtocol : IProtocol;
public
- // Create a new TDeserializer that uses the TBinaryProtocol by default.
- constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload;
-
- // Create a new TDeserializer.
- // It will use the TProtocol specified by the factory that is passed in.
- constructor Create( const factory : IProtocolFactory;
- const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload;
-
- // Create a new TDeserializer.
- // It will use the TProtocol and layered transports specified by the factories that are passed in.
- constructor Create( const protfact : IProtocolFactory;
- const transfact : ITransportFactory;
- const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); overload;
+ constructor Create( const aProtFact : IProtocolFactory = nil; // defaults to TBinaryProtocol
+ const aTransFact : ITransportFactory = nil;
+ const aConfig : IThriftConfiguration = nil);
// DTOR
destructor Destroy; override;
@@ -100,34 +84,24 @@
{ TSerializer }
-constructor TSerializer.Create;
-// Create a new TSerializer that uses the TBinaryProtocol by default.
-begin
- //no inherited;
- Create( TBinaryProtocolImpl.TFactory.Create, nil);
-end;
-
-
-constructor TSerializer.Create( const factory : IProtocolFactory);
-// Create a new TSerializer.
-// It will use the TProtocol specified by the factory that is passed in.
-begin
- //no inherited;
- Create( factory, nil);
-end;
-
-
-constructor TSerializer.Create( const protfact : IProtocolFactory; const transfact : ITransportFactory);
-// Create a new TSerializer.
-// It will use the TProtocol specified by the factory that is passed in.
+constructor TSerializer.Create( const aProtFact : IProtocolFactory;
+ const aTransFact : ITransportFactory;
+ const aConfig : IThriftConfiguration);
var adapter : IThriftStream;
+ protfact : IProtocolFactory;
begin
inherited Create;
+
FStream := TMemoryStream.Create;
adapter := TThriftStreamAdapterDelphi.Create( FStream, FALSE);
- FTransport := TStreamTransportImpl.Create( nil, adapter, TTransportControlImpl.Create(0)); // we don't read anything here
- if transfact <> nil then FTransport := transfact.GetTransport( FTransport);
- FProtocol := protfact.GetProtocol( FTransport);
+
+ FTransport := TStreamTransportImpl.Create( nil, adapter, aConfig);
+ if aTransfact <> nil then FTransport := aTransfact.GetTransport( FTransport);
+
+ if aProtFact <> nil
+ then protfact := aProtFact
+ else protfact := TBinaryProtocolImpl.TFactory.Create;
+ FProtocol := protfact.GetProtocol( FTransport);
if not FTransport.IsOpen
then FTransport.Open;
@@ -188,36 +162,24 @@
{ TDeserializer }
-constructor TDeserializer.Create( const aMaxMessageSize : Integer);
-// Create a new TDeserializer that uses the TBinaryProtocol by default.
-begin
- //no inherited;
- Create( TBinaryProtocolImpl.TFactory.Create, nil, aMaxMessageSize);
-end;
-
-
-constructor TDeserializer.Create( const factory : IProtocolFactory; const aMaxMessageSize : Integer);
-// Create a new TDeserializer.
-// It will use the TProtocol specified by the factory that is passed in.
-begin
- //no inherited;
- Create( factory, nil, aMaxMessageSize);
-end;
-
-
-constructor TDeserializer.Create( const protfact : IProtocolFactory;
- const transfact : ITransportFactory;
- const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE);
-// Create a new TDeserializer.
-// It will use the TProtocol specified by the factory that is passed in.
+constructor TDeserializer.Create( const aProtFact : IProtocolFactory;
+ const aTransFact : ITransportFactory;
+ const aConfig : IThriftConfiguration);
var adapter : IThriftStream;
+ protfact : IProtocolFactory;
begin
inherited Create;
+
FStream := TMemoryStream.Create;
adapter := TThriftStreamAdapterDelphi.Create( FStream, FALSE);
- FTransport := TStreamTransportImpl.Create( adapter, nil, TTransportControlImpl.Create(aMaxMessageSize));
- if transfact <> nil then FTransport := transfact.GetTransport( FTransport);
- FProtocol := protfact.GetProtocol( FTransport);
+
+ FTransport := TStreamTransportImpl.Create( adapter, nil, aConfig);
+ if aTransfact <> nil then FTransport := aTransfact.GetTransport( FTransport);
+
+ if aProtFact <> nil
+ then protfact := aProtFact
+ else protfact := TBinaryProtocolImpl.TFactory.Create;
+ FProtocol := protfact.GetProtocol( FTransport);
if not FTransport.IsOpen
then FTransport.Open;
diff --git a/lib/delphi/src/Thrift.Server.pas b/lib/delphi/src/Thrift.Server.pas
index da053b9..a73e6cb 100644
--- a/lib/delphi/src/Thrift.Server.pas
+++ b/lib/delphi/src/Thrift.Server.pas
@@ -32,7 +32,8 @@
{$ENDIF}
Thrift,
Thrift.Protocol,
- Thrift.Transport;
+ Thrift.Transport,
+ Thrift.Configuration;
type
IServerEvents = interface
@@ -70,6 +71,7 @@
FOutputProtocolFactory : IProtocolFactory;
FLogDelegate : TLogDelegate;
FServerEvents : IServerEvents;
+ FConfiguration : IThriftConfiguration;
class procedure DefaultLogDelegate( const str: string);
@@ -86,31 +88,17 @@
const aOutputTransportFactory : ITransportFactory;
const aInputProtocolFactory : IProtocolFactory;
const aOutputProtocolFactory : IProtocolFactory;
+ const aConfig : IThriftConfiguration;
const aLogDelegate : TLogDelegate
); overload;
constructor Create(
- const aProcessor :IProcessor;
- const aServerTransport: IServerTransport
- ); overload;
-
- constructor Create(
- const aProcessor :IProcessor;
+ const aProcessor: IProcessor;
const aServerTransport: IServerTransport;
- const aLogDelegate: TLogDelegate
- ); overload;
-
- constructor Create(
- const aProcessor :IProcessor;
- const aServerTransport: IServerTransport;
- const aTransportFactory : ITransportFactory
- ); overload;
-
- constructor Create(
- const aProcessor :IProcessor;
- const aServerTransport: IServerTransport;
- const aTransportFactory : ITransportFactory;
- const aProtocolFactory : IProtocolFactory
+ const aTransportFactory: ITransportFactory = nil;
+ const aProtocolFactory: IProtocolFactory = nil;
+ const aConfig : IThriftConfiguration = nil;
+ const aLogDel: TServerImpl.TLogDelegate = nil
); overload;
end;
@@ -119,30 +107,6 @@
private
FStop : Boolean;
public
- constructor Create(
- const aProcessor: IProcessor;
- const aServerTransport: IServerTransport
- ); overload;
-
- constructor Create(
- const aProcessor: IProcessor;
- const aServerTransport: IServerTransport;
- const ALogDel: TServerImpl.TLogDelegate
- ); overload;
-
- constructor Create(
- const aProcessor: IProcessor;
- const aServerTransport: IServerTransport;
- const aTransportFactory: ITransportFactory
- ); overload;
-
- constructor Create(
- const aProcessor: IProcessor;
- const aServerTransport: IServerTransport;
- const aTransportFactory: ITransportFactory;
- const aProtocolFactory: IProtocolFactory
- ); overload;
-
procedure Serve; override;
procedure Stop; override;
end;
@@ -154,83 +118,55 @@
constructor TServerImpl.Create( const aProcessor: IProcessor;
const aServerTransport: IServerTransport;
- const aLogDelegate: TLogDelegate);
-var
- InputFactory, OutputFactory : IProtocolFactory;
- InputTransFactory, OutputTransFactory : ITransportFactory;
-
-begin
- InputFactory := TBinaryProtocolImpl.TFactory.Create;
- OutputFactory := TBinaryProtocolImpl.TFactory.Create;
- InputTransFactory := TTransportFactoryImpl.Create;
- OutputTransFactory := TTransportFactoryImpl.Create;
-
- //no inherited;
- Create(
- aProcessor,
- aServerTransport,
- InputTransFactory,
- OutputTransFactory,
- InputFactory,
- OutputFactory,
- ALogDelegate
- );
-end;
-
-constructor TServerImpl.Create(const aProcessor: IProcessor;
- const aServerTransport: IServerTransport);
-var
- InputFactory, OutputFactory : IProtocolFactory;
- InputTransFactory, OutputTransFactory : ITransportFactory;
-
-begin
- InputFactory := TBinaryProtocolImpl.TFactory.Create;
- OutputFactory := TBinaryProtocolImpl.TFactory.Create;
- InputTransFactory := TTransportFactoryImpl.Create;
- OutputTransFactory := TTransportFactoryImpl.Create;
-
- //no inherited;
- Create(
- aProcessor,
- aServerTransport,
- InputTransFactory,
- OutputTransFactory,
- InputFactory,
- OutputFactory,
- DefaultLogDelegate
- );
-end;
-
-constructor TServerImpl.Create(const aProcessor: IProcessor;
- const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory);
-var
- InputProtocolFactory : IProtocolFactory;
- OutputProtocolFactory : IProtocolFactory;
-begin
- InputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
- OutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
-
- //no inherited;
- Create( aProcessor, aServerTransport, aTransportFactory, aTransportFactory,
- InputProtocolFactory, OutputProtocolFactory, DefaultLogDelegate);
-end;
-
-constructor TServerImpl.Create(const aProcessor: IProcessor;
- const aServerTransport: IServerTransport;
- const aInputTransportFactory, aOutputTransportFactory: ITransportFactory;
- const aInputProtocolFactory, aOutputProtocolFactory: IProtocolFactory;
- const aLogDelegate : TLogDelegate);
+ const aInputTransportFactory, aOutputTransportFactory: ITransportFactory;
+ const aInputProtocolFactory, aOutputProtocolFactory: IProtocolFactory;
+ const aConfig : IThriftConfiguration;
+ const aLogDelegate : TLogDelegate);
begin
inherited Create;
FProcessor := aProcessor;
FServerTransport := aServerTransport;
- FInputTransportFactory := aInputTransportFactory;
- FOutputTransportFactory := aOutputTransportFactory;
- FInputProtocolFactory := aInputProtocolFactory;
- FOutputProtocolFactory := aOutputProtocolFactory;
- FLogDelegate := aLogDelegate;
+
+ if aConfig <> nil
+ then FConfiguration := aConfig
+ else FConfiguration := TThriftConfigurationImpl.Create;
+
+ if aInputTransportFactory <> nil
+ then FInputTransportFactory := aInputTransportFactory
+ else FInputTransportFactory := TTransportFactoryImpl.Create;
+
+ if aOutputTransportFactory <> nil
+ then FOutputTransportFactory := aOutputTransportFactory
+ else FOutputTransportFactory := TTransportFactoryImpl.Create;
+
+ if aInputProtocolFactory <> nil
+ then FInputProtocolFactory := aInputProtocolFactory
+ else FInputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
+
+ if aOutputProtocolFactory <> nil
+ then FOutputProtocolFactory := aOutputProtocolFactory
+ else FOutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
+
+ if Assigned(aLogDelegate)
+ then FLogDelegate := aLogDelegate
+ else FLogDelegate := DefaultLogDelegate;
end;
+
+constructor TServerImpl.Create( const aProcessor: IProcessor;
+ const aServerTransport: IServerTransport;
+ const aTransportFactory: ITransportFactory;
+ const aProtocolFactory: IProtocolFactory;
+ const aConfig : IThriftConfiguration;
+ const aLogDel: TServerImpl.TLogDelegate);
+begin
+ Create( aProcessor, aServerTransport,
+ aTransportFactory, aTransportFactory,
+ aProtocolFactory, aProtocolFactory,
+ aConfig, aLogDel);
+end;
+
+
class procedure TServerImpl.DefaultLogDelegate( const str: string);
begin
try
@@ -241,16 +177,6 @@
end;
end;
-constructor TServerImpl.Create( const aProcessor: IProcessor;
- const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory;
- const aProtocolFactory: IProtocolFactory);
-begin
- //no inherited;
- Create( aProcessor, aServerTransport,
- aTransportFactory, aTransportFactory,
- aProtocolFactory, aProtocolFactory,
- DefaultLogDelegate);
-end;
function TServerImpl.GetServerEvents : IServerEvents;
@@ -268,55 +194,6 @@
{ TSimpleServer }
-constructor TSimpleServer.Create( const aProcessor: IProcessor;
- const aServerTransport: IServerTransport);
-var
- InputProtocolFactory : IProtocolFactory;
- OutputProtocolFactory : IProtocolFactory;
- InputTransportFactory : ITransportFactory;
- OutputTransportFactory : ITransportFactory;
-begin
- InputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
- OutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
- InputTransportFactory := TTransportFactoryImpl.Create;
- OutputTransportFactory := TTransportFactoryImpl.Create;
-
- inherited Create( aProcessor, aServerTransport, InputTransportFactory,
- OutputTransportFactory, InputProtocolFactory, OutputProtocolFactory, DefaultLogDelegate);
-end;
-
-constructor TSimpleServer.Create( const aProcessor: IProcessor;
- const aServerTransport: IServerTransport; const ALogDel: TServerImpl.TLogDelegate);
-var
- InputProtocolFactory : IProtocolFactory;
- OutputProtocolFactory : IProtocolFactory;
- InputTransportFactory : ITransportFactory;
- OutputTransportFactory : ITransportFactory;
-begin
- InputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
- OutputProtocolFactory := TBinaryProtocolImpl.TFactory.Create;
- InputTransportFactory := TTransportFactoryImpl.Create;
- OutputTransportFactory := TTransportFactoryImpl.Create;
-
- inherited Create( aProcessor, aServerTransport, InputTransportFactory,
- OutputTransportFactory, InputProtocolFactory, OutputProtocolFactory, ALogDel);
-end;
-
-constructor TSimpleServer.Create( const aProcessor: IProcessor;
- const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory);
-begin
- inherited Create( aProcessor, aServerTransport, aTransportFactory,
- aTransportFactory, TBinaryProtocolImpl.TFactory.Create, TBinaryProtocolImpl.TFactory.Create, DefaultLogDelegate);
-end;
-
-constructor TSimpleServer.Create( const aProcessor: IProcessor;
- const aServerTransport: IServerTransport; const aTransportFactory: ITransportFactory;
- const aProtocolFactory: IProtocolFactory);
-begin
- inherited Create( aProcessor, aServerTransport, aTransportFactory,
- aTransportFactory, aProtocolFactory, aProtocolFactory, DefaultLogDelegate);
-end;
-
procedure TSimpleServer.Serve;
var
client : ITransport;
diff --git a/lib/delphi/src/Thrift.Stream.pas b/lib/delphi/src/Thrift.Stream.pas
index 0f4e723..1668059 100644
--- a/lib/delphi/src/Thrift.Stream.pas
+++ b/lib/delphi/src/Thrift.Stream.pas
@@ -37,22 +37,16 @@
type
IThriftStream = interface
- ['{DBE61E28-2A77-42DB-A5A3-3CCB8A2D09FA}']
+ ['{3A61A8A6-3639-4B91-A260-EFCA23944F3A}']
procedure Write( const buffer: TBytes; offset: Integer; count: Integer); overload;
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
- procedure CheckReadBytesAvailable( const value : Integer);
procedure Open;
procedure Close;
procedure Flush;
function IsOpen: Boolean;
function ToArray: TBytes;
- end;
-
-
- IThriftStream2 = interface( IThriftStream)
- ['{1F55D9FE-F617-4B80-B8CA-4A300D8E33F6}']
function Size : Int64;
function Position : Int64;
end;
@@ -67,15 +61,16 @@
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); overload; virtual;
function Read( var buffer: TBytes; offset: Integer; count: Integer): Integer; overload; inline;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload; virtual;
- procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract;
procedure Open; virtual; abstract;
procedure Close; virtual; abstract;
procedure Flush; virtual; abstract;
function IsOpen: Boolean; virtual; abstract;
function ToArray: TBytes; virtual; abstract;
+ function Size : Int64; virtual;
+ function Position : Int64; virtual;
end;
- TThriftStreamAdapterDelphi = class( TThriftStreamImpl, IThriftStream2)
+ TThriftStreamAdapterDelphi = class( TThriftStreamImpl)
strict private
FStream : TStream;
FOwnsStream : Boolean;
@@ -83,38 +78,32 @@
// IThriftStream
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
-
- // IThriftStream2
- function Size : Int64;
- function Position : Int64;
+ function Size : Int64; override;
+ function Position : Int64; override;
public
constructor Create( const aStream: TStream; aOwnsStream : Boolean);
destructor Destroy; override;
end;
- TThriftStreamAdapterCOM = class( TThriftStreamImpl, IThriftStream2)
+ TThriftStreamAdapterCOM = class( TThriftStreamImpl)
strict private
FStream : IStream;
strict protected
// IThriftStream
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
-
- // IThriftStream2
- function Size : Int64;
- function Position : Int64;
+ function Size : Int64; override;
+ function Position : Int64; override;
public
constructor Create( const aStream: IStream);
end;
@@ -191,14 +180,6 @@
end;
end;
-procedure TThriftStreamAdapterCOM.CheckReadBytesAvailable( const value : Integer);
-var nRemaining : Int64;
-begin
- nRemaining := Self.Size - Self.Position;
- if nRemaining < value
- then raise TTransportExceptionEndOfFile.Create('Not enough input data');
-end;
-
function TThriftStreamAdapterCOM.ToArray: TBytes;
var
len : Int64;
@@ -267,6 +248,19 @@
CheckSizeAndOffset( pBuf, offset+count, offset, count);
end;
+function TThriftStreamImpl.Size : Int64;
+begin
+ ASSERT(FALSE);
+ raise ENotImplemented.Create(ClassName+'.Size');
+end;
+
+function TThriftStreamImpl.Position : Int64;
+begin
+ ASSERT(FALSE);
+ raise ENotImplemented.Create(ClassName+'.Position');
+end;
+
+
{ TThriftStreamAdapterDelphi }
constructor TThriftStreamAdapterDelphi.Create( const aStream: TStream; aOwnsStream: Boolean);
@@ -332,13 +326,6 @@
else Result := 0;
end;
-procedure TThriftStreamAdapterDelphi.CheckReadBytesAvailable( const value : Integer);
-var nRemaining : Int64;
-begin
- nRemaining := FStream.Size - FStream.Position;
- if nRemaining < value then raise TTransportExceptionEndOfFile.Create('Not enough input data');
-end;
-
function TThriftStreamAdapterDelphi.ToArray: TBytes;
var
OrgPos : Integer;
diff --git a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
index b92cce1..bdc65d1 100644
--- a/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.MsxmlHTTP.pas
@@ -34,13 +34,14 @@
Winapi.ActiveX, Winapi.msxml,
{$ENDIF}
Thrift.Collections,
+ Thrift.Configuration,
Thrift.Transport,
Thrift.Exception,
Thrift.Utils,
Thrift.Stream;
type
- TMsxmlHTTPClientImpl = class( TTransportImpl, IHTTPClient)
+ TMsxmlHTTPClientImpl = class( TEndpointTransportBase, IHTTPClient)
strict private
FUri : string;
FInputStream : IThriftStream;
@@ -59,7 +60,6 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure Flush; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure SetDnsResolveTimeout(const Value: Integer);
function GetDnsResolveTimeout: Integer;
@@ -81,26 +81,29 @@
property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
public
- constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil);
+ constructor Create( const aUri: string; const aConfig : IThriftConfiguration); reintroduce;
destructor Destroy; override;
end;
implementation
+const
+ XMLHTTP_CONNECTION_TIMEOUT = 60 * 1000;
+ XMLHTTP_SENDRECV_TIMEOUT = 30 * 1000;
{ TMsxmlHTTPClientImpl }
-constructor TMsxmlHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl);
+constructor TMsxmlHTTPClientImpl.Create( const aUri: string; const aConfig : IThriftConfiguration);
begin
- inherited Create( aTransportCtl);
- FUri := AUri;
+ inherited Create( aConfig);
+ FUri := aUri;
// defaults according to MSDN
FDnsResolveTimeout := 0; // no timeout
- FConnectionTimeout := 60 * 1000;
- FSendTimeout := 30 * 1000;
- FReadTimeout := 30 * 1000;
+ FConnectionTimeout := XMLHTTP_CONNECTION_TIMEOUT;
+ FSendTimeout := XMLHTTP_SENDRECV_TIMEOUT;
+ FReadTimeout := XMLHTTP_SENDRECV_TIMEOUT;
FCustomHeaders := TThriftDictionaryImpl<string,string>.Create;
FOutputStream := TThriftStreamAdapterDelphi.Create( TMemoryStream.Create, True);
@@ -219,13 +222,6 @@
end;
end;
-procedure TMsxmlHTTPClientImpl.CheckReadBytesAvailable( const value : Integer);
-begin
- if FInputStream <> nil
- then FInputStream.CheckReadBytesAvailable( value)
- else raise TTransportExceptionNotOpen.Create('No request has been sent');
-end;
-
function TMsxmlHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputStream = nil then begin
@@ -234,7 +230,6 @@
try
Result := FInputStream.Read( pBuf, buflen, off, len);
- ConsumeReadBytes( result);
except
on E: Exception
do raise TTransportExceptionUnknown.Create(E.Message);
@@ -261,6 +256,7 @@
xmlhttp.send( IUnknown( TStreamAdapter.Create( ms, soReference )));
FInputStream := nil;
FInputStream := TThriftStreamAdapterCOM.Create( IUnknown( xmlhttp.responseStream) as IStream);
+ UpdateKnownMessageSize( FInputStream.Size);
finally
ms.Free;
end;
diff --git a/lib/delphi/src/Thrift.Transport.Pipes.pas b/lib/delphi/src/Thrift.Transport.Pipes.pas
index b602b64..635a841 100644
--- a/lib/delphi/src/Thrift.Transport.Pipes.pas
+++ b/lib/delphi/src/Thrift.Transport.Pipes.pas
@@ -29,6 +29,7 @@
{$ELSE}
Winapi.Windows, System.SysUtils, System.Math, Winapi.AccCtrl, Winapi.AclAPI, System.SyncObjs,
{$ENDIF}
+ Thrift.Configuration,
Thrift.Transport,
Thrift.Utils,
Thrift.Stream;
@@ -53,7 +54,6 @@
//procedure Open; override; - see derived classes
procedure Close; override;
procedure Flush; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
function ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
function ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
@@ -65,7 +65,9 @@
public
constructor Create( aEnableOverlapped : Boolean;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
- const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT);
+ const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT
+ ); reintroduce; overload;
+
destructor Destroy; override;
end;
@@ -85,7 +87,8 @@
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
- const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT); overload;
+ const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT
+ ); reintroduce; overload;
end;
@@ -99,7 +102,9 @@
public
constructor Create( const aPipeHandle : THandle;
const aOwnsHandle, aEnableOverlapped : Boolean;
- const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT); overload;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT
+ ); reintroduce; overload;
+
destructor Destroy; override;
end;
@@ -113,7 +118,7 @@
TPipeTransportBase = class( TStreamTransportImpl, IPipeTransport)
- public
+ strict protected
// ITransport
function GetIsOpen: Boolean; override;
procedure Open; override;
@@ -127,27 +132,32 @@
constructor Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD;
- const aTransportCtl : ITransportControl); overload;
+ const aConfig : IThriftConfiguration = nil
+ ); reintroduce; overload;
constructor Create( const aPipeName : string;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT;
- const aTransportCtl : ITransportControl = nil); overload;
+ const aConfig : IThriftConfiguration = nil
+ ); reintroduce; overload;
end;
TNamedPipeTransportServerEndImpl = class( TNamedPipeTransportClientEndImpl)
strict private
FHandle : THandle;
- public
+ strict protected
// ITransport
procedure Close; override;
+ public
constructor Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
- const aTransportCtl : ITransportControl = nil); reintroduce;
+ const aConfig : IThriftConfiguration = nil
+ ); reintroduce; overload;
+
end;
@@ -157,7 +167,8 @@
constructor Create( const aPipeRead, aPipeWrite : THandle;
const aOwnsHandles : Boolean;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
- const aTransportCtl : ITransportControl = nil); overload;
+ const aConfig : IThriftConfiguration = nil
+ ); reintroduce; overload;
end;
@@ -187,7 +198,7 @@
procedure InternalClose; virtual; abstract;
function QueryStopServer : Boolean;
public
- constructor Create;
+ constructor Create( const aConfig : IThriftConfiguration);
destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
@@ -221,7 +232,10 @@
procedure InternalClose; override;
public
- constructor Create(aBufsize : Cardinal = 4096; aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT);
+ constructor Create( const aBufsize : Cardinal = 4096;
+ const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
+ const aConfig : IThriftConfiguration = nil
+ ); reintroduce; overload;
end;
@@ -245,9 +259,12 @@
procedure InternalClose; override;
public
- constructor Create( aPipename : string; aBufsize : Cardinal = 4096;
- aMaxConns : Cardinal = PIPE_UNLIMITED_INSTANCES;
- aTimeOut : Cardinal = INFINITE);
+ constructor Create( const aPipename : string;
+ const aBufsize : Cardinal = 4096;
+ const aMaxConns : Cardinal = PIPE_UNLIMITED_INSTANCES;
+ const aTimeOut : Cardinal = INFINITE;
+ const aConfig : IThriftConfiguration = nil
+ ); reintroduce; overload;
end;
@@ -278,15 +295,14 @@
{ TPipeStreamBase }
-constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean;
- const aTimeOut, aOpenTimeOut : DWORD);
+constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean; const aTimeOut, aOpenTimeOut : DWORD);
begin
inherited Create;
- ASSERT( aTimeout > 0); // aOpenTimeout may be 0
FPipe := INVALID_HANDLE_VALUE;
FTimeout := aTimeOut;
FOpenTimeOut := aOpenTimeOut;
FOverlapped := aEnableOverlapped;
+ ASSERT( FTimeout > 0); // FOpenTimeout may be 0
end;
@@ -318,12 +334,6 @@
end;
-procedure TPipeStreamBase.CheckReadBytesAvailable( const value : Integer);
-begin
- // can't tell how much we can suck out of the pipe
-end;
-
-
procedure TPipeStreamBase.Write( const pBuf : Pointer; offset, count : Integer);
begin
if FOverlapped
@@ -538,7 +548,7 @@
const aSecurityAttributes: PSecurityAttributes;
const aTimeOut, aOpenTimeOut : DWORD);
begin
- inherited Create( aEnableOverlapped, aTimeout, aOpenTimeOut);
+ inherited Create( aEnableOverlapped, aTimeOut, aOpenTimeOut);
FPipeName := aPipeName;
FShareMode := aShareMode;
@@ -601,7 +611,7 @@
const aOwnsHandle, aEnableOverlapped : Boolean;
const aTimeOut : DWORD);
begin
- inherited Create( aEnableOverlapped, aTimeOut);
+ inherited Create( aEnableOverlapped, aTimeout, aTimeout);
if aOwnsHandle
then FSrcHandle := aPipeHandle
@@ -655,13 +665,14 @@
{ TNamedPipeTransportClientEndImpl }
-constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string; const aShareMode: DWORD;
+constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string;
+ const aShareMode: DWORD;
const aSecurityAttributes: PSecurityAttributes;
const aTimeOut, aOpenTimeOut : DWORD;
- const aTransportCtl : ITransportControl);
+ const aConfig : IThriftConfiguration);
// Named pipe constructor
begin
- inherited Create( nil, nil, aTransportCtl);
+ inherited Create( nil, nil, aConfig);
FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
@@ -670,11 +681,11 @@
constructor TNamedPipeTransportClientEndImpl.Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD;
- const aTransportCtl : ITransportControl);
+ const aConfig : IThriftConfiguration);
// Named pipe constructor
begin
- inherited Create( nil, nil, aTransportCtl);
- FInputStream := THandlePipeStreamImpl.Create( aPipe, TRUE, aOwnsHandle, aTimeOut);
+ inherited Create( nil, nil, aConfig);
+ FInputStream := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
@@ -685,11 +696,11 @@
constructor TNamedPipeTransportServerEndImpl.Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD;
- const aTransportCtl : ITransportControl);
+ const aConfig : IThriftConfiguration);
// Named pipe constructor
begin
FHandle := DuplicatePipeHandle( aPipe);
- inherited Create( aPipe, aOwnsHandle, aTimeOut, aTransportCtl);
+ inherited Create( aPipe, aOwnsHandle, aTimeout, aConfig);
end;
@@ -709,22 +720,22 @@
constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : THandle;
const aOwnsHandles : Boolean;
const aTimeOut : DWORD;
- const aTransportCtl : ITransportControl);
+ const aConfig : IThriftConfiguration);
// Anonymous pipe constructor
begin
- inherited Create( nil, nil, aTransportCtl);
+ inherited Create( nil, nil, aConfig);
// overlapped is not supported with AnonPipes, see MSDN
- FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeOut);
- FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeOut);
+ FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout);
+ FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout);
end;
{ TPipeServerTransportBase }
-constructor TPipeServerTransportBase.Create;
+constructor TPipeServerTransportBase.Create( const aConfig : IThriftConfiguration);
begin
- inherited Create;
+ inherited Create( aConfig);
FStopServer := TEvent.Create(nil,TRUE,FALSE,''); // manual reset
end;
@@ -761,11 +772,12 @@
{ TAnonymousPipeServerTransportImpl }
-
-constructor TAnonymousPipeServerTransportImpl.Create(aBufsize : Cardinal; aTimeOut : DWORD);
+constructor TAnonymousPipeServerTransportImpl.Create( const aBufsize : Cardinal;
+ const aTimeOut : DWORD;
+ const aConfig : IThriftConfiguration);
// Anonymous pipe CTOR
begin
- inherited Create;
+ inherited Create(aConfig);
FBufsize := aBufSize;
FReadHandle := INVALID_HANDLE_VALUE;
FWriteHandle := INVALID_HANDLE_VALUE;
@@ -794,7 +806,7 @@
then raise TTransportExceptionNotOpen.Create('TServerPipe unable to initiate pipe communication');
// create the transport impl
- result := TAnonymousPipeTransportImpl.Create( FReadHandle, FWriteHandle, FALSE, FTimeOut);
+ result := TAnonymousPipeTransportImpl.Create( FReadHandle, FWriteHandle, FALSE, FTimeOut, Configuration);
end;
@@ -872,17 +884,19 @@
{ TNamedPipeServerTransportImpl }
-constructor TNamedPipeServerTransportImpl.Create( aPipename : string; aBufsize, aMaxConns, aTimeOut : Cardinal);
+constructor TNamedPipeServerTransportImpl.Create( const aPipename : string;
+ const aBufsize, aMaxConns, aTimeOut : Cardinal;
+ const aConfig : IThriftConfiguration);
// Named Pipe CTOR
begin
- inherited Create;
- ASSERT( aTimeout > 0);
+ inherited Create( aConfig);
FPipeName := aPipename;
FBufsize := aBufSize;
FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
FHandle := INVALID_HANDLE_VALUE;
FTimeout := aTimeOut;
FConnected := FALSE;
+ ASSERT( FTimeout > 0);
if Copy(FPipeName,1,2) <> '\\'
then FPipeName := '\\.\pipe\' + FPipeName; // assume localhost
@@ -951,7 +965,7 @@
hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
try
FConnected := FALSE;
- result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout);
+ result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout, Configuration);
except
ClosePipeHandle(hPipe);
raise;
diff --git a/lib/delphi/src/Thrift.Transport.WinHTTP.pas b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
index 2f961a0..7a1b48f 100644
--- a/lib/delphi/src/Thrift.Transport.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.Transport.WinHTTP.pas
@@ -29,6 +29,7 @@
Math,
Generics.Collections,
Thrift.Collections,
+ Thrift.Configuration,
Thrift.Transport,
Thrift.Exception,
Thrift.Utils,
@@ -36,7 +37,7 @@
Thrift.Stream;
type
- TWinHTTPClientImpl = class( TTransportImpl, IHTTPClient)
+ TWinHTTPClientImpl = class( TEndpointTransportBase, IHTTPClient)
strict private
FUri : string;
FInputStream : IThriftStream;
@@ -58,19 +59,16 @@
THTTPResponseStream = class( TThriftStreamImpl)
strict private
FRequest : IWinHTTPRequest;
- FTransportControl : ITransportControl;
strict protected
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
- procedure ConsumeReadBytes( const count : Integer);
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
- constructor Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl);
+ constructor Create( const aRequest : IWinHTTPRequest);
destructor Destroy; override;
end;
@@ -81,7 +79,6 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure Flush; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure SetDnsResolveTimeout(const Value: Integer);
function GetDnsResolveTimeout: Integer;
@@ -103,25 +100,29 @@
property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
public
- constructor Create( const AUri: string; const aTransportCtl : ITransportControl = nil);
+ constructor Create( const aUri: string; const aConfig : IThriftConfiguration = nil);
destructor Destroy; override;
end;
implementation
+const
+ WINHTTP_CONNECTION_TIMEOUT = 60 * 1000;
+ WINHTTP_SENDRECV_TIMEOUT = 30 * 1000;
+
{ TWinHTTPClientImpl }
-constructor TWinHTTPClientImpl.Create(const AUri: string; const aTransportCtl : ITransportControl);
+constructor TWinHTTPClientImpl.Create( const aUri: string; const aConfig : IThriftConfiguration);
begin
- inherited Create( aTransportCtl);
+ inherited Create( aConfig);
FUri := AUri;
// defaults according to MSDN
FDnsResolveTimeout := 0; // no timeout
- FConnectionTimeout := 60 * 1000;
- FSendTimeout := 30 * 1000;
- FReadTimeout := 30 * 1000;
+ FConnectionTimeout := WINHTTP_CONNECTION_TIMEOUT;
+ FSendTimeout := WINHTTP_SENDRECV_TIMEOUT;
+ FReadTimeout := WINHTTP_SENDRECV_TIMEOUT;
FSecureProtocols := DEFAULT_THRIFT_SECUREPROTOCOLS;
@@ -288,13 +289,6 @@
end;
end;
-procedure TWinHTTPClientImpl.CheckReadBytesAvailable( const value : Integer);
-begin
- if FInputStream <> nil
- then FInputStream.CheckReadBytesAvailable( value)
- else raise TTransportExceptionNotOpen.Create('No request has been sent');
-end;
-
function TWinHTTPClientImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputStream = nil then begin
@@ -303,7 +297,7 @@
try
Result := FInputStream.Read( pBuf, buflen, off, len);
- ConsumeReadBytes( result);
+ CountConsumedMessageBytes( result);
except
on E: Exception
do raise TTransportExceptionUnknown.Create(E.Message);
@@ -313,7 +307,6 @@
procedure TWinHTTPClientImpl.SendRequest;
var
http : IWinHTTPRequest;
- ctrl : ITransportControl;
pData : PByte;
len : Integer;
error : Cardinal;
@@ -340,8 +333,8 @@
else raise TTransportExceptionInterrupted.Create( sMsg);
end;
- ctrl := TTransportControlImpl.Create( TransportControl.MaxAllowedMessageSize);
- FInputStream := THTTPResponseStream.Create( http, ctrl);
+ FInputStream := THTTPResponseStream.Create( http);
+ UpdateKnownMessageSize( http.QueryTotalResponseSize);
end;
procedure TWinHTTPClientImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -355,12 +348,10 @@
{ TWinHTTPClientImpl.THTTPResponseStream }
-constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest; const aTransportCtl : ITransportControl);
+constructor TWinHTTPClientImpl.THTTPResponseStream.Create( const aRequest : IWinHTTPRequest);
begin
inherited Create;
FRequest := aRequest;
- FTransportControl := aTransportCtl;
- ASSERT( FTransportControl <> nil);
end;
destructor TWinHTTPClientImpl.THTTPResponseStream.Destroy;
@@ -406,8 +397,6 @@
if count >= buflen-offset
then count := buflen-offset;
- CheckReadBytesAvailable(count);
-
if count > 0 then begin
pTmp := pBuf;
Inc( pTmp, offset);
@@ -415,20 +404,6 @@
ASSERT( Result >= 0);
end
else Result := 0;
-
- ConsumeReadBytes( result);
-end;
-
-procedure TWinHTTPClientImpl.THTTPResponseStream.ConsumeReadBytes( const count : Integer);
-begin
- if FTransportControl <> nil
- then FTransportControl.ConsumeReadBytes( count);
-end;
-
-procedure TWinHTTPClientImpl.THTTPResponseStream.CheckReadBytesAvailable( const value : Integer);
-begin
- if Int64(value) > Int64(FRequest.QueryTotalResponseSize)
- then raise TTransportExceptionEndOfFile.Create('Not enough input data');
end;
function TWinHTTPClientImpl.THTTPResponseStream.ToArray: TBytes;
diff --git a/lib/delphi/src/Thrift.Transport.pas b/lib/delphi/src/Thrift.Transport.pas
index 0a9a39e..af62548 100644
--- a/lib/delphi/src/Thrift.Transport.pas
+++ b/lib/delphi/src/Thrift.Transport.pas
@@ -38,6 +38,7 @@
Thrift.Socket,
{$ENDIF}
{$ENDIF}
+ Thrift.Configuration,
Thrift.Collections,
Thrift.Exception,
Thrift.Utils,
@@ -49,28 +50,10 @@
DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
type
- ITransportControl = interface
- ['{CDA35E2C-F1D2-4BE3-9927-7F1540923265}']
- function MaxAllowedMessageSize : Integer;
- procedure ConsumeReadBytes( const count : Integer);
- procedure ResetConsumedMessageSize;
- end;
-
- TTransportControlImpl = class( TInterfacedObject, ITransportControl)
- strict private
- FMaxAllowedMsgSize : Integer;
- FRemainingMsgSize : Integer;
- strict protected
- // ITransportControl
- function MaxAllowedMessageSize : Integer;
- procedure ConsumeReadBytes( const count : Integer);
- procedure ResetConsumedMessageSize;
- public
- constructor Create( const aMaxMessageSize : Integer = DEFAULT_MAX_MESSAGE_SIZE); reintroduce;
- end;
+ IStreamTransport = interface;
ITransport = interface
- ['{938F6EB5-1848-43D5-8AC4-07633C55B229}']
+ ['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
function GetIsOpen: Boolean;
property IsOpen: Boolean read GetIsOpen;
function Peek: Boolean;
@@ -87,14 +70,14 @@
procedure Write( const pBuf : Pointer; len : Integer); overload;
procedure Flush;
- function TransportControl : ITransportControl;
- procedure CheckReadBytesAvailable( const value : Integer);
+ function Configuration : IThriftConfiguration;
+ function MaxMessageSize : Integer;
+ procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
+ procedure CheckReadBytesAvailable( const numBytes : Int64);
+ procedure UpdateKnownMessageSize( const size : Int64);
end;
- TTransportImpl = class( TInterfacedObject, ITransport)
- strict private
- FTransportControl : ITransportControl;
-
+ TTransportBase = class abstract( TInterfacedObject)
strict protected
function GetIsOpen: Boolean; virtual; abstract;
property IsOpen: Boolean read GetIsOpen;
@@ -112,12 +95,44 @@
procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
procedure Flush; virtual;
- function TransportControl : ITransportControl; inline;
- procedure ConsumeReadBytes( const count : Integer); inline;
- procedure CheckReadBytesAvailable( const value : Integer); virtual; abstract;
+ function Configuration : IThriftConfiguration; virtual; abstract;
+ procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
+ end;
+ // base class for all endpoint transports, e.g. sockets, pipes or HTTP
+ TEndpointTransportBase = class abstract( TTransportBase, ITransport)
+ strict private
+ FRemainingMessageSize : Int64;
+ FKnownMessageSize : Int64;
+ FConfiguration : IThriftConfiguration;
+ strict protected
+ function Configuration : IThriftConfiguration; override;
+ function MaxMessageSize : Integer;
+ property RemainingMessageSize : Int64 read FRemainingMessageSize;
+ property KnownMessageSize : Int64 read FKnownMessageSize;
+ procedure ResetConsumedMessageSize( const newSize : Int64 = -1); inline;
+ procedure UpdateKnownMessageSize(const size : Int64); override;
+ procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
+ procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
public
- constructor Create( const aTransportCtl : ITransportControl); reintroduce;
+ constructor Create( const aConfig : IThriftConfiguration); reintroduce;
+ end;
+
+ // base class for all layered transports, e.g. framed
+ TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
+ strict private
+ FTransport : T;
+ strict protected
+ property InnerTransport : T read FTransport;
+ function GetUnderlyingTransport: ITransport;
+ function Configuration : IThriftConfiguration; override;
+ procedure UpdateKnownMessageSize( const size : Int64); override;
+ function MaxMessageSize : Integer; inline;
+ procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
+ procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
+ public
+ constructor Create( const aTransport: T); reintroduce;
+ property UnderlyingTransport: ITransport read GetUnderlyingTransport;
end;
TTransportException = class abstract( TException)
@@ -220,17 +235,23 @@
end;
IServerTransport = interface
- ['{C43B87ED-69EA-47C4-B77C-15E288252900}']
+ ['{FA01363F-6B40-482F-971E-4A085535EFC8}']
procedure Listen;
procedure Close;
function Accept( const fnAccepting: TProc): ITransport;
+ function Configuration : IThriftConfiguration;
end;
TServerTransportImpl = class( TInterfacedObject, IServerTransport)
+ strict private
+ FConfig : IThriftConfiguration;
strict protected
+ function Configuration : IThriftConfiguration;
procedure Listen; virtual; abstract;
procedure Close; virtual; abstract;
- function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
+ function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
+ public
+ constructor Create( const aConfig : IThriftConfiguration);
end;
ITransportFactory = interface
@@ -238,11 +259,13 @@
function GetTransport( const aTransport: ITransport): ITransport;
end;
- TTransportFactoryImpl = class( TInterfacedObject, ITransportFactory)
+ TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
+ strict protected
function GetTransport( const aTransport: ITransport): ITransport; virtual;
end;
- TTcpSocketStreamImpl = class( TThriftStreamImpl )
+
+ TTcpSocketStreamImpl = class( TThriftStreamImpl)
{$IFDEF OLD_SOCKETS}
strict private type
TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
@@ -261,7 +284,6 @@
strict protected
procedure Write( const pBuf : Pointer; offset, count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
@@ -270,9 +292,9 @@
function ToArray: TBytes; override;
public
{$IFDEF OLD_SOCKETS}
- constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = 0);
+ constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
{$ELSE}
- constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = 0);
+ constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
{$ENDIF}
end;
@@ -284,7 +306,7 @@
property OutputStream : IThriftStream read GetOutputStream;
end;
- TStreamTransportImpl = class( TTransportImpl, IStreamTransport)
+ TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
strict protected
FInputStream : IThriftStream;
FOutputStream : IThriftStream;
@@ -294,7 +316,6 @@
function GetInputStream: IThriftStream;
function GetOutputStream: IThriftStream;
- procedure CheckReadBytesAvailable( const value : Integer); override;
strict protected
procedure Open; override;
procedure Close; override;
@@ -302,7 +323,7 @@
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
public
- constructor Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl = nil);
+ constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
destructor Destroy; override;
property InputStream : IThriftStream read GetInputStream;
@@ -318,12 +339,13 @@
strict protected
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
+ function Size : Int64; override;
+ function Position : Int64; override;
public
constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
destructor Destroy; override;
@@ -340,38 +362,34 @@
{$ENDIF}
FUseBufferedSocket : Boolean;
FOwnsServer : Boolean;
- FTransportControl : ITransportControl;
strict protected
function Accept( const fnAccepting: TProc) : ITransport; override;
- property TransportControl : ITransportControl read FTransportControl;
public
-{$IFDEF OLD_SOCKETS}
- constructor Create( const aServer: TTcpServer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
-{$ELSE}
- constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aTransportCtl : ITransportControl = nil); overload;
-{$ENDIF}
+ {$IFDEF OLD_SOCKETS}
+ constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
+ {$ELSE}
+ constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
+ {$ENDIF}
+
destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
end;
- TBufferedTransportImpl = class( TTransportImpl )
+ TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
strict private
FInputBuffer : IThriftStream;
FOutputBuffer : IThriftStream;
- FTransport : IStreamTransport;
FBufSize : Integer;
procedure InitBuffers;
- function GetUnderlyingTransport: ITransport;
strict protected
function GetIsOpen: Boolean; override;
procedure Flush; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
public
type
TFactory = class( TTransportFactoryImpl )
@@ -384,7 +402,7 @@
procedure Close(); override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
- property UnderlyingTransport: ITransport read GetUnderlyingTransport;
+ procedure CheckReadBytesAvailable( const value : Int64); override;
property IsOpen: Boolean read GetIsOpen;
end;
@@ -408,15 +426,16 @@
strict protected
function GetIsOpen: Boolean; override;
public
- procedure Open; override;
{$IFDEF OLD_SOCKETS}
- constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
{$ELSE}
- constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl = nil); overload;
- constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aTransportCtl : ITransportControl = nil); overload;
+ constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
+ constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
{$ENDIF}
destructor Destroy; override;
+
+ procedure Open; override;
procedure Close; override;
{$IFDEF OLD_SOCKETS}
property TcpClient: TCustomIpClient read FClient;
@@ -427,29 +446,25 @@
property Port: Integer read FPort;
end;
- TFramedTransportImpl = class( TTransportImpl)
- strict protected const
- DEFAULT_MAX_LENGTH = 16384000; // this value is used by all Thrift libraries
+ TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
strict protected type
TFramedHeader = Int32;
strict protected
- FTransport : ITransport;
FWriteBuffer : TMemoryStream;
FReadBuffer : TMemoryStream;
- FMaxFrameSize : Integer;
- procedure InitMaxFrameSize;
procedure InitWriteBuffer;
procedure ReadFrame;
procedure Open(); override;
- function GetIsOpen: Boolean; override;
+ function GetIsOpen: Boolean; override;
procedure Close(); override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
+ procedure CheckReadBytesAvailable( const value : Int64); override;
procedure Flush; override;
- procedure CheckReadBytesAvailable( const value : Integer); override;
+
public
type
TFactory = class( TTransportFactoryImpl )
@@ -457,7 +472,6 @@
function GetTransport( const aTransport: ITransport): ITransport; override;
end;
- constructor Create( const aTransportCtl : ITransportControl); overload;
constructor Create( const aTransport: ITransport); overload;
destructor Destroy; override;
end;
@@ -469,80 +483,33 @@
implementation
-{ TTransportControlImpl }
+{ TTransportBase }
-constructor TTransportControlImpl.Create( const aMaxMessageSize : Integer);
-begin
- inherited Create;
-
- if aMaxMessageSize > 0
- then FMaxAllowedMsgSize := aMaxMessageSize
- else FMaxAllowedMsgSize := DEFAULT_MAX_MESSAGE_SIZE;
-
- ResetConsumedMessageSize;
-end;
-
-function TTransportControlImpl.MaxAllowedMessageSize : Integer;
-begin
- result := FMaxAllowedMsgSize;
-end;
-
-procedure TTransportControlImpl.ResetConsumedMessageSize;
-begin
- FRemainingMsgSize := MaxAllowedMessageSize;
-end;
-
-
-procedure TTransportControlImpl.ConsumeReadBytes( const count : Integer);
-begin
- if FRemainingMsgSize >= count
- then Dec( FRemainingMsgSize, count)
- else begin
- FRemainingMsgSize := 0;
- if FRemainingMsgSize < count
- then raise TTransportExceptionEndOfFile.Create('Maximum message size reached');
- end;
-end;
-
-
-{ TTransportImpl }
-
-constructor TTransportImpl.Create( const aTransportCtl : ITransportControl);
-begin
- inherited Create;
-
- if aTransportCtl <> nil
- then FTransportControl := aTransportCtl
- else FTransportControl := TTransportControlImpl.Create;
- ASSERT( FTransportControl <> nil);
-end;
-
-
-procedure TTransportImpl.Flush;
+procedure TTransportBase.Flush;
begin
// nothing to do
end;
-function TTransportImpl.Peek: Boolean;
+function TTransportBase.Peek: Boolean;
begin
Result := IsOpen;
end;
-function TTransportImpl.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
+function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
begin
if Length(buf) > 0
then result := Read( @buf[0], Length(buf), off, len)
else result := 0;
end;
-function TTransportImpl.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
+function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
begin
if Length(buf) > 0
then result := ReadAll( @buf[0], Length(buf), off, len)
else result := 0;
end;
-function TTransportImpl.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
+function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
var ret : Integer;
begin
result := 0;
@@ -554,37 +521,144 @@
end;
end;
-procedure TTransportImpl.Write( const buf: TBytes);
+procedure TTransportBase.Write( const buf: TBytes);
begin
if Length(buf) > 0
then Write( @buf[0], 0, Length(buf));
end;
-procedure TTransportImpl.Write( const buf: TBytes; off: Integer; len: Integer);
+procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
begin
if Length(buf) > 0
then Write( @buf[0], off, len);
end;
-procedure TTransportImpl.Write( const pBuf : Pointer; len : Integer);
+procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
begin
Self.Write( pBuf, 0, len);
end;
-function TTransportImpl.TransportControl : ITransportControl;
+{ TEndpointTransportBase }
+
+constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
begin
- result := FTransportControl;
+ inherited Create;
+
+ if aConfig <> nil
+ then FConfiguration := aConfig
+ else FConfiguration := TThriftConfigurationImpl.Create;
+
+ ResetConsumedMessageSize;
end;
-procedure TTransportImpl.ConsumeReadBytes( const count : Integer);
+function TEndpointTransportBase.Configuration : IThriftConfiguration;
begin
- if FTransportControl <> nil
- then FTransportControl.ConsumeReadBytes( count);
+ result := FConfiguration;
end;
+function TEndpointTransportBase.MaxMessageSize : Integer;
+begin
+ ASSERT( Configuration <> nil);
+ result := Configuration.MaxMessageSize;
+end;
+
+
+procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
+// Resets RemainingMessageSize to the configured maximum
+begin
+ // full reset
+ if newSize < 0 then begin
+ FKnownMessageSize := MaxMessageSize;
+ FRemainingMessageSize := MaxMessageSize;
+ Exit;
+ end;
+
+ // update only: message size can shrink, but not grow
+ ASSERT( KnownMessageSize <= MaxMessageSize);
+ if newSize > KnownMessageSize
+ then TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+
+ FKnownMessageSize := newSize;
+ FRemainingMessageSize := newSize;
+end;
+
+
+procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
+// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+// Will throw if we already consumed too many bytes.
+var consumed : Int64;
+begin
+ consumed := KnownMessageSize - RemainingMessageSize;
+ ResetConsumedMessageSize(size);
+ CountConsumedMessageBytes(consumed);
+end;
+
+
+procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
+// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
+begin
+ if RemainingMessageSize < numBytes
+ then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+end;
+
+
+procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
+// Consumes numBytes from the RemainingMessageSize.
+begin
+ if (RemainingMessageSize >= numBytes)
+ then Dec( FRemainingMessageSize, numBytes)
+ else begin
+ FRemainingMessageSize := 0;
+ raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
+ end;
+end;
+
+{ TLayeredTransportBase }
+
+constructor TLayeredTransportBase<T>.Create( const aTransport: T);
+begin
+ inherited Create;
+ FTransport := aTransport;
+end;
+
+function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
+begin
+ result := InnerTransport;
+end;
+
+function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
+begin
+ result := InnerTransport.Configuration;
+end;
+
+procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
+begin
+ InnerTransport.UpdateKnownMessageSize( size);
+end;
+
+
+function TLayeredTransportBase<T>.MaxMessageSize : Integer;
+begin
+ result := InnerTransport.MaxMessageSize;
+end;
+
+
+procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
+begin
+ InnerTransport.ResetConsumedMessageSize( knownSize);
+end;
+
+
+procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
+begin
+ InnerTransport.CheckReadBytesAvailable( numBytes);
+end;
+
+
+
{ TTransportException }
constructor TTransportException.HiddenCreate(const Msg: string);
@@ -676,18 +750,33 @@
Result := aTransport;
end;
+
+{ TServerTransportImpl }
+
+constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
+begin
+ inherited Create;
+ if aConfig <> nil
+ then FConfig := aConfig
+ else FConfig := TThriftConfigurationImpl.Create;
+end;
+
+function TServerTransportImpl.Configuration : IThriftConfiguration;
+begin
+ result := FConfig;
+end;
+
{ TServerSocket }
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
- inherited Create;
+ inherited Create( aConfig);
FServer := aServer;
- FTransportControl := aTransportCtl;
- ASSERT( FTransportControl <> nil);
+
{$IFDEF OLD_SOCKETS}
FClientTimeout := aClientTimeout;
@@ -699,17 +788,12 @@
{$IFDEF OLD_SOCKETS}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aTransportCtl : ITransportControl);
+constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
- inherited Create;
-
- if aTransportCtl <> nil
- then FTransportControl := aTransportCtl
- else FTransportControl := TTransportControlImpl.Create;
- ASSERT( FTransportControl <> nil);
+ inherited Create( aConfig);
{$IFDEF OLD_SOCKETS}
FPort := aPort;
@@ -772,7 +856,7 @@
Exit;
end;
- trans := TSocketImpl.Create( client, TRUE, FClientTimeout, TransportControl);
+ trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
client := nil; // trans owns it now
if FUseBufferedSocket
@@ -791,7 +875,7 @@
client := FServer.Accept;
try
- trans := TSocketImpl.Create(client, True, TransportControl);
+ trans := TSocketImpl.Create(client, TRUE, Configuration);
client := nil;
if FUseBufferedSocket then
@@ -840,9 +924,9 @@
{ TSocket }
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
{$ENDIF}
var stream : IThriftStream;
begin
@@ -856,16 +940,17 @@
{$ENDIF}
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
- inherited Create( stream, stream, aTransportCtl);
+ inherited Create( stream, stream, aConfig);
end;
+
{$IFDEF OLD_SOCKETS}
-constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
{$ELSE}
-constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aTransportCtl : ITransportControl);
+constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
- inherited Create(nil,nil, aTransportCtl);
+ inherited Create(nil,nil, aConfig);
FHost := aHost;
FPort := aPort;
FTimeout := aTimeout;
@@ -1043,30 +1128,12 @@
end;
-procedure TBufferedStreamImpl.CheckReadBytesAvailable( const value : Integer);
-var nRequired : Integer;
-begin
- nRequired := value;
-
- if FReadBuffer <> nil then begin
- Dec( nRequired, (FReadBuffer.Position - FReadBuffer.Size));
- if nRequired <= 0 then Exit;
- end;
-
- if FStream <> nil
- then FStream.CheckReadBytesAvailable( nRequired)
- else raise TTransportExceptionEndOfFile.Create('Not enough input data');
-end;
-
-
function TBufferedStreamImpl.ToArray: TBytes;
var len : Integer;
begin
- len := 0;
-
- if IsOpen then begin
- len := FReadBuffer.Size;
- end;
+ if IsOpen
+ then len := FReadBuffer.Size
+ else len := 0;
SetLength( Result, len);
@@ -1092,11 +1159,24 @@
end;
end;
+
+function TBufferedStreamImpl.Size : Int64;
+begin
+ result := FReadBuffer.Size;
+end;
+
+
+function TBufferedStreamImpl.Position : Int64;
+begin
+ result := FReadBuffer.Position;
+end;
+
+
{ TStreamTransportImpl }
-constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aTransportCtl : ITransportControl);
+constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
begin
- inherited Create( aTransportCtl);
+ inherited Create( aConfig);
FInputStream := aInputStream;
FOutputStream := aOutputStream;
end;
@@ -1149,7 +1229,7 @@
then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Result := FInputStream.Read( pBuf,buflen, off, len );
- ConsumeReadBytes( result);
+ CountConsumedMessageBytes( result);
end;
procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
@@ -1160,28 +1240,19 @@
FOutputStream.Write( pBuf, off, len );
end;
-procedure TStreamTransportImpl.CheckReadBytesAvailable( const value : Integer);
-begin
- if FInputStream <> nil
- then FInputStream.CheckReadBytesAvailable( value)
- else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
-end;
-
-
{ TBufferedTransportImpl }
constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
begin
ASSERT( aTransport <> nil);
- inherited Create( aTransport.TransportControl);
- FTransport := aTransport;
+ inherited Create( aTransport);
FBufSize := aBufSize;
InitBuffers;
end;
procedure TBufferedTransportImpl.Close;
begin
- FTransport.Close;
+ InnerTransport.Close;
FInputBuffer := nil;
FOutputBuffer := nil;
end;
@@ -1195,34 +1266,29 @@
function TBufferedTransportImpl.GetIsOpen: Boolean;
begin
- Result := FTransport.IsOpen;
-end;
-
-function TBufferedTransportImpl.GetUnderlyingTransport: ITransport;
-begin
- Result := FTransport;
+ Result := InnerTransport.IsOpen;
end;
procedure TBufferedTransportImpl.InitBuffers;
begin
- if FTransport.InputStream <> nil then begin
- FInputBuffer := TBufferedStreamImpl.Create( FTransport.InputStream, FBufSize );
+ if InnerTransport.InputStream <> nil then begin
+ FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
end;
- if FTransport.OutputStream <> nil then begin
- FOutputBuffer := TBufferedStreamImpl.Create( FTransport.OutputStream, FBufSize );
+ if InnerTransport.OutputStream <> nil then begin
+ FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
end;
end;
procedure TBufferedTransportImpl.Open;
begin
- FTransport.Open;
+ InnerTransport.Open;
InitBuffers; // we need to get the buffers to match FTransport substreams again
end;
function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputBuffer <> nil
- then Result := FInputBuffer.Read( pBuf,buflen, off, len)
+ then Result := FInputBuffer.Read( pBuf,buflen, off, len )
else Result := 0;
end;
@@ -1233,23 +1299,18 @@
end;
end;
-procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Integer);
-var stm2 : IThriftStream2;
- need : Integer;
+procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
+var buffered, need : Int64;
begin
need := value;
// buffered bytes
- if Supports( FInputBuffer, IThriftStream2, stm2) then begin
- Dec( need, stm2.Size - stm2.Position);
- if need <= 0 then Exit;
- end;
-
- if FInputBuffer <> nil
- then FInputBuffer.CheckReadBytesAvailable( need)
- else raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
+ buffered := FInputBuffer.Size - FInputBuffer.Position;
+ if buffered < need
+ then InnerTransport.CheckReadBytesAvailable( need - buffered);
end;
+
{ TBufferedTransportImpl.TFactory }
function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
@@ -1260,53 +1321,33 @@
{ TFramedTransportImpl }
-constructor TFramedTransportImpl.Create( const aTransportCtl : ITransportControl);
-begin
- inherited Create( aTransportCtl);
-
- InitMaxFrameSize;
- InitWriteBuffer;
-end;
-
constructor TFramedTransportImpl.Create( const aTransport: ITransport);
begin
ASSERT( aTransport <> nil);
- inherited Create( aTransport.TransportControl);
+ inherited Create( aTransport);
- InitMaxFrameSize;
InitWriteBuffer;
- FTransport := aTransport;
end;
destructor TFramedTransportImpl.Destroy;
begin
FWriteBuffer.Free;
+ FWriteBuffer := nil;
FReadBuffer.Free;
+ FReadBuffer := nil;
inherited;
end;
-procedure TFramedTransportImpl.InitMaxFrameSize;
-var maxLen : Integer;
-begin
- FMaxFrameSize := DEFAULT_MAX_LENGTH;
-
- // MaxAllowedMessageSize may be smaller, but not larger
- if TransportControl <> nil then begin
- maxLen := TransportControl.MaxAllowedMessageSize - SizeOf(TFramedHeader);
- FMaxFrameSize := Min( FMaxFrameSize, maxLen);
- end;
-end;
-
procedure TFramedTransportImpl.Close;
begin
- FTransport.Close;
+ InnerTransport.Close;
end;
procedure TFramedTransportImpl.Flush;
var
buf : TBytes;
len : Integer;
- data_len : Integer;
+ data_len : Int64;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('not open');
@@ -1318,9 +1359,9 @@
end;
data_len := len - SizeOf(TFramedHeader);
- if (data_len < 0) then begin
- raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: data_len < 0' );
- end;
+ if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
+ then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
+ else UpdateKnownMessageSize( len);
InitWriteBuffer;
@@ -1329,13 +1370,13 @@
buf[2] := Byte($FF and (data_len shr 8));
buf[3] := Byte($FF and data_len);
- FTransport.Write( buf, 0, len );
- FTransport.Flush;
+ InnerTransport.Write( buf, 0, len );
+ InnerTransport.Flush;
end;
function TFramedTransportImpl.GetIsOpen: Boolean;
begin
- Result := FTransport.IsOpen;
+ Result := InnerTransport.IsOpen;
end;
type
@@ -1353,7 +1394,7 @@
procedure TFramedTransportImpl.Open;
begin
- FTransport.Open;
+ InnerTransport.Open;
end;
function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
@@ -1382,7 +1423,7 @@
size : Integer;
buff : TBytes;
begin
- FTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
+ InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
size :=
((i32rd[0] and $FF) shl 24) or
((i32rd[1] and $FF) shl 16) or
@@ -1394,14 +1435,15 @@
raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
end;
- if size > FMaxFrameSize then begin
+ if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
Close();
- raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(FMaxFrameSize)+')');
+ raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
end;
- FTransport.CheckReadBytesAvailable( size);
+ UpdateKnownMessageSize(size + SizeOf(size));
+
SetLength( buff, size );
- FTransport.ReadAll( buff, 0, size );
+ InnerTransport.ReadAll( buff, 0, size );
FreeAndNil( FReadBuffer);
FReadBuffer := TMemoryStream.Create;
@@ -1422,15 +1464,15 @@
end;
-procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Integer);
-var nRemaining : Int64;
+procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
+var buffered, need : Int64;
begin
- if FReadBuffer = nil
- then raise TTransportExceptionEndOfFile.Create('Cannot read from null inputstream');
+ need := value;
- nRemaining := FReadBuffer.Size - FReadBuffer.Position;
- if value > nRemaining
- then raise TTransportExceptionEndOfFile.Create('Not enough input data');
+ // buffered bytes
+ buffered := FReadBuffer.Size - FReadBuffer.Position;
+ if buffered < need
+ then InnerTransport.CheckReadBytesAvailable( need - buffered);
end;
@@ -1470,9 +1512,10 @@
procedure TTcpSocketStreamImpl.Flush;
begin
-
+ // nothing to do
end;
+
function TTcpSocketStreamImpl.IsOpen: Boolean;
begin
{$IFDEF OLD_SOCKETS}
@@ -1557,7 +1600,7 @@
{$IFDEF LINUX}
result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
{$ENDIF}
-
+
if result = SOCKET_ERROR
then wsaError := WSAGetLastError;
@@ -1638,10 +1681,7 @@
TWaitForData.wfd_Timeout : begin
if (FTimeout = 0)
then Exit
- else begin
- raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
-
- end;
+ else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
end;
else
ASSERT( FALSE);
@@ -1764,10 +1804,5 @@
{$ENDIF}
-procedure TTcpSocketStreamImpl.CheckReadBytesAvailable( const value : Integer);
-begin
- // we can't really tell, no further checks possible
-end;
-
end.
diff --git a/lib/delphi/src/Thrift.WinHTTP.pas b/lib/delphi/src/Thrift.WinHTTP.pas
index 6d886fe..d060066 100644
--- a/lib/delphi/src/Thrift.WinHTTP.pas
+++ b/lib/delphi/src/Thrift.WinHTTP.pas
@@ -1224,8 +1224,8 @@
dwIndex)
then begin
dwError := GetLastError;
- ASSERT( dwError = ERROR_WINHTTP_HEADER_NOT_FOUND); // anything else would be an real error
- result := MAXINT; // we don't know
+ if dwError <> ERROR_WINHTTP_HEADER_NOT_FOUND then ASSERT(FALSE); // anything else would be an real error
+ result := MAXINT; // we don't know
end;
end;