blob: c7218301afd60253a090dd51fe6d5016303b3d11 [file] [log] [blame]
Chris Piro20c81ad2013-03-07 11:32:48 -05001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090020import logging
Chris Piro20c81ad2013-03-07 11:32:48 -050021import socket
22import struct
Asjad Syed135b79e2025-03-13 00:38:06 -040023import warnings
Konrad Grochowski3a724e32014-08-12 11:48:29 -040024
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090025from .transport.TTransport import TTransportException, TTransportBase, TMemoryBuffer
Chris Piro20c81ad2013-03-07 11:32:48 -050026
Roger Meierd52edba2014-08-07 17:03:47 +020027from io import BytesIO
28from collections import deque
29from contextlib import contextmanager
30from tornado import gen, iostream, ioloop, tcpserver, concurrent
31
32__all__ = ['TTornadoServer', 'TTornadoStreamTransport']
Chris Piro20c81ad2013-03-07 11:32:48 -050033
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090034logger = logging.getLogger(__name__)
35
Chris Piro20c81ad2013-03-07 11:32:48 -050036
Alexandre Detisted0787212024-12-26 02:44:13 +010037class _Lock:
Roger Meierd52edba2014-08-07 17:03:47 +020038 def __init__(self):
39 self._waiters = deque()
40
41 def acquired(self):
42 return len(self._waiters) > 0
43
44 @gen.coroutine
45 def acquire(self):
46 blocker = self._waiters[-1] if self.acquired() else None
47 future = concurrent.Future()
48 self._waiters.append(future)
49 if blocker:
50 yield blocker
51
52 raise gen.Return(self._lock_context())
53
54 def release(self):
55 assert self.acquired(), 'Lock not aquired'
56 future = self._waiters.popleft()
57 future.set_result(None)
58
59 @contextmanager
60 def _lock_context(self):
61 try:
62 yield
63 finally:
64 self.release()
65
66
67class TTornadoStreamTransport(TTransportBase):
Chris Piro20c81ad2013-03-07 11:32:48 -050068 """a framed, buffered transport over a Tornado stream"""
Roger Meierd52edba2014-08-07 17:03:47 +020069 def __init__(self, host, port, stream=None, io_loop=None):
Asjad Syed135b79e2025-03-13 00:38:06 -040070 if io_loop is not None:
71 warnings.warn(
72 "The `io_loop` parameter is deprecated and unused. Passing "
73 "`io_loop` is unnecessary because Tornado now automatically "
74 "provides the current I/O loop via `IOLoop.current()`. "
75 "Remove the `io_loop` parameter to ensure compatibility - it "
76 "will be removed in a future release.",
77 DeprecationWarning,
78 stacklevel=2,
79 )
Chris Piro20c81ad2013-03-07 11:32:48 -050080 self.host = host
81 self.port = port
Asjad Syed135b79e2025-03-13 00:38:06 -040082 self.io_loop = ioloop.IOLoop.current()
Roger Meierd52edba2014-08-07 17:03:47 +020083 self.__wbuf = BytesIO()
84 self._read_lock = _Lock()
Chris Piro20c81ad2013-03-07 11:32:48 -050085
86 # servers provide a ready-to-go stream
87 self.stream = stream
Chris Piro20c81ad2013-03-07 11:32:48 -050088
Roger Meierd52edba2014-08-07 17:03:47 +020089 def with_timeout(self, timeout, future):
Asjad Syed135b79e2025-03-13 00:38:06 -040090 return gen.with_timeout(timeout, future)
Roger Meierd52edba2014-08-07 17:03:47 +020091
Asjad Syed04147552025-04-08 14:51:30 -040092 def isOpen(self):
93 if self.stream is None:
94 return False
95 return not self.stream.closed()
96
Roger Meierd52edba2014-08-07 17:03:47 +020097 @gen.coroutine
98 def open(self, timeout=None):
Konrad Grochowski3a724e32014-08-12 11:48:29 -040099 logger.debug('socket connecting')
Chris Piro20c81ad2013-03-07 11:32:48 -0500100 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
101 self.stream = iostream.IOStream(sock)
102
Roger Meierd52edba2014-08-07 17:03:47 +0200103 try:
104 connect = self.stream.connect((self.host, self.port))
105 if timeout is not None:
106 yield self.with_timeout(timeout, connect)
107 else:
108 yield connect
109 except (socket.error, IOError, ioloop.TimeoutError) as e:
110 message = 'could not connect to {}:{} ({})'.format(self.host, self.port, e)
Chris Piro20c81ad2013-03-07 11:32:48 -0500111 raise TTransportException(
112 type=TTransportException.NOT_OPEN,
113 message=message)
Chris Piro20c81ad2013-03-07 11:32:48 -0500114
Roger Meierd52edba2014-08-07 17:03:47 +0200115 raise gen.Return(self)
Chris Piro20c81ad2013-03-07 11:32:48 -0500116
Roger Meierd52edba2014-08-07 17:03:47 +0200117 def set_close_callback(self, callback):
118 """
119 Should be called only after open() returns
120 """
121 self.stream.set_close_callback(callback)
Chris Piro20c81ad2013-03-07 11:32:48 -0500122
123 def close(self):
124 # don't raise if we intend to close
125 self.stream.set_close_callback(None)
126 self.stream.close()
127
128 def read(self, _):
129 # The generated code for Tornado shouldn't do individual reads -- only
130 # frames at a time
Roger Meierd52edba2014-08-07 17:03:47 +0200131 assert False, "you're doing it wrong"
Chris Piro20c81ad2013-03-07 11:32:48 -0500132
Roger Meierd52edba2014-08-07 17:03:47 +0200133 @contextmanager
134 def io_exception_context(self):
135 try:
136 yield
137 except (socket.error, IOError) as e:
138 raise TTransportException(
139 type=TTransportException.END_OF_FILE,
140 message=str(e))
141 except iostream.StreamBufferFullError as e:
142 raise TTransportException(
143 type=TTransportException.UNKNOWN,
144 message=str(e))
Chris Piro20c81ad2013-03-07 11:32:48 -0500145
Roger Meierd52edba2014-08-07 17:03:47 +0200146 @gen.coroutine
147 def readFrame(self):
148 # IOStream processes reads one at a time
149 with (yield self._read_lock.acquire()):
150 with self.io_exception_context():
151 frame_header = yield self.stream.read_bytes(4)
152 if len(frame_header) == 0:
153 raise iostream.StreamClosedError('Read zero bytes from stream')
154 frame_length, = struct.unpack('!i', frame_header)
Roger Meierd52edba2014-08-07 17:03:47 +0200155 frame = yield self.stream.read_bytes(frame_length)
Roger Meierd52edba2014-08-07 17:03:47 +0200156 raise gen.Return(frame)
Chris Piro20c81ad2013-03-07 11:32:48 -0500157
158 def write(self, buf):
159 self.__wbuf.write(buf)
160
Roger Meierd52edba2014-08-07 17:03:47 +0200161 def flush(self):
162 frame = self.__wbuf.getvalue()
Chris Piro20c81ad2013-03-07 11:32:48 -0500163 # reset wbuf before write/flush to preserve state on underlying failure
Roger Meierd52edba2014-08-07 17:03:47 +0200164 frame_length = struct.pack('!i', len(frame))
165 self.__wbuf = BytesIO()
166 with self.io_exception_context():
167 return self.stream.write(frame_length + frame)
Chris Piro20c81ad2013-03-07 11:32:48 -0500168
169
henrique3e25e5e2013-11-08 19:06:21 +0100170class TTornadoServer(tcpserver.TCPServer):
Chris Piro20c81ad2013-03-07 11:32:48 -0500171 def __init__(self, processor, iprot_factory, oprot_factory=None,
172 *args, **kwargs):
173 super(TTornadoServer, self).__init__(*args, **kwargs)
174
175 self._processor = processor
176 self._iprot_factory = iprot_factory
177 self._oprot_factory = (oprot_factory if oprot_factory is not None
178 else iprot_factory)
179
Roger Meierd52edba2014-08-07 17:03:47 +0200180 @gen.coroutine
Chris Piro20c81ad2013-03-07 11:32:48 -0500181 def handle_stream(self, stream, address):
Nobuaki Sukegawa5bac5ba2016-03-05 14:47:09 +0900182 host, port = address[:2]
Asjad Syed135b79e2025-03-13 00:38:06 -0400183 trans = TTornadoStreamTransport(host=host, port=port, stream=stream)
Roger Meierd52edba2014-08-07 17:03:47 +0200184 oprot = self._oprot_factory.getProtocol(trans)
185
Chris Piro20c81ad2013-03-07 11:32:48 -0500186 try:
Roger Meierd52edba2014-08-07 17:03:47 +0200187 while not trans.stream.closed():
Jens Geyer145749c2015-10-16 19:21:22 +0200188 try:
189 frame = yield trans.readFrame()
190 except TTransportException as e:
191 if e.type == TTransportException.END_OF_FILE:
192 break
193 else:
194 raise
Roger Meierd52edba2014-08-07 17:03:47 +0200195 tr = TMemoryBuffer(frame)
196 iprot = self._iprot_factory.getProtocol(tr)
197 yield self._processor.process(iprot, oprot)
Chris Piro20c81ad2013-03-07 11:32:48 -0500198 except Exception:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400199 logger.exception('thrift exception in handle_stream')
Chris Piro20c81ad2013-03-07 11:32:48 -0500200 trans.close()
Roger Meierd52edba2014-08-07 17:03:47 +0200201
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400202 logger.info('client disconnected %s:%d', host, port)