THRIFT-2709 c_glib: Support server implementation

Patch: Simon South
diff --git a/lib/c_glib/Makefile.am b/lib/c_glib/Makefile.am
index 35f1114..39e848a 100755
--- a/lib/c_glib/Makefile.am
+++ b/lib/c_glib/Makefile.am
@@ -36,12 +36,15 @@
                               src/thrift/c_glib/thrift_struct.c \
                               src/thrift/c_glib/thrift_application_exception.c \
                               src/thrift/c_glib/processor/thrift_processor.c \
+                              src/thrift/c_glib/processor/thrift_dispatch_processor.c \
                               src/thrift/c_glib/protocol/thrift_protocol.c \
                               src/thrift/c_glib/protocol/thrift_protocol_factory.c \
                               src/thrift/c_glib/protocol/thrift_binary_protocol.c \
                               src/thrift/c_glib/protocol/thrift_binary_protocol_factory.c \
                               src/thrift/c_glib/transport/thrift_transport.c \
                               src/thrift/c_glib/transport/thrift_transport_factory.c \
+                              src/thrift/c_glib/transport/thrift_buffered_transport_factory.c \
+                              src/thrift/c_glib/transport/thrift_framed_transport_factory.c \
                               src/thrift/c_glib/transport/thrift_socket.c \
                               src/thrift/c_glib/transport/thrift_server_transport.c \
                               src/thrift/c_glib/transport/thrift_server_socket.c \
@@ -74,14 +77,17 @@
                             src/thrift/c_glib/transport/thrift_server_transport.h \
                             src/thrift/c_glib/transport/thrift_socket.h \
                             src/thrift/c_glib/transport/thrift_transport.h \
-                            src/thrift/c_glib/transport/thrift_transport_factory.h
+                            src/thrift/c_glib/transport/thrift_transport_factory.h \
+                            src/thrift/c_glib/transport/thrift_buffered_transport_factory.h \
+                            src/thrift/c_glib/transport/thrift_framed_transport_factory.h
 
 include_serverdir = $(include_thriftdir)/server
 include_server_HEADERS = src/thrift/c_glib/server/thrift_server.h \
                          src/thrift/c_glib/server/thrift_simple_server.h
 
 include_processordir = $(include_thriftdir)/processor
-include_processor_HEADERS = src/thrift/c_glib/processor/thrift_processor.h
+include_processor_HEADERS = src/thrift/c_glib/processor/thrift_processor.h \
+                            src/thrift/c_glib/processor/thrift_dispatch_processor.h
 
 
 EXTRA_DIST = \
