Thrift now a TLP - INFRA-3116

git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/server/THttpServer.py b/lib/py/src/server/THttpServer.py
new file mode 100644
index 0000000..21fc314
--- /dev/null
+++ b/lib/py/src/server/THttpServer.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import BaseHTTPServer
+
+from thrift.server import TServer
+from thrift.transport import TTransport
+
+class THttpServer(TServer.TServer):
+  """A simple HTTP-based Thrift server
+
+  This class is not very performant, but it is useful (for example) for
+  acting as a mock version of an Apache-based PHP Thrift endpoint."""
+
+  def __init__(self, processor, server_address,
+      inputProtocolFactory, outputProtocolFactory = None):
+    """Set up protocol factories and HTTP server.
+
+    See BaseHTTPServer for server_address.
+    See TServer for protocol factories."""
+
+    if outputProtocolFactory is None:
+      outputProtocolFactory = inputProtocolFactory
+
+    TServer.TServer.__init__(self, processor, None, None, None,
+        inputProtocolFactory, outputProtocolFactory)
+
+    thttpserver = self
+
+    class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
+      def do_POST(self):
+        # Don't care about the request path.
+        self.send_response(200)
+        self.send_header("content-type", "application/x-thrift")
+        self.end_headers()
+
+        itrans = TTransport.TFileObjectTransport(self.rfile)
+        otrans = TTransport.TFileObjectTransport(self.wfile)
+        iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
+        oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
+        thttpserver.processor.process(iprot, oprot)
+        otrans.flush()
+
+    self.httpd = BaseHTTPServer.HTTPServer(server_address, RequestHander)
+
+  def serve(self):
+    self.httpd.serve_forever()
diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py
new file mode 100644
index 0000000..deec708
--- /dev/null
+++ b/lib/py/src/server/TNonblockingServer.py
@@ -0,0 +1,309 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""Implementation of non-blocking server.
+
+The main idea of the server is reciving and sending requests
+only from main thread.
+
+It also makes thread pool server in tasks terms, not connections.
+"""
+import threading
+import socket
+import Queue
+import select
+import struct
+import logging
+
+from thrift.transport import TTransport
+from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
+
+__all__ = ['TNonblockingServer']
+
+class Worker(threading.Thread):
+    """Worker is a small helper to process incoming connection."""
+    def __init__(self, queue):
+        threading.Thread.__init__(self)
+        self.queue = queue
+
+    def run(self):
+        """Process queries from task queue, stop if processor is None."""
+        while True:
+            try:
+                processor, iprot, oprot, otrans, callback = self.queue.get()
+                if processor is None:
+                    break
+                processor.process(iprot, oprot)
+                callback(True, otrans.getvalue())
+            except Exception:
+                logging.exception("Exception while processing request")
+                callback(False, '')
+
+WAIT_LEN = 0
+WAIT_MESSAGE = 1
+WAIT_PROCESS = 2
+SEND_ANSWER = 3
+CLOSED = 4
+
+def locked(func):
+    "Decorator which locks self.lock."
+    def nested(self, *args, **kwargs):
+        self.lock.acquire()
+        try:
+            return func(self, *args, **kwargs)
+        finally:
+            self.lock.release()
+    return nested
+
+def socket_exception(func):
+    "Decorator close object on socket.error."
+    def read(self, *args, **kwargs):
+        try:
+            return func(self, *args, **kwargs)
+        except socket.error:
+            self.close()
+    return read
+
+class Connection:
+    """Basic class is represented connection.
+    
+    It can be in state:
+        WAIT_LEN --- connection is reading request len.
+        WAIT_MESSAGE --- connection is reading request.
+        WAIT_PROCESS --- connection has just read whole request and 
+            waits for call ready routine.
+        SEND_ANSWER --- connection is sending answer string (including length
+            of answer).
+        CLOSED --- socket was closed and connection should be deleted.
+    """
+    def __init__(self, new_socket, wake_up):
+        self.socket = new_socket
+        self.socket.setblocking(False)
+        self.status = WAIT_LEN
+        self.len = 0
+        self.message = ''
+        self.lock = threading.Lock()
+        self.wake_up = wake_up
+
+    def _read_len(self):
+        """Reads length of request.
+        
+        It's really paranoic routine and it may be replaced by 
+        self.socket.recv(4)."""
+        read = self.socket.recv(4 - len(self.message))
+        if len(read) == 0:
+            # if we read 0 bytes and self.message is empty, it means client close 
+            # connection
+            if len(self.message) != 0:
+                logging.error("can't read frame size from socket")
+            self.close()
+            return
+        self.message += read
+        if len(self.message) == 4:
+            self.len, = struct.unpack('!i', self.message)
+            if self.len < 0:
+                logging.error("negative frame size, it seems client"\
+                    " doesn't use FramedTransport")
+                self.close()
+            elif self.len == 0:
+                logging.error("empty frame, it's really strange")
+                self.close()
+            else:
+                self.message = ''
+                self.status = WAIT_MESSAGE
+
+    @socket_exception
+    def read(self):
+        """Reads data from stream and switch state."""
+        assert self.status in (WAIT_LEN, WAIT_MESSAGE)
+        if self.status == WAIT_LEN:
+            self._read_len()
+            # go back to the main loop here for simplicity instead of
+            # falling through, even though there is a good chance that
+            # the message is already available
+        elif self.status == WAIT_MESSAGE:
+            read = self.socket.recv(self.len - len(self.message))
+            if len(read) == 0:
+                logging.error("can't read frame from socket (get %d of %d bytes)" %
+                    (len(self.message), self.len))
+                self.close()
+                return
+            self.message += read
+            if len(self.message) == self.len:
+                self.status = WAIT_PROCESS
+
+    @socket_exception
+    def write(self):
+        """Writes data from socket and switch state."""
+        assert self.status == SEND_ANSWER
+        sent = self.socket.send(self.message)
+        if sent == len(self.message):
+            self.status = WAIT_LEN
+            self.message = ''
+            self.len = 0
+        else:
+            self.message = self.message[sent:]
+
+    @locked
+    def ready(self, all_ok, message):
+        """Callback function for switching state and waking up main thread.
+        
+        This function is the only function witch can be called asynchronous.
+        
+        The ready can switch Connection to three states:
+            WAIT_LEN if request was oneway.
+            SEND_ANSWER if request was processed in normal way.
+            CLOSED if request throws unexpected exception.
+        
+        The one wakes up main thread.
+        """
+        assert self.status == WAIT_PROCESS
+        if not all_ok:
+            self.close()
+            self.wake_up()
+            return
+        self.len = ''
+        self.message = struct.pack('!i', len(message)) + message
+        if len(message) == 0:
+            # it was a oneway request, do not write answer
+            self.status = WAIT_LEN
+        else:
+            self.status = SEND_ANSWER
+        self.wake_up()
+
+    @locked
+    def is_writeable(self):
+        "Returns True if connection should be added to write list of select."
+        return self.status == SEND_ANSWER
+
+    # it's not necessary, but...
+    @locked
+    def is_readable(self):
+        "Returns True if connection should be added to read list of select."
+        return self.status in (WAIT_LEN, WAIT_MESSAGE)
+
+    @locked
+    def is_closed(self):
+        "Returns True if connection is closed."
+        return self.status == CLOSED
+
+    def fileno(self):
+        "Returns the file descriptor of the associated socket."
+        return self.socket.fileno()
+
+    def close(self):
+        "Closes connection"
+        self.status = CLOSED
+        self.socket.close()
+
+class TNonblockingServer:
+    """Non-blocking server."""
+    def __init__(self, processor, lsocket, inputProtocolFactory=None, 
+            outputProtocolFactory=None, threads=10):
+        self.processor = processor
+        self.socket = lsocket
+        self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
+        self.out_protocol = outputProtocolFactory or self.in_protocol
+        self.threads = int(threads)
+        self.clients = {}
+        self.tasks = Queue.Queue()
+        self._read, self._write = socket.socketpair()
+        self.prepared = False
+
+    def setNumThreads(self, num):
+        """Set the number of worker threads that should be created."""
+        # implement ThreadPool interface
+        assert not self.prepared, "You can't change number of threads for working server"
+        self.threads = num
+
+    def prepare(self):
+        """Prepares server for serve requests."""
+        self.socket.listen()
+        for _ in xrange(self.threads):
+            thread = Worker(self.tasks)
+            thread.setDaemon(True)
+            thread.start()
+        self.prepared = True
+
+    def wake_up(self):
+        """Wake up main thread.
+        
+        The server usualy waits in select call in we should terminate one.
+        The simplest way is using socketpair.
+        
+        Select always wait to read from the first socket of socketpair.
+        
+        In this case, we can just write anything to the second socket from
+        socketpair."""
+        self._write.send('1')
+
+    def _select(self):
+        """Does select on open connections."""
+        readable = [self.socket.handle.fileno(), self._read.fileno()]
+        writable = []
+        for i, connection in self.clients.items():
+            if connection.is_readable():
+                readable.append(connection.fileno())
+            if connection.is_writeable():
+                writable.append(connection.fileno())
+            if connection.is_closed():
+                del self.clients[i]
+        return select.select(readable, writable, readable)
+        
+    def handle(self):
+        """Handle requests.
+       
+        WARNING! You must call prepare BEFORE calling handle.
+        """
+        assert self.prepared, "You have to call prepare before handle"
+        rset, wset, xset = self._select()
+        for readable in rset:
+            if readable == self._read.fileno():
+                # don't care i just need to clean readable flag
+                self._read.recv(1024) 
+            elif readable == self.socket.handle.fileno():
+                client = self.socket.accept().handle
+                self.clients[client.fileno()] = Connection(client, self.wake_up)
+            else:
+                connection = self.clients[readable]
+                connection.read()
+                if connection.status == WAIT_PROCESS:
+                    itransport = TTransport.TMemoryBuffer(connection.message)
+                    otransport = TTransport.TMemoryBuffer()
+                    iprot = self.in_protocol.getProtocol(itransport)
+                    oprot = self.out_protocol.getProtocol(otransport)
+                    self.tasks.put([self.processor, iprot, oprot, 
+                                    otransport, connection.ready])
+        for writeable in wset:
+            self.clients[writeable].write()
+        for oob in xset:
+            self.clients[oob].close()
+            del self.clients[oob]
+
+    def close(self):
+        """Closes the server."""
+        for _ in xrange(self.threads):
+            self.tasks.put([None, None, None, None, None])
+        self.socket.close()
+        self.prepared = False
+        
+    def serve(self):
+        """Serve forever."""
+        self.prepare()
+        while True:
+            self.handle()
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
new file mode 100644
index 0000000..6152911
--- /dev/null
+++ b/lib/py/src/server/TServer.py
@@ -0,0 +1,270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import logging
+import sys
+import os
+import traceback
+import threading
+import Queue
+
+from thrift.Thrift import TProcessor
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+
+class TServer:
+
+  """Base interface for a server, which must have a serve method."""
+
+  """ 3 constructors for all servers:
+  1) (processor, serverTransport)
+  2) (processor, serverTransport, transportFactory, protocolFactory)
+  3) (processor, serverTransport,
+      inputTransportFactory, outputTransportFactory,
+      inputProtocolFactory, outputProtocolFactory)"""
+  def __init__(self, *args):
+    if (len(args) == 2):
+      self.__initArgs__(args[0], args[1],
+                        TTransport.TTransportFactoryBase(),
+                        TTransport.TTransportFactoryBase(),
+                        TBinaryProtocol.TBinaryProtocolFactory(),
+                        TBinaryProtocol.TBinaryProtocolFactory())
+    elif (len(args) == 4):
+      self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
+    elif (len(args) == 6):
+      self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
+
+  def __initArgs__(self, processor, serverTransport,
+                   inputTransportFactory, outputTransportFactory,
+                   inputProtocolFactory, outputProtocolFactory):
+    self.processor = processor
+    self.serverTransport = serverTransport
+    self.inputTransportFactory = inputTransportFactory
+    self.outputTransportFactory = outputTransportFactory
+    self.inputProtocolFactory = inputProtocolFactory
+    self.outputProtocolFactory = outputProtocolFactory
+
+  def serve(self):
+    pass
+
+class TSimpleServer(TServer):
+
+  """Simple single-threaded server that just pumps around one transport."""
+
+  def __init__(self, *args):
+    TServer.__init__(self, *args)
+
+  def serve(self):
+    self.serverTransport.listen()
+    while True:
+      client = self.serverTransport.accept()
+      itrans = self.inputTransportFactory.getTransport(client)
+      otrans = self.outputTransportFactory.getTransport(client)
+      iprot = self.inputProtocolFactory.getProtocol(itrans)
+      oprot = self.outputProtocolFactory.getProtocol(otrans)
+      try:
+        while True:
+          self.processor.process(iprot, oprot)
+      except TTransport.TTransportException, tx:
+        pass
+      except Exception, x:
+        logging.exception(x)
+
+      itrans.close()
+      otrans.close()
+
+class TThreadedServer(TServer):
+
+  """Threaded server that spawns a new thread per each connection."""
+
+  def __init__(self, *args):
+    TServer.__init__(self, *args)
+
+  def serve(self):
+    self.serverTransport.listen()
+    while True:
+      try:
+        client = self.serverTransport.accept()
+        t = threading.Thread(target = self.handle, args=(client,))
+        t.start()
+      except KeyboardInterrupt:
+        raise
+      except Exception, x:
+        logging.exception(x)
+
+  def handle(self, client):
+    itrans = self.inputTransportFactory.getTransport(client)
+    otrans = self.outputTransportFactory.getTransport(client)
+    iprot = self.inputProtocolFactory.getProtocol(itrans)
+    oprot = self.outputProtocolFactory.getProtocol(otrans)
+    try:
+      while True:
+        self.processor.process(iprot, oprot)
+    except TTransport.TTransportException, tx:
+      pass
+    except Exception, x:
+      logging.exception(x)
+
+    itrans.close()
+    otrans.close()
+
+class TThreadPoolServer(TServer):
+
+  """Server with a fixed size pool of threads which service requests."""
+
+  def __init__(self, *args):
+    TServer.__init__(self, *args)
+    self.clients = Queue.Queue()
+    self.threads = 10
+
+  def setNumThreads(self, num):
+    """Set the number of worker threads that should be created"""
+    self.threads = num
+
+  def serveThread(self):
+    """Loop around getting clients from the shared queue and process them."""
+    while True:
+      try:
+        client = self.clients.get()
+        self.serveClient(client)
+      except Exception, x:
+        logging.exception(x)
+
+  def serveClient(self, client):
+    """Process input/output from a client for as long as possible"""
+    itrans = self.inputTransportFactory.getTransport(client)
+    otrans = self.outputTransportFactory.getTransport(client)
+    iprot = self.inputProtocolFactory.getProtocol(itrans)
+    oprot = self.outputProtocolFactory.getProtocol(otrans)
+    try:
+      while True:
+        self.processor.process(iprot, oprot)
+    except TTransport.TTransportException, tx:
+      pass
+    except Exception, x:
+      logging.exception(x)
+
+    itrans.close()
+    otrans.close()
+
+  def serve(self):
+    """Start a fixed number of worker threads and put client into a queue"""
+    for i in range(self.threads):
+      try:
+        t = threading.Thread(target = self.serveThread)
+        t.start()
+      except Exception, x:
+        logging.exception(x)
+
+    # Pump the socket for clients
+    self.serverTransport.listen()
+    while True:
+      try:
+        client = self.serverTransport.accept()
+        self.clients.put(client)
+      except Exception, x:
+        logging.exception(x)
+
+
+class TForkingServer(TServer):
+
+  """A Thrift server that forks a new process for each request"""
+  """
+  This is more scalable than the threaded server as it does not cause
+  GIL contention.
+
+  Note that this has different semantics from the threading server.
+  Specifically, updates to shared variables will no longer be shared.
+  It will also not work on windows.
+
+  This code is heavily inspired by SocketServer.ForkingMixIn in the
+  Python stdlib.
+  """
+
+  def __init__(self, *args):
+    TServer.__init__(self, *args)
+    self.children = []
+
+  def serve(self):
+    def try_close(file):
+      try:
+        file.close()
+      except IOError, e:
+        logging.warning(e, exc_info=True)
+
+
+    self.serverTransport.listen()
+    while True:
+      client = self.serverTransport.accept()
+      try:
+        pid = os.fork()
+
+        if pid: # parent
+          # add before collect, otherwise you race w/ waitpid
+          self.children.append(pid)
+          self.collect_children()
+
+          # Parent must close socket or the connection may not get
+          # closed promptly
+          itrans = self.inputTransportFactory.getTransport(client)
+          otrans = self.outputTransportFactory.getTransport(client)
+          try_close(itrans)
+          try_close(otrans)
+        else:
+          itrans = self.inputTransportFactory.getTransport(client)
+          otrans = self.outputTransportFactory.getTransport(client)
+
+          iprot = self.inputProtocolFactory.getProtocol(itrans)
+          oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+          ecode = 0
+          try:
+            try:
+              while True:
+                self.processor.process(iprot, oprot)
+            except TTransport.TTransportException, tx:
+              pass
+            except Exception, e:
+              logging.exception(e)
+              ecode = 1
+          finally:
+            try_close(itrans)
+            try_close(otrans)
+
+          os._exit(ecode)
+
+      except TTransport.TTransportException, tx:
+        pass
+      except Exception, x:
+        logging.exception(x)
+
+
+  def collect_children(self):
+    while self.children:
+      try:
+        pid, status = os.waitpid(0, os.WNOHANG)
+      except os.error:
+        pid = None
+
+      if pid:
+        self.children.remove(pid)
+      else:
+        break
+
+
diff --git a/lib/py/src/server/__init__.py b/lib/py/src/server/__init__.py
new file mode 100644
index 0000000..1bf6e25
--- /dev/null
+++ b/lib/py/src/server/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+__all__ = ['TServer', 'TNonblockingServer']