THRIFT-812. contrib: Add a demo of using Thrift over ZeroMQ

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@991260 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/contrib/zeromq/Makefile b/contrib/zeromq/Makefile
new file mode 100644
index 0000000..a1d7156
--- /dev/null
+++ b/contrib/zeromq/Makefile
@@ -0,0 +1,37 @@
+THRIFT = thrift
+CXXFLAGS = `pkg-config --cflags libzmq thrift`
+LDLIBS = `pkg-config --libs libzmq thrift`
+
+CXXFLAGS += -g3 -O0
+
+GENNAMES = Storage storage_types
+GENHEADERS = $(addsuffix .h, $(GENNAMES))
+GENSRCS = $(addsuffix .cpp, $(GENNAMES))
+GENOBJS = $(addsuffix .o, $(GENNAMES))
+
+PYLIBS = storage/__init__.py
+
+PROGS =  test-client test-server test-sender test-receiver
+
+all: $(PYLIBS) $(PROGS)
+
+test-client: test-client.o TZmqClient.o $(GENOBJS)
+test-server: test-server.o TZmqServer.o $(GENOBJS)
+test-sender: test-sender.o TZmqClient.o $(GENOBJS)
+test-receiver: test-receiver.o TZmqServer.o $(GENOBJS)
+
+test-client.o test-server.o test-sender.o test-receiver.o: $(GENSRCS)
+
+storage/__init__.py: storage.thrift
+	$(RM) $(dir $@)
+	$(THRIFT) --gen py:newstyle $<
+	mv gen-py/$(dir $@) .
+
+$(GENSRCS): storage.thrift
+	$(THRIFT) --gen cpp $<
+	mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) .
+
+clean:
+	$(RM) -r *.o $(PROGS) storage $(GENSRCS) $(GENHEADERS)
+
+.PHONY: clean
diff --git a/contrib/zeromq/README b/contrib/zeromq/README
new file mode 100644
index 0000000..9e0b5bd
--- /dev/null
+++ b/contrib/zeromq/README
@@ -0,0 +1,30 @@
+This directory contains some glue code to allow Thrift RPCs to be sent over
+ZeroMQ.  Included are client and server implementations for Python and C++,
+along with a simple demo interface (with a working client and server for
+each language).
+
+Thrift was designed for stream-based interfaces like TCP, but ZeroMQ is
+message-based, so there is a small impedance mismatch.  Most of issues are
+hidden from developers, but one cannot be: oneway methods have to be handled
+differently from normal ones.  ZeroMQ requires the messaging pattern to be
+declared at socket creation time, so an application cannot decide on a
+message-by-message basis whether to send a reply.  Therefore, this
+implementation makes it the client's responsibility to ensure that ZMQ_REQ
+sockets are used for normal methods and ZMQ_DOWNSTREAM sockets are used for
+oneway methods.  In addition, services that expose both types of methods
+have to expose two servers (on two ports), but the TZmqMultiServer makes it
+easy to run the two together in the same thread.
+
+This code was tested with ZeroMQ 2.0.7 and pyzmq afabbb5b9bd3.
+
+To build, simply install Thrift and ZeroMQ, then run "make".  If you install
+in a non-standard location, make sure to set THRIFT to the location of the
+Thrift code generator on the make command line and PKG_CONFIG_PATH to a path
+that includes the pkgconfig files for both Thrift and ZeroMQ.  The test
+servers take no arguments.  Run the test clients with no arguments to
+retrieve the stored value or with an integer argument to increment it by
+that amount.
+
+This code is not quite what I would consider production-ready.  It doesn't
+support all of the normal hooks into Thrift, and its performance is
+sub-optimal because it does some unnecessary copying.
diff --git a/contrib/zeromq/TZmqClient.cpp b/contrib/zeromq/TZmqClient.cpp
new file mode 100644
index 0000000..133204e
--- /dev/null
+++ b/contrib/zeromq/TZmqClient.cpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TZmqClient.h"
+#include <cstring>
+
+namespace apache { namespace thrift { namespace transport {
+
+uint32_t TZmqClient::read(uint8_t* buf, uint32_t len) {
+  if (rbuf_.available_read() == 0) {
+    (void)sock_.recv(&msg_);
+    rbuf_.resetBuffer((uint8_t*)msg_.data(), msg_.size());
+  }
+  return rbuf_.read(buf, len);
+}
+
+void TZmqClient::write(const uint8_t* buf, uint32_t len) {
+  return wbuf_.write(buf, len);
+}
+
+void TZmqClient::writeEnd() {
+  uint8_t* buf;
+  uint32_t size;
+  wbuf_.getBuffer(&buf, &size);
+  zmq::message_t msg(size);
+  std::memcpy(msg.data(), buf, size);
+  (void)sock_.send(msg);
+  wbuf_.resetBuffer(true);
+}
+
+}}} // apache::thrift::transport
diff --git a/contrib/zeromq/TZmqClient.h b/contrib/zeromq/TZmqClient.h
new file mode 100644
index 0000000..9544503
--- /dev/null
+++ b/contrib/zeromq/TZmqClient.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_
+#define _THRIFT_TRANSPORT_TZMQCLIENT_H_ 1
+
+#include <zmq.hpp>
+#include <transport/TBufferTransports.h>
+
+namespace apache { namespace thrift { namespace transport {
+
+class TZmqClient : public TTransport {
+ public:
+  TZmqClient(zmq::context_t& ctx, const std::string& endpoint, int type)
+    : sock_(ctx, type)
+    , endpoint_(endpoint)
+    , wbuf_()
+    , rbuf_()
+    , msg_()
+    , zmq_type_(type)
+  {}
+
+  void open() {
+    if(zmq_type_ == ZMQ_PUB) {
+      sock_.bind(endpoint_.c_str());
+    }
+    else {
+      sock_.connect(endpoint_.c_str());
+    }
+  }
+
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void writeEnd();
+
+ protected:
+  std::string endpoint_;
+  zmq::socket_t sock_;
+  TMemoryBuffer wbuf_;
+  TMemoryBuffer rbuf_;
+  zmq::message_t msg_;
+  int zmq_type_;
+};
+
+}}} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_
diff --git a/contrib/zeromq/TZmqClient.py b/contrib/zeromq/TZmqClient.py
new file mode 100644
index 0000000..d560697
--- /dev/null
+++ b/contrib/zeromq/TZmqClient.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import zmq
+from cStringIO import StringIO
+from thrift.transport.TTransport import TTransportBase, CReadableTransport
+
+class TZmqClient(TTransportBase, CReadableTransport):
+  def __init__(self, ctx, endpoint, sock_type):
+    self._sock = ctx.socket(sock_type)
+    self._endpoint = endpoint
+    self._wbuf = StringIO()
+    self._rbuf = StringIO()
+
+  def open(self):
+    self._sock.connect(self._endpoint)
+
+  def read(self, size):
+    ret = self._rbuf.read(size)
+    if len(ret) != 0:
+      return ret
+    self._read_message()
+    return self._rbuf.read(size)
+
+  def _read_message(self):
+    msg = self._sock.recv()
+    self._rbuf = StringIO(msg)
+
+  def write(self, buf):
+    self._wbuf.write(buf)
+
+  def flush(self):
+    msg = self._wbuf.getvalue()
+    self._wbuf = StringIO()
+    self._sock.send(msg)
+
+  # Implement the CReadableTransport interface.
+  @property
+  def cstringio_buf(self):
+    return self._rbuf
+
+  # NOTE: This will probably not actually work.
+  def cstringio_refill(self, prefix, reqlen):
+    while len(prefix) < reqlen:
+      self.read_message()
+      prefix += self._rbuf.getvalue()
+    self._rbuf = StringIO(prefix)
+    return self._rbuf
diff --git a/contrib/zeromq/TZmqServer.cpp b/contrib/zeromq/TZmqServer.cpp
new file mode 100644
index 0000000..c6142d7
--- /dev/null
+++ b/contrib/zeromq/TZmqServer.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TZmqServer.h"
+#include <transport/TBufferTransports.h>
+#include <boost/scoped_ptr.hpp>
+
+using boost::shared_ptr;
+using apache::thrift::transport::TMemoryBuffer;
+using apache::thrift::protocol::TProtocol;
+
+namespace apache { namespace thrift { namespace server {
+
+
+bool TZmqServer::serveOne(int recv_flags) {
+  zmq::message_t msg;
+  bool received = sock_.recv(&msg, recv_flags);
+  if (!received) {
+    return false;
+  }
+  shared_ptr<TMemoryBuffer> inputTransport(new TMemoryBuffer((uint8_t*)msg.data(), msg.size()));
+  shared_ptr<TMemoryBuffer> outputTransport(new TMemoryBuffer());
+  shared_ptr<TProtocol> inputProtocol(
+      inputProtocolFactory_->getProtocol(inputTransport));
+  shared_ptr<TProtocol> outputProtocol(
+      outputProtocolFactory_->getProtocol(outputTransport));
+
+  processor_->process(inputProtocol, outputProtocol);
+
+  if (zmq_type_ == ZMQ_REP) {
+    uint8_t* buf;
+    uint32_t size;
+    outputTransport->getBuffer(&buf, &size);
+    msg.rebuild(size);
+    std::memcpy(msg.data(), buf, size);
+    (void)sock_.send(msg);
+  }
+
+  return true;
+}
+
+
+void TZmqMultiServer::serveOne(long timeout) {
+  boost::scoped_ptr<zmq::pollitem_t> items(setupPoll());
+  serveActive(items.get(), timeout);
+}
+
+
+void TZmqMultiServer::serveForever() {
+  boost::scoped_ptr<zmq::pollitem_t> items(setupPoll());
+  while (true) {
+    serveActive(items.get(), -1);
+  }
+}
+
+
+zmq::pollitem_t* TZmqMultiServer::setupPoll() {
+  zmq::pollitem_t* items = new zmq::pollitem_t[servers_.size()];
+  for (int i = 0; i < servers_.size(); ++i) {
+    items[i].socket = servers_[i]->getSocket();
+    items[i].events = ZMQ_POLLIN;
+  }
+  return items;
+}
+
+void TZmqMultiServer::serveActive(zmq::pollitem_t* items, long timeout) {
+  int rc = zmq::poll(items, servers_.size(), timeout);
+  if (rc == 0) {
+    return;
+  }
+  for (int i = 0; i < servers_.size(); ++i) {
+    if ((items[i].revents & ZMQ_POLLIN) != 0) {
+      // Should we pass ZMQ_NOBLOCK here to be safe?
+      servers_[i]->serveOne();
+    }
+  }
+}
+
+
+}}} // apache::thrift::server
diff --git a/contrib/zeromq/TZmqServer.h b/contrib/zeromq/TZmqServer.h
new file mode 100644
index 0000000..1603eac
--- /dev/null
+++ b/contrib/zeromq/TZmqServer.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TZMQSERVER_H_
+#define _THRIFT_SERVER_TZMQSERVER_H_ 1
+
+#include <zmq.hpp>
+#include <server/TServer.h>
+
+namespace apache { namespace thrift { namespace server {
+
+class TZmqServer : public TServer {
+ public:
+  TZmqServer(
+      boost::shared_ptr<TProcessor> processor,
+      zmq::context_t& ctx, const std::string& endpoint, int type)
+    : TServer(processor)
+    , zmq_type_(type)
+    , sock_(ctx, type)
+  {
+    if(zmq_type_ == ZMQ_SUB) {
+      sock_.setsockopt(ZMQ_SUBSCRIBE, "", 0) ; // listen to all messages
+      sock_.connect(endpoint.c_str()) ;
+    }
+    else {
+      sock_.bind(endpoint.c_str());
+    }
+  }
+
+  bool serveOne(int recv_flags = 0);
+  void serve() {
+    while (true) {
+      serveOne();
+    }
+  }
+
+  zmq::socket_t& getSocket() {
+    return sock_;
+  }
+
+ private:
+  int zmq_type_;
+  zmq::socket_t sock_;
+};
+
+
+class TZmqMultiServer {
+ public:
+  void serveOne(long timeout = -1);
+  void serveForever();
+
+  std::vector<TZmqServer*>& servers() {
+    return servers_;
+  }
+
+ private:
+  zmq::pollitem_t* setupPoll();
+  void serveActive(zmq::pollitem_t* items, long timeout);
+  std::vector<TZmqServer*> servers_;
+};
+
+
+}}} // apache::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TZMQSERVER_H_
diff --git a/contrib/zeromq/TZmqServer.py b/contrib/zeromq/TZmqServer.py
new file mode 100644
index 0000000..c83cc8d
--- /dev/null
+++ b/contrib/zeromq/TZmqServer.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import logging
+import zmq
+import thrift.server.TServer
+import thrift.transport.TTransport
+
+class TZmqServer(thrift.server.TServer.TServer):
+  def __init__(self, processor, ctx, endpoint, sock_type):
+    thrift.server.TServer.TServer.__init__(self, processor, None)
+    self.zmq_type = sock_type
+    self.socket = ctx.socket(sock_type)
+    self.socket.bind(endpoint)
+
+  def serveOne(self):
+    msg = self.socket.recv()
+    itrans = thrift.transport.TTransport.TMemoryBuffer(msg)
+    otrans = thrift.transport.TTransport.TMemoryBuffer()
+    iprot = self.inputProtocolFactory.getProtocol(itrans)
+    oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+    try:
+      self.processor.process(iprot, oprot)
+    except Exception:
+      logging.exception("Exception while processing request")
+      # Fall through and send back a response, even if empty or incomplete.
+
+    if self.zmq_type == zmq.REP:
+      msg = otrans.getvalue()
+      self.socket.send(msg)
+
+  def serve(self):
+    while True:
+      self.serveOne()
+
+
+class TZmqMultiServer(object):
+  def __init__(self):
+    self.servers = []
+
+  def serveOne(self, timeout = -1):
+    self._serveActive(self._setupPoll(), timeout)
+
+  def serveForever(self):
+    poll_info = self._setupPoll()
+    while True:
+      self._serveActive(poll_info, -1)
+
+  def _setupPoll(self):
+    server_map = {}
+    poller = zmq.Poller()
+    for server in self.servers:
+      server_map[server.socket] = server
+      poller.register(server.socket, zmq.POLLIN)
+    return (server_map, poller)
+
+  def _serveActive(self, poll_info, timeout):
+    (server_map, poller) = poll_info
+    ready = dict(poller.poll())
+    for sock, state in ready.items():
+      assert (state & zmq.POLLIN) != 0
+      server_map[sock].serveOne()
diff --git a/contrib/zeromq/storage.thrift b/contrib/zeromq/storage.thrift
new file mode 100644
index 0000000..a1ea967
--- /dev/null
+++ b/contrib/zeromq/storage.thrift
@@ -0,0 +1,4 @@
+service Storage {
+  oneway void incr(1: i32 amount);
+  i32 get();
+}
diff --git a/contrib/zeromq/test-client.cpp b/contrib/zeromq/test-client.cpp
new file mode 100644
index 0000000..64e20f6
--- /dev/null
+++ b/contrib/zeromq/test-client.cpp
@@ -0,0 +1,40 @@
+#include <iostream>
+#include <cstdlib>
+#include <protocol/TBinaryProtocol.h>
+
+#include "zmq.hpp"
+#include "TZmqClient.h"
+#include "Storage.h"
+
+using boost::shared_ptr;
+using apache::thrift::transport::TZmqClient;
+using apache::thrift::protocol::TBinaryProtocol;
+
+int main(int argc, char** argv) {
+  const char* endpoint = "tcp://127.0.0.1:9090";
+  int socktype = ZMQ_REQ;
+  int incr = 0;
+  if (argc > 1) {
+    incr = atoi(argv[1]);
+    if (incr) {
+      socktype = ZMQ_DOWNSTREAM;
+      endpoint = "tcp://127.0.0.1:9091";
+    }
+  }
+
+  zmq::context_t ctx(1);
+  shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype));
+  shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+  StorageClient client(protocol);
+  transport->open();
+
+  if (incr) {
+    client.incr(incr);
+    usleep(50000);
+  } else {
+    int value = client.get();
+    std::cout << value << std::endl;
+  }
+
+  return 0;
+}
diff --git a/contrib/zeromq/test-client.py b/contrib/zeromq/test-client.py
new file mode 100755
index 0000000..1886d9c
--- /dev/null
+++ b/contrib/zeromq/test-client.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+import sys
+import time
+import zmq
+import TZmqClient
+import thrift.protocol.TBinaryProtocol
+import storage.ttypes
+import storage.Storage
+
+
+def main(args):
+  endpoint = "tcp://127.0.0.1:9090"
+  socktype = zmq.REQ
+  incr = 0
+  if len(args) > 1:
+    incr = int(args[1])
+    if incr:
+      socktype = zmq.DOWNSTREAM
+      endpoint = "tcp://127.0.0.1:9091"
+
+  ctx = zmq.Context()
+  transport = TZmqClient.TZmqClient(ctx, endpoint, socktype)
+  protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocolAccelerated(transport)
+  client = storage.Storage.Client(protocol)
+  transport.open()
+
+  if incr:
+    client.incr(incr)
+    time.sleep(0.05)
+  else:
+    value = client.get()
+    print value
+
+
+if __name__ == "__main__":
+  main(sys.argv)
diff --git a/contrib/zeromq/test-receiver.cpp b/contrib/zeromq/test-receiver.cpp
new file mode 100644
index 0000000..8fe69da
--- /dev/null
+++ b/contrib/zeromq/test-receiver.cpp
@@ -0,0 +1,40 @@
+#include "zmq.hpp"
+#include "TZmqServer.h"
+#include "Storage.h"
+
+using boost::shared_ptr;
+using apache::thrift::TProcessor;
+using apache::thrift::server::TZmqServer;
+using apache::thrift::server::TZmqMultiServer;
+
+class StorageHandler : virtual public StorageIf {
+ public:
+  StorageHandler()
+    : value_(0)
+  {}
+
+  void incr(const int32_t amount) {
+    value_ += amount;
+    printf("value_: %i\n", value_) ;
+  }
+
+  int32_t get() {
+    return value_;
+  }
+
+ private:
+  int32_t value_;
+
+};
+
+
+int main(int argc, char *argv[]) {
+  shared_ptr<StorageHandler> handler(new StorageHandler());
+  shared_ptr<TProcessor> processor(new StorageProcessor(handler));
+
+  zmq::context_t ctx(1);
+  TZmqServer oneway_server(processor, ctx, "epgm://eth0;239.192.1.1:5555", ZMQ_SUB);
+  oneway_server.serve();
+
+  return 0;
+}
diff --git a/contrib/zeromq/test-sender.cpp b/contrib/zeromq/test-sender.cpp
new file mode 100644
index 0000000..ca05709
--- /dev/null
+++ b/contrib/zeromq/test-sender.cpp
@@ -0,0 +1,32 @@
+#include <iostream>
+#include <cstdlib>
+#include <protocol/TBinaryProtocol.h>
+
+#include "zmq.hpp"
+#include "TZmqClient.h"
+#include "Storage.h"
+
+using boost::shared_ptr;
+using apache::thrift::transport::TZmqClient;
+using apache::thrift::protocol::TBinaryProtocol;
+
+int main(int argc, char** argv) {
+  const char* endpoint = "epgm://eth0;239.192.1.1:5555";
+  int socktype = ZMQ_PUB;
+  int incr = 1;
+  if (argc > 1) {
+    incr = atoi(argv[1]);
+  }
+
+  zmq::context_t ctx(1);
+  shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype));
+  shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+  StorageClient client(protocol);
+
+  transport->open();
+
+  client.incr(incr);
+  usleep(50000);
+
+  return 0;
+}
diff --git a/contrib/zeromq/test-server.cpp b/contrib/zeromq/test-server.cpp
new file mode 100644
index 0000000..c624b0d
--- /dev/null
+++ b/contrib/zeromq/test-server.cpp
@@ -0,0 +1,43 @@
+#include "zmq.hpp"
+#include "TZmqServer.h"
+#include "Storage.h"
+
+using boost::shared_ptr;
+using apache::thrift::TProcessor;
+using apache::thrift::server::TZmqServer;
+using apache::thrift::server::TZmqMultiServer;
+
+class StorageHandler : virtual public StorageIf {
+ public:
+  StorageHandler()
+    : value_(0)
+  {}
+
+  void incr(const int32_t amount) {
+    value_ += amount;
+  }
+
+  int32_t get() {
+    return value_;
+  }
+
+ private:
+  int32_t value_;
+
+};
+
+
+int main(int argc, char *argv[]) {
+  shared_ptr<StorageHandler> handler(new StorageHandler());
+  shared_ptr<TProcessor> processor(new StorageProcessor(handler));
+
+  zmq::context_t ctx(1);
+  TZmqServer reqrep_server(processor, ctx, "tcp://0.0.0.0:9090", ZMQ_REP);
+  TZmqServer oneway_server(processor, ctx, "tcp://0.0.0.0:9091", ZMQ_UPSTREAM);
+  TZmqMultiServer multiserver;
+  multiserver.servers().push_back(&reqrep_server);
+  multiserver.servers().push_back(&oneway_server);
+  multiserver.serveForever();
+
+  return 0;
+}
diff --git a/contrib/zeromq/test-server.py b/contrib/zeromq/test-server.py
new file mode 100755
index 0000000..5767b71
--- /dev/null
+++ b/contrib/zeromq/test-server.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+import zmq
+import TZmqServer
+import storage.ttypes
+import storage.Storage
+
+
+class StorageHandler(storage.Storage.Iface):
+  def __init__(self):
+    self.value = 0
+
+  def incr(self, amount):
+    self.value += amount
+
+  def get(self):
+    return self.value
+
+
+def main():
+  handler = StorageHandler()
+  processor = storage.Storage.Processor(handler)
+
+  ctx = zmq.Context()
+  reqrep_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9090", zmq.REP)
+  oneway_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9091", zmq.UPSTREAM)
+  multiserver = TZmqServer.TZmqMultiServer()
+  multiserver.servers.append(reqrep_server)
+  multiserver.servers.append(oneway_server)
+  multiserver.serveForever()
+
+
+if __name__ == "__main__":
+  main()