THRIFT-1480. py: remove tabs, adjust whitespace and address PEP8 warnings
This patch addresses a host of PEP8 lint problems.
Patch: Will Pierce
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1226890 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/server/THttpServer.py b/lib/py/src/server/THttpServer.py
index 3047d9c..be54bab 100644
--- a/lib/py/src/server/THttpServer.py
+++ b/lib/py/src/server/THttpServer.py
@@ -22,6 +22,7 @@
from thrift.server import TServer
from thrift.transport import TTransport
+
class ResponseException(Exception):
"""Allows handlers to override the HTTP response
@@ -39,16 +40,19 @@
"""A simple HTTP-based Thrift server
This class is not very performant, but it is useful (for example) for
- acting as a mock version of an Apache-based PHP Thrift endpoint."""
-
- def __init__(self, processor, server_address,
- inputProtocolFactory, outputProtocolFactory = None,
- server_class = BaseHTTPServer.HTTPServer):
+ acting as a mock version of an Apache-based PHP Thrift endpoint.
+ """
+ def __init__(self,
+ processor,
+ server_address,
+ inputProtocolFactory,
+ outputProtocolFactory=None,
+ server_class=BaseHTTPServer.HTTPServer):
"""Set up protocol factories and HTTP server.
See BaseHTTPServer for server_address.
- See TServer for protocol factories."""
-
+ See TServer for protocol factories.
+ """
if outputProtocolFactory is None:
outputProtocolFactory = inputProtocolFactory
@@ -62,7 +66,8 @@
# Don't care about the request path.
itrans = TTransport.TFileObjectTransport(self.rfile)
otrans = TTransport.TFileObjectTransport(self.wfile)
- itrans = TTransport.TBufferedTransport(itrans, int(self.headers['Content-Length']))
+ itrans = TTransport.TBufferedTransport(
+ itrans, int(self.headers['Content-Length']))
otrans = TTransport.TMemoryBuffer()
iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py
index ea348a0..cd90b4f 100644
--- a/lib/py/src/server/TNonblockingServer.py
+++ b/lib/py/src/server/TNonblockingServer.py
@@ -18,10 +18,11 @@
#
"""Implementation of non-blocking server.
-The main idea of the server is reciving and sending requests
-only from main thread.
+The main idea of the server is to receive and send requests
+only from the main thread.
-It also makes thread pool server in tasks terms, not connections.
+The thread poool should be sized for concurrent tasks, not
+maximum connections
"""
import threading
import socket
@@ -35,8 +36,10 @@
__all__ = ['TNonblockingServer']
+
class Worker(threading.Thread):
"""Worker is a small helper to process incoming connection."""
+
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
@@ -60,8 +63,9 @@
SEND_ANSWER = 3
CLOSED = 4
+
def locked(func):
- "Decorator which locks self.lock."
+ """Decorator which locks self.lock."""
def nested(self, *args, **kwargs):
self.lock.acquire()
try:
@@ -70,8 +74,9 @@
self.lock.release()
return nested
+
def socket_exception(func):
- "Decorator close object on socket.error."
+ """Decorator close object on socket.error."""
def read(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
@@ -79,16 +84,17 @@
self.close()
return read
+
class Connection:
"""Basic class is represented connection.
-
+
It can be in state:
WAIT_LEN --- connection is reading request len.
WAIT_MESSAGE --- connection is reading request.
- WAIT_PROCESS --- connection has just read whole request and
- waits for call ready routine.
+ WAIT_PROCESS --- connection has just read whole request and
+ waits for call ready routine.
SEND_ANSWER --- connection is sending answer string (including length
- of answer).
+ of answer).
CLOSED --- socket was closed and connection should be deleted.
"""
def __init__(self, new_socket, wake_up):
@@ -102,13 +108,13 @@
def _read_len(self):
"""Reads length of request.
-
- It's really paranoic routine and it may be replaced by
- self.socket.recv(4)."""
+
+ It's a safer alternative to self.socket.recv(4)
+ """
read = self.socket.recv(4 - len(self.message))
if len(read) == 0:
- # if we read 0 bytes and self.message is empty, it means client close
- # connection
+ # if we read 0 bytes and self.message is empty, then
+ # the client closed the connection
if len(self.message) != 0:
logging.error("can't read frame size from socket")
self.close()
@@ -117,8 +123,8 @@
if len(self.message) == 4:
self.len, = struct.unpack('!i', self.message)
if self.len < 0:
- logging.error("negative frame size, it seems client"\
- " doesn't use FramedTransport")
+ logging.error("negative frame size, it seems client "
+ "doesn't use FramedTransport")
self.close()
elif self.len == 0:
logging.error("empty frame, it's really strange")
@@ -139,8 +145,8 @@
elif self.status == WAIT_MESSAGE:
read = self.socket.recv(self.len - len(self.message))
if len(read) == 0:
- logging.error("can't read frame from socket (get %d of %d bytes)" %
- (len(self.message), self.len))
+ logging.error("can't read frame from socket (get %d of "
+ "%d bytes)" % (len(self.message), self.len))
self.close()
return
self.message += read
@@ -162,14 +168,14 @@
@locked
def ready(self, all_ok, message):
"""Callback function for switching state and waking up main thread.
-
+
This function is the only function witch can be called asynchronous.
-
+
The ready can switch Connection to three states:
WAIT_LEN if request was oneway.
SEND_ANSWER if request was processed in normal way.
CLOSED if request throws unexpected exception.
-
+
The one wakes up main thread.
"""
assert self.status == WAIT_PROCESS
@@ -189,33 +195,39 @@
@locked
def is_writeable(self):
- "Returns True if connection should be added to write list of select."
+ """Return True if connection should be added to write list of select"""
return self.status == SEND_ANSWER
# it's not necessary, but...
@locked
def is_readable(self):
- "Returns True if connection should be added to read list of select."
+ """Return True if connection should be added to read list of select"""
return self.status in (WAIT_LEN, WAIT_MESSAGE)
@locked
def is_closed(self):
- "Returns True if connection is closed."
+ """Returns True if connection is closed."""
return self.status == CLOSED
def fileno(self):
- "Returns the file descriptor of the associated socket."
+ """Returns the file descriptor of the associated socket."""
return self.socket.fileno()
def close(self):
- "Closes connection"
+ """Closes connection"""
self.status = CLOSED
self.socket.close()
+
class TNonblockingServer:
"""Non-blocking server."""
- def __init__(self, processor, lsocket, inputProtocolFactory=None,
- outputProtocolFactory=None, threads=10):
+
+ def __init__(self,
+ processor,
+ lsocket,
+ inputProtocolFactory=None,
+ outputProtocolFactory=None,
+ threads=10):
self.processor = processor
self.socket = lsocket
self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
@@ -229,7 +241,7 @@
def setNumThreads(self, num):
"""Set the number of worker threads that should be created."""
# implement ThreadPool interface
- assert not self.prepared, "You can't change number of threads for working server"
+ assert not self.prepared, "Can't change number of threads after start"
self.threads = num
def prepare(self):
@@ -243,14 +255,15 @@
def wake_up(self):
"""Wake up main thread.
-
+
The server usualy waits in select call in we should terminate one.
The simplest way is using socketpair.
-
+
Select always wait to read from the first socket of socketpair.
-
+
In this case, we can just write anything to the second socket from
- socketpair."""
+ socketpair.
+ """
self._write.send('1')
def _select(self):
@@ -265,21 +278,22 @@
if connection.is_closed():
del self.clients[i]
return select.select(readable, writable, readable)
-
+
def handle(self):
"""Handle requests.
-
- WARNING! You must call prepare BEFORE calling handle.
+
+ WARNING! You must call prepare() BEFORE calling handle()
"""
assert self.prepared, "You have to call prepare before handle"
rset, wset, xset = self._select()
for readable in rset:
if readable == self._read.fileno():
# don't care i just need to clean readable flag
- self._read.recv(1024)
+ self._read.recv(1024)
elif readable == self.socket.handle.fileno():
client = self.socket.accept().handle
- self.clients[client.fileno()] = Connection(client, self.wake_up)
+ self.clients[client.fileno()] = Connection(client,
+ self.wake_up)
else:
connection = self.clients[readable]
connection.read()
@@ -288,7 +302,7 @@
otransport = TTransport.TMemoryBuffer()
iprot = self.in_protocol.getProtocol(itransport)
oprot = self.out_protocol.getProtocol(otransport)
- self.tasks.put([self.processor, iprot, oprot,
+ self.tasks.put([self.processor, iprot, oprot,
otransport, connection.ready])
for writeable in wset:
self.clients[writeable].write()
@@ -302,7 +316,7 @@
self.tasks.put([None, None, None, None, None])
self.socket.close()
self.prepared = False
-
+
def serve(self):
"""Serve forever."""
self.prepare()
diff --git a/lib/py/src/server/TProcessPoolServer.py b/lib/py/src/server/TProcessPoolServer.py
index 7ed814a..7a695a8 100644
--- a/lib/py/src/server/TProcessPoolServer.py
+++ b/lib/py/src/server/TProcessPoolServer.py
@@ -24,15 +24,14 @@
from TServer import TServer
from thrift.transport.TTransport import TTransportException
-class TProcessPoolServer(TServer):
- """
- Server with a fixed size pool of worker subprocesses which service requests.
+class TProcessPoolServer(TServer):
+ """Server with a fixed size pool of worker subprocesses to service requests
+
Note that if you need shared state between the handlers - it's up to you!
Written by Dvir Volk, doat.com
"""
-
- def __init__(self, * args):
+ def __init__(self, *args):
TServer.__init__(self, *args)
self.numWorkers = 10
self.workers = []
@@ -50,12 +49,11 @@
self.numWorkers = num
def workerProcess(self):
- """Loop around getting clients from the shared queue and process them."""
-
+ """Loop getting clients from the shared queue and process them"""
if self.postForkCallback:
self.postForkCallback()
- while self.isRunning.value == True:
+ while self.isRunning.value:
try:
client = self.serverTransport.accept()
self.serveClient(client)
@@ -82,17 +80,15 @@
itrans.close()
otrans.close()
-
def serve(self):
- """Start a fixed number of worker threads and put client into a queue"""
-
- #this is a shared state that can tell the workers to exit when set as false
+ """Start workers and put into queue"""
+ # this is a shared state that can tell the workers to exit when False
self.isRunning.value = True
- #first bind and listen to the port
+ # first bind and listen to the port
self.serverTransport.listen()
- #fork the children
+ # fork the children
for i in range(self.numWorkers):
try:
w = Process(target=self.workerProcess)
@@ -102,16 +98,14 @@
except Exception, x:
logging.exception(x)
- #wait until the condition is set by stop()
-
+ # wait until the condition is set by stop()
while True:
-
self.stopCondition.acquire()
try:
self.stopCondition.wait()
break
except (SystemExit, KeyboardInterrupt):
- break
+ break
except Exception, x:
logging.exception(x)
@@ -122,4 +116,3 @@
self.stopCondition.acquire()
self.stopCondition.notify()
self.stopCondition.release()
-
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
index 8456e2d..2f24842 100644
--- a/lib/py/src/server/TServer.py
+++ b/lib/py/src/server/TServer.py
@@ -17,27 +17,28 @@
# under the License.
#
-import logging
-import sys
-import os
-import traceback
-import threading
import Queue
+import logging
+import os
+import sys
+import threading
+import traceback
from thrift.Thrift import TProcessor
-from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
+from thrift.transport import TTransport
+
class TServer:
+ """Base interface for a server, which must have a serve() method.
- """Base interface for a server, which must have a serve method."""
-
- """ 3 constructors for all servers:
+ Three constructors for all servers:
1) (processor, serverTransport)
2) (processor, serverTransport, transportFactory, protocolFactory)
3) (processor, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory)"""
+ inputProtocolFactory, outputProtocolFactory)
+ """
def __init__(self, *args):
if (len(args) == 2):
self.__initArgs__(args[0], args[1],
@@ -63,8 +64,8 @@
def serve(self):
pass
-class TSimpleServer(TServer):
+class TSimpleServer(TServer):
"""Simple single-threaded server that just pumps around one transport."""
def __init__(self, *args):
@@ -89,8 +90,8 @@
itrans.close()
otrans.close()
-class TThreadedServer(TServer):
+class TThreadedServer(TServer):
"""Threaded server that spawns a new thread per each connection."""
def __init__(self, *args, **kwargs):
@@ -102,7 +103,7 @@
while True:
try:
client = self.serverTransport.accept()
- t = threading.Thread(target = self.handle, args=(client,))
+ t = threading.Thread(target=self.handle, args=(client,))
t.setDaemon(self.daemon)
t.start()
except KeyboardInterrupt:
@@ -126,8 +127,8 @@
itrans.close()
otrans.close()
-class TThreadPoolServer(TServer):
+class TThreadPoolServer(TServer):
"""Server with a fixed size pool of threads which service requests."""
def __init__(self, *args, **kwargs):
@@ -170,7 +171,7 @@
"""Start a fixed number of worker threads and put client into a queue"""
for i in range(self.threads):
try:
- t = threading.Thread(target = self.serveThread)
+ t = threading.Thread(target=self.serveThread)
t.setDaemon(self.daemon)
t.start()
except Exception, x:
@@ -187,9 +188,8 @@
class TForkingServer(TServer):
+ """A Thrift server that forks a new process for each request
- """A Thrift server that forks a new process for each request"""
- """
This is more scalable than the threaded server as it does not cause
GIL contention.
@@ -200,7 +200,6 @@
This code is heavily inspired by SocketServer.ForkingMixIn in the
Python stdlib.
"""
-
def __init__(self, *args):
TServer.__init__(self, *args)
self.children = []
@@ -212,14 +211,13 @@
except IOError, e:
logging.warning(e, exc_info=True)
-
self.serverTransport.listen()
while True:
client = self.serverTransport.accept()
try:
pid = os.fork()
- if pid: # parent
+ if pid: # parent
# add before collect, otherwise you race w/ waitpid
self.children.append(pid)
self.collect_children()
@@ -258,7 +256,6 @@
except Exception, x:
logging.exception(x)
-
def collect_children(self):
while self.children:
try:
@@ -270,5 +267,3 @@
self.children.remove(pid)
else:
break
-
-