diff --git a/lib/c_glib/src/thrift/c_glib/processor/thrift_dispatch_processor.c b/lib/c_glib/src/thrift/c_glib/processor/thrift_dispatch_processor.c
new file mode 100644
index 0000000..7d223bf
--- /dev/null
+++ b/lib/c_glib/src/thrift/c_glib/processor/thrift_dispatch_processor.c
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/c_glib/thrift.h>
+#include <thrift/c_glib/thrift_application_exception.h>
+#include <thrift/c_glib/processor/thrift_dispatch_processor.h>
+
+G_DEFINE_ABSTRACT_TYPE (ThriftDispatchProcessor,
+                        thrift_dispatch_processor,
+                        THRIFT_TYPE_PROCESSOR);
+
+gboolean
+thrift_dispatch_processor_process (ThriftProcessor *processor,
+                                   ThriftProtocol *in,
+                                   ThriftProtocol *out,
+                                   GError **error)
+{
+  gchar *fname;
+  ThriftMessageType mtype;
+  gint32 seqid;
+  ThriftDispatchProcessor *dispatch_processor =
+    THRIFT_DISPATCH_PROCESSOR (processor);
+
+  /* Read the start of the message, which we expect to be a method call */
+  if (thrift_protocol_read_message_begin (in,
+                                          &fname,
+                                          &mtype,
+                                          &seqid,
+                                          error) < 0) {
+    g_warning ("error reading start of message: %s",
+               (error != NULL) ? (*error)->message : "(null)");
+    return FALSE;
+  }
+  else if (mtype != T_CALL && mtype != T_ONEWAY) {
+    g_warning ("received invalid message type %d from client", mtype);
+    return FALSE;
+  }
+
+  /* Dispatch the method call */
+  return THRIFT_DISPATCH_PROCESSOR_GET_CLASS (dispatch_processor)
+    ->dispatch_call (dispatch_processor,
+                     in,
+                     out,
+                     fname,
+                     seqid,
+                     error);
+}
+
+static gboolean
+thrift_dispatch_processor_real_dispatch_call (ThriftDispatchProcessor *self,
+                                              ThriftProtocol *in,
+                                              ThriftProtocol *out,
+                                              gchar *fname,
+                                              gint32 seqid,
+                                              GError **error)
+{
+  ThriftTransport *transport;
+  ThriftApplicationException *xception;
+  gchar *message;
+  gint32 result;
+  gboolean dispatch_result = FALSE;
+
+  THRIFT_UNUSED_VAR (self);
+
+  /* By default, return an application exception to the client indicating the
+     method name is not recognized. */
+
+  if ((thrift_protocol_skip (in, T_STRUCT, error) < 0) ||
+      (thrift_protocol_read_message_end (in, error) < 0))
+    return FALSE;
+
+  g_object_get (in, "transport", &transport, NULL);
+  result = thrift_transport_read_end (transport, error);
+  g_object_unref (transport);
+  if (result < 0)
+    return FALSE;
+
+  if (thrift_protocol_write_message_begin (out,
+                                           fname,
+                                           T_EXCEPTION,
+                                           seqid,
+                                           error) < 0)
+    return FALSE;
+  message = g_strconcat ("Invalid method name: '", fname, "'", NULL);
+  xception =
+    g_object_new (THRIFT_TYPE_APPLICATION_EXCEPTION,
+                  "type",    THRIFT_APPLICATION_EXCEPTION_ERROR_UNKNOWN_METHOD,
+                  "message", message,
+                  NULL);
+  g_free (message);
+  result = thrift_struct_write (THRIFT_STRUCT (xception),
+                                out,
+                                error);
+  g_object_unref (xception);
+  if ((result < 0) ||
+      (thrift_protocol_write_message_end (out, error) < 0))
+    return FALSE;
+
+  g_object_get (out, "transport", &transport, NULL);
+  dispatch_result =
+    ((thrift_transport_write_end (transport, error) >= 0) &&
+     (thrift_transport_flush (transport, error) >= 0));
+  g_object_unref (transport);
+
+  return dispatch_result;
+}
+
+static void
+thrift_dispatch_processor_init (ThriftDispatchProcessor *self)
+{
+  THRIFT_UNUSED_VAR (self);
+}
+
+static void
+thrift_dispatch_processor_class_init (ThriftDispatchProcessorClass *klass)
+{
+  ThriftProcessorClass *processor_class =
+    THRIFT_PROCESSOR_CLASS (klass);
+
+  /* Implement ThriftProcessor's process method */
+  processor_class->process = thrift_dispatch_processor_process;
+
+  /* Provide a default implement for dispatch_call, which returns an exception
+     to the client indicating the method name was not recognized */
+  klass->dispatch_call = thrift_dispatch_processor_real_dispatch_call;
+}
diff --git a/lib/c_glib/src/thrift/c_glib/processor/thrift_dispatch_processor.h b/lib/c_glib/src/thrift/c_glib/processor/thrift_dispatch_processor.h
new file mode 100644
index 0000000..5afb85e
--- /dev/null
+++ b/lib/c_glib/src/thrift/c_glib/processor/thrift_dispatch_processor.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_DISPATCH_PROCESSOR_H
+#define _THRIFT_DISPATCH_PROCESSOR_H
+
+#include <glib-object.h>
+
+#include <thrift/c_glib/processor/thrift_processor.h>
+
+G_BEGIN_DECLS
+
+/*! \file thrift_dispatch_processor.h
+ *  \brief Parses a method-call message header and invokes a function
+ *         to dispatch the call by function name.
+ *
+ * ThriftDispatchProcessor is an abstract helper class that parses the
+ * header of a method-call message and invokes a member function,
+ * dispatch_call, with the method's name.
+ *
+ * Subclasses must implement dispatch_call to dispatch the method call
+ * to the implementing function.
+ */
+
+/* Type macros */
+#define THRIFT_TYPE_DISPATCH_PROCESSOR (thrift_dispatch_processor_get_type ())
+#define THRIFT_DISPATCH_PROCESSOR(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), THRIFT_TYPE_DISPATCH_PROCESSOR, ThriftDispatchProcessor))
+#define THRIFT_IS_DISPATCH_PROCESSOR(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), THRIFT_TYPE_DISPATCH_PROCESSOR))
+#define THRIFT_DISPATCH_PROCESSOR_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), THRIFT_TYPE_DISPATCH_PROCESSOR, ThriftDispatchProcessorClass))
+#define THRIFT_IS_DISPATCH_PROCESSOR_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), THRIFT_TYPE_DISPATCH_PROCESSOR))
+#define THRIFT_DISPATCH_PROCESSOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), THRIFT_TYPE_DISPATCH_PROCESSOR, ThriftDispatchProcessorClass))
+
+/*!
+ * Thrift Dispatch Processor object
+ */
+struct _ThriftDispatchProcessor
+{
+  ThriftProcessor parent;
+};
+typedef struct _ThriftDispatchProcessor ThriftDispatchProcessor;
+
+/*!
+ * Thrift Dispatch Processor class
+ */
+struct _ThriftDispatchProcessorClass
+{
+  ThriftProcessorClass parent;
+
+  /* public */
+  gboolean (*process) (ThriftProcessor *processor,
+                       ThriftProtocol *in,
+                       ThriftProtocol *out,
+                       GError **error);
+
+  /* protected */
+  gboolean (*dispatch_call) (ThriftDispatchProcessor *self,
+                             ThriftProtocol *in,
+                             ThriftProtocol *out,
+                             gchar *fname,
+                             gint32 seqid,
+                             GError **error);
+};
+typedef struct _ThriftDispatchProcessorClass ThriftDispatchProcessorClass;
+
+/* Used by THRIFT_TYPE_DISPATCH_PROCESSOR */
+GType thrift_dispatch_processor_get_type (void);
+
+/*!
+ * Processes a request.
+ * \public \memberof ThriftDispatchProcessorClass
+ */
+gboolean thrift_dispatch_processor_process (ThriftProcessor *processor,
+                                            ThriftProtocol *in,
+                                            ThriftProtocol *out,
+                                            GError **error);
+
+G_END_DECLS
+
+#endif /* _THRIFT_DISPATCH_PROCESSOR_H */
diff --git a/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.c b/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.c
index dac8783..c242c25 100644
--- a/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.c
+++ b/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.c
@@ -24,9 +24,10 @@
 
 gboolean
 thrift_processor_process (ThriftProcessor *processor, ThriftProtocol *in,
-                          ThriftProtocol *out)
+                          ThriftProtocol *out, GError **error)
 {
-  return THRIFT_PROCESSOR_GET_CLASS (processor)->process (processor, in, out);
+  return
+    THRIFT_PROCESSOR_GET_CLASS (processor)->process (processor, in, out, error);
 }
 
 /* class initializer for ThriftProcessor */
