1/*
2 * LegacyClonk
3 *
4 * Copyright (c) RedWolf Design
5 * Copyright (c) 2017-2022, The LegacyClonk Team and contributors
6 *
7 * Distributed under the terms of the ISC license; see accompanying file
8 * "COPYING" for details.
9 *
10 * "Clonk" is a registered trademark of Matthes Bender, used with permission.
11 * See accompanying file "TRADEMARK" for details.
12 *
13 * To redistribute this file separately, substitute the full license texts
14 * for the above references.
15 */
16
17#include "C4Thread.h"
18#include "StdScheduler.h"
19
20#include <cstring>
21
22#include <stdio.h>
23
24#include <assert.h>
25#include <errno.h>
26#include <fcntl.h>
27
28#ifdef _WIN32
29
30#include <process.h>
31#include <mmsystem.h>
32
33#endif
34
35#ifndef _WIN32
36#include <ranges>
37#include <unordered_map>
38
39// For pipe()
40#include <unistd.h>
41#endif
42
43// *** StdSchedulerProc
44
45// *** StdScheduler
46
47void StdScheduler::Clear()
48{
49 procs.clear();
50#ifdef _WIN32
51 eventHandles.clear();
52 eventProcs.clear();
53#endif
54}
55
56void StdScheduler::Add(StdSchedulerProc *const proc)
57{
58 procs.insert(x: proc);
59}
60
61void StdScheduler::Remove(StdSchedulerProc *const proc)
62{
63 procs.erase(x: proc);
64}
65
66bool StdScheduler::Execute(int iTimeout)
67{
68 // Needs at least one process to work properly
69 if (!procs.size()) return false;
70
71 // Get timeout
72 for (auto *const proc : procs)
73 {
74 if (const int procTimeout{proc->GetTimeout()}; procTimeout >= 0)
75 {
76 if (iTimeout == StdSync::Infinite || iTimeout > procTimeout)
77 {
78 iTimeout = procTimeout;
79 }
80 }
81 }
82
83#ifdef _WIN32
84 eventHandles.clear();
85 eventProcs.clear();
86
87 // Collect event handles
88 for (auto *const proc : procs)
89 {
90 if (const HANDLE event{proc->GetEvent()}; event)
91 {
92 eventHandles.emplace_back(event);
93 eventProcs.emplace_back(proc);
94 }
95 }
96
97 // Add Unblocker
98 eventHandles.emplace_back(unblocker.GetEvent());
99
100 // Wait for something to happen
101 const DWORD ret{WaitForMultipleObjects(eventHandles.size(), eventHandles.data(), false, iTimeout < 0 ? INFINITE : iTimeout)};
102
103 bool success{false};
104
105 if (ret != WAIT_TIMEOUT)
106 {
107 // Which event?
108 const auto eventNumber = ret - WAIT_OBJECT_0;
109
110 // Execute the signaled proces
111 if (eventNumber < eventHandles.size() - 1)
112 {
113 if (!eventProcs[eventNumber]->Execute())
114 {
115 OnError(eventProcs[eventNumber]);
116 success = false;
117 }
118 }
119 }
120
121#else
122 fds.resize(new_size: 1);
123
124 struct FdRange
125 {
126 std::size_t Offset;
127 std::size_t Size;
128 };
129
130 std::unordered_map<StdSchedulerProc *, FdRange> fdMap;
131
132 for (auto *const proc : procs)
133 {
134 const std::size_t oldSize{fds.size()};
135 proc->GetFDs(fds);
136
137 assert(fds.size() >= oldSize);
138
139 if (fds.size() != oldSize)
140 {
141 fdMap.emplace(args: std::piecewise_construct, args: std::forward_as_tuple(args: proc), args: std::forward_as_tuple(args: oldSize, args: fds.size() - oldSize));
142 }
143 }
144
145 // Wait for something to happen
146 const int cnt{StdSync::Poll(fds, timeout: iTimeout)};
147
148 bool success{true};
149
150 if (cnt > 0)
151 {
152 // Unblocker? Flush
153 if (fds[0].revents & POLLIN)
154 {
155 unblocker.Reset();
156 }
157
158 const std::span<pollfd> fdSpan{fds};
159
160 for (const auto &[proc, range] : fdMap)
161 {
162 if (std::ranges::any_of(fdSpan.subspan(offset: range.Offset, count: range.Size), std::identity{}, &pollfd::revents))
163 {
164 if (!proc->Execute(iTimeout: 0))
165 {
166 OnError(pProc: proc);
167 success = false;
168 }
169 }
170 }
171 }
172 else if (cnt < 0)
173 {
174 printf(format: "StdScheduler::Execute: poll failed %s\n", strerror(errno));
175 }
176
177#endif
178
179 for (auto *const proc : procs)
180 {
181 if (proc->GetTimeout() == 0)
182 {
183 if (!proc->Execute())
184 {
185 OnError(pProc: proc);
186 success = false;
187 }
188 }
189 }
190
191 return success;
192}
193
194void StdScheduler::UnBlock()
195{
196 unblocker.Set();
197}
198
199// *** StdSchedulerThread
200
201StdSchedulerThread::~StdSchedulerThread()
202{
203 // Stop thread
204 if (fThread) Stop();
205}
206
207void StdSchedulerThread::Add(StdSchedulerProc *pProc)
208{
209 // Thread is running? Stop it first
210 bool fGotThread = fThread;
211 if (fGotThread) Stop();
212 // Set
213 StdScheduler::Add(proc: pProc);
214 // Restart
215 if (fGotThread) Start();
216}
217
218void StdSchedulerThread::Remove(StdSchedulerProc *pProc)
219{
220 // Thread is running? Stop it first
221 bool fGotThread = fThread;
222 if (fGotThread) Stop();
223 // Set
224 StdScheduler::Remove(proc: pProc);
225 // Restart
226 if (fGotThread) Start();
227}
228
229bool StdSchedulerThread::Start()
230{
231 // already running? stop
232 if (fThread) Stop();
233 // begin thread
234 runThreadRun.store(i: true, m: std::memory_order_release);
235 thread = C4Thread::Create(options: {.Name: "StdSchedulerThread"}, func: [this]
236 {
237 while (runThreadRun.load(m: std::memory_order_acquire))
238 {
239 Execute();
240 }
241 });
242 fThread = true;
243 // success?
244 return true;
245}
246
247void StdSchedulerThread::Stop()
248{
249 // Not running?
250 if (!fThread) return;
251 // Set flag
252 runThreadRun.store(i: false, m: std::memory_order_release);
253 // Unblock
254 UnBlock();
255 thread.join();
256 fThread = false;
257 // ok
258 return;
259}
260