blob: 6de13d9167a53426be0a08dbe800cdf4d011ea44 [file] [log] [blame]
Jake Farrellb95b0ff2012-03-22 21:49:10 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19module thrift.async.socket;
20
21import core.thread : Fiber;
22import core.time : dur, Duration;
23import std.array : empty;
24import std.conv : to;
25import std.exception : enforce;
26import std.socket;
27import thrift.base;
28import thrift.async.base;
29import thrift.transport.base;
30import thrift.transport.socket : TSocketBase;
31import thrift.internal.endian;
32import thrift.internal.socket;
33
34version (Windows) {
35 import std.c.windows.winsock : connect;
36} else version (Posix) {
37 import core.sys.posix.sys.socket : connect;
38} else static assert(0, "Don't know connect on this platform.");
39
40/**
41 * Non-blocking socket implementation of the TTransport interface.
42 *
43 * Whenever a socket operation would block, TAsyncSocket registers a callback
44 * with the specified TAsyncSocketManager and yields.
45 *
46 * As for thrift.transport.socket, due to the limitations of std.socket,
47 * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
48 * not).
49 */
50class TAsyncSocket : TSocketBase, TAsyncTransport {
51 /**
52 * Constructor that takes an already created, connected (!) socket.
53 *
54 * Params:
55 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
56 * socket = Already created, connected socket object. Will be switched to
57 * non-blocking mode if it isn't already.
58 */
59 this(TAsyncSocketManager asyncManager, Socket socket) {
60 asyncManager_ = asyncManager;
61 socket.blocking = false;
62 super(socket);
63 }
64
65 /**
66 * Creates a new unconnected socket that will connect to the given host
67 * on the given port.
68 *
69 * Params:
70 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
71 * host = Remote host.
72 * port = Remote port.
73 */
74 this(TAsyncSocketManager asyncManager, string host, ushort port) {
75 asyncManager_ = asyncManager;
76 super(host, port);
77 }
78
79 override TAsyncManager asyncManager() @property {
80 return asyncManager_;
81 }
82
83 /**
84 * Asynchronously connects the socket.
85 *
86 * Completes without blocking and defers further operations on the socket
87 * until the connection is established. If connecting fails, this is
88 * currently not indicated in any way other than every call to read/write
89 * failing.
90 */
91 override void open() {
92 if (isOpen) return;
93
94 enforce(!host_.empty, new TTransportException(
95 "Cannot open null host.", TTransportException.Type.NOT_OPEN));
96 enforce(port_ != 0, new TTransportException(
97 "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
98
99
100 // Cannot use std.socket.Socket.connect here because it hides away
101 // EINPROGRESS/WSAWOULDBLOCK.
102 Address addr;
103 try {
104 // Currently, we just go with the first address returned, could be made
105 // more intelligent though – IPv6?
106 addr = getAddress(host_, port_)[0];
107 } catch (Exception e) {
108 throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
109 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
110 }
111
112 socket_ = new TcpSocket(addr.addressFamily);
113 socket_.blocking = false;
114 setSocketOpts();
115
116 auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
117 if (errorCode == 0) {
118 // If the connection could be established immediately, just return. I
119 // don't know if this ever happens.
120 return;
121 }
122
123 auto errno = getSocketErrno();
124 if (errno != CONNECT_INPROGRESS_ERRNO) {
125 throw new TTransportException(`Could not establish connection to "` ~
126 host_ ~ `": ` ~ socketErrnoString(errno),
127 TTransportException.Type.NOT_OPEN);
128 }
129
130 // This is the expected case: connect() signalled that the connection
131 // is being established in the background. Queue up a work item with the
132 // async manager which just defers any other operations on this
133 // TAsyncSocket instance until the socket is ready.
134 asyncManager_.execute(this,
135 {
136 auto fiber = Fiber.getThis();
137 TAsyncEventReason reason = void;
138 asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
139 connectTimeout,
140 scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
141 );
142 Fiber.yield();
143
144 if (reason == TAsyncEventReason.TIMED_OUT) {
145 // Close the connection, so that subsequent work items fail immediately.
146 closeImmediately();
147 return;
148 }
149
150 int errorCode = void;
151 socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
152 errorCode);
153
154 if (errorCode) {
155 logInfo("Could not connect TAsyncSocket: %s",
156 socketErrnoString(errorCode));
157
158 // Close the connection, so that subsequent work items fail immediately.
159 closeImmediately();
160 return;
161 }
162
163 }
164 );
165 }
166
167 /**
168 * Closes the socket.
169 *
170 * Will block until all currently active operations are finished before the
171 * socket is closed.
172 */
173 override void close() {
174 if (!isOpen) return;
175
176 import core.sync.condition;
177 import core.sync.mutex;
178
179 auto doneMutex = new Mutex;
180 auto doneCond = new Condition(doneMutex);
181 synchronized (doneMutex) {
182 asyncManager_.execute(this,
183 scopedDelegate(
184 {
185 closeImmediately();
186 synchronized (doneMutex) doneCond.notifyAll();
187 }
188 )
189 );
190 doneCond.wait();
191 }
192 }
193
194 override bool peek() {
195 if (!isOpen) return false;
196
197 ubyte buf;
198 auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
199 if (r == Socket.ERROR) {
200 auto lastErrno = getSocketErrno();
201 static if (connresetOnPeerShutdown) {
202 if (lastErrno == ECONNRESET) {
203 closeImmediately();
204 return false;
205 }
206 }
207 throw new TTransportException("Peeking into socket failed: " ~
208 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
209 }
210 return (r > 0);
211 }
212
213 override size_t read(ubyte[] buf) {
214 enforce(isOpen, new TTransportException(
215 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
216
217 typeof(getSocketErrno()) lastErrno;
218
219 auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
220 TAsyncEventType.READ);
221
222 // If recv went fine, immediately return.
223 if (r >= 0) return r;
224
225 // Something went wrong, find out how to handle it.
226 lastErrno = getSocketErrno();
227
228 static if (connresetOnPeerShutdown) {
229 // See top comment.
230 if (lastErrno == ECONNRESET) {
231 return 0;
232 }
233 }
234
235 throw new TTransportException("Receiving from socket failed: " ~
236 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
237 }
238
239 override void write(in ubyte[] buf) {
240 size_t sent;
241 while (sent < buf.length) {
242 sent += writeSome(buf[sent .. $]);
243 }
244 assert(sent == buf.length);
245 }
246
247 override size_t writeSome(in ubyte[] buf) {
248 enforce(isOpen, new TTransportException(
249 "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
250
251 auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
252
253 // Everything went well, just return the number of bytes written.
254 if (r > 0) return r;
255
256 // Handle error conditions.
257 if (r < 0) {
258 auto lastErrno = getSocketErrno();
259
260 auto type = TTransportException.Type.UNKNOWN;
261 if (isSocketCloseErrno(lastErrno)) {
262 type = TTransportException.Type.NOT_OPEN;
263 closeImmediately();
264 }
265
266 throw new TTransportException("Sending to socket failed: " ~
267 socketErrnoString(lastErrno), type);
268 }
269
270 // send() should never return 0.
271 throw new TTransportException("Sending to socket failed (0 bytes written).",
272 TTransportException.Type.UNKNOWN);
273 }
274
275 /// The amount of time in which a conncetion must be established before the
276 /// open() call times out.
277 Duration connectTimeout = dur!"seconds"(5);
278
279private:
280 void closeImmediately() {
281 socket_.close();
282 socket_ = null;
283 }
284
285 T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
286 while (true) {
287 auto result = call();
288 if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
289
290 // We got an EAGAIN result, register a callback to return here once some
291 // event happens and yield.
292
293 Duration timeout = void;
294 final switch (eventType) {
295 case TAsyncEventType.READ:
296 timeout = recvTimeout_;
297 break;
298 case TAsyncEventType.WRITE:
299 timeout = sendTimeout_;
300 break;
301 }
302
303 auto fiber = Fiber.getThis();
304 assert(fiber, "Current fiber null – not running in TAsyncManager?");
305 TAsyncEventReason eventReason = void;
306 asyncManager_.addOneshotListener(socket_, eventType, timeout,
307 scopedDelegate((TAsyncEventReason reason) {
308 eventReason = reason;
309 fiber.call();
310 })
311 );
312
313 // Yields execution back to the async manager, will return back here once
314 // the above listener is called.
315 Fiber.yield();
316
317 if (eventReason == TAsyncEventReason.TIMED_OUT) {
318 // If we are cancelling the request due to a timed out operation, the
319 // connection is in an undefined state, because the server could decide
320 // to send the requested data later, or we could have already been half-
321 // way into writing a request. Thus, we close the connection to make any
322 // possibly queued up work items fail immediately. Besides, the server
323 // is not very likely to immediately recover after a socket-level
324 // timeout has expired anyway.
325 closeImmediately();
326
327 throw new TTransportException("Timed out while waiting for socket " ~
328 "to get ready to " ~ to!string(eventType) ~ ".",
329 TTransportException.Type.TIMED_OUT);
330 }
331 }
332 }
333
334 /// The TAsyncSocketManager to use for non-blocking I/O.
335 TAsyncSocketManager asyncManager_;
336}
337
338private {
339 // std.socket doesn't include SO_ERROR for reasons unknown.
340 version (linux) {
341 enum SO_ERROR = 4;
342 } else version (OSX) {
343 enum SO_ERROR = 0x1007;
344 } else version (FreeBSD) {
345 enum SO_ERROR = 0x1007;
346 } else version (Win32) {
347 import std.c.windows.winsock : SO_ERROR;
348 } else static assert(false, "Don't know SO_ERROR on this platform.");
349
350 // This hack forces a delegate literal to be scoped, even if it is passed to
351 // a function accepting normal delegates as well. DMD likes to allocate the
352 // context on the heap anyway, but it seems to work for LDC.
353 import std.traits : isDelegate;
354 auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
355 return d;
356 }
357}