diff --git a/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.h b/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.h
index a96a555..ff2d2da 100644
--- a/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.h
+++ b/lib/c_glib/src/thrift/c_glib/processor/thrift_processor.h
@@ -56,7 +56,7 @@
 
   /* vtable */
   gboolean (*process) (ThriftProcessor *processor, ThriftProtocol *in,
-                       ThriftProtocol *out);
+                       ThriftProtocol *out, GError **error);
 };
 typedef struct _ThriftProcessorClass ThriftProcessorClass;
 
@@ -68,7 +68,8 @@
  * \public \memberof ThriftProcessorClass
  */
 gboolean thrift_processor_process (ThriftProcessor *processor,
-                                   ThriftProtocol *in, ThriftProtocol *out);
+                                   ThriftProtocol *in, ThriftProtocol *out,
+                                   GError **error);
 
 G_END_DECLS
 
diff --git a/lib/c_glib/src/thrift/c_glib/server/thrift_server.c b/lib/c_glib/src/thrift/c_glib/server/thrift_server.c
index 0171cee..e8aff45 100644
--- a/lib/c_glib/src/thrift/c_glib/server/thrift_server.c
+++ b/lib/c_glib/src/thrift/c_glib/server/thrift_server.c
@@ -96,10 +96,10 @@
   }
 }
 
-void
-thrift_server_serve (ThriftServer *server)
+gboolean
+thrift_server_serve (ThriftServer *server, GError **error)
 {
-  THRIFT_SERVER_GET_CLASS (server)->serve (server);
+  return THRIFT_SERVER_GET_CLASS (server)->serve (server, error);
 }
 
 void
diff --git a/lib/c_glib/src/thrift/c_glib/server/thrift_server.h b/lib/c_glib/src/thrift/c_glib/server/thrift_server.h
index 2744f97..49beddc 100644
--- a/lib/c_glib/src/thrift/c_glib/server/thrift_server.h
+++ b/lib/c_glib/src/thrift/c_glib/server/thrift_server.h
@@ -69,7 +69,7 @@
   GObjectClass parent;
 
   /* vtable */
-  void (*serve) (ThriftServer *server);
+  gboolean (*serve) (ThriftServer *server, GError **error);
   void (*stop) (ThriftServer *server);
 };
 
@@ -80,7 +80,7 @@
  * Processes the request.
  * \public \memberof ThriftServerClass
  */
