Roger Meier | 87e4980 | 2011-04-19 19:47:03 +0000 | [diff] [blame] | 1 | using System; |
| 2 | using ZMQ; |
| 3 | using System.IO; |
| 4 | using Thrift.Transport; |
| 5 | |
| 6 | namespace ZmqClient |
| 7 | { |
| 8 | public class TZmqClient : TTransport |
| 9 | { |
| 10 | Socket _sock; |
| 11 | String _endpoint; |
| 12 | MemoryStream _wbuf = new MemoryStream (); |
| 13 | MemoryStream _rbuf = new MemoryStream (); |
| 14 | |
| 15 | void debug (string msg) |
| 16 | { |
| 17 | //Uncomment to enable debug |
| 18 | // Console.WriteLine (msg); |
| 19 | } |
| 20 | |
| 21 | public TZmqClient (Context ctx, String endpoint, SocketType sockType) |
| 22 | { |
| 23 | _sock = ctx.Socket (sockType); |
| 24 | _endpoint = endpoint; |
| 25 | } |
| 26 | |
| 27 | public override void Open () |
| 28 | { |
| 29 | _sock.Connect (_endpoint); |
| 30 | } |
| 31 | |
| 32 | public override void Close () |
| 33 | { |
| 34 | throw new NotImplementedException (); |
| 35 | } |
| 36 | |
| 37 | public override bool IsOpen { |
| 38 | get { |
| 39 | throw new NotImplementedException (); |
| 40 | } |
| 41 | } |
| 42 | |
| 43 | public override int Read (byte[] buf, int off, int len) |
| 44 | { |
| 45 | debug ("Client_Read"); |
| 46 | if (off != 0 || len != buf.Length) |
| 47 | throw new NotImplementedException (); |
| 48 | |
| 49 | if (_rbuf.Length == 0) { |
| 50 | //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift reponse |
| 51 | debug ("Client_Read Filling buffer.."); |
| 52 | byte[] tmpBuf = _sock.Recv (); |
| 53 | debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length)); |
| 54 | _rbuf.Write (tmpBuf, 0, tmpBuf.Length); |
| 55 | _rbuf.Position = 0; //For reading |
| 56 | } |
| 57 | int ret = _rbuf.Read (buf, 0, len); |
| 58 | if (_rbuf.Length == _rbuf.Position) //Finished reading |
| 59 | _rbuf.SetLength (0); |
| 60 | debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position)); |
| 61 | return ret; |
| 62 | } |
| 63 | |
| 64 | public override void Write (byte[] buf, int off, int len) |
| 65 | { |
| 66 | debug ("Client_Write"); |
| 67 | _wbuf.Write (buf, off, len); |
| 68 | } |
| 69 | |
| 70 | public override void Flush () |
| 71 | { |
| 72 | debug ("Client_Flush"); |
| 73 | _sock.Send (_wbuf.GetBuffer ()); |
| 74 | _wbuf = new MemoryStream (); |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | |