blob: e9ab5166a57c1eadbd34ce0cae5bed33ee2a6015 [file] [log] [blame]
Roger Meier87e49802011-04-19 19:47:03 +00001using System;
2using ZMQ;
3using System.IO;
4using Thrift.Transport;
5
6namespace ZmqClient
7{
8 public class TZmqClient : TTransport
9 {
10 Socket _sock;
11 String _endpoint;
12 MemoryStream _wbuf = new MemoryStream ();
13 MemoryStream _rbuf = new MemoryStream ();
14
15 void debug (string msg)
16 {
17 //Uncomment to enable debug
18// Console.WriteLine (msg);
19 }
20
21 public TZmqClient (Context ctx, String endpoint, SocketType sockType)
22 {
23 _sock = ctx.Socket (sockType);
24 _endpoint = endpoint;
25 }
26
27 public override void Open ()
28 {
29 _sock.Connect (_endpoint);
30 }
31
32 public override void Close ()
33 {
34 throw new NotImplementedException ();
35 }
36
37 public override bool IsOpen {
38 get {
39 throw new NotImplementedException ();
40 }
41 }
42
43 public override int Read (byte[] buf, int off, int len)
44 {
45 debug ("Client_Read");
46 if (off != 0 || len != buf.Length)
47 throw new NotImplementedException ();
48
49 if (_rbuf.Length == 0) {
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +010050 //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response
Roger Meier87e49802011-04-19 19:47:03 +000051 debug ("Client_Read Filling buffer..");
52 byte[] tmpBuf = _sock.Recv ();
53 debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
54 _rbuf.Write (tmpBuf, 0, tmpBuf.Length);
55 _rbuf.Position = 0; //For reading
56 }
57 int ret = _rbuf.Read (buf, 0, len);
58 if (_rbuf.Length == _rbuf.Position) //Finished reading
59 _rbuf.SetLength (0);
60 debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
61 return ret;
62 }
63
64 public override void Write (byte[] buf, int off, int len)
65 {
66 debug ("Client_Write");
67 _wbuf.Write (buf, off, len);
68 }
69
70 public override void Flush ()
71 {
72 debug ("Client_Flush");
73 _sock.Send (_wbuf.GetBuffer ());
74 _wbuf = new MemoryStream ();
75 }
76 }
77}
78