-void thrift_server_serve (ThriftServer *server);
+gboolean thrift_server_serve (ThriftServer *server, GError **error);
 
 /*!
  * Stop handling requests.
diff --git a/lib/c_glib/src/thrift/c_glib/server/thrift_simple_server.c b/lib/c_glib/src/thrift/c_glib/server/thrift_simple_server.c
index 7a39bc2..cb20ab6 100644
--- a/lib/c_glib/src/thrift/c_glib/server/thrift_simple_server.c
+++ b/lib/c_glib/src/thrift/c_glib/server/thrift_simple_server.c
@@ -24,51 +24,71 @@
 
 G_DEFINE_TYPE(ThriftSimpleServer, thrift_simple_server, THRIFT_TYPE_SERVER)
 
-void
-thrift_simple_server_serve (ThriftServer *server)
+gboolean
+thrift_simple_server_serve (ThriftServer *server, GError **error)
 {
-  g_return_if_fail (THRIFT_IS_SIMPLE_SERVER (server));
+  g_return_val_if_fail (THRIFT_IS_SIMPLE_SERVER (server), FALSE);
 
   ThriftTransport *t = NULL;
   ThriftTransport *input_transport = NULL, *output_transport = NULL;
   ThriftProtocol *input_protocol = NULL, *output_protocol = NULL;
   ThriftSimpleServer *tss = THRIFT_SIMPLE_SERVER(server);
+  GError *process_error = NULL;
 
-  THRIFT_SERVER_TRANSPORT_GET_CLASS (server->server_transport)
-      ->listen (server->server_transport, NULL);
-
-  tss->running = TRUE;
-  while (tss->running == TRUE)
-  {
-    t = thrift_server_transport_accept (server->server_transport, NULL);
-    input_transport =
-        THRIFT_TRANSPORT_FACTORY_GET_CLASS (server->input_transport_factory)
-            ->get_transport (server->input_transport_factory, t);
-    output_transport = 
-        THRIFT_TRANSPORT_FACTORY_GET_CLASS (server->output_transport_factory)
-            ->get_transport (server->output_transport_factory, t);
-    input_protocol =
-        THRIFT_PROTOCOL_FACTORY_GET_CLASS (server->input_protocol_factory)
-            ->get_protocol (server->input_protocol_factory, t);
-    output_protocol =
-        THRIFT_PROTOCOL_FACTORY_GET_CLASS (server->output_protocol_factory)
-            ->get_protocol (server->output_protocol_factory, t);
-
-    while (THRIFT_PROCESSOR_GET_CLASS (server->processor)
-               ->process (server->processor, input_protocol, output_protocol))
+  if (thrift_server_transport_listen (server->server_transport, error)) {
+    tss->running = TRUE;
+    while (tss->running == TRUE)
     {
-      // TODO: implement transport peek ()
+      t = thrift_server_transport_accept (server->server_transport,
+                                          error);
+      if (t != NULL && tss->running) {
+        input_transport =
+          THRIFT_TRANSPORT_FACTORY_GET_CLASS (server->input_transport_factory)
+          ->get_transport (server->input_transport_factory, t);
+        output_transport =
+          THRIFT_TRANSPORT_FACTORY_GET_CLASS (server->output_transport_factory)
+          ->get_transport (server->output_transport_factory, t);
+        input_protocol =
+          THRIFT_PROTOCOL_FACTORY_GET_CLASS (server->input_protocol_factory)
+          ->get_protocol (server->input_protocol_factory, input_transport);
+        output_protocol =
+          THRIFT_PROTOCOL_FACTORY_GET_CLASS (server->output_protocol_factory)
+          ->get_protocol (server->output_protocol_factory, output_transport);
+
+        while (THRIFT_PROCESSOR_GET_CLASS (server->processor)
+               ->process (server->processor,
+                          input_protocol,
+                          output_protocol,
+                          &process_error) &&
+               thrift_transport_peek (input_transport, &process_error))
+        {
+        }
+
+        if (process_error != NULL)
+        {
+          g_message ("thrift_simple_server_serve: %s", process_error->message);
+          g_clear_error (&process_error);
+
+          // Note we do not propagate processing errors to the caller as they
+          // normally are transient and not fatal to the server
+        }
+
+        // TODO: handle exceptions
+        THRIFT_TRANSPORT_GET_CLASS (input_transport)->close (input_transport,
+                                                             NULL);
+        THRIFT_TRANSPORT_GET_CLASS (output_transport)->close (output_transport,
+                                                              NULL);
+      }
     }
 
-    // TODO: handle exceptions
-    THRIFT_TRANSPORT_GET_CLASS (input_transport)->close (input_transport, NULL);
-    THRIFT_TRANSPORT_GET_CLASS (output_transport)->close (output_transport,
-                                                          NULL);
-  } 
+    // attempt to shutdown
+    THRIFT_SERVER_TRANSPORT_GET_CLASS (server->server_transport)
+      ->close (server->server_transport, NULL);
+  }
 
-  // attempt to shutdown
-  THRIFT_SERVER_TRANSPORT_GET_CLASS (server->server_transport)
-      ->close (server->server_transport, NULL); 
+  // Since this method is designed to run forever, it can only ever return on
+  // error
+  return FALSE;
 }
 
 void
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport.c b/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport.c
index 76d4fda..ee8e1aa 100644
--- a/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport.c
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport.c
@@ -47,6 +47,14 @@
   return THRIFT_TRANSPORT_GET_CLASS (t->transport)->is_open (t->transport);
 }
 
+/* overrides thrift_transport_peek */
+gboolean
+thrift_buffered_transport_peek (ThriftTransport *transport, GError **error)
+{
+  ThriftBufferedTransport *t = THRIFT_BUFFERED_TRANSPORT (transport);
+  return (t->r_buf->len > 0) || thrift_transport_peek (t->transport, error);
+}
+
 /* implements thrift_transport_open */
 gboolean
 thrift_buffered_transport_open (ThriftTransport *transport, GError **error)
@@ -369,6 +377,7 @@
 
   gobject_class->finalize = thrift_buffered_transport_finalize;
   ttc->is_open = thrift_buffered_transport_is_open;
