blob: c83cc8d5d8c4caaa5c4bf2bd38bb53c236f4310a [file] [log] [blame]
David Reiss9f3296b2010-08-31 16:58:41 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19import logging
20import zmq
21import thrift.server.TServer
22import thrift.transport.TTransport
23
24class TZmqServer(thrift.server.TServer.TServer):
25 def __init__(self, processor, ctx, endpoint, sock_type):
26 thrift.server.TServer.TServer.__init__(self, processor, None)
27 self.zmq_type = sock_type
28 self.socket = ctx.socket(sock_type)
29 self.socket.bind(endpoint)
30
31 def serveOne(self):
32 msg = self.socket.recv()
33 itrans = thrift.transport.TTransport.TMemoryBuffer(msg)
34 otrans = thrift.transport.TTransport.TMemoryBuffer()
35 iprot = self.inputProtocolFactory.getProtocol(itrans)
36 oprot = self.outputProtocolFactory.getProtocol(otrans)
37
38 try:
39 self.processor.process(iprot, oprot)
40 except Exception:
41 logging.exception("Exception while processing request")
42 # Fall through and send back a response, even if empty or incomplete.
43
44 if self.zmq_type == zmq.REP:
45 msg = otrans.getvalue()
46 self.socket.send(msg)
47
48 def serve(self):
49 while True:
50 self.serveOne()
51
52
53class TZmqMultiServer(object):
54 def __init__(self):
55 self.servers = []
56
57 def serveOne(self, timeout = -1):
58 self._serveActive(self._setupPoll(), timeout)
59
60 def serveForever(self):
61 poll_info = self._setupPoll()
62 while True:
63 self._serveActive(poll_info, -1)
64
65 def _setupPoll(self):
66 server_map = {}
67 poller = zmq.Poller()
68 for server in self.servers:
69 server_map[server.socket] = server
70 poller.register(server.socket, zmq.POLLIN)
71 return (server_map, poller)
72
73 def _serveActive(self, poll_info, timeout):
74 (server_map, poller) = poll_info
75 ready = dict(poller.poll())
76 for sock, state in ready.items():
77 assert (state & zmq.POLLIN) != 0
78 server_map[sock].serveOne()