1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 /** 6 * Event loop implementation for Linux. 7 * 8 * Copyright: Eugene Wissner 2016-2020. 9 * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, 10 * Mozilla Public License, v. 2.0). 11 * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) 12 * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/epoll.d, 13 * tanya/async/event/epoll.d) 14 */ 15 module tanya.async.event.epoll; 16 17 version (D_Ddoc) 18 { 19 } 20 else version (linux): 21 22 import core.stdc.errno; 23 public import core.sys.linux.epoll; 24 import core.sys.posix.unistd; 25 import core.time; 26 import std.algorithm.comparison; 27 import tanya.async.event.selector; 28 import tanya.async.loop; 29 import tanya.async.protocol; 30 import tanya.async.transport; 31 import tanya.async.watcher; 32 import tanya.container.array; 33 import tanya.memory.allocator; 34 import tanya.net.socket; 35 36 extern (C) nothrow @nogc 37 { 38 int epoll_create1(int flags); 39 int epoll_ctl (int epfd, int op, int fd, epoll_event *event); 40 int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout); 41 } 42 43 final class EpollLoop : SelectorLoop 44 { 45 protected int fd; 46 private Array!epoll_event events; 47 48 /** 49 * Initializes the loop. 50 */ 51 this() @nogc 52 { 53 if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0) 54 { 55 throw defaultAllocator.make!BadLoopException("epoll initialization failed"); 56 } 57 super(); 58 events = Array!epoll_event(maxEvents); 59 } 60 61 /** 62 * Frees loop internals. 63 */ 64 ~this() @nogc 65 { 66 close(fd); 67 } 68 69 /** 70 * Should be called if the backend configuration changes. 71 * 72 * Params: 73 * watcher = Watcher. 74 * oldEvents = The events were already set. 75 * events = The events should be set. 76 * 77 * Returns: $(D_KEYWORD true) if the operation was successful. 78 */ 79 protected override bool reify(SocketWatcher watcher, 80 EventMask oldEvents, 81 EventMask events) @nogc 82 { 83 int op = EPOLL_CTL_DEL; 84 epoll_event ev; 85 86 if (events == oldEvents) 87 { 88 return true; 89 } 90 if (events && oldEvents) 91 { 92 op = EPOLL_CTL_MOD; 93 } 94 else if (events && !oldEvents) 95 { 96 op = EPOLL_CTL_ADD; 97 } 98 99 ev.data.fd = watcher.socket.handle; 100 ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) 101 | (events & Event.write ? EPOLLOUT : 0) 102 | EPOLLET; 103 104 return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0; 105 } 106 107 /** 108 * Does the actual polling. 109 */ 110 protected override void poll() @nogc 111 { 112 // Don't block 113 immutable timeout = cast(immutable int) blockTime.total!"msecs"; 114 auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout); 115 116 if (eventCount < 0) 117 { 118 if (errno != EINTR) 119 { 120 throw defaultAllocator.make!BadLoopException(); 121 } 122 return; 123 } 124 125 for (auto i = 0; i < eventCount; ++i) 126 { 127 auto transport = cast(StreamTransport) connections[events[i].data.fd]; 128 129 if (transport is null) 130 { 131 auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; 132 assert(connection !is null); 133 134 acceptConnections(connection); 135 } 136 else if (events[i].events & EPOLLERR) 137 { 138 kill(transport); 139 continue; 140 } 141 else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) 142 { 143 SocketException exception; 144 try 145 { 146 ptrdiff_t received; 147 do 148 { 149 received = transport.socket.receive(transport.output[]); 150 transport.output += received; 151 } 152 while (received); 153 } 154 catch (SocketException e) 155 { 156 exception = e; 157 } 158 if (transport.socket.disconnected) 159 { 160 kill(transport, exception); 161 continue; 162 } 163 else if (transport.output.length) 164 { 165 pendings.insertBack(transport); 166 } 167 } 168 if (events[i].events & EPOLLOUT) 169 { 170 transport.writeReady = true; 171 if (transport.input.length) 172 { 173 feed(transport); 174 } 175 } 176 } 177 } 178 179 /** 180 * Returns: The blocking time. 181 */ 182 override protected @property inout(Duration) blockTime() 183 inout @safe pure nothrow 184 { 185 return min(super.blockTime, 1.dur!"seconds"); 186 } 187 }