+  ttc->peek = thrift_buffered_transport_peek;
   ttc->open = thrift_buffered_transport_open;
   ttc->close = thrift_buffered_transport_close;
   ttc->read = thrift_buffered_transport_read;
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport_factory.c b/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport_factory.c
new file mode 100644
index 0000000..86050b6
--- /dev/null
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport_factory.c
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/c_glib/thrift.h>
+#include <thrift/c_glib/transport/thrift_buffered_transport.h>
+#include <thrift/c_glib/transport/thrift_buffered_transport_factory.h>
+
+G_DEFINE_TYPE (ThriftBufferedTransportFactory,
+               thrift_buffered_transport_factory,
+               THRIFT_TYPE_TRANSPORT_FACTORY)
+
+/* Wraps a transport with a ThriftBufferedTransport. */
+ThriftTransport *
+thrift_buffered_transport_factory_get_transport (ThriftTransportFactory *factory,
+                                                 ThriftTransport *transport)
+{
+  THRIFT_UNUSED_VAR (factory);
+
+  return THRIFT_TRANSPORT (g_object_new (THRIFT_TYPE_BUFFERED_TRANSPORT,
+                                         "transport", transport,
+                                         NULL));
+}
+
+static void
+thrift_buffered_transport_factory_init (ThriftBufferedTransportFactory *self)
+{
+  THRIFT_UNUSED_VAR (self);
+}
+
+static void
+thrift_buffered_transport_factory_class_init (ThriftBufferedTransportFactoryClass *klass)
+{
+  ThriftTransportFactoryClass *base_class =
+    THRIFT_TRANSPORT_FACTORY_CLASS (klass);
+
+  base_class->get_transport =
+    klass->get_transport =
+    thrift_buffered_transport_factory_get_transport;
+}
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport_factory.h b/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport_factory.h
new file mode 100644
index 0000000..d43f4e4
--- /dev/null
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_buffered_transport_factory.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_BUFFERED_TRANSPORT_FACTORY_H
+#define _THRIFT_BUFFERED_TRANSPORT_FACTORY_H
+
+#include <glib-object.h>
+
+#include <thrift/c_glib/transport/thrift_transport.h>
+#include <thrift/c_glib/transport/thrift_transport_factory.h>
+
+G_BEGIN_DECLS
+
+/*! \file thrift_buffered_transport_factory.h
+ *  \brief Wraps a transport with a ThriftBufferedTransport.
+ */
+
+/* type macros */
+#define THRIFT_TYPE_BUFFERED_TRANSPORT_FACTORY          \
+  (thrift_buffered_transport_factory_get_type ())
+#define THRIFT_BUFFERED_TRANSPORT_FACTORY(obj)                          \
+  (G_TYPE_CHECK_INSTANCE_CAST ((obj),                                   \
+                               THRIFT_TYPE_BUFFERED_TRANSPORT_FACTORY,  \
+                               ThriftBufferedTransportFactory))
+#define THRIFT_IS_BUFFERED_TRANSPORT_FACTORY(obj)                       \
+  (G_TYPE_CHECK_INSTANCE_TYPE ((obj),                                   \
+                               THRIFT_TYPE_BUFFERED_TRANSPORT_FACTORY))
+#define THRIFT_BUFFERED_TRANSPORT_FACTORY_CLASS(c)                      \
+  (G_TYPE_CHECK_CLASS_CAST ((c),                                        \
+                            THRIFT_TYPE_BUFFERED_TRANSPORT_FACTORY,     \
+                            ThriftBufferedTransportFactoryClass))
+#define THRIFT_IS_BUFFERED_TRANSPORT_FACTORY_CLASS(c)                   \
+  (G_TYPE_CHECK_CLASS_TYPE ((c),                                        \
+                            THRIFT_TYPE_BUFFERED_TRANSPORT_FACTORY))
+#define THRIFT_BUFFERED_TRANSPORT_FACTORY_GET_CLASS(obj)                \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj),                                    \
+                              THRIFT_TYPE_BUFFERED_TRANSPORT_FACTORY,   \
+                              ThriftBufferedTransportFactoryClass))
+
+typedef struct _ThriftBufferedTransportFactory ThriftBufferedTransportFactory;
+
+/* Thrift Buffered-Transport Factory instance */
+struct _ThriftBufferedTransportFactory
+{
+  ThriftTransportFactory parent;
+};
+
+typedef struct _ThriftBufferedTransportFactoryClass ThriftBufferedTransportFactoryClass;
+
+/* Thrift Buffered-Transport Factory class */
+struct _ThriftBufferedTransportFactoryClass
+{
+  ThriftTransportFactoryClass parent;
+
+  /* vtable */
+  ThriftTransport *(*get_transport) (ThriftTransportFactory *factory,
+                                     ThriftTransport *transport);
+};
+
+/* used by THRIFT_TYPE_BUFFERED_TRANSPORT_FACTORY */
+GType thrift_buffered_transport_factory_get_type (void);
+
+/* virtual public methods */
+ThriftTransport *
+thrift_buffered_transport_factory_get_transport (ThriftTransportFactory *factory,
+                                                 ThriftTransport *transport);
+
+G_END_DECLS
+
+#endif /* _THRIFT_BUFFERED_TRANSPORT_FACTORY_H */
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport.c b/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport.c
index 9810aa6..47a7960 100644
--- a/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport.c
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport.c
@@ -49,6 +49,14 @@
   return THRIFT_TRANSPORT_GET_CLASS (t->transport)->is_open (t->transport);
 }
 
+/* overrides thrift_transport_peek */
+gboolean
+thrift_framed_transport_peek (ThriftTransport *transport, GError **error)
+{
+  ThriftFramedTransport *t = THRIFT_FRAMED_TRANSPORT (transport);
+  return (t->r_buf->len > 0) || thrift_transport_peek (t->transport, error);
+}
+
 /* implements thrift_transport_open */
 gboolean
 thrift_framed_transport_open (ThriftTransport *transport, GError **error)
@@ -361,6 +369,7 @@
 
   gobject_class->finalize = thrift_framed_transport_finalize;
   ttc->is_open = thrift_framed_transport_is_open;
+  ttc->peek = thrift_framed_transport_peek;
   ttc->open = thrift_framed_transport_open;
   ttc->close = thrift_framed_transport_close;
   ttc->read = thrift_framed_transport_read;
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport_factory.c b/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport_factory.c
new file mode 100644
index 0000000..e68fe0a
--- /dev/null
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport_factory.c
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/c_glib/thrift.h>
+#include <thrift/c_glib/transport/thrift_framed_transport.h>
+#include <thrift/c_glib/transport/thrift_framed_transport_factory.h>
+
+G_DEFINE_TYPE (ThriftFramedTransportFactory,
+               thrift_framed_transport_factory,
+               THRIFT_TYPE_TRANSPORT_FACTORY)
+
+/* Wraps a transport with a ThriftFramedTransport. */
+ThriftTransport *
+thrift_framed_transport_factory_get_transport (ThriftTransportFactory *factory,
+                                               ThriftTransport *transport)
+{
+  THRIFT_UNUSED_VAR (factory);
+
+  return THRIFT_TRANSPORT (g_object_new (THRIFT_TYPE_FRAMED_TRANSPORT,
+                                         "transport", transport,
+                                         NULL));
+}
+
+static void
+thrift_framed_transport_factory_init (ThriftFramedTransportFactory *self)
+{
+  THRIFT_UNUSED_VAR (self);
+}
+
+static void
+thrift_framed_transport_factory_class_init (ThriftFramedTransportFactoryClass *klass)
+{
+  ThriftTransportFactoryClass *base_class =
+    THRIFT_TRANSPORT_FACTORY_CLASS (klass);
+
+  base_class->get_transport =
+    klass->get_transport =
+    thrift_framed_transport_factory_get_transport;
+}
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport_factory.h b/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport_factory.h
new file mode 100644
index 0000000..c3e9496
--- /dev/null
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_framed_transport_factory.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_FRAMED_TRANSPORT_FACTORY_H
+#define _THRIFT_FRAMED_TRANSPORT_FACTORY_H
+
+#include <glib-object.h>
+
+#include <thrift/c_glib/transport/thrift_transport.h>
+#include <thrift/c_glib/transport/thrift_transport_factory.h>
+
+G_BEGIN_DECLS
+
+/*! \file thrift_framed_transport_factory.h
+ *  \brief Wraps a transport with a ThriftFramedTransport.
+ */
+
+/* type macros */
+#define THRIFT_TYPE_FRAMED_TRANSPORT_FACTORY    \
+  (thrift_framed_transport_factory_get_type ())
+#define THRIFT_FRAMED_TRANSPORT_FACTORY(obj)                            \
+  (G_TYPE_CHECK_INSTANCE_CAST ((obj),                                   \
+                               THRIFT_TYPE_FRAMED_TRANSPORT_FACTORY,    \
+                               ThriftFramedTransportFactory))
+#define THRIFT_IS_FRAMED_TRANSPORT_FACTORY(obj)                         \
+  (G_TYPE_CHECK_INSTANCE_TYPE ((obj),                                   \
+                               THRIFT_TYPE_FRAMED_TRANSPORT_FACTORY))
+#define THRIFT_FRAMED_TRANSPORT_FACTORY_CLASS(c)                        \
+  (G_TYPE_CHECK_CLASS_CAST ((c),                                        \
+                            THRIFT_TYPE_FRAMED_TRANSPORT_FACTORY,       \
+                            ThriftFramedTransportFactoryClass))
+#define THRIFT_IS_FRAMED_TRANSPORT_FACTORY_CLASS(c)                     \
+  (G_TYPE_CHECK_CLASS_TYPE ((c),                                        \
+                            THRIFT_TYPE_FRAMED_TRANSPORT_FACTORY))
+#define THRIFT_FRAMED_TRANSPORT_FACTORY_GET_CLASS(obj)                  \
+  (G_TYPE_INSTANCE_GET_CLASS ((obj),                                    \
+                              THRIFT_TYPE_FRAMED_TRANSPORT_FACTORY,     \
+                              ThriftFramedTransportFactoryClass))
+
+typedef struct _ThriftFramedTransportFactory ThriftFramedTransportFactory;
+
+/* Thrift Framed-Transport Factory instance */
+struct _ThriftFramedTransportFactory
+{
+  ThriftTransportFactory parent;
+};
+
+typedef struct _ThriftFramedTransportFactoryClass ThriftFramedTransportFactoryClass;
+
+/* Thrift Framed-Transport Factory class */
+struct _ThriftFramedTransportFactoryClass
+{
+  ThriftTransportFactoryClass parent;
+
+  /* vtable */
+  ThriftTransport *(*get_transport) (ThriftTransportFactory *factory,
+                                     ThriftTransport *transport);
+};
+
+/* used by THRIFT_TYPE_FRAMED_TRANSPORT_FACTORY */
+GType thrift_framed_transport_factory_get_type (void);
+
+/* virtual public methods */
+ThriftTransport *
+thrift_framed_transport_factory_get_transport (ThriftTransportFactory *factory,
+                                               ThriftTransport *transport);
+
+G_END_DECLS
+
+#endif /* _THRIFT_FRAMED_TRANSPORT_FACTORY_H */
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_socket.c b/lib/c_glib/src/thrift/c_glib/transport/thrift_socket.c
index 1313577..6bc9af3 100644
--- a/lib/c_glib/src/thrift/c_glib/transport/thrift_socket.c
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_socket.c
@@ -50,6 +50,54 @@
   return socket->sd != THRIFT_INVALID_SOCKET;
 }
 
+/* overrides thrift_transport_peek */
+gboolean
+thrift_socket_peek (ThriftTransport *transport, GError **error)
+{
+  gboolean result = FALSE;
+  guint8 buf;
+  int r;
+  int errno_copy;
+
+  ThriftSocket *socket = THRIFT_SOCKET (transport);
+
+  if (thrift_socket_is_open (transport))
+  {
+    r = recv (socket->sd, &buf, 1, MSG_PEEK);
+    if (r == -1)
+    {
+      errno_copy = errno;
+
+      #if defined __FreeBSD__ || defined __MACH__
+      /* FreeBSD returns -1 and ECONNRESET if the socket was closed by the other
+         side */
+      if (errno_copy == ECONNRESET)
+      {
+        thrift_socket_close (transport, error);
+      }
+      else
+      {
+      #endif
+
+      g_set_error (error,
+                   THRIFT_TRANSPORT_ERROR,
+                   THRIFT_TRANSPORT_ERROR_SOCKET,
+                   "failed to peek at socket - %s",
+                   strerror (errno_copy));
+
+      #if defined __FreeBSD__ || defined __MACH__
+      }
+      #endif
+    }
+    else if (r > 0)
+    {
+      result = TRUE;
+    }
+  }
+
+  return result;
+}
+
 /* implements thrift_transport_open */
 gboolean
 thrift_socket_open (ThriftTransport *transport, GError **error)
@@ -311,6 +359,7 @@
 
   gobject_class->finalize = thrift_socket_finalize;
   ttc->is_open = thrift_socket_is_open;
+  ttc->peek = thrift_socket_peek;
   ttc->open = thrift_socket_open;
   ttc->close = thrift_socket_close;
   ttc->read = thrift_socket_read;
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.c b/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.c
index 59d464c..5533437 100644
--- a/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.c
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.c
@@ -32,6 +32,12 @@
 }
 
 gboolean
+thrift_transport_peek (ThriftTransport *transport, GError **error)
+{
+  return THRIFT_TRANSPORT_GET_CLASS (transport)->peek (transport, error);
+}
+
+gboolean
 thrift_transport_open (ThriftTransport *transport, GError **error)
 {
   return THRIFT_TRANSPORT_GET_CLASS (transport)->open (transport, error);
@@ -79,6 +85,15 @@
   return THRIFT_TRANSPORT_GET_CLASS (transport)->flush (transport, error);
 }
 
+/* by default, peek returns true if and only if the transport is open */
+static gboolean
+thrift_transport_real_peek (ThriftTransport *transport, GError **error)
+{
+  THRIFT_UNUSED_VAR (error);
+
+  return THRIFT_TRANSPORT_GET_CLASS (transport)->is_open (transport);
+}
+
 /* define the GError domain for Thrift transports */
 GQuark
 thrift_transport_error_quark (void)
@@ -99,6 +114,9 @@
   cls->write = thrift_transport_write;
   cls->write_end = thrift_transport_write_end;
   cls->flush = thrift_transport_flush;
+
+  /* provide a default implementation for the peek method */
+  cls->peek = thrift_transport_real_peek;
 }
 
 static void
diff --git a/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.h b/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.h
index 65b5763..5555a5e 100644
--- a/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.h
+++ b/lib/c_glib/src/thrift/c_glib/transport/thrift_transport.h
@@ -63,6 +63,7 @@
 
   /* vtable */
   gboolean (*is_open) (ThriftTransport *transport);
+  gboolean (*peek) (ThriftTransport *transport, GError **error);
   gboolean (*open) (ThriftTransport *transport, GError **error);
   gboolean (*close) (ThriftTransport *transport, GError **error);
   gint32 (*read) (ThriftTransport *transport, gpointer buf,
@@ -92,6 +93,17 @@
 gboolean thrift_transport_open (ThriftTransport *transport, GError **error);
 
 /*!
+ * Tests whether there is more data to read or if the remote side is still
+ * open. By default this is true whenever the transport is open, but
+ * implementations should add logic to test for this condition where possible
+ * (i.e. on a socket).
+ *
+ * This is used by a server to check if it should listen for another request.
+ * \public \memberof ThriftTransportInterface
+ */
+gboolean thrift_transport_peek (ThriftTransport *transport, GError **error);
+
+/*!
  * Close the transport.
  * \public \memberof ThriftTransportInterface
  */
diff --git a/lib/c_glib/test/Makefile.am b/lib/c_glib/test/Makefile.am
index 25f474a..72d0f64 100755
--- a/lib/c_glib/test/Makefile.am
+++ b/lib/c_glib/test/Makefile.am
@@ -52,6 +52,7 @@
 testtransportsocket_SOURCES = testtransportsocket.c
 testtransportsocket_LDADD = \
     ../libthrift_c_glib_la-thrift_transport.o \
+    ../libthrift_c_glib_la-thrift_buffered_transport.o \
     ../libthrift_c_glib_la-thrift_server_transport.o \
     ../libthrift_c_glib_la-thrift_server_socket.o
 
diff --git a/lib/c_glib/test/testsimpleserver.c b/lib/c_glib/test/testsimpleserver.c
index fca2dcd..92629b4 100755
--- a/lib/c_glib/test/testsimpleserver.c
+++ b/lib/c_glib/test/testsimpleserver.c
@@ -51,7 +51,7 @@
 
 gboolean
 test_processor_process (ThriftProcessor *processor, ThriftProtocol *in,
-                        ThriftProtocol *out)
+                        ThriftProtocol *out, GError **error)
 {
   return FALSE;
 }
