blob: d90f6728a5c1bf7ad71fdae5e761c569839ef389 [file] [log] [blame]
Chris Piro20c81ad2013-03-07 11:32:48 -05001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20from cStringIO import StringIO
21import logging
22import socket
23import struct
24
25from thrift.transport import TTransport
26from thrift.transport.TTransport import TTransportException
27
28from tornado import gen
29from tornado import iostream
henrique3e25e5e2013-11-08 19:06:21 +010030from tornado import tcpserver
Chris Piro20c81ad2013-03-07 11:32:48 -050031
32
33class TTornadoStreamTransport(TTransport.TTransportBase):
34 """a framed, buffered transport over a Tornado stream"""
35 def __init__(self, host, port, stream=None):
36 self.host = host
37 self.port = port
38 self.is_queuing_reads = False
39 self.read_queue = []
40 self.__wbuf = StringIO()
41
42 # servers provide a ready-to-go stream
43 self.stream = stream
44 if self.stream is not None:
45 self._set_close_callback()
46
47 # not the same number of parameters as TTransportBase.open
48 def open(self, callback):
49 logging.debug('socket connecting')
50 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
51 self.stream = iostream.IOStream(sock)
52
53 def on_close_in_connect(*_):
54 message = 'could not connect to {}:{}'.format(self.host, self.port)
55 raise TTransportException(
56 type=TTransportException.NOT_OPEN,
57 message=message)
58 self.stream.set_close_callback(on_close_in_connect)
59
60 def finish(*_):
61 self._set_close_callback()
62 callback()
63
64 self.stream.connect((self.host, self.port), callback=finish)
65
66 def _set_close_callback(self):
67 def on_close():
68 raise TTransportException(
69 type=TTransportException.END_OF_FILE,
70 message='socket closed')
71 self.stream.set_close_callback(self.close)
72
73 def close(self):
74 # don't raise if we intend to close
75 self.stream.set_close_callback(None)
76 self.stream.close()
77
78 def read(self, _):
79 # The generated code for Tornado shouldn't do individual reads -- only
80 # frames at a time
81 assert "you're doing it wrong" is True
82
83 @gen.engine
84 def readFrame(self, callback):
85 self.read_queue.append(callback)
86 logging.debug('read queue: %s', self.read_queue)
87
88 if self.is_queuing_reads:
89 # If a read is already in flight, then the while loop below should
90 # pull it from self.read_queue
91 return
92
93 self.is_queuing_reads = True
94 while self.read_queue:
95 next_callback = self.read_queue.pop()
96 result = yield gen.Task(self._readFrameFromStream)
97 next_callback(result)
98 self.is_queuing_reads = False
99
100 @gen.engine
101 def _readFrameFromStream(self, callback):
102 logging.debug('_readFrameFromStream')
103 frame_header = yield gen.Task(self.stream.read_bytes, 4)
104 frame_length, = struct.unpack('!i', frame_header)
105 logging.debug('received frame header, frame length = %i', frame_length)
106 frame = yield gen.Task(self.stream.read_bytes, frame_length)
107 logging.debug('received frame payload')
108 callback(frame)
109
110 def write(self, buf):
111 self.__wbuf.write(buf)
112
113 def flush(self, callback=None):
114 wout = self.__wbuf.getvalue()
115 wsz = len(wout)
116 # reset wbuf before write/flush to preserve state on underlying failure
117 self.__wbuf = StringIO()
118 # N.B.: Doing this string concatenation is WAY cheaper than making
119 # two separate calls to the underlying socket object. Socket writes in
120 # Python turn out to be REALLY expensive, but it seems to do a pretty
121 # good job of managing string buffer operations without excessive copies
122 buf = struct.pack("!i", wsz) + wout
123
124 logging.debug('writing frame length = %i', wsz)
125 self.stream.write(buf, callback)
126
127
henrique3e25e5e2013-11-08 19:06:21 +0100128class TTornadoServer(tcpserver.TCPServer):
Chris Piro20c81ad2013-03-07 11:32:48 -0500129 def __init__(self, processor, iprot_factory, oprot_factory=None,
130 *args, **kwargs):
131 super(TTornadoServer, self).__init__(*args, **kwargs)
132
133 self._processor = processor
134 self._iprot_factory = iprot_factory
135 self._oprot_factory = (oprot_factory if oprot_factory is not None
136 else iprot_factory)
137
138 def handle_stream(self, stream, address):
139 try:
140 host, port = address
141 trans = TTornadoStreamTransport(host=host, port=port, stream=stream)
142 oprot = self._oprot_factory.getProtocol(trans)
143
144 def next_pass():
145 if not trans.stream.closed():
146 self._processor.process(trans, self._iprot_factory, oprot,
147 callback=next_pass)
148
149 next_pass()
150
151 except Exception:
152 logging.exception('thrift exception in handle_stream')
153 trans.close()