1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 /** 6 * Event loop implementation for Windows. 7 * 8 * Copyright: Eugene Wissner 2016-2020. 9 * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, 10 * Mozilla Public License, v. 2.0). 11 * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) 12 * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/iocp.d, 13 * tanya/async/event/iocp.d) 14 */ 15 module tanya.async.event.iocp; 16 17 version (D_Ddoc) 18 { 19 } 20 else version (Windows): 21 22 import core.sys.windows.mswsock; 23 import core.sys.windows.winsock2; 24 import tanya.async.loop; 25 import tanya.async.protocol; 26 import tanya.async.transport; 27 import tanya.async.watcher; 28 import tanya.container.buffer; 29 import tanya.memory.allocator; 30 import tanya.network.socket; 31 import tanya.sys.windows.winbase; 32 33 /** 34 * Transport for stream sockets. 35 */ 36 final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport 37 { 38 private SocketException exception; 39 40 private ReadBuffer!ubyte output; 41 42 private WriteBuffer!ubyte input; 43 44 private Protocol protocol_; 45 46 private bool closing; 47 48 /** 49 * Creates new completion port transport. 50 * 51 * Params: 52 * socket = Socket. 53 * 54 * Precondition: $(D_INLINECODE socket !is null) 55 */ 56 this(OverlappedConnectedSocket socket) @nogc 57 { 58 super(socket); 59 output = ReadBuffer!ubyte(8192, 1024); 60 input = WriteBuffer!ubyte(8192); 61 active = true; 62 } 63 64 /** 65 * Returns: Socket. 66 * 67 * Postcondition: $(D_INLINECODE socket !is null) 68 */ 69 override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc 70 out (socket) 71 { 72 assert(socket !is null); 73 } 74 do 75 { 76 return cast(OverlappedConnectedSocket) socket_; 77 } 78 79 /** 80 * Returns $(D_PARAM true) if the transport is closing or closed. 81 */ 82 bool isClosing() const pure nothrow @safe @nogc 83 { 84 return closing; 85 } 86 87 /** 88 * Close the transport. 89 * 90 * Buffered data will be flushed. No more data will be received. 91 */ 92 void close() pure nothrow @safe @nogc 93 { 94 closing = true; 95 } 96 97 /** 98 * Write some data to the transport. 99 * 100 * Params: 101 * data = Data to send. 102 */ 103 void write(ubyte[] data) @nogc 104 { 105 input ~= data; 106 } 107 108 /** 109 * Returns: Application protocol. 110 */ 111 @property Protocol protocol() pure nothrow @safe @nogc 112 { 113 return protocol_; 114 } 115 116 /** 117 * Switches the protocol. 118 * 119 * The protocol is deallocated by the event loop. 120 * 121 * Params: 122 * protocol = Application protocol. 123 * 124 * Precondition: $(D_INLINECODE protocol !is null) 125 */ 126 @property void protocol(Protocol protocol) pure nothrow @safe @nogc 127 in 128 { 129 assert(protocol !is null); 130 } 131 do 132 { 133 protocol_ = protocol; 134 } 135 136 /** 137 * Invokes the watcher callback. 138 */ 139 override void invoke() @nogc 140 { 141 if (output.length) 142 { 143 immutable empty = input.length == 0; 144 protocol.received(output[0 .. $]); 145 output.clear(); 146 if (empty) 147 { 148 SocketState overlapped; 149 try 150 { 151 overlapped = defaultAllocator.make!SocketState; 152 socket.beginSend(input[], overlapped); 153 } 154 catch (SocketException e) 155 { 156 defaultAllocator.dispose(overlapped); 157 defaultAllocator.dispose(e); 158 } 159 } 160 } 161 else 162 { 163 protocol.disconnected(exception); 164 defaultAllocator.dispose(protocol_); 165 defaultAllocator.dispose(exception); 166 active = false; 167 } 168 } 169 } 170 171 final class IOCPLoop : Loop 172 { 173 protected HANDLE completionPort; 174 175 protected OVERLAPPED overlap; 176 177 /** 178 * Initializes the loop. 179 */ 180 this() @nogc 181 { 182 super(); 183 184 completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); 185 if (!completionPort) 186 { 187 throw make!BadLoopException(defaultAllocator, 188 "Creating completion port failed"); 189 } 190 } 191 192 /** 193 * Should be called if the backend configuration changes. 194 * 195 * Params: 196 * watcher = Watcher. 197 * oldEvents = The events were already set. 198 * events = The events should be set. 199 * 200 * Returns: $(D_KEYWORD true) if the operation was successful. 201 */ 202 override protected bool reify(SocketWatcher watcher, 203 EventMask oldEvents, 204 EventMask events) @nogc 205 { 206 SocketState overlapped; 207 if (!(oldEvents & Event.accept) && (events & Event.accept)) 208 { 209 auto socket = cast(OverlappedStreamSocket) watcher.socket; 210 assert(socket !is null); 211 212 if (CreateIoCompletionPort(cast(HANDLE) socket.handle, 213 completionPort, 214 cast(size_t) (cast(void*) watcher), 215 0) !is completionPort) 216 { 217 return false; 218 } 219 220 try 221 { 222 overlapped = defaultAllocator.make!SocketState; 223 socket.beginAccept(overlapped); 224 } 225 catch (SocketException e) 226 { 227 defaultAllocator.dispose(overlapped); 228 defaultAllocator.dispose(e); 229 return false; 230 } 231 } 232 if ((!(oldEvents & Event.read) && (events & Event.read)) 233 || (!(oldEvents & Event.write) && (events & Event.write))) 234 { 235 auto transport = cast(StreamTransport) watcher; 236 assert(transport !is null); 237 238 if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, 239 completionPort, 240 cast(size_t) (cast(void*) watcher), 241 0) !is completionPort) 242 { 243 return false; 244 } 245 246 // Begin to read 247 if (!(oldEvents & Event.read) && (events & Event.read)) 248 { 249 try 250 { 251 overlapped = defaultAllocator.make!SocketState; 252 transport.socket.beginReceive(transport.output[], overlapped); 253 } 254 catch (SocketException e) 255 { 256 defaultAllocator.dispose(overlapped); 257 defaultAllocator.dispose(e); 258 return false; 259 } 260 } 261 } 262 return true; 263 } 264 265 private void kill(StreamTransport transport, 266 SocketException exception = null) @nogc 267 in 268 { 269 assert(transport !is null); 270 } 271 do 272 { 273 transport.socket.shutdown(); 274 defaultAllocator.dispose(transport.socket); 275 transport.exception = exception; 276 pendings.insertBack(transport); 277 } 278 279 /** 280 * Does the actual polling. 281 */ 282 override protected void poll() @nogc 283 { 284 DWORD lpNumberOfBytes; 285 size_t key; 286 OVERLAPPED* overlap; 287 immutable timeout = cast(immutable int) blockTime.total!"msecs"; 288 289 auto result = GetQueuedCompletionStatus(completionPort, 290 &lpNumberOfBytes, 291 &key, 292 &overlap, 293 timeout); 294 if (result == FALSE && overlap is null) 295 { 296 return; // Timeout 297 } 298 299 enum size_t offset = size_t.sizeof * 2; 300 auto overlapped = cast(SocketState) ((cast(void*) overlap) - offset); 301 assert(overlapped !is null); 302 scope (failure) 303 { 304 defaultAllocator.dispose(overlapped); 305 } 306 307 switch (overlapped.event) 308 { 309 case OverlappedSocketEvent.accept: 310 auto connection = cast(ConnectionWatcher) (cast(void*) key); 311 assert(connection !is null); 312 313 auto listener = cast(OverlappedStreamSocket) connection.socket; 314 assert(listener !is null); 315 316 auto socket = listener.endAccept(overlapped); 317 auto transport = defaultAllocator.make!StreamTransport(socket); 318 319 connection.incoming.insertBack(transport); 320 321 reify(transport, 322 EventMask(Event.none), 323 EventMask(Event.read | Event.write)); 324 325 pendings.insertBack(connection); 326 listener.beginAccept(overlapped); 327 break; 328 case OverlappedSocketEvent.read: 329 auto transport = cast(StreamTransport) (cast(void*) key); 330 assert(transport !is null); 331 332 if (!transport.active) 333 { 334 defaultAllocator.dispose(transport); 335 defaultAllocator.dispose(overlapped); 336 return; 337 } 338 339 int received; 340 SocketException exception; 341 try 342 { 343 received = transport.socket.endReceive(overlapped); 344 } 345 catch (SocketException e) 346 { 347 exception = e; 348 } 349 if (transport.socket.disconnected) 350 { 351 // We want to get one last notification to destroy the watcher. 352 transport.socket.beginReceive(transport.output[], overlapped); 353 kill(transport, exception); 354 } 355 else if (received > 0) 356 { 357 immutable full = transport.output.free == received; 358 359 transport.output += received; 360 // Receive was interrupted because the buffer is full. We have to continue. 361 if (full) 362 { 363 transport.socket.beginReceive(transport.output[], overlapped); 364 } 365 pendings.insertBack(transport); 366 } 367 break; 368 case OverlappedSocketEvent.write: 369 auto transport = cast(StreamTransport) (cast(void*) key); 370 assert(transport !is null); 371 372 transport.input += transport.socket.endSend(overlapped); 373 if (transport.input.length > 0) 374 { 375 transport.socket.beginSend(transport.input[], overlapped); 376 } 377 else 378 { 379 transport.socket.beginReceive(transport.output[], overlapped); 380 if (transport.isClosing()) 381 { 382 kill(transport); 383 } 384 } 385 break; 386 default: 387 assert(false, "Unknown event"); 388 } 389 } 390 }