THRIFT-5449: Use poll instead of select in Python TNonblockingServer if available
Client: Python
diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py
index ac06496..fdf6779 100644
--- a/lib/py/src/server/TNonblockingServer.py
+++ b/lib/py/src/server/TNonblockingServer.py
@@ -253,6 +253,7 @@
self._read, self._write = socket.socketpair()
self.prepared = False
self._stop = False
+ self.poll = select.poll() if hasattr(select, 'poll') else None
def setNumThreads(self, num):
"""Set the number of worker threads that should be created."""
@@ -318,13 +319,53 @@
else:
return select.select(readable, writable, readable) + (True,)
+ def _poll_select(self):
+ """Does poll on open connections, if available."""
+ remaining = []
+
+ self.poll.register(self.socket.handle.fileno(), select.POLLIN | select.POLLRDNORM)
+ self.poll.register(self._read.fileno(), select.POLLIN | select.POLLRDNORM)
+
+ for i, connection in list(self.clients.items()):
+ if connection.is_readable():
+ self.poll.register(connection.fileno(), select.POLLIN | select.POLLRDNORM | select.POLLERR | select.POLLHUP | select.POLLNVAL)
+ if connection.remaining or connection.received:
+ remaining.append(connection.fileno())
+ if connection.is_writeable():
+ self.poll.register(connection.fileno(), select.POLLOUT | select.POLLWRNORM)
+ if connection.is_closed():
+ try:
+ self.poll.unregister(i)
+ except KeyError:
+ logger.debug("KeyError in unregistering connections...")
+ del self.clients[i]
+ if remaining:
+ return remaining, [], [], False
+
+ rlist = []
+ wlist = []
+ xlist = []
+ pollres = self.poll.poll()
+ for fd, event in pollres:
+ if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
+ xlist.append(fd)
+ elif event & (select.POLLOUT | select.POLLWRNORM):
+ wlist.append(fd)
+ elif event & (select.POLLIN | select.POLLRDNORM):
+ rlist.append(fd)
+ else: # should be impossible
+ logger.debug("reached an impossible state in _poll_select")
+ xlist.append(fd)
+
+ return rlist, wlist, xlist, True
+
def handle(self):
"""Handle requests.
WARNING! You must call prepare() BEFORE calling handle()
"""
assert self.prepared, "You have to call prepare before handle"
- rset, wset, xset, selected = self._select()
+ rset, wset, xset, selected = self._select() if not self.poll else self._poll_select()
for readable in rset:
if readable == self._read.fileno():
# don't care i just need to clean readable flag
@@ -343,6 +384,8 @@
connection.read()
if connection.received:
connection.status = WAIT_PROCESS
+ if self.poll:
+ self.poll.unregister(connection.fileno())
msg = connection.received.popleft()
itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
otransport = TTransport.TMemoryBuffer()
@@ -354,7 +397,6 @@
self.clients[writeable].write()
for oob in xset:
self.clients[oob].close()
- del self.clients[oob]
def close(self):
"""Closes the server."""