THRIFT-3596 Better conformance to PEP8
This closes #832
diff --git a/lib/py/src/server/THttpServer.py b/lib/py/src/server/THttpServer.py
index bf3b0e3..1b501a7 100644
--- a/lib/py/src/server/THttpServer.py
+++ b/lib/py/src/server/THttpServer.py
@@ -24,64 +24,64 @@
class ResponseException(Exception):
- """Allows handlers to override the HTTP response
+ """Allows handlers to override the HTTP response
- Normally, THttpServer always sends a 200 response. If a handler wants
- to override this behavior (e.g., to simulate a misconfigured or
- overloaded web server during testing), it can raise a ResponseException.
- The function passed to the constructor will be called with the
- RequestHandler as its only argument.
- """
- def __init__(self, handler):
- self.handler = handler
+ Normally, THttpServer always sends a 200 response. If a handler wants
+ to override this behavior (e.g., to simulate a misconfigured or
+ overloaded web server during testing), it can raise a ResponseException.
+ The function passed to the constructor will be called with the
+ RequestHandler as its only argument.
+ """
+ def __init__(self, handler):
+ self.handler = handler
class THttpServer(TServer.TServer):
- """A simple HTTP-based Thrift server
+ """A simple HTTP-based Thrift server
- This class is not very performant, but it is useful (for example) for
- acting as a mock version of an Apache-based PHP Thrift endpoint.
- """
- def __init__(self,
- processor,
- server_address,
- inputProtocolFactory,
- outputProtocolFactory=None,
- server_class=BaseHTTPServer.HTTPServer):
- """Set up protocol factories and HTTP server.
-
- See BaseHTTPServer for server_address.
- See TServer for protocol factories.
+ This class is not very performant, but it is useful (for example) for
+ acting as a mock version of an Apache-based PHP Thrift endpoint.
"""
- if outputProtocolFactory is None:
- outputProtocolFactory = inputProtocolFactory
+ def __init__(self,
+ processor,
+ server_address,
+ inputProtocolFactory,
+ outputProtocolFactory=None,
+ server_class=BaseHTTPServer.HTTPServer):
+ """Set up protocol factories and HTTP server.
- TServer.TServer.__init__(self, processor, None, None, None,
- inputProtocolFactory, outputProtocolFactory)
+ See BaseHTTPServer for server_address.
+ See TServer for protocol factories.
+ """
+ if outputProtocolFactory is None:
+ outputProtocolFactory = inputProtocolFactory
- thttpserver = self
+ TServer.TServer.__init__(self, processor, None, None, None,
+ inputProtocolFactory, outputProtocolFactory)
- class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
- def do_POST(self):
- # Don't care about the request path.
- itrans = TTransport.TFileObjectTransport(self.rfile)
- otrans = TTransport.TFileObjectTransport(self.wfile)
- itrans = TTransport.TBufferedTransport(
- itrans, int(self.headers['Content-Length']))
- otrans = TTransport.TMemoryBuffer()
- iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
- oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
- try:
- thttpserver.processor.process(iprot, oprot)
- except ResponseException as exn:
- exn.handler(self)
- else:
- self.send_response(200)
- self.send_header("content-type", "application/x-thrift")
- self.end_headers()
- self.wfile.write(otrans.getvalue())
+ thttpserver = self
- self.httpd = server_class(server_address, RequestHander)
+ class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_POST(self):
+ # Don't care about the request path.
+ itrans = TTransport.TFileObjectTransport(self.rfile)
+ otrans = TTransport.TFileObjectTransport(self.wfile)
+ itrans = TTransport.TBufferedTransport(
+ itrans, int(self.headers['Content-Length']))
+ otrans = TTransport.TMemoryBuffer()
+ iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
+ oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
+ try:
+ thttpserver.processor.process(iprot, oprot)
+ except ResponseException as exn:
+ exn.handler(self)
+ else:
+ self.send_response(200)
+ self.send_header("content-type", "application/x-thrift")
+ self.end_headers()
+ self.wfile.write(otrans.getvalue())
- def serve(self):
- self.httpd.serve_forever()
+ self.httpd = server_class(server_address, RequestHander)
+
+ def serve(self):
+ self.httpd.serve_forever()
diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py
index a930a80..87031c1 100644
--- a/lib/py/src/server/TNonblockingServer.py
+++ b/lib/py/src/server/TNonblockingServer.py
@@ -24,13 +24,12 @@
The thread poool should be sized for concurrent tasks, not
maximum connections
"""
-import threading
-import socket
-import select
-import struct
import logging
-logger = logging.getLogger(__name__)
+import select
+import socket
+import struct
+import threading
from six.moves import queue
@@ -39,6 +38,8 @@
__all__ = ['TNonblockingServer']
+logger = logging.getLogger(__name__)
+
class Worker(threading.Thread):
"""Worker is a small helper to process incoming connection."""
@@ -127,7 +128,7 @@
self.len, = struct.unpack('!i', self.message)
if self.len < 0:
logger.error("negative frame size, it seems client "
- "doesn't use FramedTransport")
+ "doesn't use FramedTransport")
self.close()
elif self.len == 0:
logger.error("empty frame, it's really strange")
@@ -149,7 +150,7 @@
read = self.socket.recv(self.len - len(self.message))
if len(read) == 0:
logger.error("can't read frame from socket (get %d of "
- "%d bytes)" % (len(self.message), self.len))
+ "%d bytes)" % (len(self.message), self.len))
self.close()
return
self.message += read
diff --git a/lib/py/src/server/TProcessPoolServer.py b/lib/py/src/server/TProcessPoolServer.py
index b2c2308..fe6dc81 100644
--- a/lib/py/src/server/TProcessPoolServer.py
+++ b/lib/py/src/server/TProcessPoolServer.py
@@ -19,13 +19,14 @@
import logging
-logger = logging.getLogger(__name__)
-from multiprocessing import Process, Value, Condition, reduction
+from multiprocessing import Process, Value, Condition
from .TServer import TServer
from thrift.transport.TTransport import TTransportException
+logger = logging.getLogger(__name__)
+
class TProcessPoolServer(TServer):
"""Server with a fixed size pool of worker subprocesses to service requests
@@ -59,7 +60,7 @@
try:
client = self.serverTransport.accept()
if not client:
- continue
+ continue
self.serveClient(client)
except (KeyboardInterrupt, SystemExit):
return 0
@@ -76,7 +77,7 @@
try:
while True:
self.processor.process(iprot, oprot)
- except TTransportException as tx:
+ except TTransportException:
pass
except Exception as x:
logger.exception(x)
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
index 30f063b..d5d9c98 100644
--- a/lib/py/src/server/TServer.py
+++ b/lib/py/src/server/TServer.py
@@ -18,262 +18,259 @@
#
from six.moves import queue
-import os
-import sys
-import threading
-import traceback
-
import logging
-logger = logging.getLogger(__name__)
+import os
+import threading
-from thrift.Thrift import TProcessor
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
+logger = logging.getLogger(__name__)
+
class TServer(object):
- """Base interface for a server, which must have a serve() method.
+ """Base interface for a server, which must have a serve() method.
- Three constructors for all servers:
- 1) (processor, serverTransport)
- 2) (processor, serverTransport, transportFactory, protocolFactory)
- 3) (processor, serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory)
- """
- def __init__(self, *args):
- if (len(args) == 2):
- self.__initArgs__(args[0], args[1],
- TTransport.TTransportFactoryBase(),
- TTransport.TTransportFactoryBase(),
- TBinaryProtocol.TBinaryProtocolFactory(),
- TBinaryProtocol.TBinaryProtocolFactory())
- elif (len(args) == 4):
- self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
- elif (len(args) == 6):
- self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
+ Three constructors for all servers:
+ 1) (processor, serverTransport)
+ 2) (processor, serverTransport, transportFactory, protocolFactory)
+ 3) (processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory)
+ """
+ def __init__(self, *args):
+ if (len(args) == 2):
+ self.__initArgs__(args[0], args[1],
+ TTransport.TTransportFactoryBase(),
+ TTransport.TTransportFactoryBase(),
+ TBinaryProtocol.TBinaryProtocolFactory(),
+ TBinaryProtocol.TBinaryProtocolFactory())
+ elif (len(args) == 4):
+ self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
+ elif (len(args) == 6):
+ self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
- def __initArgs__(self, processor, serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory):
- self.processor = processor
- self.serverTransport = serverTransport
- self.inputTransportFactory = inputTransportFactory
- self.outputTransportFactory = outputTransportFactory
- self.inputProtocolFactory = inputProtocolFactory
- self.outputProtocolFactory = outputProtocolFactory
+ def __initArgs__(self, processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory):
+ self.processor = processor
+ self.serverTransport = serverTransport
+ self.inputTransportFactory = inputTransportFactory
+ self.outputTransportFactory = outputTransportFactory
+ self.inputProtocolFactory = inputProtocolFactory
+ self.outputProtocolFactory = outputProtocolFactory
- def serve(self):
- pass
+ def serve(self):
+ pass
class TSimpleServer(TServer):
- """Simple single-threaded server that just pumps around one transport."""
+ """Simple single-threaded server that just pumps around one transport."""
- def __init__(self, *args):
- TServer.__init__(self, *args)
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
- def serve(self):
- self.serverTransport.listen()
- while True:
- client = self.serverTransport.accept()
- if not client:
- continue
- itrans = self.inputTransportFactory.getTransport(client)
- otrans = self.outputTransportFactory.getTransport(client)
- iprot = self.inputProtocolFactory.getProtocol(itrans)
- oprot = self.outputProtocolFactory.getProtocol(otrans)
- try:
+ def serve(self):
+ self.serverTransport.listen()
while True:
- self.processor.process(iprot, oprot)
- except TTransport.TTransportException as tx:
- pass
- except Exception as x:
- logger.exception(x)
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
- itrans.close()
- otrans.close()
+ itrans.close()
+ otrans.close()
class TThreadedServer(TServer):
- """Threaded server that spawns a new thread per each connection."""
+ """Threaded server that spawns a new thread per each connection."""
- def __init__(self, *args, **kwargs):
- TServer.__init__(self, *args)
- self.daemon = kwargs.get("daemon", False)
+ def __init__(self, *args, **kwargs):
+ TServer.__init__(self, *args)
+ self.daemon = kwargs.get("daemon", False)
- def serve(self):
- self.serverTransport.listen()
- while True:
- try:
- client = self.serverTransport.accept()
- if not client:
- continue
- t = threading.Thread(target=self.handle, args=(client,))
- t.setDaemon(self.daemon)
- t.start()
- except KeyboardInterrupt:
- raise
- except Exception as x:
- logger.exception(x)
+ def serve(self):
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ t = threading.Thread(target=self.handle, args=(client,))
+ t.setDaemon(self.daemon)
+ t.start()
+ except KeyboardInterrupt:
+ raise
+ except Exception as x:
+ logger.exception(x)
- def handle(self, client):
- itrans = self.inputTransportFactory.getTransport(client)
- otrans = self.outputTransportFactory.getTransport(client)
- iprot = self.inputProtocolFactory.getProtocol(itrans)
- oprot = self.outputProtocolFactory.getProtocol(otrans)
- try:
- while True:
- self.processor.process(iprot, oprot)
- except TTransport.TTransportException as tx:
- pass
- except Exception as x:
- logger.exception(x)
+ def handle(self, client):
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
- itrans.close()
- otrans.close()
+ itrans.close()
+ otrans.close()
class TThreadPoolServer(TServer):
- """Server with a fixed size pool of threads which service requests."""
+ """Server with a fixed size pool of threads which service requests."""
- def __init__(self, *args, **kwargs):
- TServer.__init__(self, *args)
- self.clients = queue.Queue()
- self.threads = 10
- self.daemon = kwargs.get("daemon", False)
+ def __init__(self, *args, **kwargs):
+ TServer.__init__(self, *args)
+ self.clients = queue.Queue()
+ self.threads = 10
+ self.daemon = kwargs.get("daemon", False)
- def setNumThreads(self, num):
- """Set the number of worker threads that should be created"""
- self.threads = num
+ def setNumThreads(self, num):
+ """Set the number of worker threads that should be created"""
+ self.threads = num
- def serveThread(self):
- """Loop around getting clients from the shared queue and process them."""
- while True:
- try:
- client = self.clients.get()
- self.serveClient(client)
- except Exception as x:
- logger.exception(x)
+ def serveThread(self):
+ """Loop around getting clients from the shared queue and process them."""
+ while True:
+ try:
+ client = self.clients.get()
+ self.serveClient(client)
+ except Exception as x:
+ logger.exception(x)
- def serveClient(self, client):
- """Process input/output from a client for as long as possible"""
- itrans = self.inputTransportFactory.getTransport(client)
- otrans = self.outputTransportFactory.getTransport(client)
- iprot = self.inputProtocolFactory.getProtocol(itrans)
- oprot = self.outputProtocolFactory.getProtocol(otrans)
- try:
- while True:
- self.processor.process(iprot, oprot)
- except TTransport.TTransportException as tx:
- pass
- except Exception as x:
- logger.exception(x)
+ def serveClient(self, client):
+ """Process input/output from a client for as long as possible"""
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as x:
+ logger.exception(x)
- itrans.close()
- otrans.close()
+ itrans.close()
+ otrans.close()
- def serve(self):
- """Start a fixed number of worker threads and put client into a queue"""
- for i in range(self.threads):
- try:
- t = threading.Thread(target=self.serveThread)
- t.setDaemon(self.daemon)
- t.start()
- except Exception as x:
- logger.exception(x)
+ def serve(self):
+ """Start a fixed number of worker threads and put client into a queue"""
+ for i in range(self.threads):
+ try:
+ t = threading.Thread(target=self.serveThread)
+ t.setDaemon(self.daemon)
+ t.start()
+ except Exception as x:
+ logger.exception(x)
- # Pump the socket for clients
- self.serverTransport.listen()
- while True:
- try:
- client = self.serverTransport.accept()
- if not client:
- continue
- self.clients.put(client)
- except Exception as x:
- logger.exception(x)
+ # Pump the socket for clients
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ self.clients.put(client)
+ except Exception as x:
+ logger.exception(x)
class TForkingServer(TServer):
- """A Thrift server that forks a new process for each request
+ """A Thrift server that forks a new process for each request
- This is more scalable than the threaded server as it does not cause
- GIL contention.
+ This is more scalable than the threaded server as it does not cause
+ GIL contention.
- Note that this has different semantics from the threading server.
- Specifically, updates to shared variables will no longer be shared.
- It will also not work on windows.
+ Note that this has different semantics from the threading server.
+ Specifically, updates to shared variables will no longer be shared.
+ It will also not work on windows.
- This code is heavily inspired by SocketServer.ForkingMixIn in the
- Python stdlib.
- """
- def __init__(self, *args):
- TServer.__init__(self, *args)
- self.children = []
+ This code is heavily inspired by SocketServer.ForkingMixIn in the
+ Python stdlib.
+ """
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
+ self.children = []
- def serve(self):
- def try_close(file):
- try:
- file.close()
- except IOError as e:
- logger.warning(e, exc_info=True)
-
- self.serverTransport.listen()
- while True:
- client = self.serverTransport.accept()
- if not client:
- continue
- try:
- pid = os.fork()
-
- if pid: # parent
- # add before collect, otherwise you race w/ waitpid
- self.children.append(pid)
- self.collect_children()
-
- # Parent must close socket or the connection may not get
- # closed promptly
- itrans = self.inputTransportFactory.getTransport(client)
- otrans = self.outputTransportFactory.getTransport(client)
- try_close(itrans)
- try_close(otrans)
- else:
- itrans = self.inputTransportFactory.getTransport(client)
- otrans = self.outputTransportFactory.getTransport(client)
-
- iprot = self.inputProtocolFactory.getProtocol(itrans)
- oprot = self.outputProtocolFactory.getProtocol(otrans)
-
- ecode = 0
- try:
+ def serve(self):
+ def try_close(file):
try:
- while True:
- self.processor.process(iprot, oprot)
+ file.close()
+ except IOError as e:
+ logger.warning(e, exc_info=True)
+
+ self.serverTransport.listen()
+ while True:
+ client = self.serverTransport.accept()
+ if not client:
+ continue
+ try:
+ pid = os.fork()
+
+ if pid: # parent
+ # add before collect, otherwise you race w/ waitpid
+ self.children.append(pid)
+ self.collect_children()
+
+ # Parent must close socket or the connection may not get
+ # closed promptly
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ try_close(itrans)
+ try_close(otrans)
+ else:
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ ecode = 0
+ try:
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException:
+ pass
+ except Exception as e:
+ logger.exception(e)
+ ecode = 1
+ finally:
+ try_close(itrans)
+ try_close(otrans)
+
+ os._exit(ecode)
+
except TTransport.TTransportException:
- pass
- except Exception as e:
- logger.exception(e)
- ecode = 1
- finally:
- try_close(itrans)
- try_close(otrans)
+ pass
+ except Exception as x:
+ logger.exception(x)
- os._exit(ecode)
+ def collect_children(self):
+ while self.children:
+ try:
+ pid, status = os.waitpid(0, os.WNOHANG)
+ except os.error:
+ pid = None
- except TTransport.TTransportException:
- pass
- except Exception as x:
- logger.exception(x)
-
- def collect_children(self):
- while self.children:
- try:
- pid, status = os.waitpid(0, os.WNOHANG)
- except os.error:
- pid = None
-
- if pid:
- self.children.remove(pid)
- else:
- break
+ if pid:
+ self.children.remove(pid)
+ else:
+ break