1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 /**
6  * Interface for the event loop implementations and the default event loop
7  * chooser.
8  *
9  * ---
10  * import tanya.async;
11  * import tanya.memory;
12  * import tanya.network.socket;
13  *
14  * class EchoProtocol : TransmissionControlProtocol
15  * {
16  *     private DuplexTransport transport;
17  *
18  *     void received(in ubyte[] data) @nogc
19  *     {
20  *         ubyte[512] buffer;
21  *         buffer[0 .. data.length] = data;
22  *         transport.write(buffer[]);
23  *     }
24  *
25  *     void connected(DuplexTransport transport) @nogc
26  *     {
27  *         this.transport = transport;
28  *     }
29  *
30  *     void disconnected(SocketException e) @nogc
31  *     {
32  *     }
33  * }
34  *
35  * void main()
36  * {
37  *     auto address = address4("127.0.0.1");
38  *     auto endpoint = Endpoint(address.get, cast(ushort) 8192);
39  *    
40  *     version (Windows)
41  *     {
42  *         auto sock = defaultAllocator.make!OverlappedStreamSocket(AddressFamily.inet);
43  *     }
44  *     else
45  *     {
46  *         auto sock = defaultAllocator.make!StreamSocket(AddressFamily.inet);
47  *         sock.blocking = false;
48  *     }
49  *
50  *     sock.bind(endpoint);
51  *     sock.listen(5);
52  *    
53  *     auto io = defaultAllocator.make!ConnectionWatcher(sock);
54  *     io.setProtocol!EchoProtocol;
55  *    
56  *     defaultLoop.start(io);
57  *     defaultLoop.run();
58  *    
59  *     sock.shutdown();
60  *     defaultAllocator.dispose(io);
61  *     defaultAllocator.dispose(sock);
62  * }
63  * ---
64  *
65  * Copyright: Eugene Wissner 2016-2020.
66  * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
67  *                  Mozilla Public License, v. 2.0).
68  * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
69  * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/loop.d,
70  *                 tanya/async/loop.d)
71  */
72 module tanya.async.loop;
73 
74 import core.time;
75 import tanya.async.transport;
76 import tanya.async.watcher;
77 import tanya.bitmanip;
78 import tanya.container.buffer;
79 import tanya.container.list;
80 import tanya.memory.allocator;
81 import tanya.net.socket;
82 
83 version (DisableBackends)
84 {
85 }
86 else version (D_Ddoc)
87 {
88 }
89 else version (linux)
90 {
91     import tanya.async.event.epoll;
92     version = Epoll;
93 }
94 else version (Windows)
95 {
96     import tanya.async.event.iocp;
97     version = IOCP;
98 }
99 else version (OSX)
100 {
101     version = Kqueue;
102 }
103 else version (iOS)
104 {
105     version = Kqueue;
106 }
107 else version (FreeBSD)
108 {
109     version = Kqueue;
110 }
111 else version (OpenBSD)
112 {
113     version = Kqueue;
114 }
115 else version (DragonFlyBSD)
116 {
117     version = Kqueue;
118 }
119 
120 /**
121  * Events.
122  */
123 enum Event : uint
124 {
125     none   = 0x00,       /// No events.
126     read   = 0x01,       /// Non-blocking read call.
127     write  = 0x02,       /// Non-blocking write call.
128     accept = 0x04,       /// Connection made.
129     error  = 0x80000000, /// Sent when an error occurs.
130 }
131 
132 alias EventMask = BitFlags!Event;
133 
134 /**
135  * Event loop.
136  */
137 abstract class Loop
138 {
139     protected bool done = true;
140 
141     /// Pending watchers.
142     protected DList!Watcher pendings;
143 
144     /**
145      * Returns: Maximal event count can be got at a time
146      *          (should be supported by the backend).
147      */
148     protected @property uint maxEvents()
149     const pure nothrow @safe @nogc
150     {
151         return 128U;
152     }
153 
154     /**
155      * Initializes the loop.
156      */
157     this() @nogc
158     {
159     }
160 
161     /**
162      * Frees loop internals.
163      */
164     ~this() @nogc
165     {
166         for (; !this.pendings.empty; this.pendings.removeFront())
167         {
168             defaultAllocator.dispose(this.pendings.front);
169         }
170     }
171 
172     /**
173      * Starts the loop.
174      */
175     void run() @nogc
176     {
177         this.done = false;
178         do
179         {
180             poll();
181 
182             // Invoke pendings
183             for (; !this.pendings.empty; this.pendings.removeFront())
184             {
185                 this.pendings.front.invoke();
186             }
187         }
188         while (!this.done);
189     }
190 
191     /**
192      * Break out of the loop.
193      */
194     void unloop() @safe pure nothrow @nogc
195     {
196         this.done = true;
197     }
198 
199     /**
200      * Start watching.
201      *
202      * Params:
203      *  watcher = Watcher.
204      */
205     void start(ConnectionWatcher watcher) @nogc
206     {
207         if (watcher.active)
208         {
209             return;
210         }
211         watcher.active = true;
212 
213         reify(watcher, EventMask(Event.none), EventMask(Event.accept));
214     }
215 
216     /**
217      * Stop watching.
218      *
219      * Params:
220      *  watcher = Watcher.
221      */
222     void stop(ConnectionWatcher watcher) @nogc
223     {
224         if (!watcher.active)
225         {
226             return;
227         }
228         watcher.active = false;
229 
230         reify(watcher, EventMask(Event.accept), EventMask(Event.none));
231     }
232 
233     /**
234      * Should be called if the backend configuration changes.
235      *
236      * Params:
237      *  watcher   = Watcher.
238      *  oldEvents = The events were already set.
239      *  events    = The events should be set.
240      *
241      * Returns: $(D_KEYWORD true) if the operation was successful.
242      */
243     abstract protected bool reify(SocketWatcher watcher,
244                                   EventMask oldEvents,
245                                   EventMask events) @nogc;
246 
247     /**
248      * Returns: The blocking time.
249      */
250     protected @property inout(Duration) blockTime()
251     inout @safe pure nothrow @nogc
252     {
253         // Don't block if we have to do.
254         return pendings.empty ? blockTime_ : Duration.zero;
255     }
256 
257     /**
258      * Sets the blocking time for IO watchers.
259      *
260      * Params:
261      *  blockTime = The blocking time. Cannot be larger than
262      *              $(D_PSYMBOL maxBlockTime).
263      */
264     protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc
265     in
266     {
267         assert(blockTime <= 1.dur!"hours", "Too long to wait.");
268         assert(!blockTime.isNegative);
269     }
270     do
271     {
272         blockTime_ = blockTime;
273     }
274 
275     /**
276      * Does the actual polling.
277      */
278     abstract protected void poll() @nogc;
279 
280     /// Maximal block time.
281     protected Duration blockTime_ = 1.dur!"minutes";
282 }
283 
284 /**
285  * Exception thrown on errors in the event loop.
286  */
287 class BadLoopException : Exception
288 {
289     /**
290      * Params:
291      *  file = The file where the exception occurred.
292      *  line = The line number where the exception occurred.
293      *  next = The previous exception in the chain of exceptions, if any.
294      */
295     this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
296     pure nothrow const @safe @nogc
297     {
298         super("Event loop cannot be initialized.", file, line, next);
299     }
300 }
301 
302 /**
303  * Returns the event loop used by default. If an event loop wasn't set with
304  * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL defaultLoop) will try to
305  * choose an event loop supported on the system.
306  *
307  * Returns: The default event loop.
308  */
309 @property Loop defaultLoop() @nogc
310 {
311     if (defaultLoop_ !is null)
312     {
313         return defaultLoop_;
314     }
315     version (Epoll)
316     {
317         defaultLoop_ = defaultAllocator.make!EpollLoop;
318     }
319     else version (IOCP)
320     {
321         defaultLoop_ = defaultAllocator.make!IOCPLoop;
322     }
323     else version (Kqueue)
324     {
325         import tanya.async.event.kqueue;
326         defaultLoop_ = defaultAllocator.make!KqueueLoop;
327     }
328     return defaultLoop_;
329 }
330 
331 /**
332  * Sets the default event loop.
333  *
334  * This property makes it possible to implement your own backends or event
335  * loops, for example, if the system is not supported or if you want to
336  * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass
337  * your implementation to this property.
338  *
339  * Params:
340  *  loop = The event loop.
341  */
342 @property void defaultLoop(Loop loop) @nogc
343 in
344 {
345     assert(loop !is null);
346 }
347 do
348 {
349     defaultLoop_ = loop;
350 }
351 
352 private Loop defaultLoop_;