1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 /** 6 * Interface for the event loop implementations and the default event loop 7 * chooser. 8 * 9 * --- 10 * import tanya.async; 11 * import tanya.memory; 12 * import tanya.network.socket; 13 * 14 * class EchoProtocol : TransmissionControlProtocol 15 * { 16 * private DuplexTransport transport; 17 * 18 * void received(in ubyte[] data) @nogc 19 * { 20 * ubyte[512] buffer; 21 * buffer[0 .. data.length] = data; 22 * transport.write(buffer[]); 23 * } 24 * 25 * void connected(DuplexTransport transport) @nogc 26 * { 27 * this.transport = transport; 28 * } 29 * 30 * void disconnected(SocketException e) @nogc 31 * { 32 * } 33 * } 34 * 35 * void main() 36 * { 37 * auto address = address4("127.0.0.1"); 38 * auto endpoint = Endpoint(address.get, cast(ushort) 8192); 39 * 40 * version (Windows) 41 * { 42 * auto sock = defaultAllocator.make!OverlappedStreamSocket(AddressFamily.inet); 43 * } 44 * else 45 * { 46 * auto sock = defaultAllocator.make!StreamSocket(AddressFamily.inet); 47 * sock.blocking = false; 48 * } 49 * 50 * sock.bind(endpoint); 51 * sock.listen(5); 52 * 53 * auto io = defaultAllocator.make!ConnectionWatcher(sock); 54 * io.setProtocol!EchoProtocol; 55 * 56 * defaultLoop.start(io); 57 * defaultLoop.run(); 58 * 59 * sock.shutdown(); 60 * defaultAllocator.dispose(io); 61 * defaultAllocator.dispose(sock); 62 * } 63 * --- 64 * 65 * Copyright: Eugene Wissner 2016-2020. 66 * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, 67 * Mozilla Public License, v. 2.0). 68 * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) 69 * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/loop.d, 70 * tanya/async/loop.d) 71 */ 72 module tanya.async.loop; 73 74 import core.time; 75 import tanya.async.transport; 76 import tanya.async.watcher; 77 import tanya.bitmanip; 78 import tanya.container.buffer; 79 import tanya.container.list; 80 import tanya.memory.allocator; 81 import tanya.net.socket; 82 83 version (DisableBackends) 84 { 85 } 86 else version (D_Ddoc) 87 { 88 } 89 else version (linux) 90 { 91 import tanya.async.event.epoll; 92 version = Epoll; 93 } 94 else version (Windows) 95 { 96 import tanya.async.event.iocp; 97 version = IOCP; 98 } 99 else version (OSX) 100 { 101 version = Kqueue; 102 } 103 else version (iOS) 104 { 105 version = Kqueue; 106 } 107 else version (FreeBSD) 108 { 109 version = Kqueue; 110 } 111 else version (OpenBSD) 112 { 113 version = Kqueue; 114 } 115 else version (DragonFlyBSD) 116 { 117 version = Kqueue; 118 } 119 120 /** 121 * Events. 122 */ 123 enum Event : uint 124 { 125 none = 0x00, /// No events. 126 read = 0x01, /// Non-blocking read call. 127 write = 0x02, /// Non-blocking write call. 128 accept = 0x04, /// Connection made. 129 error = 0x80000000, /// Sent when an error occurs. 130 } 131 132 alias EventMask = BitFlags!Event; 133 134 /** 135 * Event loop. 136 */ 137 abstract class Loop 138 { 139 protected bool done = true; 140 141 /// Pending watchers. 142 protected DList!Watcher pendings; 143 144 /** 145 * Returns: Maximal event count can be got at a time 146 * (should be supported by the backend). 147 */ 148 protected @property uint maxEvents() 149 const pure nothrow @safe @nogc 150 { 151 return 128U; 152 } 153 154 /** 155 * Initializes the loop. 156 */ 157 this() @nogc 158 { 159 } 160 161 /** 162 * Frees loop internals. 163 */ 164 ~this() @nogc 165 { 166 for (; !this.pendings.empty; this.pendings.removeFront()) 167 { 168 defaultAllocator.dispose(this.pendings.front); 169 } 170 } 171 172 /** 173 * Starts the loop. 174 */ 175 void run() @nogc 176 { 177 this.done = false; 178 do 179 { 180 poll(); 181 182 // Invoke pendings 183 for (; !this.pendings.empty; this.pendings.removeFront()) 184 { 185 this.pendings.front.invoke(); 186 } 187 } 188 while (!this.done); 189 } 190 191 /** 192 * Break out of the loop. 193 */ 194 void unloop() @safe pure nothrow @nogc 195 { 196 this.done = true; 197 } 198 199 /** 200 * Start watching. 201 * 202 * Params: 203 * watcher = Watcher. 204 */ 205 void start(ConnectionWatcher watcher) @nogc 206 { 207 if (watcher.active) 208 { 209 return; 210 } 211 watcher.active = true; 212 213 reify(watcher, EventMask(Event.none), EventMask(Event.accept)); 214 } 215 216 /** 217 * Stop watching. 218 * 219 * Params: 220 * watcher = Watcher. 221 */ 222 void stop(ConnectionWatcher watcher) @nogc 223 { 224 if (!watcher.active) 225 { 226 return; 227 } 228 watcher.active = false; 229 230 reify(watcher, EventMask(Event.accept), EventMask(Event.none)); 231 } 232 233 /** 234 * Should be called if the backend configuration changes. 235 * 236 * Params: 237 * watcher = Watcher. 238 * oldEvents = The events were already set. 239 * events = The events should be set. 240 * 241 * Returns: $(D_KEYWORD true) if the operation was successful. 242 */ 243 abstract protected bool reify(SocketWatcher watcher, 244 EventMask oldEvents, 245 EventMask events) @nogc; 246 247 /** 248 * Returns: The blocking time. 249 */ 250 protected @property inout(Duration) blockTime() 251 inout @safe pure nothrow @nogc 252 { 253 // Don't block if we have to do. 254 return pendings.empty ? blockTime_ : Duration.zero; 255 } 256 257 /** 258 * Sets the blocking time for IO watchers. 259 * 260 * Params: 261 * blockTime = The blocking time. Cannot be larger than 262 * $(D_PSYMBOL maxBlockTime). 263 */ 264 protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc 265 in 266 { 267 assert(blockTime <= 1.dur!"hours", "Too long to wait."); 268 assert(!blockTime.isNegative); 269 } 270 do 271 { 272 blockTime_ = blockTime; 273 } 274 275 /** 276 * Does the actual polling. 277 */ 278 abstract protected void poll() @nogc; 279 280 /// Maximal block time. 281 protected Duration blockTime_ = 1.dur!"minutes"; 282 } 283 284 /** 285 * Exception thrown on errors in the event loop. 286 */ 287 class BadLoopException : Exception 288 { 289 /** 290 * Params: 291 * file = The file where the exception occurred. 292 * line = The line number where the exception occurred. 293 * next = The previous exception in the chain of exceptions, if any. 294 */ 295 this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) 296 pure nothrow const @safe @nogc 297 { 298 super("Event loop cannot be initialized.", file, line, next); 299 } 300 } 301 302 /** 303 * Returns the event loop used by default. If an event loop wasn't set with 304 * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL defaultLoop) will try to 305 * choose an event loop supported on the system. 306 * 307 * Returns: The default event loop. 308 */ 309 @property Loop defaultLoop() @nogc 310 { 311 if (defaultLoop_ !is null) 312 { 313 return defaultLoop_; 314 } 315 version (Epoll) 316 { 317 defaultLoop_ = defaultAllocator.make!EpollLoop; 318 } 319 else version (IOCP) 320 { 321 defaultLoop_ = defaultAllocator.make!IOCPLoop; 322 } 323 else version (Kqueue) 324 { 325 import tanya.async.event.kqueue; 326 defaultLoop_ = defaultAllocator.make!KqueueLoop; 327 } 328 return defaultLoop_; 329 } 330 331 /** 332 * Sets the default event loop. 333 * 334 * This property makes it possible to implement your own backends or event 335 * loops, for example, if the system is not supported or if you want to 336 * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass 337 * your implementation to this property. 338 * 339 * Params: 340 * loop = The event loop. 341 */ 342 @property void defaultLoop(Loop loop) @nogc 343 in 344 { 345 assert(loop !is null); 346 } 347 do 348 { 349 defaultLoop_ = loop; 350 } 351 352 private Loop defaultLoop_;