1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 /* 6 * This module contains base implementations for reactor event loops. 7 * 8 * Copyright: Eugene Wissner 2016-2020. 9 * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, 10 * Mozilla Public License, v. 2.0). 11 * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) 12 * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/selector.d, 13 * tanya/async/event/selector.d) 14 */ 15 module tanya.async.event.selector; 16 17 version (D_Ddoc) 18 { 19 } 20 else version (Posix): 21 22 import tanya.async.loop; 23 import tanya.async.protocol; 24 import tanya.async.transport; 25 import tanya.async.watcher; 26 import tanya.container.array; 27 import tanya.container.buffer; 28 import tanya.memory.allocator; 29 import tanya.net.socket; 30 31 /** 32 * Transport for stream sockets. 33 */ 34 package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport 35 { 36 private SelectorLoop loop; 37 38 private SocketException exception; 39 40 package ReadBuffer!ubyte output; 41 42 package WriteBuffer!ubyte input; 43 44 private Protocol protocol_; 45 46 private bool closing; 47 48 /// Received notification that the underlying socket is write-ready. 49 package bool writeReady; 50 51 /** 52 * Params: 53 * loop = Event loop. 54 * socket = Socket. 55 * 56 * Precondition: $(D_INLINECODE loop !is null && socket !is null) 57 */ 58 this(SelectorLoop loop, ConnectedSocket socket) @nogc 59 in 60 { 61 assert(loop !is null); 62 } 63 do 64 { 65 super(socket); 66 this.loop = loop; 67 output = ReadBuffer!ubyte(8192, 1024); 68 input = WriteBuffer!ubyte(8192); 69 active = true; 70 } 71 72 /** 73 * Returns: Socket. 74 * 75 * Postcondition: $(D_INLINECODE socket !is null) 76 */ 77 override @property ConnectedSocket socket() pure nothrow @safe @nogc 78 out (socket) 79 { 80 assert(socket !is null); 81 } 82 do 83 { 84 return cast(ConnectedSocket) socket_; 85 } 86 87 private @property void socket(ConnectedSocket socket) 88 pure nothrow @safe @nogc 89 in 90 { 91 assert(socket !is null); 92 } 93 do 94 { 95 socket_ = socket; 96 } 97 98 /** 99 * Returns: Application protocol. 100 */ 101 @property Protocol protocol() pure nothrow @safe @nogc 102 { 103 return protocol_; 104 } 105 106 /** 107 * Switches the protocol. 108 * 109 * The protocol is deallocated by the event loop. 110 * 111 * Params: 112 * protocol = Application protocol. 113 * 114 * Precondition: $(D_INLINECODE protocol !is null) 115 */ 116 @property void protocol(Protocol protocol) pure nothrow @safe @nogc 117 in 118 { 119 assert(protocol !is null); 120 } 121 do 122 { 123 protocol_ = protocol; 124 } 125 126 /** 127 * Returns $(D_PARAM true) if the transport is closing or closed. 128 */ 129 bool isClosing() const pure nothrow @safe @nogc 130 { 131 return closing; 132 } 133 134 /** 135 * Close the transport. 136 * 137 * Buffered data will be flushed. No more data will be received. 138 */ 139 void close() @nogc 140 { 141 closing = true; 142 loop.reify(this, 143 EventMask(Event.read | Event.write), 144 EventMask(Event.write)); 145 } 146 147 /** 148 * Invokes the watcher callback. 149 */ 150 override void invoke() @nogc 151 { 152 if (output.length) 153 { 154 protocol.received(output[0 .. $]); 155 output.clear(); 156 if (isClosing() && input.length == 0) 157 { 158 loop.kill(this); 159 } 160 } 161 else 162 { 163 protocol.disconnected(exception); 164 defaultAllocator.dispose(protocol_); 165 defaultAllocator.dispose(exception); 166 active = false; 167 } 168 } 169 170 /** 171 * Write some data to the transport. 172 * 173 * Params: 174 * data = Data to send. 175 */ 176 void write(ubyte[] data) @nogc 177 { 178 if (!data.length) 179 { 180 return; 181 } 182 // Try to write if the socket is write ready. 183 if (writeReady) 184 { 185 ptrdiff_t sent; 186 SocketException exception; 187 try 188 { 189 sent = socket.send(data); 190 if (sent == 0) 191 { 192 writeReady = false; 193 } 194 } 195 catch (SocketException e) 196 { 197 writeReady = false; 198 exception = e; 199 } 200 if (sent < data.length) 201 { 202 input ~= data[sent..$]; 203 loop.feed(this, exception); 204 } 205 } 206 else 207 { 208 input ~= data; 209 } 210 } 211 } 212 213 abstract class SelectorLoop : Loop 214 { 215 /// Pending connections. 216 protected Array!SocketWatcher connections; 217 218 this() @nogc 219 { 220 super(); 221 this.connections = Array!SocketWatcher(maxEvents); 222 } 223 224 ~this() @nogc 225 { 226 foreach (ref connection; this.connections[]) 227 { 228 // We want to free only the transports. ConnectionWatcher are 229 // created by the user and should be freed by himself. 230 if (cast(StreamTransport) connection !is null) 231 { 232 defaultAllocator.dispose(connection); 233 } 234 } 235 } 236 237 /** 238 * Should be called if the backend configuration changes. 239 * 240 * Params: 241 * watcher = Watcher. 242 * oldEvents = The events were already set. 243 * events = The events should be set. 244 * 245 * Returns: $(D_KEYWORD true) if the operation was successful. 246 */ 247 override abstract protected bool reify(SocketWatcher watcher, 248 EventMask oldEvents, 249 EventMask events) @nogc; 250 251 /** 252 * Kills the watcher and closes the connection. 253 * 254 * Params: 255 * transport = Transport. 256 * exception = Occurred exception. 257 */ 258 protected void kill(StreamTransport transport, 259 SocketException exception = null) @nogc 260 in 261 { 262 assert(transport !is null); 263 } 264 do 265 { 266 transport.socket.shutdown(); 267 defaultAllocator.dispose(transport.socket); 268 transport.exception = exception; 269 pendings.insertBack(transport); 270 } 271 272 /** 273 * If the transport couldn't send the data, the further sending should 274 * be handled by the event loop. 275 * 276 * Params: 277 * transport = Transport. 278 * exception = Exception thrown on sending. 279 * 280 * Returns: $(D_KEYWORD true) if the operation could be successfully 281 * completed or scheduled, $(D_KEYWORD false) otherwise (the 282 * transport will be destroyed then). 283 */ 284 protected bool feed(StreamTransport transport, 285 SocketException exception = null) @nogc 286 in 287 { 288 assert(transport !is null); 289 } 290 do 291 { 292 while (transport.input.length && transport.writeReady) 293 { 294 try 295 { 296 ptrdiff_t sent = transport.socket.send(transport.input[]); 297 if (sent == 0) 298 { 299 transport.writeReady = false; 300 } 301 else 302 { 303 transport.input += sent; 304 } 305 } 306 catch (SocketException e) 307 { 308 exception = e; 309 transport.writeReady = false; 310 } 311 } 312 if (exception !is null) 313 { 314 kill(transport, exception); 315 return false; 316 } 317 if (transport.input.length == 0 && transport.isClosing()) 318 { 319 kill(transport); 320 } 321 return true; 322 } 323 324 /** 325 * Start watching. 326 * 327 * Params: 328 * watcher = Watcher. 329 */ 330 override void start(ConnectionWatcher watcher) @nogc 331 { 332 if (watcher.active) 333 { 334 return; 335 } 336 337 if (connections.length <= watcher.socket) 338 { 339 connections.length = watcher.socket.handle + maxEvents / 2; 340 } 341 connections[watcher.socket.handle] = watcher; 342 343 super.start(watcher); 344 } 345 346 /** 347 * Accept incoming connections. 348 * 349 * Params: 350 * connection = Connection watcher ready to accept. 351 */ 352 package void acceptConnections(ConnectionWatcher connection) @nogc 353 in 354 { 355 assert(connection !is null); 356 } 357 do 358 { 359 while (true) 360 { 361 ConnectedSocket client; 362 try 363 { 364 client = (cast(StreamSocket) connection.socket).accept(); 365 } 366 catch (SocketException e) 367 { 368 defaultAllocator.dispose(e); 369 break; 370 } 371 if (client is null) 372 { 373 break; 374 } 375 376 StreamTransport transport; 377 378 if (connections.length > client.handle) 379 { 380 transport = cast(StreamTransport) connections[client.handle]; 381 } 382 else 383 { 384 connections.length = client.handle + maxEvents / 2; 385 } 386 if (transport is null) 387 { 388 transport = defaultAllocator.make!StreamTransport(this, client); 389 connections[client.handle] = transport; 390 } 391 else 392 { 393 transport.socket = client; 394 } 395 396 reify(transport, 397 EventMask(Event.none), 398 EventMask(Event.read | Event.write)); 399 connection.incoming.insertBack(transport); 400 } 401 402 if (!connection.incoming.empty) 403 { 404 pendings.insertBack(connection); 405 } 406 } 407 }