1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 /**
6  * Event loop implementation for Linux.
7  *
8  * Copyright: Eugene Wissner 2016-2020.
9  * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
10  *                  Mozilla Public License, v. 2.0).
11  * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
12  * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/epoll.d,
13  *                 tanya/async/event/epoll.d)
14  */
15 module tanya.async.event.epoll;
16 
17 version (D_Ddoc)
18 {
19 }
20 else version (linux):
21 
22 import core.stdc.errno;
23 public import core.sys.linux.epoll;
24 import core.sys.posix.unistd;
25 import core.time;
26 import std.algorithm.comparison;
27 import tanya.async.event.selector;
28 import tanya.async.loop;
29 import tanya.async.protocol;
30 import tanya.async.transport;
31 import tanya.async.watcher;
32 import tanya.container.array;
33 import tanya.memory.allocator;
34 import tanya.net.socket;
35 
36 extern (C) nothrow @nogc
37 {
38     int epoll_create1(int flags);
39     int epoll_ctl (int epfd, int op, int fd, epoll_event *event);
40     int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout);
41 }
42 
43 final class EpollLoop : SelectorLoop
44 {
45     protected int fd;
46     private Array!epoll_event events;
47 
48     /**
49      * Initializes the loop.
50      */
51     this() @nogc
52     {
53         if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
54         {
55             throw defaultAllocator.make!BadLoopException("epoll initialization failed");
56         }
57         super();
58         events = Array!epoll_event(maxEvents);
59     }
60 
61     /**
62      * Frees loop internals.
63      */
64     ~this() @nogc
65     {
66         close(fd);
67     }
68 
69     /**
70      * Should be called if the backend configuration changes.
71      *
72      * Params:
73      *  watcher   = Watcher.
74      *  oldEvents = The events were already set.
75      *  events    = The events should be set.
76      *
77      * Returns: $(D_KEYWORD true) if the operation was successful.
78      */
79     protected override bool reify(SocketWatcher watcher,
80                                   EventMask oldEvents,
81                                   EventMask events) @nogc
82     {
83         int op = EPOLL_CTL_DEL;
84         epoll_event ev;
85 
86         if (events == oldEvents)
87         {
88             return true;
89         }
90         if (events && oldEvents)
91         {
92             op = EPOLL_CTL_MOD;
93         }
94         else if (events && !oldEvents)
95         {
96             op = EPOLL_CTL_ADD;
97         }
98 
99         ev.data.fd = watcher.socket.handle;
100         ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
101                   | (events & Event.write ? EPOLLOUT : 0)
102                   | EPOLLET;
103 
104         return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
105     }
106 
107     /**
108      * Does the actual polling.
109      */
110     protected override void poll() @nogc
111     {
112         // Don't block
113         immutable timeout = cast(immutable int) blockTime.total!"msecs";
114         auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout);
115 
116         if (eventCount < 0)
117         {
118             if (errno != EINTR)
119             {
120                 throw defaultAllocator.make!BadLoopException();
121             }
122             return;
123         }
124 
125         for (auto i = 0; i < eventCount; ++i)
126         {
127             auto transport = cast(StreamTransport) connections[events[i].data.fd];
128 
129             if (transport is null)
130             {
131                 auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
132                 assert(connection !is null);
133 
134                 acceptConnections(connection);
135             }
136             else if (events[i].events & EPOLLERR)
137             {
138                 kill(transport);
139                 continue;
140             }
141             else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
142             {
143                 SocketException exception;
144                 try
145                 {
146                     ptrdiff_t received;
147                     do
148                     {
149                         received = transport.socket.receive(transport.output[]);
150                         transport.output += received;
151                     }
152                     while (received);
153                 }
154                 catch (SocketException e)
155                 {
156                     exception = e;
157                 }
158                 if (transport.socket.disconnected)
159                 {
160                     kill(transport, exception);
161                     continue;
162                 }
163                 else if (transport.output.length)
164                 {
165                     pendings.insertBack(transport);
166                 }
167             }
168             if (events[i].events & EPOLLOUT)
169             {
170                 transport.writeReady = true;
171                 if (transport.input.length)
172                 {
173                     feed(transport);
174                 }
175             }
176         }
177     }
178 
179     /**
180      * Returns: The blocking time.
181      */
182     override protected @property inout(Duration) blockTime()
183     inout @safe pure nothrow
184     {
185         return min(super.blockTime, 1.dur!"seconds");
186     }
187 }