blob: c792882cd5e85fb9f092e32a53f6b0a0bb1547da [file] [log] [blame]
using System;
using ZMQ;
using System.IO;
using Thrift.Transport;
namespace ZmqClient
{
public class TZmqClient : TTransport
{
Socket _sock;
String _endpoint;
MemoryStream _wbuf = new MemoryStream ();
MemoryStream _rbuf = new MemoryStream ();
void debug (string msg)
{
//Uncomment to enable debug
// Console.WriteLine (msg);
}
public TZmqClient (Context ctx, String endpoint, SocketType sockType)
{
_sock = ctx.Socket (sockType);
_endpoint = endpoint;
}
public override void Open ()
{
_sock.Connect (_endpoint);
}
public override void Close ()
{
throw new NotImplementedException ();
}
public override bool IsOpen {
get {
throw new NotImplementedException ();
}
}
public override int Read (byte[] buf, int off, int len)
{
debug ("Client_Read");
if (off != 0 || len != buf.Length)
throw new NotImplementedException ();
if (_rbuf.Length == 0) {
//Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift reponse
debug ("Client_Read Filling buffer..");
byte[] tmpBuf = _sock.Recv ();
debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
_rbuf.Write (tmpBuf, 0, tmpBuf.Length);
_rbuf.Position = 0; //For reading
}
int ret = _rbuf.Read (buf, 0, len);
if (_rbuf.Length == _rbuf.Position) //Finished reading
_rbuf.SetLength (0);
debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
return ret;
}
public override void Write (byte[] buf, int off, int len)
{
debug ("Client_Write");
_wbuf.Write (buf, off, len);
}
public override void Flush ()
{
debug ("Client_Flush");
_sock.Send (_wbuf.GetBuffer ());
_wbuf = new MemoryStream ();
}
}
}