blob: a701ec9bc8f46a0b14f4352db0aded5a65f593fe [file] [log] [blame]
Chris Piro20c81ad2013-03-07 11:32:48 -05001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090020import logging
Chris Piro20c81ad2013-03-07 11:32:48 -050021import socket
22import struct
Asjad Syed135b79e2025-03-13 00:38:06 -040023import warnings
Konrad Grochowski3a724e32014-08-12 11:48:29 -040024
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090025from .transport.TTransport import TTransportException, TTransportBase, TMemoryBuffer
Chris Piro20c81ad2013-03-07 11:32:48 -050026
Roger Meierd52edba2014-08-07 17:03:47 +020027from io import BytesIO
28from collections import deque
29from contextlib import contextmanager
30from tornado import gen, iostream, ioloop, tcpserver, concurrent
31
32__all__ = ['TTornadoServer', 'TTornadoStreamTransport']
Chris Piro20c81ad2013-03-07 11:32:48 -050033
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090034logger = logging.getLogger(__name__)
35
Chris Piro20c81ad2013-03-07 11:32:48 -050036
Alexandre Detisted0787212024-12-26 02:44:13 +010037class _Lock:
Roger Meierd52edba2014-08-07 17:03:47 +020038 def __init__(self):
39 self._waiters = deque()
40
41 def acquired(self):
42 return len(self._waiters) > 0
43
44 @gen.coroutine
45 def acquire(self):
46 blocker = self._waiters[-1] if self.acquired() else None
47 future = concurrent.Future()
48 self._waiters.append(future)
49 if blocker:
50 yield blocker
51
52 raise gen.Return(self._lock_context())
53
54 def release(self):
55 assert self.acquired(), 'Lock not aquired'
56 future = self._waiters.popleft()
57 future.set_result(None)
58
59 @contextmanager
60 def _lock_context(self):
61 try:
62 yield
63 finally:
64 self.release()
65
66
67class TTornadoStreamTransport(TTransportBase):
Chris Piro20c81ad2013-03-07 11:32:48 -050068 """a framed, buffered transport over a Tornado stream"""
Dmytro Shteflyukacbcf102026-02-13 18:25:55 -050069
Roger Meierd52edba2014-08-07 17:03:47 +020070 def __init__(self, host, port, stream=None, io_loop=None):
Asjad Syed135b79e2025-03-13 00:38:06 -040071 if io_loop is not None:
72 warnings.warn(
73 "The `io_loop` parameter is deprecated and unused. Passing "
74 "`io_loop` is unnecessary because Tornado now automatically "
75 "provides the current I/O loop via `IOLoop.current()`. "
76 "Remove the `io_loop` parameter to ensure compatibility - it "
77 "will be removed in a future release.",
78 DeprecationWarning,
79 stacklevel=2,
80 )
Chris Piro20c81ad2013-03-07 11:32:48 -050081 self.host = host
82 self.port = port
Asjad Syed135b79e2025-03-13 00:38:06 -040083 self.io_loop = ioloop.IOLoop.current()
Roger Meierd52edba2014-08-07 17:03:47 +020084 self.__wbuf = BytesIO()
85 self._read_lock = _Lock()
Chris Piro20c81ad2013-03-07 11:32:48 -050086
87 # servers provide a ready-to-go stream
88 self.stream = stream
Chris Piro20c81ad2013-03-07 11:32:48 -050089
Roger Meierd52edba2014-08-07 17:03:47 +020090 def with_timeout(self, timeout, future):
Asjad Syed135b79e2025-03-13 00:38:06 -040091 return gen.with_timeout(timeout, future)
Roger Meierd52edba2014-08-07 17:03:47 +020092
Asjad Syed04147552025-04-08 14:51:30 -040093 def isOpen(self):
Dmytro Shteflyukacbcf102026-02-13 18:25:55 -050094 if self.stream is None:
95 return False
96 return not self.stream.closed()
Asjad Syed04147552025-04-08 14:51:30 -040097
Roger Meierd52edba2014-08-07 17:03:47 +020098 @gen.coroutine
99 def open(self, timeout=None):
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400100 logger.debug('socket connecting')
Chris Piro20c81ad2013-03-07 11:32:48 -0500101 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
102 self.stream = iostream.IOStream(sock)
103
Roger Meierd52edba2014-08-07 17:03:47 +0200104 try:
105 connect = self.stream.connect((self.host, self.port))
106 if timeout is not None:
107 yield self.with_timeout(timeout, connect)
108 else:
109 yield connect
110 except (socket.error, IOError, ioloop.TimeoutError) as e:
111 message = 'could not connect to {}:{} ({})'.format(self.host, self.port, e)
Chris Piro20c81ad2013-03-07 11:32:48 -0500112 raise TTransportException(
113 type=TTransportException.NOT_OPEN,
114 message=message)
Chris Piro20c81ad2013-03-07 11:32:48 -0500115
Roger Meierd52edba2014-08-07 17:03:47 +0200116 raise gen.Return(self)
Chris Piro20c81ad2013-03-07 11:32:48 -0500117
Roger Meierd52edba2014-08-07 17:03:47 +0200118 def set_close_callback(self, callback):
119 """
120 Should be called only after open() returns
121 """
122 self.stream.set_close_callback(callback)
Chris Piro20c81ad2013-03-07 11:32:48 -0500123
124 def close(self):
125 # don't raise if we intend to close
126 self.stream.set_close_callback(None)
127 self.stream.close()
128
129 def read(self, _):
130 # The generated code for Tornado shouldn't do individual reads -- only
131 # frames at a time
Roger Meierd52edba2014-08-07 17:03:47 +0200132 assert False, "you're doing it wrong"
Chris Piro20c81ad2013-03-07 11:32:48 -0500133
Roger Meierd52edba2014-08-07 17:03:47 +0200134 @contextmanager
135 def io_exception_context(self):
136 try:
137 yield
138 except (socket.error, IOError) as e:
139 raise TTransportException(
140 type=TTransportException.END_OF_FILE,
141 message=str(e))
142 except iostream.StreamBufferFullError as e:
143 raise TTransportException(
144 type=TTransportException.UNKNOWN,
145 message=str(e))
Chris Piro20c81ad2013-03-07 11:32:48 -0500146
Roger Meierd52edba2014-08-07 17:03:47 +0200147 @gen.coroutine
148 def readFrame(self):
149 # IOStream processes reads one at a time
150 with (yield self._read_lock.acquire()):
151 with self.io_exception_context():
152 frame_header = yield self.stream.read_bytes(4)
153 if len(frame_header) == 0:
154 raise iostream.StreamClosedError('Read zero bytes from stream')
155 frame_length, = struct.unpack('!i', frame_header)
Roger Meierd52edba2014-08-07 17:03:47 +0200156 frame = yield self.stream.read_bytes(frame_length)
Roger Meierd52edba2014-08-07 17:03:47 +0200157 raise gen.Return(frame)
Chris Piro20c81ad2013-03-07 11:32:48 -0500158
159 def write(self, buf):
160 self.__wbuf.write(buf)
161
Roger Meierd52edba2014-08-07 17:03:47 +0200162 def flush(self):
163 frame = self.__wbuf.getvalue()
Chris Piro20c81ad2013-03-07 11:32:48 -0500164 # reset wbuf before write/flush to preserve state on underlying failure
Roger Meierd52edba2014-08-07 17:03:47 +0200165 frame_length = struct.pack('!i', len(frame))
166 self.__wbuf = BytesIO()
167 with self.io_exception_context():
168 return self.stream.write(frame_length + frame)
Chris Piro20c81ad2013-03-07 11:32:48 -0500169
170
henrique3e25e5e2013-11-08 19:06:21 +0100171class TTornadoServer(tcpserver.TCPServer):
Chris Piro20c81ad2013-03-07 11:32:48 -0500172 def __init__(self, processor, iprot_factory, oprot_factory=None,
173 *args, **kwargs):
174 super(TTornadoServer, self).__init__(*args, **kwargs)
175
176 self._processor = processor
177 self._iprot_factory = iprot_factory
178 self._oprot_factory = (oprot_factory if oprot_factory is not None
179 else iprot_factory)
180
Roger Meierd52edba2014-08-07 17:03:47 +0200181 @gen.coroutine
Chris Piro20c81ad2013-03-07 11:32:48 -0500182 def handle_stream(self, stream, address):
Nobuaki Sukegawa5bac5ba2016-03-05 14:47:09 +0900183 host, port = address[:2]
Asjad Syed135b79e2025-03-13 00:38:06 -0400184 trans = TTornadoStreamTransport(host=host, port=port, stream=stream)
Roger Meierd52edba2014-08-07 17:03:47 +0200185 oprot = self._oprot_factory.getProtocol(trans)
186
Chris Piro20c81ad2013-03-07 11:32:48 -0500187 try:
Roger Meierd52edba2014-08-07 17:03:47 +0200188 while not trans.stream.closed():
Jens Geyer145749c2015-10-16 19:21:22 +0200189 try:
190 frame = yield trans.readFrame()
191 except TTransportException as e:
192 if e.type == TTransportException.END_OF_FILE:
193 break
194 else:
195 raise
Roger Meierd52edba2014-08-07 17:03:47 +0200196 tr = TMemoryBuffer(frame)
197 iprot = self._iprot_factory.getProtocol(tr)
198 yield self._processor.process(iprot, oprot)
Chris Piro20c81ad2013-03-07 11:32:48 -0500199 except Exception:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400200 logger.exception('thrift exception in handle_stream')
Chris Piro20c81ad2013-03-07 11:32:48 -0500201 trans.close()
Roger Meierd52edba2014-08-07 17:03:47 +0200202
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400203 logger.info('client disconnected %s:%d', host, port)