THRIFT-2911 fix c++ version zeromq transport, the old version cannot work
Client: contrib
Patch: tiny <lox.xiao@gmail.com>
This closes #315
diff --git a/contrib/zeromq/Makefile b/contrib/zeromq/Makefile
index a1d7156..b09f4ee 100644
--- a/contrib/zeromq/Makefile
+++ b/contrib/zeromq/Makefile
@@ -1,6 +1,4 @@
THRIFT = thrift
-CXXFLAGS = `pkg-config --cflags libzmq thrift`
-LDLIBS = `pkg-config --libs libzmq thrift`
CXXFLAGS += -g3 -O0
@@ -16,9 +14,13 @@
all: $(PYLIBS) $(PROGS)
test-client: test-client.o TZmqClient.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
test-server: test-server.o TZmqServer.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
test-sender: test-sender.o TZmqClient.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
test-receiver: test-receiver.o TZmqServer.o $(GENOBJS)
+ $(CXX) $^ -o $@ -lzmq -lthrift
test-client.o test-server.o test-sender.o test-receiver.o: $(GENSRCS)
diff --git a/contrib/zeromq/TZmqClient.cpp b/contrib/zeromq/TZmqClient.cpp
index 133204e..56278f3 100644
--- a/contrib/zeromq/TZmqClient.cpp
+++ b/contrib/zeromq/TZmqClient.cpp
@@ -22,7 +22,7 @@
namespace apache { namespace thrift { namespace transport {
-uint32_t TZmqClient::read(uint8_t* buf, uint32_t len) {
+uint32_t TZmqClient::read_virt(uint8_t* buf, uint32_t len) {
if (rbuf_.available_read() == 0) {
(void)sock_.recv(&msg_);
rbuf_.resetBuffer((uint8_t*)msg_.data(), msg_.size());
@@ -30,11 +30,11 @@
return rbuf_.read(buf, len);
}
-void TZmqClient::write(const uint8_t* buf, uint32_t len) {
+void TZmqClient::write_virt(const uint8_t* buf, uint32_t len) {
return wbuf_.write(buf, len);
}
-void TZmqClient::writeEnd() {
+uint32_t TZmqClient::writeEnd() {
uint8_t* buf;
uint32_t size;
wbuf_.getBuffer(&buf, &size);
@@ -42,6 +42,7 @@
std::memcpy(msg.data(), buf, size);
(void)sock_.send(msg);
wbuf_.resetBuffer(true);
+ return size;
}
}}} // apache::thrift::transport
diff --git a/contrib/zeromq/TZmqClient.h b/contrib/zeromq/TZmqClient.h
index 9fcfc06..df16e03 100644
--- a/contrib/zeromq/TZmqClient.h
+++ b/contrib/zeromq/TZmqClient.h
@@ -45,15 +45,15 @@
}
}
- uint32_t read(uint8_t* buf, uint32_t len);
+ uint32_t read_virt(uint8_t* buf, uint32_t len);
- void write(const uint8_t* buf, uint32_t len);
+ void write_virt(const uint8_t* buf, uint32_t len);
- void writeEnd();
+ uint32_t writeEnd();
protected:
- std::string endpoint_;
zmq::socket_t sock_;
+ std::string endpoint_;
TMemoryBuffer wbuf_;
TMemoryBuffer rbuf_;
zmq::message_t msg_;
diff --git a/contrib/zeromq/TZmqServer.cpp b/contrib/zeromq/TZmqServer.cpp
index f255a66..f031458 100644
--- a/contrib/zeromq/TZmqServer.cpp
+++ b/contrib/zeromq/TZmqServer.cpp
@@ -27,7 +27,6 @@
namespace apache { namespace thrift { namespace server {
-
bool TZmqServer::serveOne(int recv_flags) {
zmq::message_t msg;
bool received = sock_.recv(&msg, recv_flags);
@@ -40,8 +39,9 @@
inputProtocolFactory_->getProtocol(inputTransport));
shared_ptr<TProtocol> outputProtocol(
outputProtocolFactory_->getProtocol(outputTransport));
+ shared_ptr<TMemoryBuffer> transport(new TMemoryBuffer);
- processor_->process(inputProtocol, outputProtocol);
+ processor_->process(inputProtocol, outputProtocol, NULL);
if (zmq_type_ == ZMQ_REP) {
uint8_t* buf;
diff --git a/contrib/zeromq/TZmqServer.h b/contrib/zeromq/TZmqServer.h
index f91c6e8..a840c86 100644
--- a/contrib/zeromq/TZmqServer.h
+++ b/contrib/zeromq/TZmqServer.h
@@ -31,6 +31,7 @@
boost::shared_ptr<TProcessor> processor,
zmq::context_t& ctx, const std::string& endpoint, int type)
: TServer(processor)
+ , processor_(processor)
, zmq_type_(type)
, sock_(ctx, type)
{
@@ -55,6 +56,7 @@
}
private:
+ boost::shared_ptr<TProcessor> processor_;
int zmq_type_;
zmq::socket_t sock_;
};