1/*
2 * LegacyClonk
3 *
4 * Copyright (c) 2023, The LegacyClonk Team and contributors
5 *
6 * Distributed under the terms of the ISC license; see accompanying file
7 * "COPYING" for details.
8 *
9 * "Clonk" is a registered trademark of Matthes Bender, used with permission.
10 * See accompanying file "TRADEMARK" for details.
11 *
12 * To redistribute this file separately, substitute the full license texts
13 * for the above references.
14 */
15
16#include "C4Awaiter.h"
17#include "C4CurlSystem.h"
18#include "C4Log.h"
19#include "C4ThreadPool.h"
20#include "StdApp.h"
21#include "C4ResStrTable.h"
22
23#include <format>
24#include <ranges>
25#include <utility>
26
27#define CURL_STRICTER
28#include <curl/curl.h>
29
30template<typename T, typename... Args> requires (sizeof...(Args) >= 1)
31static decltype(auto) ThrowIfFailed(T &&result, Args &&...args)
32{
33 if (!result)
34 {
35 if constexpr (sizeof...(Args) == 1)
36 {
37 throw C4CurlSystem::Exception{std::forward<Args>(args)...};
38 }
39 else
40 {
41 throw C4CurlSystem::Exception{std::format(std::forward<Args>(args)...)};
42 }
43 }
44
45 return std::forward<T>(result);
46}
47
48void C4CurlSystem::CURLMultiDeleter::operator()(CURLM *const multi)
49{
50 curl_multi_cleanup(multi_handle: multi);
51}
52
53void C4CurlSystem::CURLEasyDeleter::operator()(CURL *const easy)
54{
55 curl_easy_cleanup(curl: easy);
56}
57
58C4CurlSystem::GlobalInit::GlobalInit()
59{
60 if (const auto ret = curl_global_init(CURL_GLOBAL_ALL); ret != CURLE_OK)
61 {
62 const char *const error{curl_easy_strerror(ret)};
63 std::string message{std::vformat(fmt: LoadResStr(id: C4ResStrTableKey::IDS_ERR_CURLGLOBALINIT), args: std::make_format_args(fmt_args: error))};
64 LogNTr(message);
65 throw CStdApp::StartupException{std::move(message)};
66 }
67}
68
69C4CurlSystem::GlobalInit::~GlobalInit()
70{
71 curl_global_cleanup();
72}
73
74C4CurlSystem::AddedEasyHandle::AddedEasyHandle(C4CurlSystem &system, EasyHandle &&easyHandle)
75 : system{system}, easyHandle{std::move(easyHandle)}
76{
77}
78
79C4CurlSystem::AddedEasyHandle::~AddedEasyHandle()
80{
81 if (get())
82 {
83 system.get().RemoveHandle(handle: get());
84 }
85}
86
87C4CurlSystem::MultiHandleWithCallbacks::MultiHandleWithCallbacks(
88 MultiHandle multiHandle,
89 C4CurlSystem &system,
90 SocketFunction *const socketFunction,
91 TimerFunction *const timerFunction)
92 : multiHandle{std::move(multiHandle)}
93{
94 if (this->multiHandle)
95 {
96 curl_multi_setopt(this->multiHandle.get(), CURLMOPT_SOCKETFUNCTION, socketFunction);
97 curl_multi_setopt(this->multiHandle.get(), CURLMOPT_SOCKETDATA, &system);
98
99 curl_multi_setopt(this->multiHandle.get(), CURLMOPT_TIMERFUNCTION, timerFunction);
100 curl_multi_setopt(this->multiHandle.get(), CURLMOPT_TIMERDATA, &system);
101 }
102}
103
104C4CurlSystem::MultiHandleWithCallbacks::~MultiHandleWithCallbacks()
105{
106 if (multiHandle)
107 {
108 curl_multi_setopt(multiHandle.get(), CURLMOPT_SOCKETFUNCTION, nullptr);
109 curl_multi_setopt(multiHandle.get(), CURLMOPT_SOCKETDATA, nullptr);
110
111 curl_multi_setopt(multiHandle.get(), CURLMOPT_TIMERFUNCTION, nullptr);
112 curl_multi_setopt(multiHandle.get(), CURLMOPT_TIMERDATA, nullptr);
113 }
114}
115
116C4CurlSystem::Awaiter::Awaiter(C4CurlSystem &system, EasyHandle &&easyHandle)
117 : system{system},
118 easyHandle{std::move(easyHandle)},
119 result{std::unexpected{std::string(static_cast<std::size_t>(CURL_ERROR_SIZE), '\0')}}
120{
121 curl_easy_setopt(std::get<0>(this->easyHandle).get(), CURLOPT_ERRORBUFFER, result.error().data());
122 curl_easy_setopt(std::get<0>(this->easyHandle).get(), CURLOPT_PRIVATE, this);
123}
124
125void C4CurlSystem::Awaiter::SetResult(C4NetIO::addr_t &&result)
126{
127 const std::lock_guard lock{resultMutex};
128 this->result = std::move(result);
129}
130
131void C4CurlSystem::Awaiter::SetErrorMessage(const char *const message)
132{
133 const std::lock_guard lock{resultMutex};
134 result = std::unexpected{message};
135}
136
137C4NetIO::addr_t C4CurlSystem::Awaiter::await_resume()
138{
139 if (cancelled.load(m: std::memory_order_acquire))
140 {
141 throw C4Task::CancelledException{};
142 }
143
144 const std::lock_guard lock{resultMutex};
145
146 if (result.has_value())
147 {
148 return std::move(result.value());
149 }
150 else
151 {
152 throw C4CurlSystem::Exception{std::move(result.error())};
153 }
154}
155
156void C4CurlSystem::Awaiter::SetupCancellation(C4Task::CancellablePromise *const promise)
157{
158 promise->SetCancellationCallback(callback: [](void *const argument)
159 {
160 auto &that = *reinterpret_cast<Awaiter *>(argument);
161 {
162 const std::lock_guard lock{that.resultMutex};
163 that.easyHandle = {};
164 that.cancelled.store(i: true, m: std::memory_order_release);
165 }
166
167 that.Resume();
168 }, argument: this);
169}
170
171void C4CurlSystem::Awaiter::Resume()
172{
173 C4ThreadPool::Global->SubmitCallback(callback: coroutineHandle.load(m: std::memory_order_acquire));
174}
175
176C4CurlSystem::C4CurlSystem()
177 : multiHandle{MultiHandle{curl_multi_init()}, *this, &C4CurlSystem::SocketFunction, &C4CurlSystem::TimerFunction}
178{
179 if (!multiHandle)
180 {
181 std::string message{std::vformat(fmt: LoadResStr(id: C4ResStrTableKey::IDS_ERR_CURLGLOBALINIT), args: std::make_format_args(fmt_args: "curl_multi_init failed"))};
182 LogNTr(message);
183 throw CStdApp::StartupException{std::move(message)};
184 }
185
186 multiTask = Execute();
187}
188
189C4CurlSystem::AddedEasyHandle C4CurlSystem::AddHandle(EasyHandle &&easyHandle)
190{
191 AddedEasyHandle addedEasyHandle{*this, std::move(easyHandle)};
192
193 {
194 const std::lock_guard lock{socketMapMutex};
195 ThrowIfFailed(result: curl_multi_add_handle(multi_handle: multiHandle.get(), curl_handle: addedEasyHandle.get()) == CURLM_OK, args: "curl_multi_add_handle");
196 }
197
198 CancelWait();
199
200 return addedEasyHandle;
201}
202
203void C4CurlSystem::RemoveHandle(CURL *const handle)
204{
205 {
206 const std::lock_guard lock{socketMapMutex};
207 curl_multi_remove_handle(multi_handle: multiHandle.get(), curl_handle: handle);
208 std::erase_if(cont&: sockets, pred: [handle](const auto &pair) { return pair.first.first == handle; });
209 }
210
211 CancelWait();
212}
213
214C4CurlSystem::TaskType C4CurlSystem::Execute()
215{
216 int running{0};
217
218 {
219 const std::lock_guard lock{socketMapMutex};
220 curl_multi_socket_action(multi_handle: multiHandle.get(), CURL_SOCKET_TIMEOUT, ev_bitmask: 0, running_handles: &running);
221 }
222
223 const auto cancellationToken = co_await C4Task::GetCancellationToken();
224
225 for (;;)
226 {
227 WaitReturnType result{};
228
229 try
230 {
231 result = co_await Wait();
232 }
233 catch (const C4Task::CancelledException &)
234 {
235 if (cancellationToken())
236 {
237 co_return;
238 }
239 }
240
241 {
242 const std::lock_guard lock{socketMapMutex};
243
244 #ifdef _WIN32
245 if (result)
246 {
247 bool mapEmpty{sockets.empty()};
248 if (!mapEmpty)
249 {
250
251 // copy map to prevent crashes
252 const auto localSockets = sockets;
253
254 auto range = localSockets | std::views::keys | std::views::elements<1>;
255
256 if (!range.empty())
257 {
258 mapEmpty = false;
259
260 for (const auto socket : localSockets | std::views::keys | std::views::elements<1>)
261 {
262 if (WSANETWORKEVENTS networkEvents; !WSAEnumNetworkEvents(socket, event.GetEvent(), &networkEvents))
263 {
264 int eventBitmask{0};
265 if (networkEvents.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_CLOSE))
266 {
267 eventBitmask |= CURL_CSELECT_IN;
268 }
269
270 if (networkEvents.lNetworkEvents & (FD_WRITE | FD_CONNECT))
271 {
272 eventBitmask |= CURL_CSELECT_OUT;
273 }
274
275 curl_multi_socket_action(multiHandle.get(), socket, eventBitmask, &running);
276 }
277 else
278 {
279 curl_multi_socket_action(multiHandle.get(), socket, CURL_CSELECT_ERR, &running);
280 }
281 }
282 }
283 }
284
285 if (mapEmpty)
286 {
287 // Normally, WSAEnumNetworkEvents will reset the event as it needs to. However, if the map is empty,
288 // there is no WSAEnumNetworkEvents call to reset the event, so we do it manually here.
289 event.Reset();
290 curl_multi_socket_action(multiHandle.get(), CURL_SOCKET_TIMEOUT, 0, &running);
291 }
292 }
293 #else
294 if (!result.empty())
295 {
296 for (const auto event : result)
297 {
298 int eventBitmask{0};
299 if (event.revents & POLLIN)
300 {
301 eventBitmask |= CURL_CSELECT_IN;
302 }
303
304 if (event.revents & POLLOUT)
305 {
306 eventBitmask |= CURL_CSELECT_OUT;
307 }
308
309 if (event.revents & POLLERR)
310 {
311 eventBitmask |= CURL_CSELECT_ERR;
312 }
313
314 curl_multi_socket_action(multi_handle: multiHandle.get(), s: event.fd, ev_bitmask: eventBitmask, running_handles: &running);
315 }
316 }
317 #endif
318 else
319 {
320 curl_multi_socket_action(multi_handle: multiHandle.get(), CURL_SOCKET_TIMEOUT, ev_bitmask: 0, running_handles: &running);
321 }
322 }
323
324 // release the mutex for a short time to allow cancellation to remove easy
325 const std::lock_guard lock{socketMapMutex};
326 ProcessMessages();
327 }
328}
329
330C4Task::Cold<C4CurlSystem::WaitReturnType> C4CurlSystem::Wait()
331{
332 const struct Cleanup
333 {
334 Cleanup(C4Task::CancellablePromise &promise, std::atomic<C4Task::CancellablePromise *> &wait)
335 : promise{promise}, wait{wait}
336 {
337 wait.store(p: &promise, m: std::memory_order_release);
338 }
339
340 ~Cleanup()
341 {
342 C4Task::CancellablePromise *expected;
343
344 do
345 {
346 expected = &promise;
347 }
348 while (!wait.compare_exchange_strong(p1&: expected, p2: nullptr, m: std::memory_order_acq_rel));
349 }
350
351 C4Task::CancellablePromise &promise;
352 std::atomic<C4Task::CancellablePromise *> &wait;
353 } cleanup{co_await C4Task::GetPromise(), wait};
354
355#ifdef _WIN32
356 co_return co_await C4Awaiter::ResumeOnSignal(event.GetEvent(), timeout.load(std::memory_order_acquire));
357#else
358 co_return co_await C4Awaiter::ResumeOnSignals(
359 fds: GetSocketMapCopy()
360 | std::views::transform([](const auto &pair) { return pollfd{.fd = pair.first.second, .events = static_cast<short>(pair.second)}; }),
361 timeout: timeout.load(m: std::memory_order_acquire)
362 );
363#endif
364}
365
366void C4CurlSystem::ProcessMessages()
367{
368 CURLMsg *message;
369 do
370 {
371 int messagesInQueue{0};
372 message = curl_multi_info_read(multi_handle: multiHandle.get(), msgs_in_queue: &messagesInQueue);
373
374 if (message && message->msg == CURLMSG_DONE)
375 {
376 char *awaiterPtr{nullptr};
377 ThrowIfFailed(curl_easy_getinfo(message->easy_handle, CURLINFO_PRIVATE, &awaiterPtr) == CURLE_OK, args: "curl_easy_getinfo(CURLOPT_PRIVATE) failed");
378
379 auto &awaiter = *reinterpret_cast<Awaiter *>(awaiterPtr);
380
381 if (message->data.result == CURLE_OK)
382 {
383 char *ip;
384 if (curl_easy_getinfo(message->easy_handle, CURLINFO_PRIMARY_IP, &ip) == CURLE_OK)
385 {
386 C4NetIO::addr_t serverAddress;
387 serverAddress.SetHost(host: StdStrBuf{ip});
388 awaiter.SetResult(std::move(serverAddress));
389 }
390 else
391 {
392 awaiter.SetErrorMessage("curl_easy_getinfo(CURLINFO_PRIMARY_IP) failed");
393 }
394 }
395 else
396 {
397 awaiter.SetErrorMessage(curl_easy_strerror(message->data.result));
398 }
399
400 awaiter.Resume();
401 }
402 }
403 while (message);
404}
405
406void C4CurlSystem::CancelWait()
407{
408 if (C4Task::CancellablePromise *const promise{wait.exchange(p: nullptr, m: std::memory_order_acq_rel)}; promise)
409 {
410 const struct Cleanup
411 {
412 ~Cleanup()
413 {
414 wait.store(p: promise, m: std::memory_order_release);
415 }
416
417 C4Task::CancellablePromise *promise;
418 std::atomic<C4Task::CancellablePromise *> &wait;
419 } cleanup{.promise: promise, .wait: wait};
420
421 promise->Cancel();
422 }
423}
424
425int C4CurlSystem::SocketFunction(CURL *const curl, const curl_socket_t s, const int what, void *const userData) noexcept
426{
427 auto &that = *reinterpret_cast<C4CurlSystem *>(userData);
428
429 std::int32_t networkEvents;
430#ifdef _WIN32
431 static constexpr long NetworkEventsIn{FD_READ | FD_ACCEPT | FD_CLOSE};
432 static constexpr long NetworkEventsOut{FD_WRITE | FD_CONNECT};
433#else
434 static constexpr std::int32_t NetworkEventsIn{POLLIN};
435 static constexpr std::int32_t NetworkEventsOut{POLLOUT};
436#endif
437
438 switch (what)
439 {
440 case CURL_POLL_IN:
441 networkEvents = NetworkEventsIn;
442 break;
443
444 case CURL_POLL_OUT:
445 networkEvents = NetworkEventsOut;
446 break;
447
448 case CURL_POLL_INOUT:
449 networkEvents = NetworkEventsIn | NetworkEventsOut;
450 break;
451
452 default:
453 networkEvents = 0;
454 break;
455 }
456
457#ifdef _WIN32
458 if (WSAEventSelect(s, that.event.GetEvent(), networkEvents) == SOCKET_ERROR)
459 {
460 return CURL_SOCKOPT_ERROR;
461 }
462#endif
463
464 if (what == CURL_POLL_REMOVE)
465 {
466 that.sockets.erase(x: std::pair{curl, s});
467 }
468 else
469 {
470 that.sockets.insert_or_assign(k: {curl, s}, obj&: networkEvents);
471 }
472
473 return 0;
474}
475
476int C4CurlSystem::TimerFunction(CURLM *, const long timeout, void *const userData) noexcept
477{
478 reinterpret_cast<C4CurlSystem *>(userData)->timeout.store(i: static_cast<std::uint32_t>(std::clamp(val: timeout, lo: -1L, hi: static_cast<long>(std::numeric_limits<std::int32_t>::max()))), m: std::memory_order_release);
479 return 0;
480}
481