Jake Farrell | b95b0ff | 2012-03-22 21:49:10 +0000 | [diff] [blame] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | module thrift.async.socket; |
| 20 | |
| 21 | import core.thread : Fiber; |
| 22 | import core.time : dur, Duration; |
| 23 | import std.array : empty; |
| 24 | import std.conv : to; |
| 25 | import std.exception : enforce; |
| 26 | import std.socket; |
| 27 | import thrift.base; |
| 28 | import thrift.async.base; |
| 29 | import thrift.transport.base; |
| 30 | import thrift.transport.socket : TSocketBase; |
| 31 | import thrift.internal.endian; |
| 32 | import thrift.internal.socket; |
| 33 | |
| 34 | version (Windows) { |
| 35 | import std.c.windows.winsock : connect; |
| 36 | } else version (Posix) { |
| 37 | import core.sys.posix.sys.socket : connect; |
| 38 | } else static assert(0, "Don't know connect on this platform."); |
| 39 | |
| 40 | /** |
| 41 | * Non-blocking socket implementation of the TTransport interface. |
| 42 | * |
| 43 | * Whenever a socket operation would block, TAsyncSocket registers a callback |
| 44 | * with the specified TAsyncSocketManager and yields. |
| 45 | * |
| 46 | * As for thrift.transport.socket, due to the limitations of std.socket, |
| 47 | * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are |
| 48 | * not). |
| 49 | */ |
| 50 | class TAsyncSocket : TSocketBase, TAsyncTransport { |
| 51 | /** |
| 52 | * Constructor that takes an already created, connected (!) socket. |
| 53 | * |
| 54 | * Params: |
| 55 | * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. |
| 56 | * socket = Already created, connected socket object. Will be switched to |
| 57 | * non-blocking mode if it isn't already. |
| 58 | */ |
| 59 | this(TAsyncSocketManager asyncManager, Socket socket) { |
| 60 | asyncManager_ = asyncManager; |
| 61 | socket.blocking = false; |
| 62 | super(socket); |
| 63 | } |
| 64 | |
| 65 | /** |
| 66 | * Creates a new unconnected socket that will connect to the given host |
| 67 | * on the given port. |
| 68 | * |
| 69 | * Params: |
| 70 | * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. |
| 71 | * host = Remote host. |
| 72 | * port = Remote port. |
| 73 | */ |
| 74 | this(TAsyncSocketManager asyncManager, string host, ushort port) { |
| 75 | asyncManager_ = asyncManager; |
| 76 | super(host, port); |
| 77 | } |
| 78 | |
| 79 | override TAsyncManager asyncManager() @property { |
| 80 | return asyncManager_; |
| 81 | } |
| 82 | |
| 83 | /** |
| 84 | * Asynchronously connects the socket. |
| 85 | * |
| 86 | * Completes without blocking and defers further operations on the socket |
| 87 | * until the connection is established. If connecting fails, this is |
| 88 | * currently not indicated in any way other than every call to read/write |
| 89 | * failing. |
| 90 | */ |
| 91 | override void open() { |
| 92 | if (isOpen) return; |
| 93 | |
| 94 | enforce(!host_.empty, new TTransportException( |
| 95 | "Cannot open null host.", TTransportException.Type.NOT_OPEN)); |
| 96 | enforce(port_ != 0, new TTransportException( |
| 97 | "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); |
| 98 | |
| 99 | |
| 100 | // Cannot use std.socket.Socket.connect here because it hides away |
| 101 | // EINPROGRESS/WSAWOULDBLOCK. |
| 102 | Address addr; |
| 103 | try { |
| 104 | // Currently, we just go with the first address returned, could be made |
| 105 | // more intelligent though – IPv6? |
| 106 | addr = getAddress(host_, port_)[0]; |
| 107 | } catch (Exception e) { |
| 108 | throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, |
| 109 | TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); |
| 110 | } |
| 111 | |
| 112 | socket_ = new TcpSocket(addr.addressFamily); |
| 113 | socket_.blocking = false; |
| 114 | setSocketOpts(); |
| 115 | |
| 116 | auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); |
| 117 | if (errorCode == 0) { |
| 118 | // If the connection could be established immediately, just return. I |
| 119 | // don't know if this ever happens. |
| 120 | return; |
| 121 | } |
| 122 | |
| 123 | auto errno = getSocketErrno(); |
| 124 | if (errno != CONNECT_INPROGRESS_ERRNO) { |
| 125 | throw new TTransportException(`Could not establish connection to "` ~ |
| 126 | host_ ~ `": ` ~ socketErrnoString(errno), |
| 127 | TTransportException.Type.NOT_OPEN); |
| 128 | } |
| 129 | |
| 130 | // This is the expected case: connect() signalled that the connection |
| 131 | // is being established in the background. Queue up a work item with the |
| 132 | // async manager which just defers any other operations on this |
| 133 | // TAsyncSocket instance until the socket is ready. |
| 134 | asyncManager_.execute(this, |
| 135 | { |
| 136 | auto fiber = Fiber.getThis(); |
| 137 | TAsyncEventReason reason = void; |
| 138 | asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, |
| 139 | connectTimeout, |
| 140 | scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) |
| 141 | ); |
| 142 | Fiber.yield(); |
| 143 | |
| 144 | if (reason == TAsyncEventReason.TIMED_OUT) { |
| 145 | // Close the connection, so that subsequent work items fail immediately. |
| 146 | closeImmediately(); |
| 147 | return; |
| 148 | } |
| 149 | |
| 150 | int errorCode = void; |
| 151 | socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, |
| 152 | errorCode); |
| 153 | |
| 154 | if (errorCode) { |
| 155 | logInfo("Could not connect TAsyncSocket: %s", |
| 156 | socketErrnoString(errorCode)); |
| 157 | |
| 158 | // Close the connection, so that subsequent work items fail immediately. |
| 159 | closeImmediately(); |
| 160 | return; |
| 161 | } |
| 162 | |
| 163 | } |
| 164 | ); |
| 165 | } |
| 166 | |
| 167 | /** |
| 168 | * Closes the socket. |
| 169 | * |
| 170 | * Will block until all currently active operations are finished before the |
| 171 | * socket is closed. |
| 172 | */ |
| 173 | override void close() { |
| 174 | if (!isOpen) return; |
| 175 | |
| 176 | import core.sync.condition; |
| 177 | import core.sync.mutex; |
| 178 | |
| 179 | auto doneMutex = new Mutex; |
| 180 | auto doneCond = new Condition(doneMutex); |
| 181 | synchronized (doneMutex) { |
| 182 | asyncManager_.execute(this, |
| 183 | scopedDelegate( |
| 184 | { |
| 185 | closeImmediately(); |
| 186 | synchronized (doneMutex) doneCond.notifyAll(); |
| 187 | } |
| 188 | ) |
| 189 | ); |
| 190 | doneCond.wait(); |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | override bool peek() { |
| 195 | if (!isOpen) return false; |
| 196 | |
| 197 | ubyte buf; |
| 198 | auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); |
| 199 | if (r == Socket.ERROR) { |
| 200 | auto lastErrno = getSocketErrno(); |
| 201 | static if (connresetOnPeerShutdown) { |
| 202 | if (lastErrno == ECONNRESET) { |
| 203 | closeImmediately(); |
| 204 | return false; |
| 205 | } |
| 206 | } |
| 207 | throw new TTransportException("Peeking into socket failed: " ~ |
| 208 | socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); |
| 209 | } |
| 210 | return (r > 0); |
| 211 | } |
| 212 | |
| 213 | override size_t read(ubyte[] buf) { |
| 214 | enforce(isOpen, new TTransportException( |
| 215 | "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); |
| 216 | |
| 217 | typeof(getSocketErrno()) lastErrno; |
| 218 | |
| 219 | auto r = yieldOnBlock(socket_.receive(cast(void[])buf), |
| 220 | TAsyncEventType.READ); |
| 221 | |
| 222 | // If recv went fine, immediately return. |
| 223 | if (r >= 0) return r; |
| 224 | |
| 225 | // Something went wrong, find out how to handle it. |
| 226 | lastErrno = getSocketErrno(); |
| 227 | |
| 228 | static if (connresetOnPeerShutdown) { |
| 229 | // See top comment. |
| 230 | if (lastErrno == ECONNRESET) { |
| 231 | return 0; |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | throw new TTransportException("Receiving from socket failed: " ~ |
| 236 | socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); |
| 237 | } |
| 238 | |
| 239 | override void write(in ubyte[] buf) { |
| 240 | size_t sent; |
| 241 | while (sent < buf.length) { |
| 242 | sent += writeSome(buf[sent .. $]); |
| 243 | } |
| 244 | assert(sent == buf.length); |
| 245 | } |
| 246 | |
| 247 | override size_t writeSome(in ubyte[] buf) { |
| 248 | enforce(isOpen, new TTransportException( |
| 249 | "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); |
| 250 | |
| 251 | auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); |
| 252 | |
| 253 | // Everything went well, just return the number of bytes written. |
| 254 | if (r > 0) return r; |
| 255 | |
| 256 | // Handle error conditions. |
| 257 | if (r < 0) { |
| 258 | auto lastErrno = getSocketErrno(); |
| 259 | |
| 260 | auto type = TTransportException.Type.UNKNOWN; |
| 261 | if (isSocketCloseErrno(lastErrno)) { |
| 262 | type = TTransportException.Type.NOT_OPEN; |
| 263 | closeImmediately(); |
| 264 | } |
| 265 | |
| 266 | throw new TTransportException("Sending to socket failed: " ~ |
| 267 | socketErrnoString(lastErrno), type); |
| 268 | } |
| 269 | |
| 270 | // send() should never return 0. |
| 271 | throw new TTransportException("Sending to socket failed (0 bytes written).", |
| 272 | TTransportException.Type.UNKNOWN); |
| 273 | } |
| 274 | |
| 275 | /// The amount of time in which a conncetion must be established before the |
| 276 | /// open() call times out. |
| 277 | Duration connectTimeout = dur!"seconds"(5); |
| 278 | |
| 279 | private: |
| 280 | void closeImmediately() { |
| 281 | socket_.close(); |
| 282 | socket_ = null; |
| 283 | } |
| 284 | |
| 285 | T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { |
| 286 | while (true) { |
| 287 | auto result = call(); |
| 288 | if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; |
| 289 | |
| 290 | // We got an EAGAIN result, register a callback to return here once some |
| 291 | // event happens and yield. |
| 292 | |
| 293 | Duration timeout = void; |
| 294 | final switch (eventType) { |
| 295 | case TAsyncEventType.READ: |
| 296 | timeout = recvTimeout_; |
| 297 | break; |
| 298 | case TAsyncEventType.WRITE: |
| 299 | timeout = sendTimeout_; |
| 300 | break; |
| 301 | } |
| 302 | |
| 303 | auto fiber = Fiber.getThis(); |
| 304 | assert(fiber, "Current fiber null – not running in TAsyncManager?"); |
| 305 | TAsyncEventReason eventReason = void; |
| 306 | asyncManager_.addOneshotListener(socket_, eventType, timeout, |
| 307 | scopedDelegate((TAsyncEventReason reason) { |
| 308 | eventReason = reason; |
| 309 | fiber.call(); |
| 310 | }) |
| 311 | ); |
| 312 | |
| 313 | // Yields execution back to the async manager, will return back here once |
| 314 | // the above listener is called. |
| 315 | Fiber.yield(); |
| 316 | |
| 317 | if (eventReason == TAsyncEventReason.TIMED_OUT) { |
| 318 | // If we are cancelling the request due to a timed out operation, the |
| 319 | // connection is in an undefined state, because the server could decide |
| 320 | // to send the requested data later, or we could have already been half- |
| 321 | // way into writing a request. Thus, we close the connection to make any |
| 322 | // possibly queued up work items fail immediately. Besides, the server |
| 323 | // is not very likely to immediately recover after a socket-level |
| 324 | // timeout has expired anyway. |
| 325 | closeImmediately(); |
| 326 | |
| 327 | throw new TTransportException("Timed out while waiting for socket " ~ |
| 328 | "to get ready to " ~ to!string(eventType) ~ ".", |
| 329 | TTransportException.Type.TIMED_OUT); |
| 330 | } |
| 331 | } |
| 332 | } |
| 333 | |
| 334 | /// The TAsyncSocketManager to use for non-blocking I/O. |
| 335 | TAsyncSocketManager asyncManager_; |
| 336 | } |
| 337 | |
| 338 | private { |
| 339 | // std.socket doesn't include SO_ERROR for reasons unknown. |
| 340 | version (linux) { |
| 341 | enum SO_ERROR = 4; |
| 342 | } else version (OSX) { |
| 343 | enum SO_ERROR = 0x1007; |
| 344 | } else version (FreeBSD) { |
| 345 | enum SO_ERROR = 0x1007; |
| 346 | } else version (Win32) { |
| 347 | import std.c.windows.winsock : SO_ERROR; |
| 348 | } else static assert(false, "Don't know SO_ERROR on this platform."); |
| 349 | |
| 350 | // This hack forces a delegate literal to be scoped, even if it is passed to |
| 351 | // a function accepting normal delegates as well. DMD likes to allocate the |
| 352 | // context on the heap anyway, but it seems to work for LDC. |
| 353 | import std.traits : isDelegate; |
| 354 | auto scopedDelegate(D)(scope D d) if (isDelegate!D) { |
| 355 | return d; |
| 356 | } |
| 357 | } |