/tmp/bitcoin/src/util/threadpool.h
Line | Count | Source |
1 | | // Copyright (c) The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or https://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #ifndef BITCOIN_UTIL_THREADPOOL_H |
6 | | #define BITCOIN_UTIL_THREADPOOL_H |
7 | | |
8 | | #include <sync.h> |
9 | | #include <tinyformat.h> |
10 | | #include <util/check.h> |
11 | | #include <util/expected.h> |
12 | | #include <util/thread.h> |
13 | | |
14 | | #include <algorithm> |
15 | | #include <condition_variable> |
16 | | #include <functional> |
17 | | #include <future> |
18 | | #include <queue> |
19 | | #include <ranges> |
20 | | #include <thread> |
21 | | #include <type_traits> |
22 | | #include <utility> |
23 | | #include <vector> |
24 | | |
25 | | /** |
26 | | * @brief Fixed-size thread pool for running arbitrary tasks concurrently. |
27 | | * |
28 | | * The thread pool maintains a set of worker threads that consume and execute |
29 | | * tasks submitted through Submit(). Once started, tasks can be queued and |
30 | | * processed asynchronously until Stop() is called. |
31 | | * |
32 | | * ### Thread-safety and lifecycle |
33 | | * - `Start()` and `Stop()` must be called from a controller (non-worker) thread. |
34 | | * Calling `Stop()` from a worker thread will deadlock, as it waits for all |
35 | | * workers to join, including the current one. |
36 | | * |
37 | | * - `Submit()` can be called from any thread, including workers. It safely |
38 | | * enqueues new work for execution as long as the pool has active workers. |
39 | | * |
40 | | * - `Interrupt()` stops new task submission and lets queued ones drain |
41 | | * in the background. Callers can continue other shutdown steps and call |
42 | | * Stop() at the end to ensure no remaining tasks are left to execute. |
43 | | * |
44 | | * - `Stop()` prevents further task submission and blocks until all the |
45 | | * queued ones are completed. |
46 | | */ |
47 | | class ThreadPool |
48 | | { |
49 | | private: |
50 | | std::string m_name; |
51 | | Mutex m_mutex; |
52 | | std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex); |
53 | | std::condition_variable m_cv; |
54 | | // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool. |
55 | | // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals. |
56 | | // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable |
57 | | bool m_interrupt GUARDED_BY(m_mutex){false}; |
58 | | std::vector<std::thread> m_workers GUARDED_BY(m_mutex); |
59 | | |
60 | | void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
61 | 2.25k | { |
62 | 2.25k | WAIT_LOCK(m_mutex, wait_lock); |
63 | 173k | for (;;) { |
64 | 173k | std::packaged_task<void()> task; |
65 | 173k | { |
66 | | // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one. |
67 | 173k | if (!m_interrupt && m_work_queue.empty()) { |
68 | | // Block until the pool is interrupted or a task is available. |
69 | 337k | m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); }); |
70 | 167k | } |
71 | | |
72 | | // If stopped and no work left, exit worker |
73 | 173k | if (m_interrupt && m_work_queue.empty()) { |
74 | 2.25k | return; |
75 | 2.25k | } |
76 | | |
77 | 171k | task = std::move(m_work_queue.front()); |
78 | 171k | m_work_queue.pop(); |
79 | 171k | } |
80 | | |
81 | 0 | { |
82 | | // Execute the task without the lock |
83 | 171k | REVERSE_LOCK(wait_lock, m_mutex); |
84 | 171k | task(); |
85 | 171k | } |
86 | 171k | } |
87 | 2.25k | } |
88 | | |
89 | | public: |
90 | 1.34k | explicit ThreadPool(const std::string& name) : m_name(name) {} |
91 | | |
92 | | ~ThreadPool() |
93 | 15 | { |
94 | 15 | Stop(); // In case it hasn't been stopped. |
95 | 15 | } |
96 | | |
97 | | /** |
98 | | * @brief Start worker threads. |
99 | | * |
100 | | * Creates and launches `num_workers` threads that begin executing tasks |
101 | | * from the queue. If the pool is already started, throws. |
102 | | * |
103 | | * Must be called from a controller (non-worker) thread. |
104 | | */ |
105 | | void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
106 | 1.10k | { |
107 | 1.10k | assert(num_workers > 0); |
108 | 1.10k | LOCK(m_mutex); |
109 | 1.10k | if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping"); |
110 | 1.09k | if (!m_workers.empty()) throw std::runtime_error("Thread pool already started"); |
111 | | |
112 | | // Create workers |
113 | 1.09k | m_workers.reserve(num_workers); |
114 | 3.35k | for (int i = 0; i < num_workers; i++) { |
115 | 2.25k | m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); }); |
116 | 2.25k | } |
117 | 1.09k | } |
118 | | |
119 | | /** |
120 | | * @brief Stop all worker threads and wait for them to exit. |
121 | | * |
122 | | * Sets the interrupt flag, wakes all waiting workers, and joins them. |
123 | | * Any remaining tasks in the queue will be processed before returning. |
124 | | * |
125 | | * Must be called from a controller (non-worker) thread. |
126 | | * Concurrent calls to Start() will be rejected while Stop() is in progress. |
127 | | */ |
128 | | void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
129 | 1.16k | { |
130 | | // Notify workers and join them |
131 | 1.16k | std::vector<std::thread> threads_to_join; |
132 | 1.16k | { |
133 | 1.16k | LOCK(m_mutex); |
134 | | // Ensure Stop() is not called from a worker thread while workers are still registered, |
135 | | // otherwise a self-join deadlock would occur. |
136 | 1.16k | auto id = std::this_thread::get_id(); |
137 | 1.16k | for (const auto& worker : m_workers) assert(worker.get_id() != id); |
138 | | // Early shutdown to return right away on any concurrent Submit() call |
139 | 1.16k | m_interrupt = true; |
140 | 1.16k | threads_to_join.swap(m_workers); |
141 | 1.16k | } |
142 | 0 | m_cv.notify_all(); |
143 | | // Help draining queue |
144 | 1.18k | while (ProcessTask()) {} |
145 | | // Free resources |
146 | 2.25k | for (auto& worker : threads_to_join) worker.join(); |
147 | | |
148 | | // Since we currently wait for tasks completion, sanity-check empty queue |
149 | 1.16k | LOCK(m_mutex); |
150 | 1.16k | Assume(m_work_queue.empty()); |
151 | | // Re-allow Start() now that all workers have exited |
152 | 1.16k | m_interrupt = false; |
153 | 1.16k | } |
154 | | |
155 | | enum class SubmitError { |
156 | | Inactive, |
157 | | Interrupted, |
158 | | }; |
159 | | |
160 | | template <class F> |
161 | | using Future = std::future<std::invoke_result_t<F>>; |
162 | | |
163 | | template <class R> |
164 | | using RangeFuture = Future<std::ranges::range_reference_t<R>>; |
165 | | |
166 | | template <class F> |
167 | | using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>; |
168 | | |
169 | | /** |
170 | | * @brief Enqueues a new task for asynchronous execution. |
171 | | * |
172 | | * @param fn Callable to execute asynchronously. |
173 | | * @return On success, a future containing fn's result. |
174 | | * On failure, an error indicating why the task was rejected: |
175 | | * - SubmitError::Inactive: Pool has no workers (never started or already stopped). |
176 | | * - SubmitError::Interrupted: Pool task acceptance has been interrupted. |
177 | | * |
178 | | * Thread-safe: Can be called from any thread, including within the provided 'fn' callable. |
179 | | * |
180 | | * @warning Ignoring the returned future requires guarding the task against |
181 | | * uncaught exceptions, as they would otherwise be silently discarded. |
182 | | */ |
183 | | template <class F> |
184 | | [[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
185 | 171k | { |
186 | 171k | PackagedTask<F> task{std::forward<F>(fn)}; |
187 | 171k | auto future{task.get_future()}; |
188 | 171k | { |
189 | 171k | LOCK(m_mutex); |
190 | 171k | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; |
191 | 171k | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; |
192 | | |
193 | 171k | m_work_queue.emplace(std::move(task)); |
194 | 171k | } |
195 | 0 | m_cv.notify_one(); |
196 | 171k | return {std::move(future)}; |
197 | 171k | } threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::BlockWorkers(ThreadPool&, std::counting_semaphore<2147483647l>&, unsigned long)::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::BlockWorkers(ThreadPool&, std::counting_semaphore<2147483647l>&, unsigned long)::$_0>(threadpool_tests::BlockWorkers(ThreadPool&, std::counting_semaphore<2147483647l>&, unsigned long)::$_0&&) Line | Count | Source | 185 | 37 | { | 186 | 37 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 37 | auto future{task.get_future()}; | 188 | 37 | { | 189 | 37 | LOCK(m_mutex); | 190 | 37 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 37 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 37 | m_work_queue.emplace(std::move(task)); | 194 | 37 | } | 195 | 0 | m_cv.notify_one(); | 196 | 37 | return {std::move(future)}; | 197 | 37 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::submit_tasks_complete_successfully::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::submit_tasks_complete_successfully::test_method()::$_0>(threadpool_tests::submit_tasks_complete_successfully::test_method()::$_0&&) Line | Count | Source | 185 | 50 | { | 186 | 50 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 50 | auto future{task.get_future()}; | 188 | 50 | { | 189 | 50 | LOCK(m_mutex); | 190 | 50 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 50 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 50 | m_work_queue.emplace(std::move(task)); | 194 | 50 | } | 195 | 0 | m_cv.notify_one(); | 196 | 50 | return {std::move(future)}; | 197 | 50 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::single_available_worker_executes_all_tasks::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::single_available_worker_executes_all_tasks::test_method()::$_0>(threadpool_tests::single_available_worker_executes_all_tasks::test_method()::$_0&&) Line | Count | Source | 185 | 15 | { | 186 | 15 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 15 | auto future{task.get_future()}; | 188 | 15 | { | 189 | 15 | LOCK(m_mutex); | 190 | 15 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 15 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 15 | m_work_queue.emplace(std::move(task)); | 194 | 15 | } | 195 | 0 | m_cv.notify_one(); | 196 | 15 | return {std::move(future)}; | 197 | 15 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::wait_for_task_to_finish::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::wait_for_task_to_finish::test_method()::$_0>(threadpool_tests::wait_for_task_to_finish::test_method()::$_0&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 1 | m_work_queue.emplace(std::move(task)); | 194 | 1 | } | 195 | 0 | m_cv.notify_one(); | 196 | 1 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::get_result_from_completed_task::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::get_result_from_completed_task::test_method()::$_0>(threadpool_tests::get_result_from_completed_task::test_method()::$_0&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 1 | m_work_queue.emplace(std::move(task)); | 194 | 1 | } | 195 | 0 | m_cv.notify_one(); | 196 | 1 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::get_result_from_completed_task::test_method()::$_1>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::get_result_from_completed_task::test_method()::$_1>(threadpool_tests::get_result_from_completed_task::test_method()::$_1&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 1 | m_work_queue.emplace(std::move(task)); | 194 | 1 | } | 195 | 0 | m_cv.notify_one(); | 196 | 1 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::task_exception_propagates_to_future::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::task_exception_propagates_to_future::test_method()::$_0>(threadpool_tests::task_exception_propagates_to_future::test_method()::$_0&&) Line | Count | Source | 185 | 5 | { | 186 | 5 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 5 | auto future{task.get_future()}; | 188 | 5 | { | 189 | 5 | LOCK(m_mutex); | 190 | 5 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 5 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 5 | m_work_queue.emplace(std::move(task)); | 194 | 5 | } | 195 | 0 | m_cv.notify_one(); | 196 | 5 | return {std::move(future)}; | 197 | 5 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::process_tasks_manually_when_workers_busy::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::process_tasks_manually_when_workers_busy::test_method()::$_0>(threadpool_tests::process_tasks_manually_when_workers_busy::test_method()::$_0&&) Line | Count | Source | 185 | 20 | { | 186 | 20 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 20 | auto future{task.get_future()}; | 188 | 20 | { | 189 | 20 | LOCK(m_mutex); | 190 | 20 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 20 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 20 | m_work_queue.emplace(std::move(task)); | 194 | 20 | } | 195 | 0 | m_cv.notify_one(); | 196 | 20 | return {std::move(future)}; | 197 | 20 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::recursive_task_submission::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::recursive_task_submission::test_method()::$_0>(threadpool_tests::recursive_task_submission::test_method()::$_0&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 1 | m_work_queue.emplace(std::move(task)); | 194 | 1 | } | 195 | 0 | m_cv.notify_one(); | 196 | 1 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::recursive_task_submission::test_method()::$_0::operator()() const::'lambda'()>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::recursive_task_submission::test_method()::$_0::operator()() const::'lambda'()>(threadpool_tests::recursive_task_submission::test_method()::$_0::operator()() const::'lambda'()&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 1 | m_work_queue.emplace(std::move(task)); | 194 | 1 | } | 195 | 0 | m_cv.notify_one(); | 196 | 1 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::task_submitted_while_busy_completes::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::task_submitted_while_busy_completes::test_method()::$_0>(threadpool_tests::task_submitted_while_busy_completes::test_method()::$_0&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 1 | m_work_queue.emplace(std::move(task)); | 194 | 1 | } | 195 | 0 | m_cv.notify_one(); | 196 | 1 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::congestion_more_workers_than_cores::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::congestion_more_workers_than_cores::test_method()::$_0>(threadpool_tests::congestion_more_workers_than_cores::test_method()::$_0&&) Line | Count | Source | 185 | 200 | { | 186 | 200 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 200 | auto future{task.get_future()}; | 188 | 200 | { | 189 | 200 | LOCK(m_mutex); | 190 | 200 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 200 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 200 | m_work_queue.emplace(std::move(task)); | 194 | 200 | } | 195 | 0 | m_cv.notify_one(); | 196 | 200 | return {std::move(future)}; | 197 | 200 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_1>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_1>(threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_1&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 1 | m_work_queue.emplace(std::move(task)); | 194 | 1 | } | 195 | 0 | m_cv.notify_one(); | 196 | 1 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::queued_tasks_complete_after_interrupt::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::queued_tasks_complete_after_interrupt::test_method()::$_0>(threadpool_tests::queued_tasks_complete_after_interrupt::test_method()::$_0&&) Line | Count | Source | 185 | 10 | { | 186 | 10 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 10 | auto future{task.get_future()}; | 188 | 10 | { | 189 | 10 | LOCK(m_mutex); | 190 | 10 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 10 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 10 | m_work_queue.emplace(std::move(task)); | 194 | 10 | } | 195 | 0 | m_cv.notify_one(); | 196 | 10 | return {std::move(future)}; | 197 | 10 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::stop_active_wait_drains_queue::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::stop_active_wait_drains_queue::test_method()::$_0>(threadpool_tests::stop_active_wait_drains_queue::test_method()::$_0&&) Line | Count | Source | 185 | 20 | { | 186 | 20 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 20 | auto future{task.get_future()}; | 188 | 20 | { | 189 | 20 | LOCK(m_mutex); | 190 | 20 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 20 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 20 | m_work_queue.emplace(std::move(task)); | 194 | 20 | } | 195 | 0 | m_cv.notify_one(); | 196 | 20 | return {std::move(future)}; | 197 | 20 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::submit_fails_with_correct_error::test_method()::$_0 const&>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::submit_fails_with_correct_error::test_method()::$_0 const&>(threadpool_tests::submit_fails_with_correct_error::test_method()::$_0 const&) Line | Count | Source | 185 | 4 | { | 186 | 4 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 4 | auto future{task.get_future()}; | 188 | 4 | { | 189 | 4 | LOCK(m_mutex); | 190 | 4 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 0 | m_work_queue.emplace(std::move(task)); | 194 | 0 | } | 195 | 0 | m_cv.notify_one(); | 196 | 0 | return {std::move(future)}; | 197 | 1 | } |
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_0>(threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_0&&) Line | Count | Source | 185 | 1 | { | 186 | 1 | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 1 | auto future{task.get_future()}; | 188 | 1 | { | 189 | 1 | LOCK(m_mutex); | 190 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 0 | m_work_queue.emplace(std::move(task)); | 194 | 0 | } | 195 | 0 | m_cv.notify_one(); | 196 | 0 | return {std::move(future)}; | 197 | 1 | } |
httpserver.cpp:util::Expected<std::future<std::invoke_result<http_request_cb(evhttp_request*, void*)::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<http_request_cb(evhttp_request*, void*)::$_0>(http_request_cb(evhttp_request*, void*)::$_0&&) Line | Count | Source | 185 | 171k | { | 186 | 171k | PackagedTask<F> task{std::forward<F>(fn)}; | 187 | 171k | auto future{task.get_future()}; | 188 | 171k | { | 189 | 171k | LOCK(m_mutex); | 190 | 171k | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 191 | 171k | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 192 | | | 193 | 171k | m_work_queue.emplace(std::move(task)); | 194 | 171k | } | 195 | 0 | m_cv.notify_one(); | 196 | 171k | return {std::move(future)}; | 197 | 171k | } |
|
198 | | |
199 | | /** |
200 | | * @brief Enqueues a range of tasks for asynchronous execution. |
201 | | * |
202 | | * @param fns Callables to execute asynchronously. |
203 | | * @return On success, a vector of futures containing each element of fns's result in order. |
204 | | * On failure, an error indicating why the range was rejected: |
205 | | * - SubmitError::Inactive: Pool has no workers (never started or already stopped). |
206 | | * - SubmitError::Interrupted: Pool task acceptance has been interrupted. |
207 | | * |
208 | | * This is more efficient when submitting many tasks at once, since |
209 | | * the queue lock is only taken once internally and all worker threads are |
210 | | * notified. For single tasks, Submit() is preferred since only one worker |
211 | | * thread is notified. |
212 | | * |
213 | | * Thread-safe: Can be called from any thread, including within submitted callables. |
214 | | * |
215 | | * @warning Ignoring the returned futures requires guarding tasks against |
216 | | * uncaught exceptions, as they would otherwise be silently discarded. |
217 | | */ |
218 | | template <std::ranges::sized_range R> |
219 | | requires(!std::is_lvalue_reference_v<R>) |
220 | | [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
221 | 4 | { |
222 | 4 | std::vector<RangeFuture<R>> futures; |
223 | 4 | futures.reserve(std::ranges::size(fns)); |
224 | | |
225 | 4 | { |
226 | 4 | LOCK(m_mutex); |
227 | 4 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; |
228 | 3 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; |
229 | 100 | for (auto&& fn : fns) { |
230 | 100 | PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)}; |
231 | 100 | futures.emplace_back(task.get_future()); |
232 | 100 | m_work_queue.emplace(std::move(task)); |
233 | 100 | } |
234 | 2 | } |
235 | 0 | m_cv.notify_all(); |
236 | 2 | return {std::move(futures)}; |
237 | 3 | } util::Expected<std::vector<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>, std::allocator<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>>>, ThreadPool::SubmitError> ThreadPool::Submit<std::vector<std::function<void ()>, std::allocator<std::function<void ()>>>>(T&&) Line | Count | Source | 221 | 2 | { | 222 | 2 | std::vector<RangeFuture<R>> futures; | 223 | 2 | futures.reserve(std::ranges::size(fns)); | 224 | | | 225 | 2 | { | 226 | 2 | LOCK(m_mutex); | 227 | 2 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 228 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 229 | 0 | for (auto&& fn : fns) { | 230 | 0 | PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)}; | 231 | 0 | futures.emplace_back(task.get_future()); | 232 | 0 | m_work_queue.emplace(std::move(task)); | 233 | 0 | } | 234 | 0 | } | 235 | 0 | m_cv.notify_all(); | 236 | 0 | return {std::move(futures)}; | 237 | 1 | } |
util::Expected<std::vector<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>, std::allocator<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>>>, ThreadPool::SubmitError> ThreadPool::Submit<std::array<std::function<int ()>, 50ul>>(T&&) Line | Count | Source | 221 | 1 | { | 222 | 1 | std::vector<RangeFuture<R>> futures; | 223 | 1 | futures.reserve(std::ranges::size(fns)); | 224 | | | 225 | 1 | { | 226 | 1 | LOCK(m_mutex); | 227 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 228 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 229 | 50 | for (auto&& fn : fns) { | 230 | 50 | PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)}; | 231 | 50 | futures.emplace_back(task.get_future()); | 232 | 50 | m_work_queue.emplace(std::move(task)); | 233 | 50 | } | 234 | 1 | } | 235 | 0 | m_cv.notify_all(); | 236 | 1 | return {std::move(futures)}; | 237 | 1 | } |
util::Expected<std::vector<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>, std::allocator<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>>>, ThreadPool::SubmitError> ThreadPool::Submit<std::vector<std::function<int ()>, std::allocator<std::function<int ()>>>>(T&&) Line | Count | Source | 221 | 1 | { | 222 | 1 | std::vector<RangeFuture<R>> futures; | 223 | 1 | futures.reserve(std::ranges::size(fns)); | 224 | | | 225 | 1 | { | 226 | 1 | LOCK(m_mutex); | 227 | 1 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; | 228 | 1 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; | 229 | 50 | for (auto&& fn : fns) { | 230 | 50 | PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)}; | 231 | 50 | futures.emplace_back(task.get_future()); | 232 | 50 | m_work_queue.emplace(std::move(task)); | 233 | 50 | } | 234 | 1 | } | 235 | 0 | m_cv.notify_all(); | 236 | 1 | return {std::move(futures)}; | 237 | 1 | } |
|
238 | | |
239 | | /** |
240 | | * @brief Execute a single queued task synchronously. |
241 | | * Removes one task from the queue and executes it on the calling thread. |
242 | | */ |
243 | | bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
244 | 1.20k | { |
245 | 1.20k | std::packaged_task<void()> task; |
246 | 1.20k | { |
247 | 1.20k | LOCK(m_mutex); |
248 | 1.20k | if (m_work_queue.empty()) return false; |
249 | | |
250 | | // Pop the task |
251 | 41 | task = std::move(m_work_queue.front()); |
252 | 41 | m_work_queue.pop(); |
253 | 41 | } |
254 | 0 | task(); |
255 | 41 | return true; |
256 | 1.20k | } |
257 | | |
258 | | /** |
259 | | * @brief Stop accepting new tasks and begin asynchronous shutdown. |
260 | | * |
261 | | * Wakes all worker threads so they can drain the queue and exit. |
262 | | * Unlike Stop(), this function does not wait for threads to finish. |
263 | | * |
264 | | * Note: The next step in the pool lifecycle is calling Stop(), which |
265 | | * releases any dangling resources and resets the pool state |
266 | | * for shutdown or restart. |
267 | | */ |
268 | | void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
269 | 1.14k | { |
270 | 1.14k | WITH_LOCK(m_mutex, m_interrupt = true); |
271 | 1.14k | m_cv.notify_all(); |
272 | 1.14k | } |
273 | | |
274 | | size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
275 | 171k | { |
276 | 171k | return WITH_LOCK(m_mutex, return m_work_queue.size()); |
277 | 171k | } |
278 | | |
279 | | size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
280 | 4.96k | { |
281 | 4.96k | return WITH_LOCK(m_mutex, return m_workers.size()); |
282 | 4.96k | } |
283 | | }; |
284 | | |
285 | 7 | constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept { |
286 | 7 | switch (err) { |
287 | 4 | case ThreadPool::SubmitError::Inactive: |
288 | 4 | return "No active workers"; |
289 | 3 | case ThreadPool::SubmitError::Interrupted: |
290 | 3 | return "Interrupted"; |
291 | 7 | } |
292 | 0 | Assume(false); // Unreachable |
293 | 0 | return "Unknown error"; |
294 | 7 | } |
295 | | |
296 | | #endif // BITCOIN_UTIL_THREADPOOL_H |