1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 /* 6 * Event loop implementation for *BSD. 7 * 8 * Copyright: Eugene Wissner 2016-2020. 9 * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, 10 * Mozilla Public License, v. 2.0). 11 * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) 12 * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/kqueue.d, 13 * tanya/async/event/kqueue.d) 14 */ 15 module tanya.async.event.kqueue; 16 17 version (D_Ddoc) 18 { 19 } 20 else version (OSX) 21 { 22 version = MacBSD; 23 } 24 else version (iOS) 25 { 26 version = MacBSD; 27 } 28 else version (TVOS) 29 { 30 version = MacBSD; 31 } 32 else version (WatchOS) 33 { 34 version = MacBSD; 35 } 36 else version (FreeBSD) 37 { 38 version = MacBSD; 39 } 40 else version (OpenBSD) 41 { 42 version = MacBSD; 43 } 44 else version (DragonFlyBSD) 45 { 46 version = MacBSD; 47 } 48 49 version (MacBSD): 50 51 import core.stdc.errno; 52 import core.sys.posix.time; // timespec 53 import core.sys.posix.unistd; 54 import core.time; 55 import tanya.algorithm.comparison; 56 import tanya.async.event.selector; 57 import tanya.async.loop; 58 import tanya.async.transport; 59 import tanya.async.watcher; 60 import tanya.container.array; 61 import tanya.memory.allocator; 62 import tanya.network.socket; 63 64 void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc 65 { 66 *kevp = kevent_t(args); 67 } 68 69 enum : short 70 { 71 EVFILT_READ = -1, 72 EVFILT_WRITE = -2, 73 EVFILT_AIO = -3, /* attached to aio requests */ 74 EVFILT_VNODE = -4, /* attached to vnodes */ 75 EVFILT_PROC = -5, /* attached to struct proc */ 76 EVFILT_SIGNAL = -6, /* attached to struct proc */ 77 EVFILT_TIMER = -7, /* timers */ 78 EVFILT_MACHPORT = -8, /* Mach portsets */ 79 EVFILT_FS = -9, /* filesystem events */ 80 EVFILT_USER = -10, /* User events */ 81 EVFILT_VM = -12, /* virtual memory events */ 82 EVFILT_SYSCOUNT = 11 83 } 84 85 struct kevent_t 86 { 87 uintptr_t ident; // Identifier for this event 88 short filter; // Filter for event 89 ushort flags; 90 uint fflags; 91 intptr_t data; 92 void* udata; // Opaque user data identifier 93 } 94 95 enum 96 { 97 /* actions */ 98 EV_ADD = 0x0001, /* add event to kq (implies enable) */ 99 EV_DELETE = 0x0002, /* delete event from kq */ 100 EV_ENABLE = 0x0004, /* enable event */ 101 EV_DISABLE = 0x0008, /* disable event (not reported) */ 102 103 /* flags */ 104 EV_ONESHOT = 0x0010, /* only report one occurrence */ 105 EV_CLEAR = 0x0020, /* clear event state after reporting */ 106 EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ 107 EV_DISPATCH = 0x0080, /* disable event after reporting */ 108 109 EV_SYSFLAGS = 0xF000, /* reserved by system */ 110 EV_FLAG1 = 0x2000, /* filter-specific flag */ 111 112 /* returned values */ 113 EV_EOF = 0x8000, /* EOF detected */ 114 EV_ERROR = 0x4000, /* error, data contains errno */ 115 } 116 117 extern(C) int kqueue() nothrow @nogc; 118 extern(C) int kevent(int kq, const kevent_t *changelist, int nchanges, 119 kevent_t *eventlist, int nevents, const timespec *timeout) 120 nothrow @nogc; 121 122 final class KqueueLoop : SelectorLoop 123 { 124 protected int fd; 125 private Array!kevent_t events; 126 private Array!kevent_t changes; 127 private size_t changeCount; 128 129 /** 130 * Returns: Maximal event count can be got at a time 131 * (should be supported by the backend). 132 */ 133 override protected @property uint maxEvents() 134 const pure nothrow @safe @nogc 135 { 136 return cast(uint) events.length; 137 } 138 139 this() @nogc 140 { 141 super(); 142 143 if ((fd = kqueue()) == -1) 144 { 145 throw make!BadLoopException(defaultAllocator, 146 "kqueue initialization failed"); 147 } 148 events = Array!kevent_t(64); 149 changes = Array!kevent_t(64); 150 } 151 152 /** 153 * Frees loop internals. 154 */ 155 ~this() @nogc 156 { 157 close(fd); 158 } 159 160 private void set(SocketType socket, short filter, ushort flags) @nogc 161 { 162 if (changes.length <= changeCount) 163 { 164 changes.length = changeCount + maxEvents; 165 } 166 EV_SET(&changes[changeCount], 167 cast(ulong) socket, 168 filter, 169 flags, 170 0U, 171 0, 172 null); 173 ++changeCount; 174 } 175 176 /** 177 * Should be called if the backend configuration changes. 178 * 179 * Params: 180 * watcher = Watcher. 181 * oldEvents = The events were already set. 182 * events = The events should be set. 183 * 184 * Returns: $(D_KEYWORD true) if the operation was successful. 185 */ 186 override protected bool reify(SocketWatcher watcher, 187 EventMask oldEvents, 188 EventMask events) @nogc 189 { 190 if (events != oldEvents) 191 { 192 if (oldEvents & Event.read || oldEvents & Event.accept) 193 { 194 set(watcher.socket.handle, EVFILT_READ, EV_DELETE); 195 } 196 if (oldEvents & Event.write) 197 { 198 set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE); 199 } 200 } 201 if (events & (Event.read | events & Event.accept)) 202 { 203 set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE); 204 } 205 if (events & Event.write) 206 { 207 set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH); 208 } 209 return true; 210 } 211 212 /** 213 * Does the actual polling. 214 */ 215 protected override void poll() @nogc 216 { 217 timespec ts; 218 blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec); 219 220 if (changeCount > maxEvents) 221 { 222 events.length = changes.length; 223 } 224 225 auto eventCount = kevent(fd, 226 changes.get().ptr, 227 cast(int) changeCount, 228 events.get().ptr, 229 maxEvents, 230 &ts); 231 changeCount = 0; 232 233 if (eventCount < 0) 234 { 235 if (errno != EINTR) 236 { 237 throw defaultAllocator.make!BadLoopException(); 238 } 239 return; 240 } 241 242 for (int i; i < eventCount; ++i) 243 { 244 assert(connections.length > events[i].ident); 245 246 auto transport = cast(StreamTransport) connections[events[i].ident]; 247 // If it is a ConnectionWatcher. Accept connections. 248 if (transport is null) 249 { 250 auto connection = cast(ConnectionWatcher) connections[events[i].ident]; 251 assert(connection !is null); 252 253 acceptConnections(connection); 254 } 255 else if (events[i].flags & EV_ERROR) 256 { 257 kill(transport); 258 } 259 else if (events[i].filter == EVFILT_READ) 260 { 261 SocketException exception; 262 try 263 { 264 ptrdiff_t received; 265 do 266 { 267 received = transport.socket.receive(transport.output[]); 268 transport.output += received; 269 } 270 while (received); 271 } 272 catch (SocketException e) 273 { 274 exception = e; 275 } 276 if (transport.socket.disconnected) 277 { 278 kill(transport, exception); 279 } 280 else if (transport.output.length) 281 { 282 pendings.insertBack(transport); 283 } 284 } 285 else if (events[i].filter == EVFILT_WRITE) 286 { 287 transport.writeReady = true; 288 if (transport.input.length) 289 { 290 feed(transport); 291 } 292 } 293 } 294 } 295 296 /** 297 * Returns: The blocking time. 298 */ 299 override protected @property inout(Duration) blockTime() 300 inout @nogc @safe pure nothrow 301 { 302 return min(super.blockTime, 1.dur!"seconds"); 303 } 304 305 /** 306 * If the transport couldn't send the data, the further sending should 307 * be handled by the event loop. 308 * 309 * Params: 310 * transport = Transport. 311 * exception = Exception thrown on sending. 312 * 313 * Returns: $(D_KEYWORD true) if the operation could be successfully 314 * completed or scheduled, $(D_KEYWORD false) otherwise (the 315 * transport will be destroyed then). 316 */ 317 protected override bool feed(StreamTransport transport, 318 SocketException exception = null) @nogc 319 { 320 if (!super.feed(transport, exception)) 321 { 322 return false; 323 } 324 if (!transport.writeReady) 325 { 326 set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH); 327 return true; 328 } 329 return false; 330 } 331 }