@@ -88,7 +88,8 @@
 
   if (pid == 0)
   {
-    THRIFT_SERVER_GET_CLASS (THRIFT_SERVER (ss))->serve (THRIFT_SERVER (ss));
+    THRIFT_SERVER_GET_CLASS (THRIFT_SERVER (ss))->serve (THRIFT_SERVER (ss),
+                                                         NULL);
     exit (0);
   } else {
     sleep (5);
diff --git a/lib/c_glib/test/testtransportsocket.c b/lib/c_glib/test/testtransportsocket.c
index 836ddd0..08cad1c 100755
--- a/lib/c_glib/test/testtransportsocket.c
+++ b/lib/c_glib/test/testtransportsocket.c
@@ -19,8 +19,10 @@
 
 #include <assert.h>
 #include <netdb.h>
+#include <sys/wait.h>
 
 #include <thrift/c_glib/transport/thrift_transport.h>
+#include <thrift/c_glib/transport/thrift_buffered_transport.h>
 #include <thrift/c_glib/transport/thrift_server_transport.h>
 #include <thrift/c_glib/transport/thrift_server_socket.h>
 
@@ -173,6 +175,105 @@
   }
 }
 
+/* test ThriftSocket's peek() implementation */
+static void
+test_peek(void)
+{
+  gint status;
+  pid_t pid;
+  guint port = 51199;
+  gchar data = 'A';
+  ThriftTransport *client_transport;
+  GError *error = NULL;
+
+  client_transport = g_object_new (THRIFT_TYPE_SOCKET,
+                                   "hostname", "localhost",
+                                   "port",     port,
+                                   NULL);
+
+  /* thrift_transport_peek returns FALSE when the socket is closed */
+  g_assert (thrift_transport_is_open (client_transport) == FALSE);
+  g_assert (thrift_transport_peek (client_transport, &error) == FALSE);
+  g_assert (error == NULL);
+
+  pid = fork ();
+  g_assert (pid >= 0);
+
+  if (pid == 0)
+  {
+    ThriftServerTransport *server_transport = NULL;
+
+    g_object_unref (client_transport);
+
+    /* child listens */
+    server_transport = g_object_new (THRIFT_TYPE_SERVER_SOCKET,
+                                     "port", port,
+                                     NULL);
+    g_assert (server_transport != NULL);
+
+    thrift_server_transport_listen (server_transport, &error);
+    g_assert (error == NULL);
+
+    client_transport = g_object_new
+      (THRIFT_TYPE_BUFFERED_TRANSPORT,
+       "transport",  thrift_server_transport_accept (server_transport, &error),
+       "r_buf_size", 0,
+       "w_buf_size", sizeof data,
+       NULL);
+    g_assert (error == NULL);
+    g_assert (client_transport != NULL);
+
+    /* write exactly one character to the client */
+    g_assert (thrift_transport_write (client_transport,
+                                      &data,
+                                      sizeof data,
+                                      &error) == TRUE);
+
+    thrift_transport_flush (client_transport, &error);
+    thrift_transport_write_end (client_transport, &error);
+    thrift_transport_close (client_transport, &error);
+
+    g_object_unref (client_transport);
+    g_object_unref (server_transport);
+
+    exit (0);
+  }
+  else {
+    /* parent connects, wait a bit for the socket to be created */
+    sleep (1);
+
+    /* connect to the child */
+    thrift_transport_open (client_transport, &error);
+    g_assert (error == NULL);
+    g_assert (thrift_transport_is_open (client_transport) == TRUE);
+
+    /* thrift_transport_peek returns TRUE when the socket is open and there is
+       data available to be read */
+    g_assert (thrift_transport_peek (client_transport, &error) == TRUE);
+    g_assert (error == NULL);
+
+    /* read exactly one character from the server */
+    g_assert_cmpint (thrift_transport_read (client_transport,
+                                            &data,
+                                            sizeof data,
+                                            &error), ==, sizeof data);
+
+    /* thrift_transport_peek returns FALSE when the socket is open but there is
+       no (more) data available to be read */
+    g_assert (thrift_transport_is_open (client_transport) == TRUE);
+    g_assert (thrift_transport_peek (client_transport, &error) == FALSE);
+    g_assert (error == NULL);
+
+    thrift_transport_read_end (client_transport, &error);
+    thrift_transport_close (client_transport, &error);
+
+    g_object_unref (client_transport);
+
+    g_assert (wait (&status) == pid);
+    g_assert (status == 0);
+  }
+}
+
 static void
 thrift_socket_server (const int port)
 {
@@ -215,6 +316,7 @@
   g_test_add_func ("/testtransportsocket/CreateAndDestroy", test_create_and_destroy);
   g_test_add_func ("/testtransportsocket/OpenAndClose", test_open_and_close);
   g_test_add_func ("/testtransportsocket/ReadAndWrite", test_read_and_write);
+  g_test_add_func ("/testtransportsocket/Peek", test_peek);
 
   return g_test_run ();
 }