blob: 15c1543ac6d55b0dbd071ba11075330bfce7f931 [file] [log] [blame]
David Reiss9f3296b2010-08-31 16:58:41 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19import logging
20import zmq
21import thrift.server.TServer
22import thrift.transport.TTransport
23
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090024
David Reiss9f3296b2010-08-31 16:58:41 +000025class TZmqServer(thrift.server.TServer.TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090026 def __init__(self, processor, ctx, endpoint, sock_type):
27 thrift.server.TServer.TServer.__init__(self, processor, None)
28 self.zmq_type = sock_type
29 self.socket = ctx.socket(sock_type)
30 self.socket.bind(endpoint)
David Reiss9f3296b2010-08-31 16:58:41 +000031
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090032 def serveOne(self):
33 msg = self.socket.recv()
34 itrans = thrift.transport.TTransport.TMemoryBuffer(msg)
35 otrans = thrift.transport.TTransport.TMemoryBuffer()
36 iprot = self.inputProtocolFactory.getProtocol(itrans)
37 oprot = self.outputProtocolFactory.getProtocol(otrans)
David Reiss9f3296b2010-08-31 16:58:41 +000038
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090039 try:
40 self.processor.process(iprot, oprot)
41 except Exception:
42 logging.exception("Exception while processing request")
43 # Fall through and send back a response, even if empty or incomplete.
David Reiss9f3296b2010-08-31 16:58:41 +000044
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090045 if self.zmq_type == zmq.REP:
46 msg = otrans.getvalue()
47 self.socket.send(msg)
David Reiss9f3296b2010-08-31 16:58:41 +000048
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090049 def serve(self):
50 while True:
51 self.serveOne()
David Reiss9f3296b2010-08-31 16:58:41 +000052
53
54class TZmqMultiServer(object):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090055 def __init__(self):
56 self.servers = []
David Reiss9f3296b2010-08-31 16:58:41 +000057
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090058 def serveOne(self, timeout=-1):
59 self._serveActive(self._setupPoll(), timeout)
David Reiss9f3296b2010-08-31 16:58:41 +000060
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090061 def serveForever(self):
62 poll_info = self._setupPoll()
63 while True:
64 self._serveActive(poll_info, -1)
David Reiss9f3296b2010-08-31 16:58:41 +000065
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090066 def _setupPoll(self):
67 server_map = {}
68 poller = zmq.Poller()
69 for server in self.servers:
70 server_map[server.socket] = server
71 poller.register(server.socket, zmq.POLLIN)
72 return (server_map, poller)
David Reiss9f3296b2010-08-31 16:58:41 +000073
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090074 def _serveActive(self, poll_info, timeout):
75 (server_map, poller) = poll_info
76 ready = dict(poller.poll())
77 for sock, state in ready.items():
78 assert (state & zmq.POLLIN) != 0
79 server_map[sock].serveOne()