1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 /**
6  * Event loop implementation for Windows.
7  *
8  * Copyright: Eugene Wissner 2016-2020.
9  * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
10  *                  Mozilla Public License, v. 2.0).
11  * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
12  * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/iocp.d,
13  *                 tanya/async/event/iocp.d)
14  */
15 module tanya.async.event.iocp;
16 
17 version (D_Ddoc)
18 {
19 }
20 else version (Windows):
21 
22 import core.sys.windows.mswsock;
23 import core.sys.windows.winsock2;
24 import tanya.async.loop;
25 import tanya.async.protocol;
26 import tanya.async.transport;
27 import tanya.async.watcher;
28 import tanya.container.buffer;
29 import tanya.memory.allocator;
30 import tanya.network.socket;
31 import tanya.sys.windows.winbase;
32 
33 /**
34  * Transport for stream sockets.
35  */
36 final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
37 {
38     private SocketException exception;
39 
40     private ReadBuffer!ubyte output;
41 
42     private WriteBuffer!ubyte input;
43 
44     private Protocol protocol_;
45 
46     private bool closing;
47 
48     /**
49      * Creates new completion port transport.
50      *
51      * Params:
52      *  socket = Socket.
53      *
54      * Precondition: $(D_INLINECODE socket !is null)
55      */
56     this(OverlappedConnectedSocket socket) @nogc
57     {
58         super(socket);
59         output = ReadBuffer!ubyte(8192, 1024);
60         input = WriteBuffer!ubyte(8192);
61         active = true;
62     }
63 
64     /**
65      * Returns: Socket.
66      *
67      * Postcondition: $(D_INLINECODE socket !is null)
68      */
69     override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
70     out (socket)
71     {
72         assert(socket !is null);
73     }
74     do
75     {
76         return cast(OverlappedConnectedSocket) socket_;
77     }
78 
79     /**
80      * Returns $(D_PARAM true) if the transport is closing or closed.
81      */
82     bool isClosing() const pure nothrow @safe @nogc
83     {
84         return closing;
85     }
86 
87     /**
88      * Close the transport.
89      *
90      * Buffered data will be flushed.  No more data will be received.
91      */
92     void close() pure nothrow @safe @nogc
93     {
94         closing = true;
95     }
96 
97     /**
98      * Write some data to the transport.
99      *
100      * Params:
101      *  data = Data to send.
102      */
103     void write(ubyte[] data) @nogc
104     {
105         input ~= data;
106     }
107 
108     /**
109      * Returns: Application protocol.
110      */
111     @property Protocol protocol() pure nothrow @safe @nogc
112     {
113         return protocol_;
114     }
115 
116     /**
117      * Switches the protocol.
118      *
119      * The protocol is deallocated by the event loop.
120      *
121      * Params:
122      *  protocol = Application protocol.
123      *
124      * Precondition: $(D_INLINECODE protocol !is null)
125      */
126     @property void protocol(Protocol protocol) pure nothrow @safe @nogc
127     in
128     {
129         assert(protocol !is null);
130     }
131     do
132     {
133         protocol_ = protocol;
134     }
135 
136     /**
137      * Invokes the watcher callback.
138      */
139     override void invoke() @nogc
140     {
141         if (output.length)
142         {
143             immutable empty = input.length == 0;
144             protocol.received(output[0 .. $]);
145             output.clear();
146             if (empty)
147             {
148                 SocketState overlapped;
149                 try
150                 {
151                     overlapped = defaultAllocator.make!SocketState;
152                     socket.beginSend(input[], overlapped);
153                 }
154                 catch (SocketException e)
155                 {
156                     defaultAllocator.dispose(overlapped);
157                     defaultAllocator.dispose(e);
158                 }
159             }
160         }
161         else
162         {
163             protocol.disconnected(exception);
164             defaultAllocator.dispose(protocol_);
165             defaultAllocator.dispose(exception);
166             active = false;
167         }
168     }
169 }
170 
171 final class IOCPLoop : Loop
172 {
173     protected HANDLE completionPort;
174 
175     protected OVERLAPPED overlap;
176 
177     /**
178      * Initializes the loop.
179      */
180     this() @nogc
181     {
182         super();
183 
184         completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
185         if (!completionPort)
186         {
187             throw make!BadLoopException(defaultAllocator,
188                                         "Creating completion port failed");
189         }
190     }
191 
192     /**
193      * Should be called if the backend configuration changes.
194      *
195      * Params:
196      *  watcher   = Watcher.
197      *  oldEvents = The events were already set.
198      *  events    = The events should be set.
199      *
200      * Returns: $(D_KEYWORD true) if the operation was successful.
201      */
202     override protected bool reify(SocketWatcher watcher,
203                                   EventMask oldEvents,
204                                   EventMask events) @nogc
205     {
206         SocketState overlapped;
207         if (!(oldEvents & Event.accept) && (events & Event.accept))
208         {
209             auto socket = cast(OverlappedStreamSocket) watcher.socket;
210             assert(socket !is null);
211 
212             if (CreateIoCompletionPort(cast(HANDLE) socket.handle,
213                                        completionPort,
214                                        cast(size_t) (cast(void*) watcher),
215                                        0) !is completionPort)
216             {
217                 return false;
218             }
219 
220             try
221             {
222                 overlapped = defaultAllocator.make!SocketState;
223                 socket.beginAccept(overlapped);
224             }
225             catch (SocketException e)
226             {
227                 defaultAllocator.dispose(overlapped);
228                 defaultAllocator.dispose(e);
229                 return false;
230             }
231         }
232         if ((!(oldEvents & Event.read) && (events & Event.read))
233             || (!(oldEvents & Event.write) && (events & Event.write)))
234         {
235             auto transport = cast(StreamTransport) watcher;
236             assert(transport !is null);
237 
238             if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
239                                        completionPort,
240                                        cast(size_t) (cast(void*) watcher),
241                                        0) !is completionPort)
242             {
243                 return false;
244             }
245 
246             // Begin to read
247             if (!(oldEvents & Event.read) && (events & Event.read))
248             {
249                 try
250                 {
251                     overlapped = defaultAllocator.make!SocketState;
252                     transport.socket.beginReceive(transport.output[], overlapped);
253                 }
254                 catch (SocketException e)
255                 {
256                     defaultAllocator.dispose(overlapped);
257                     defaultAllocator.dispose(e);
258                     return false;
259                 }
260             }
261         }
262         return true;
263     }
264 
265     private void kill(StreamTransport transport,
266         SocketException exception = null) @nogc
267     in
268     {
269         assert(transport !is null);
270     }
271     do
272     {
273         transport.socket.shutdown();
274         defaultAllocator.dispose(transport.socket);
275         transport.exception = exception;
276         pendings.insertBack(transport);
277     }
278 
279     /**
280      * Does the actual polling.
281      */
282     override protected void poll() @nogc
283     {
284         DWORD lpNumberOfBytes;
285         size_t key;
286         OVERLAPPED* overlap;
287         immutable timeout = cast(immutable int) blockTime.total!"msecs";
288 
289         auto result = GetQueuedCompletionStatus(completionPort,
290                                                 &lpNumberOfBytes,
291                                                 &key,
292                                                 &overlap,
293                                                 timeout);
294         if (result == FALSE && overlap is null)
295         {
296             return; // Timeout
297         }
298 
299         enum size_t offset = size_t.sizeof * 2;
300         auto overlapped = cast(SocketState) ((cast(void*) overlap) - offset);
301         assert(overlapped !is null);
302         scope (failure)
303         {
304             defaultAllocator.dispose(overlapped);
305         }
306 
307         switch (overlapped.event)
308         {
309             case OverlappedSocketEvent.accept:
310                 auto connection = cast(ConnectionWatcher) (cast(void*) key);
311                 assert(connection !is null);
312 
313                 auto listener = cast(OverlappedStreamSocket) connection.socket;
314                 assert(listener !is null);
315 
316                 auto socket = listener.endAccept(overlapped);
317                 auto transport = defaultAllocator.make!StreamTransport(socket);
318 
319                 connection.incoming.insertBack(transport);
320 
321                 reify(transport,
322                       EventMask(Event.none),
323                       EventMask(Event.read | Event.write));
324 
325                 pendings.insertBack(connection);
326                 listener.beginAccept(overlapped);
327                 break;
328             case OverlappedSocketEvent.read:
329                 auto transport = cast(StreamTransport) (cast(void*) key);
330                 assert(transport !is null);
331 
332                 if (!transport.active)
333                 {
334                     defaultAllocator.dispose(transport);
335                     defaultAllocator.dispose(overlapped);
336                     return;
337                 }
338 
339                 int received;
340                 SocketException exception;
341                 try
342                 {
343                     received = transport.socket.endReceive(overlapped);
344                 }
345                 catch (SocketException e)
346                 {
347                     exception = e;
348                 }
349                 if (transport.socket.disconnected)
350                 {
351                     // We want to get one last notification to destroy the watcher.
352                     transport.socket.beginReceive(transport.output[], overlapped);
353                     kill(transport, exception);
354                 }
355                 else if (received > 0)
356                 {
357                     immutable full = transport.output.free == received;
358 
359                     transport.output += received;
360                     // Receive was interrupted because the buffer is full. We have to continue.
361                     if (full)
362                     {
363                         transport.socket.beginReceive(transport.output[], overlapped);
364                     }
365                     pendings.insertBack(transport);
366                 }
367                 break;
368             case OverlappedSocketEvent.write:
369                 auto transport = cast(StreamTransport) (cast(void*) key);
370                 assert(transport !is null);
371 
372                 transport.input += transport.socket.endSend(overlapped);
373                 if (transport.input.length > 0)
374                 {
375                     transport.socket.beginSend(transport.input[], overlapped);
376                 }
377                 else
378                 {
379                     transport.socket.beginReceive(transport.output[], overlapped);
380                     if (transport.isClosing())
381                     {
382                         kill(transport);
383                     }
384                 }
385                 break;
386             default:
387                 assert(false, "Unknown event");
388         }
389     }
390 }