-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathThreadPool.mpp
More file actions
152 lines (131 loc) * 4.46 KB
/
ThreadPool.mpp
File metadata and controls
152 lines (131 loc) * 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
export module CppUtils.Thread.ThreadPool;
import std;
import CppUtils.Execution.ScopeGuard;
import CppUtils.Logger;
import CppUtils.Thread.ThreadLoop;
import CppUtils.Thread.UniqueLocker;
export namespace CppUtils::Thread
{
class ThreadPool final
{
using Task = std::function<void()>;
public:
explicit inline ThreadPool(
std::size_t numberThreads = std::max(1uz, static_castsize_t>(std::thread::hardware_concurrency())),
std::function onError = nullptr,
std::function finally = nullptr):
m_numberThreads{std::max(1uz, numberThreads)},
m_onError{std::move(onError)},
m_finally{std::move(finally)}
{
m_workers.reserve(m_numberThreads);
for (auto i = 0uz; i < m_numberThreads; ++i)
m_workers.emplace_back(
[this] { workerThread(); },
[this] { m_startWorkingCondition.notify_all(); },
[this](std::exception_ptr exceptionPointer) { handleError(exceptionPointer); })
.start();
}
inline ThreadPool(const ThreadPool&) = delete;
inline ThreadPool& operator=(const ThreadPool&) = delete;
inline ThreadPool(ThreadPool&&) = delete;
inline ThreadPool& operator=(ThreadPool&&) = delete;
inline ~ThreadPool()
{
m_stopRequested = true;
for (auto& worker : m_workers)
worker.requestStop();
m_startWorkingCondition.notify_all();
for (auto& worker : m_workers)
worker.stop();
}
inline auto call(auto&& function) -> std::future>>
{
using ReturnType = std::invoke_result_tdecay_t<decltype(function)>>;
auto task = std::make_sharedReturnType()>>([this, function = std::forward<decltype(function)>(function)]() mutable -> ReturnType {
auto _ = Execution::ScopeGuard{m_finally};
try
{
return std::invoke(function);
}
catch (...)
{
handleError(std::current_exception());
throw;
}
});
auto future = task->get_future();
{
auto tasksQueueAccessor = m_tasksQueue.access();
tasksQueueAccessor->emplace([task = std::move(task)] { std::invoke(*task); });
}
m_startWorkingCondition.notify_one();
return future;
}
inline auto waitUntilFinished() -> void
{
auto tasksQueueAccessor = m_tasksQueue.access();
if (m_activeWorkers != 0 or not std::empty(tasksQueueAccessor.value()))
m_waitUntilFinishedCondition.wait(tasksQueueAccessor.getLockGuard(), [this, &tasksQueueAccessor] {
return m_activeWorkers == 0 and std::empty(tasksQueueAccessor.value());
});
}
inline auto setOnError(std::function<void(std::exception_ptr)> onError) noexcept -> void
{
auto lockGuard = std::unique_lock{m_onErrorMutex};
m_onError = std::move(onError);
}
private:
inline auto workerThread() -> void
{
auto task = Task{};
{
auto tasksQueueAccessor = m_tasksQueue.access();
m_startWorkingCondition.wait(tasksQueueAccessor.getLockGuard(), [this, &tasksQueueAccessor] {
return not std::empty(tasksQueueAccessor.value()) or m_stopRequested.load(std::memory_order_relaxed);
});
if (std::empty(tasksQueueAccessor.value()))
return;
m_activeWorkers.fetch_add(1, std::memory_order_relaxed);
task = std::move(tasksQueueAccessor->front());
tasksQueueAccessor->pop();
}
task();
{
auto tasksQueueAccessor = m_tasksQueue.access();
m_activeWorkers.fetch_sub(1, std::memory_order_relaxed);
if (m_activeWorkers.load(std::memory_order_relaxed) == 0 and std::empty(tasksQueueAccessor.value()))
m_waitUntilFinishedCondition.notify_all();
}
}
inline auto handleError(std::exception_ptr exceptionPointer) const noexcept -> void
{
if (auto lockGuard = std::shared_lock{m_onErrorMutex}; m_onError)
{
try
{
m_onError(exceptionPointer);
}
catch (const std::exception& exception)
{
Logger<"CppUtils">::print<"error">("ThreadPool: onError threw an exception: {}", exception.what());
}
catch (...)
{
Logger<"CppUtils">::print<"error">("ThreadPool: onError threw a non-std exception");
}
}
}
private:
std::size_t m_numberThreads;
std::atomic_size_t m_activeWorkers = 0;
std::condition_variable m_startWorkingCondition;
std::vector m_workers;
UniqueLocker> m_tasksQueue;
std::condition_variable m_waitUntilFinishedCondition;
mutable std::shared_mutex m_onErrorMutex;
std::function<void(std::exception_ptr)> m_onError;
std::atomic<bool> m_stopRequested = false;
std::function<void()> m_finally;
};
}