1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 /*
6  * Event loop implementation for *BSD.
7  *
8  * Copyright: Eugene Wissner 2016-2020.
9  * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
10  *                  Mozilla Public License, v. 2.0).
11  * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
12  * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/kqueue.d,
13  *                 tanya/async/event/kqueue.d)
14  */
15 module tanya.async.event.kqueue;
16 
17 version (D_Ddoc)
18 {
19 }
20 else version (OSX)
21 {
22     version = MacBSD;
23 }
24 else version (iOS)
25 {
26     version = MacBSD;
27 }
28 else version (TVOS)
29 {
30     version = MacBSD;
31 }
32 else version (WatchOS)
33 {
34     version = MacBSD;
35 }
36 else version (FreeBSD)
37 {
38     version = MacBSD;
39 }
40 else version (OpenBSD)
41 {
42     version = MacBSD;
43 }
44 else version (DragonFlyBSD)
45 {
46     version = MacBSD;
47 }
48 
49 version (MacBSD):
50 
51 import core.stdc.errno;
52 import core.sys.posix.time; // timespec
53 import core.sys.posix.unistd;
54 import core.time;
55 import tanya.algorithm.comparison;
56 import tanya.async.event.selector;
57 import tanya.async.loop;
58 import tanya.async.transport;
59 import tanya.async.watcher;
60 import tanya.container.array;
61 import tanya.memory.allocator;
62 import tanya.network.socket;
63 
64 void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc
65 {
66     *kevp = kevent_t(args);
67 }
68 
69 enum : short
70 {
71     EVFILT_READ     =  -1,
72     EVFILT_WRITE    =  -2,
73     EVFILT_AIO      =  -3, /* attached to aio requests */
74     EVFILT_VNODE    =  -4, /* attached to vnodes */
75     EVFILT_PROC     =  -5, /* attached to struct proc */
76     EVFILT_SIGNAL   =  -6, /* attached to struct proc */
77     EVFILT_TIMER    =  -7, /* timers */
78     EVFILT_MACHPORT =  -8, /* Mach portsets */
79     EVFILT_FS       =  -9, /* filesystem events */
80     EVFILT_USER     = -10, /* User events */
81     EVFILT_VM       = -12, /* virtual memory events */
82     EVFILT_SYSCOUNT =  11
83 }
84 
85 struct kevent_t
86 {
87     uintptr_t ident; // Identifier for this event
88     short filter; // Filter for event
89     ushort flags;
90     uint fflags;
91     intptr_t data;
92     void* udata; // Opaque user data identifier
93 }
94 
95 enum
96 {
97     /* actions */
98     EV_ADD      = 0x0001, /* add event to kq (implies enable) */
99     EV_DELETE   = 0x0002, /* delete event from kq */
100     EV_ENABLE   = 0x0004, /* enable event */
101     EV_DISABLE  = 0x0008, /* disable event (not reported) */
102 
103     /* flags */
104     EV_ONESHOT  = 0x0010, /* only report one occurrence */
105     EV_CLEAR    = 0x0020, /* clear event state after reporting */
106     EV_RECEIPT  = 0x0040, /* force EV_ERROR on success, data=0 */
107     EV_DISPATCH = 0x0080, /* disable event after reporting */
108 
109     EV_SYSFLAGS = 0xF000, /* reserved by system */
110     EV_FLAG1    = 0x2000, /* filter-specific flag */
111 
112     /* returned values */
113     EV_EOF      = 0x8000, /* EOF detected */
114     EV_ERROR    = 0x4000, /* error, data contains errno */
115 }
116 
117 extern(C) int kqueue() nothrow @nogc;
118 extern(C) int kevent(int kq, const kevent_t *changelist, int nchanges,
119                      kevent_t *eventlist, int nevents, const timespec *timeout)
120                      nothrow @nogc;
121 
122 final class KqueueLoop : SelectorLoop
123 {
124     protected int fd;
125     private Array!kevent_t events;
126     private Array!kevent_t changes;
127     private size_t changeCount;
128 
129     /**
130      * Returns: Maximal event count can be got at a time
131      *          (should be supported by the backend).
132      */
133     override protected @property uint maxEvents()
134     const pure nothrow @safe @nogc
135     {
136         return cast(uint) events.length;
137     }
138 
139     this() @nogc
140     {
141         super();
142 
143         if ((fd = kqueue()) == -1)
144         {
145             throw make!BadLoopException(defaultAllocator,
146                                         "kqueue initialization failed");
147         }
148         events = Array!kevent_t(64);
149         changes = Array!kevent_t(64);
150     }
151 
152     /**
153      * Frees loop internals.
154      */
155     ~this() @nogc
156     {
157         close(fd);
158     }
159 
160     private void set(SocketType socket, short filter, ushort flags) @nogc
161     {
162         if (changes.length <= changeCount)
163         {
164             changes.length = changeCount + maxEvents;
165         }
166         EV_SET(&changes[changeCount],
167                cast(ulong) socket,
168                filter,
169                flags,
170                0U,
171                0,
172                null);
173         ++changeCount;
174     }
175 
176     /**
177      * Should be called if the backend configuration changes.
178      *
179      * Params:
180      * 	watcher   = Watcher.
181      * 	oldEvents = The events were already set.
182      * 	events    = The events should be set.
183      *
184      * Returns: $(D_KEYWORD true) if the operation was successful.
185      */
186     override protected bool reify(SocketWatcher watcher,
187                                   EventMask oldEvents,
188                                   EventMask events) @nogc
189     {
190         if (events != oldEvents)
191         {
192             if (oldEvents & Event.read || oldEvents & Event.accept)
193             {
194                 set(watcher.socket.handle, EVFILT_READ, EV_DELETE);
195             }
196             if (oldEvents & Event.write)
197             {
198                 set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE);
199             }
200         }
201         if (events & (Event.read | events & Event.accept))
202         {
203             set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE);
204         }
205         if (events & Event.write)
206         {
207             set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH);
208         }
209         return true;
210     }
211 
212     /**
213      * Does the actual polling.
214      */
215     protected override void poll() @nogc
216     {
217         timespec ts;
218         blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
219 
220         if (changeCount > maxEvents)
221         {
222             events.length = changes.length;
223         }
224 
225         auto eventCount = kevent(fd,
226                                  changes.get().ptr,
227                                  cast(int) changeCount,
228                                  events.get().ptr,
229                                  maxEvents,
230                                  &ts);
231         changeCount = 0;
232 
233         if (eventCount < 0)
234         {
235             if (errno != EINTR)
236             {
237                 throw defaultAllocator.make!BadLoopException();
238             }
239             return;
240         }
241 
242         for (int i; i < eventCount; ++i)
243         {
244             assert(connections.length > events[i].ident);
245 
246             auto transport = cast(StreamTransport) connections[events[i].ident];
247             // If it is a ConnectionWatcher. Accept connections.
248             if (transport is null)
249             {
250                 auto connection = cast(ConnectionWatcher) connections[events[i].ident];
251                 assert(connection !is null);
252 
253                 acceptConnections(connection);
254             }
255             else if (events[i].flags & EV_ERROR)
256             {
257                 kill(transport);
258             }
259             else if (events[i].filter == EVFILT_READ)
260             {
261                 SocketException exception;
262                 try
263                 {
264                     ptrdiff_t received;
265                     do
266                     {
267                         received = transport.socket.receive(transport.output[]);
268                         transport.output += received;
269                     }
270                     while (received);
271                 }
272                 catch (SocketException e)
273                 {
274                     exception = e;
275                 }
276                 if (transport.socket.disconnected)
277                 {
278                     kill(transport, exception);
279                 }
280                 else if (transport.output.length)
281                 {
282                     pendings.insertBack(transport);
283                 }
284             }
285             else if (events[i].filter == EVFILT_WRITE)
286             {
287                 transport.writeReady = true;
288                 if (transport.input.length)
289                 {
290                     feed(transport);
291                 }
292             }
293         }
294     }
295 
296     /**
297      * Returns: The blocking time.
298      */
299     override protected @property inout(Duration) blockTime()
300     inout @nogc @safe pure nothrow
301     {
302         return min(super.blockTime, 1.dur!"seconds");
303     }
304 
305     /**
306      * If the transport couldn't send the data, the further sending should
307      * be handled by the event loop.
308      *
309      * Params:
310      * 	transport = Transport.
311      * 	exception = Exception thrown on sending.
312      *
313      * Returns: $(D_KEYWORD true) if the operation could be successfully
314      *          completed or scheduled, $(D_KEYWORD false) otherwise (the
315      *          transport will be destroyed then).
316      */
317     protected override bool feed(StreamTransport transport,
318                                  SocketException exception = null) @nogc
319     {
320         if (!super.feed(transport, exception))
321         {
322             return false;
323         }
324         if (!transport.writeReady)
325         {
326             set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH);
327             return true;
328         }
329         return false;
330     }
331 }