1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 /*
6  * This module contains base implementations for reactor event loops.
7  *
8  * Copyright: Eugene Wissner 2016-2020.
9  * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
10  *                  Mozilla Public License, v. 2.0).
11  * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
12  * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/selector.d,
13  *                 tanya/async/event/selector.d)
14  */
15 module tanya.async.event.selector;
16 
17 version (D_Ddoc)
18 {
19 }
20 else version (Posix):
21 
22 import tanya.async.loop;
23 import tanya.async.protocol;
24 import tanya.async.transport;
25 import tanya.async.watcher;
26 import tanya.container.array;
27 import tanya.container.buffer;
28 import tanya.memory.allocator;
29 import tanya.net.socket;
30 
31 /**
32  * Transport for stream sockets.
33  */
34 package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
35 {
36     private SelectorLoop loop;
37 
38     private SocketException exception;
39 
40     package ReadBuffer!ubyte output;
41 
42     package WriteBuffer!ubyte input;
43 
44     private Protocol protocol_;
45 
46     private bool closing;
47 
48     /// Received notification that the underlying socket is write-ready.
49     package bool writeReady;
50 
51     /**
52      * Params:
53      *  loop   = Event loop.
54      *  socket = Socket.
55      *
56      * Precondition: $(D_INLINECODE loop !is null && socket !is null)
57      */
58     this(SelectorLoop loop, ConnectedSocket socket) @nogc
59     in 
60     {
61         assert(loop !is null);
62     }
63     do
64     {
65         super(socket);
66         this.loop = loop;
67         output = ReadBuffer!ubyte(8192, 1024);
68         input = WriteBuffer!ubyte(8192);
69         active = true;
70     }
71 
72     /**
73      * Returns: Socket.
74      *
75      * Postcondition: $(D_INLINECODE socket !is null)
76      */
77     override @property ConnectedSocket socket() pure nothrow @safe @nogc
78     out (socket)
79     {
80         assert(socket !is null);
81     }
82     do
83     {
84         return cast(ConnectedSocket) socket_;
85     }
86 
87     private @property void socket(ConnectedSocket socket)
88     pure nothrow @safe @nogc
89     in
90     {
91         assert(socket !is null);
92     }
93     do
94     {
95         socket_ = socket;
96     }
97 
98     /**
99      * Returns: Application protocol.
100      */
101     @property Protocol protocol() pure nothrow @safe @nogc
102     {
103         return protocol_;
104     }
105 
106     /**
107      * Switches the protocol.
108      *
109      * The protocol is deallocated by the event loop.
110      *
111      * Params:
112      *  protocol = Application protocol.
113      *
114      * Precondition: $(D_INLINECODE protocol !is null)
115      */
116     @property void protocol(Protocol protocol) pure nothrow @safe @nogc
117     in
118     {
119         assert(protocol !is null);
120     }
121     do
122     {
123         protocol_ = protocol;
124     }
125 
126     /**
127      * Returns $(D_PARAM true) if the transport is closing or closed.
128      */
129     bool isClosing() const pure nothrow @safe @nogc
130     {
131         return closing;
132     }
133 
134     /**
135      * Close the transport.
136      *
137      * Buffered data will be flushed.  No more data will be received.
138      */
139     void close() @nogc
140     {
141         closing = true;
142         loop.reify(this,
143                    EventMask(Event.read | Event.write),
144                    EventMask(Event.write));
145     }
146 
147     /**
148      * Invokes the watcher callback.
149      */
150     override void invoke() @nogc
151     {
152         if (output.length)
153         {
154             protocol.received(output[0 .. $]);
155             output.clear();
156             if (isClosing() && input.length == 0)
157             {
158                 loop.kill(this);
159             }
160         }
161         else
162         {
163             protocol.disconnected(exception);
164             defaultAllocator.dispose(protocol_);
165             defaultAllocator.dispose(exception);
166             active = false;
167         }
168     }
169 
170     /**
171      * Write some data to the transport.
172      *
173      * Params:
174      *  data = Data to send.
175      */
176     void write(ubyte[] data) @nogc
177     {
178         if (!data.length)
179         {
180             return;
181         }
182         // Try to write if the socket is write ready.
183         if (writeReady)
184         {
185             ptrdiff_t sent;
186             SocketException exception;
187             try
188             {
189                 sent = socket.send(data);
190                 if (sent == 0)
191                 {
192                     writeReady = false;
193                 }
194             }
195             catch (SocketException e)
196             {
197                 writeReady = false;
198                 exception = e;
199             }
200             if (sent < data.length)
201             {
202                 input ~= data[sent..$];
203                 loop.feed(this, exception);
204             }
205         }
206         else
207         {
208             input ~= data;
209         }
210     }
211 }
212 
213 abstract class SelectorLoop : Loop
214 {
215     /// Pending connections.
216     protected Array!SocketWatcher connections;
217 
218     this() @nogc
219     {
220         super();
221         this.connections = Array!SocketWatcher(maxEvents);
222     }
223 
224     ~this() @nogc
225     {
226         foreach (ref connection; this.connections[])
227         {
228             // We want to free only the transports. ConnectionWatcher are
229             // created by the user and should be freed by himself.
230             if (cast(StreamTransport) connection !is null)
231             {
232                 defaultAllocator.dispose(connection);
233             }
234         }
235     }
236 
237     /**
238      * Should be called if the backend configuration changes.
239      *
240      * Params:
241      *  watcher   = Watcher.
242      *  oldEvents = The events were already set.
243      *  events    = The events should be set.
244      *
245      * Returns: $(D_KEYWORD true) if the operation was successful.
246      */
247     override abstract protected bool reify(SocketWatcher watcher,
248                                            EventMask oldEvents,
249                                            EventMask events) @nogc;
250 
251     /**
252      * Kills the watcher and closes the connection.
253      *
254      * Params:
255      *  transport = Transport.
256      *  exception = Occurred exception.
257      */
258     protected void kill(StreamTransport transport,
259                         SocketException exception = null) @nogc
260     in
261     {
262         assert(transport !is null);
263     }
264     do
265     {
266         transport.socket.shutdown();
267         defaultAllocator.dispose(transport.socket);
268         transport.exception = exception;
269         pendings.insertBack(transport);
270     }
271 
272     /**
273      * If the transport couldn't send the data, the further sending should
274      * be handled by the event loop.
275      *
276      * Params:
277      *  transport = Transport.
278      *  exception = Exception thrown on sending.
279      *
280      * Returns: $(D_KEYWORD true) if the operation could be successfully
281      *          completed or scheduled, $(D_KEYWORD false) otherwise (the
282      *          transport will be destroyed then).
283      */
284     protected bool feed(StreamTransport transport,
285                         SocketException exception = null) @nogc
286     in
287     {
288         assert(transport !is null);
289     }
290     do
291     {
292         while (transport.input.length && transport.writeReady)
293         {
294             try
295             {
296                 ptrdiff_t sent = transport.socket.send(transport.input[]);
297                 if (sent == 0)
298                 {
299                     transport.writeReady = false;
300                 }
301                 else
302                 {
303                     transport.input += sent;
304                 }
305             }
306             catch (SocketException e)
307             {
308                 exception = e;
309                 transport.writeReady = false;
310             }
311         }
312         if (exception !is null)
313         {
314             kill(transport, exception);
315             return false;
316         }
317         if (transport.input.length == 0 && transport.isClosing())
318         {
319             kill(transport);
320         }
321         return true;
322     }
323 
324     /**
325      * Start watching.
326      *
327      * Params:
328      *  watcher = Watcher.
329      */
330     override void start(ConnectionWatcher watcher) @nogc
331     {
332         if (watcher.active)
333         {
334             return;
335         }
336 
337         if (connections.length <= watcher.socket)
338         {
339             connections.length = watcher.socket.handle + maxEvents / 2;
340         }
341         connections[watcher.socket.handle] = watcher;
342 
343         super.start(watcher);
344     }
345 
346     /**
347      * Accept incoming connections.
348      *
349      * Params:
350      *  connection = Connection watcher ready to accept.
351      */
352     package void acceptConnections(ConnectionWatcher connection) @nogc
353     in
354     {
355         assert(connection !is null);
356     }
357     do
358     {
359         while (true)
360         {
361             ConnectedSocket client;
362             try
363             {
364                 client = (cast(StreamSocket) connection.socket).accept();
365             }
366             catch (SocketException e)
367             {
368                 defaultAllocator.dispose(e);
369                 break;
370             }
371             if (client is null)
372             {
373                 break;
374             }
375 
376             StreamTransport transport;
377 
378             if (connections.length > client.handle)
379             {
380                 transport = cast(StreamTransport) connections[client.handle];
381             }
382             else
383             {
384                 connections.length = client.handle + maxEvents / 2;
385             }
386             if (transport is null)
387             {
388                 transport = defaultAllocator.make!StreamTransport(this, client);
389                 connections[client.handle] = transport;
390             }
391             else
392             {
393                 transport.socket = client;
394             }
395 
396             reify(transport,
397                   EventMask(Event.none),
398                   EventMask(Event.read | Event.write));
399             connection.incoming.insertBack(transport);
400         }
401 
402         if (!connection.incoming.empty)
403         {
404             pendings.insertBack(connection);
405         }
406     }
407 }