Coverage Report

Created: 2026-04-29 19:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/tmp/bitcoin/src/util/threadpool.h
Line
Count
Source
1
// Copyright (c) The Bitcoin Core developers
2
// Distributed under the MIT software license, see the accompanying
3
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
4
5
#ifndef BITCOIN_UTIL_THREADPOOL_H
6
#define BITCOIN_UTIL_THREADPOOL_H
7
8
#include <sync.h>
9
#include <tinyformat.h>
10
#include <util/check.h>
11
#include <util/expected.h>
12
#include <util/thread.h>
13
14
#include <algorithm>
15
#include <condition_variable>
16
#include <functional>
17
#include <future>
18
#include <queue>
19
#include <ranges>
20
#include <thread>
21
#include <type_traits>
22
#include <utility>
23
#include <vector>
24
25
/**
26
 * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
27
 *
28
 * The thread pool maintains a set of worker threads that consume and execute
29
 * tasks submitted through Submit(). Once started, tasks can be queued and
30
 * processed asynchronously until Stop() is called.
31
 *
32
 * ### Thread-safety and lifecycle
33
 * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
34
 *   Calling `Stop()` from a worker thread will deadlock, as it waits for all
35
 *   workers to join, including the current one.
36
 *
37
 * - `Submit()` can be called from any thread, including workers. It safely
38
 *   enqueues new work for execution as long as the pool has active workers.
39
 *
40
 * - `Interrupt()` stops new task submission and lets queued ones drain
41
 *   in the background. Callers can continue other shutdown steps and call
42
 *   Stop() at the end to ensure no remaining tasks are left to execute.
43
 *
44
 * - `Stop()` prevents further task submission and blocks until all the
45
 *   queued ones are completed.
46
 */
47
class ThreadPool
48
{
49
private:
50
    std::string m_name;
51
    Mutex m_mutex;
52
    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
53
    std::condition_variable m_cv;
54
    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
55
    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
56
    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
57
    bool m_interrupt GUARDED_BY(m_mutex){false};
58
    std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
59
60
    void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
61
2.25k
    {
62
2.25k
        WAIT_LOCK(m_mutex, wait_lock);
63
173k
        for (;;) {
64
173k
            std::packaged_task<void()> task;
65
173k
            {
66
                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
67
173k
                if (!m_interrupt && m_work_queue.empty()) {
68
                    // Block until the pool is interrupted or a task is available.
69
337k
                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
70
167k
                }
71
72
                // If stopped and no work left, exit worker
73
173k
                if (m_interrupt && m_work_queue.empty()) {
74
2.25k
                    return;
75
2.25k
                }
76
77
171k
                task = std::move(m_work_queue.front());
78
171k
                m_work_queue.pop();
79
171k
            }
80
81
0
            {
82
                // Execute the task without the lock
83
171k
                REVERSE_LOCK(wait_lock, m_mutex);
84
171k
                task();
85
171k
            }
86
171k
        }
87
2.25k
    }
88
89
public:
90
1.34k
    explicit ThreadPool(const std::string& name) : m_name(name) {}
91
92
    ~ThreadPool()
93
15
    {
94
15
        Stop(); // In case it hasn't been stopped.
95
15
    }
96
97
    /**
98
     * @brief Start worker threads.
99
     *
100
     * Creates and launches `num_workers` threads that begin executing tasks
101
     * from the queue. If the pool is already started, throws.
102
     *
103
     * Must be called from a controller (non-worker) thread.
104
     */
105
    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
106
1.10k
    {
107
1.10k
        assert(num_workers > 0);
108
1.10k
        LOCK(m_mutex);
109
1.10k
        if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
110
1.09k
        if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
111
112
        // Create workers
113
1.09k
        m_workers.reserve(num_workers);
114
3.35k
        for (int i = 0; i < num_workers; i++) {
115
2.25k
            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
116
2.25k
        }
117
1.09k
    }
118
119
    /**
120
     * @brief Stop all worker threads and wait for them to exit.
121
     *
122
     * Sets the interrupt flag, wakes all waiting workers, and joins them.
123
     * Any remaining tasks in the queue will be processed before returning.
124
     *
125
     * Must be called from a controller (non-worker) thread.
126
     * Concurrent calls to Start() will be rejected while Stop() is in progress.
127
     */
128
    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
129
1.16k
    {
130
        // Notify workers and join them
131
1.16k
        std::vector<std::thread> threads_to_join;
132
1.16k
        {
133
1.16k
            LOCK(m_mutex);
134
            // Ensure Stop() is not called from a worker thread while workers are still registered,
135
            // otherwise a self-join deadlock would occur.
136
1.16k
            auto id = std::this_thread::get_id();
137
1.16k
            for (const auto& worker : m_workers) assert(worker.get_id() != id);
138
            // Early shutdown to return right away on any concurrent Submit() call
139
1.16k
            m_interrupt = true;
140
1.16k
            threads_to_join.swap(m_workers);
141
1.16k
        }
142
0
        m_cv.notify_all();
143
        // Help draining queue
144
1.18k
        while (ProcessTask()) {}
145
        // Free resources
146
2.25k
        for (auto& worker : threads_to_join) worker.join();
147
148
        // Since we currently wait for tasks completion, sanity-check empty queue
149
1.16k
        LOCK(m_mutex);
150
1.16k
        Assume(m_work_queue.empty());
151
        // Re-allow Start() now that all workers have exited
152
1.16k
        m_interrupt = false;
153
1.16k
    }
154
155
    enum class SubmitError {
156
        Inactive,
157
        Interrupted,
158
    };
159
160
    template <class F>
161
    using Future = std::future<std::invoke_result_t<F>>;
162
163
    template <class R>
164
    using RangeFuture = Future<std::ranges::range_reference_t<R>>;
165
166
    template <class F>
167
    using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>;
168
169
    /**
170
     * @brief Enqueues a new task for asynchronous execution.
171
     *
172
     * @param  fn Callable to execute asynchronously.
173
     * @return On success, a future containing fn's result.
174
     *         On failure, an error indicating why the task was rejected:
175
     *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
176
     *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
177
     *
178
     * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
179
     *
180
     * @warning Ignoring the returned future requires guarding the task against
181
     *          uncaught exceptions, as they would otherwise be silently discarded.
182
     */
183
    template <class F>
184
    [[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
185
171k
    {
186
171k
        PackagedTask<F> task{std::forward<F>(fn)};
187
171k
        auto future{task.get_future()};
188
171k
        {
189
171k
            LOCK(m_mutex);
190
171k
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
171k
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
171k
            m_work_queue.emplace(std::move(task));
194
171k
        }
195
0
        m_cv.notify_one();
196
171k
        return {std::move(future)};
197
171k
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::BlockWorkers(ThreadPool&, std::counting_semaphore<2147483647l>&, unsigned long)::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::BlockWorkers(ThreadPool&, std::counting_semaphore<2147483647l>&, unsigned long)::$_0>(threadpool_tests::BlockWorkers(ThreadPool&, std::counting_semaphore<2147483647l>&, unsigned long)::$_0&&)
Line
Count
Source
185
37
    {
186
37
        PackagedTask<F> task{std::forward<F>(fn)};
187
37
        auto future{task.get_future()};
188
37
        {
189
37
            LOCK(m_mutex);
190
37
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
37
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
37
            m_work_queue.emplace(std::move(task));
194
37
        }
195
0
        m_cv.notify_one();
196
37
        return {std::move(future)};
197
37
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::submit_tasks_complete_successfully::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::submit_tasks_complete_successfully::test_method()::$_0>(threadpool_tests::submit_tasks_complete_successfully::test_method()::$_0&&)
Line
Count
Source
185
50
    {
186
50
        PackagedTask<F> task{std::forward<F>(fn)};
187
50
        auto future{task.get_future()};
188
50
        {
189
50
            LOCK(m_mutex);
190
50
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
50
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
50
            m_work_queue.emplace(std::move(task));
194
50
        }
195
0
        m_cv.notify_one();
196
50
        return {std::move(future)};
197
50
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::single_available_worker_executes_all_tasks::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::single_available_worker_executes_all_tasks::test_method()::$_0>(threadpool_tests::single_available_worker_executes_all_tasks::test_method()::$_0&&)
Line
Count
Source
185
15
    {
186
15
        PackagedTask<F> task{std::forward<F>(fn)};
187
15
        auto future{task.get_future()};
188
15
        {
189
15
            LOCK(m_mutex);
190
15
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
15
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
15
            m_work_queue.emplace(std::move(task));
194
15
        }
195
0
        m_cv.notify_one();
196
15
        return {std::move(future)};
197
15
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::wait_for_task_to_finish::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::wait_for_task_to_finish::test_method()::$_0>(threadpool_tests::wait_for_task_to_finish::test_method()::$_0&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
1
            m_work_queue.emplace(std::move(task));
194
1
        }
195
0
        m_cv.notify_one();
196
1
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::get_result_from_completed_task::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::get_result_from_completed_task::test_method()::$_0>(threadpool_tests::get_result_from_completed_task::test_method()::$_0&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
1
            m_work_queue.emplace(std::move(task));
194
1
        }
195
0
        m_cv.notify_one();
196
1
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::get_result_from_completed_task::test_method()::$_1>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::get_result_from_completed_task::test_method()::$_1>(threadpool_tests::get_result_from_completed_task::test_method()::$_1&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
1
            m_work_queue.emplace(std::move(task));
194
1
        }
195
0
        m_cv.notify_one();
196
1
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::task_exception_propagates_to_future::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::task_exception_propagates_to_future::test_method()::$_0>(threadpool_tests::task_exception_propagates_to_future::test_method()::$_0&&)
Line
Count
Source
185
5
    {
186
5
        PackagedTask<F> task{std::forward<F>(fn)};
187
5
        auto future{task.get_future()};
188
5
        {
189
5
            LOCK(m_mutex);
190
5
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
5
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
5
            m_work_queue.emplace(std::move(task));
194
5
        }
195
0
        m_cv.notify_one();
196
5
        return {std::move(future)};
197
5
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::process_tasks_manually_when_workers_busy::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::process_tasks_manually_when_workers_busy::test_method()::$_0>(threadpool_tests::process_tasks_manually_when_workers_busy::test_method()::$_0&&)
Line
Count
Source
185
20
    {
186
20
        PackagedTask<F> task{std::forward<F>(fn)};
187
20
        auto future{task.get_future()};
188
20
        {
189
20
            LOCK(m_mutex);
190
20
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
20
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
20
            m_work_queue.emplace(std::move(task));
194
20
        }
195
0
        m_cv.notify_one();
196
20
        return {std::move(future)};
197
20
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::recursive_task_submission::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::recursive_task_submission::test_method()::$_0>(threadpool_tests::recursive_task_submission::test_method()::$_0&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
1
            m_work_queue.emplace(std::move(task));
194
1
        }
195
0
        m_cv.notify_one();
196
1
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::recursive_task_submission::test_method()::$_0::operator()() const::'lambda'()>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::recursive_task_submission::test_method()::$_0::operator()() const::'lambda'()>(threadpool_tests::recursive_task_submission::test_method()::$_0::operator()() const::'lambda'()&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
1
            m_work_queue.emplace(std::move(task));
194
1
        }
195
0
        m_cv.notify_one();
196
1
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::task_submitted_while_busy_completes::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::task_submitted_while_busy_completes::test_method()::$_0>(threadpool_tests::task_submitted_while_busy_completes::test_method()::$_0&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
1
            m_work_queue.emplace(std::move(task));
194
1
        }
195
0
        m_cv.notify_one();
196
1
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::congestion_more_workers_than_cores::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::congestion_more_workers_than_cores::test_method()::$_0>(threadpool_tests::congestion_more_workers_than_cores::test_method()::$_0&&)
Line
Count
Source
185
200
    {
186
200
        PackagedTask<F> task{std::forward<F>(fn)};
187
200
        auto future{task.get_future()};
188
200
        {
189
200
            LOCK(m_mutex);
190
200
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
200
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
200
            m_work_queue.emplace(std::move(task));
194
200
        }
195
0
        m_cv.notify_one();
196
200
        return {std::move(future)};
197
200
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_1>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_1>(threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_1&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
1
            m_work_queue.emplace(std::move(task));
194
1
        }
195
0
        m_cv.notify_one();
196
1
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::queued_tasks_complete_after_interrupt::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::queued_tasks_complete_after_interrupt::test_method()::$_0>(threadpool_tests::queued_tasks_complete_after_interrupt::test_method()::$_0&&)
Line
Count
Source
185
10
    {
186
10
        PackagedTask<F> task{std::forward<F>(fn)};
187
10
        auto future{task.get_future()};
188
10
        {
189
10
            LOCK(m_mutex);
190
10
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
10
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
10
            m_work_queue.emplace(std::move(task));
194
10
        }
195
0
        m_cv.notify_one();
196
10
        return {std::move(future)};
197
10
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::stop_active_wait_drains_queue::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::stop_active_wait_drains_queue::test_method()::$_0>(threadpool_tests::stop_active_wait_drains_queue::test_method()::$_0&&)
Line
Count
Source
185
20
    {
186
20
        PackagedTask<F> task{std::forward<F>(fn)};
187
20
        auto future{task.get_future()};
188
20
        {
189
20
            LOCK(m_mutex);
190
20
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
20
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
20
            m_work_queue.emplace(std::move(task));
194
20
        }
195
0
        m_cv.notify_one();
196
20
        return {std::move(future)};
197
20
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::submit_fails_with_correct_error::test_method()::$_0 const&>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::submit_fails_with_correct_error::test_method()::$_0 const&>(threadpool_tests::submit_fails_with_correct_error::test_method()::$_0 const&)
Line
Count
Source
185
4
    {
186
4
        PackagedTask<F> task{std::forward<F>(fn)};
187
4
        auto future{task.get_future()};
188
4
        {
189
4
            LOCK(m_mutex);
190
4
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
0
            m_work_queue.emplace(std::move(task));
194
0
        }
195
0
        m_cv.notify_one();
196
0
        return {std::move(future)};
197
1
    }
threadpool_tests.cpp:util::Expected<std::future<std::invoke_result<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_0>(threadpool_tests::interrupt_blocks_new_submissions::test_method()::$_0&&)
Line
Count
Source
185
1
    {
186
1
        PackagedTask<F> task{std::forward<F>(fn)};
187
1
        auto future{task.get_future()};
188
1
        {
189
1
            LOCK(m_mutex);
190
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
0
            m_work_queue.emplace(std::move(task));
194
0
        }
195
0
        m_cv.notify_one();
196
0
        return {std::move(future)};
197
1
    }
httpserver.cpp:util::Expected<std::future<std::invoke_result<http_request_cb(evhttp_request*, void*)::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<http_request_cb(evhttp_request*, void*)::$_0>(http_request_cb(evhttp_request*, void*)::$_0&&)
Line
Count
Source
185
171k
    {
186
171k
        PackagedTask<F> task{std::forward<F>(fn)};
187
171k
        auto future{task.get_future()};
188
171k
        {
189
171k
            LOCK(m_mutex);
190
171k
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
171k
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
171k
            m_work_queue.emplace(std::move(task));
194
171k
        }
195
0
        m_cv.notify_one();
196
171k
        return {std::move(future)};
197
171k
    }
198
199
    /**
200
     * @brief Enqueues a range of tasks for asynchronous execution.
201
     *
202
     * @param  fns Callables to execute asynchronously.
203
     * @return On success, a vector of futures containing each element of fns's result in order.
204
     *         On failure, an error indicating why the range was rejected:
205
     *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
206
     *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
207
     *
208
     * This is more efficient when submitting many tasks at once, since
209
     * the queue lock is only taken once internally and all worker threads are
210
     * notified. For single tasks, Submit() is preferred since only one worker
211
     * thread is notified.
212
     *
213
     * Thread-safe: Can be called from any thread, including within submitted callables.
214
     *
215
     * @warning Ignoring the returned futures requires guarding tasks against
216
     *          uncaught exceptions, as they would otherwise be silently discarded.
217
     */
218
    template <std::ranges::sized_range R>
219
        requires(!std::is_lvalue_reference_v<R>)
220
    [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
221
4
    {
222
4
        std::vector<RangeFuture<R>> futures;
223
4
        futures.reserve(std::ranges::size(fns));
224
225
4
        {
226
4
            LOCK(m_mutex);
227
4
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228
3
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229
100
            for (auto&& fn : fns) {
230
100
                PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
231
100
                futures.emplace_back(task.get_future());
232
100
                m_work_queue.emplace(std::move(task));
233
100
            }
234
2
        }
235
0
        m_cv.notify_all();
236
2
        return {std::move(futures)};
237
3
    }
util::Expected<std::vector<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>, std::allocator<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>>>, ThreadPool::SubmitError> ThreadPool::Submit<std::vector<std::function<void ()>, std::allocator<std::function<void ()>>>>(T&&)
Line
Count
Source
221
2
    {
222
2
        std::vector<RangeFuture<R>> futures;
223
2
        futures.reserve(std::ranges::size(fns));
224
225
2
        {
226
2
            LOCK(m_mutex);
227
2
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229
0
            for (auto&& fn : fns) {
230
0
                PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
231
0
                futures.emplace_back(task.get_future());
232
0
                m_work_queue.emplace(std::move(task));
233
0
            }
234
0
        }
235
0
        m_cv.notify_all();
236
0
        return {std::move(futures)};
237
1
    }
util::Expected<std::vector<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>, std::allocator<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>>>, ThreadPool::SubmitError> ThreadPool::Submit<std::array<std::function<int ()>, 50ul>>(T&&)
Line
Count
Source
221
1
    {
222
1
        std::vector<RangeFuture<R>> futures;
223
1
        futures.reserve(std::ranges::size(fns));
224
225
1
        {
226
1
            LOCK(m_mutex);
227
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229
50
            for (auto&& fn : fns) {
230
50
                PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
231
50
                futures.emplace_back(task.get_future());
232
50
                m_work_queue.emplace(std::move(task));
233
50
            }
234
1
        }
235
0
        m_cv.notify_all();
236
1
        return {std::move(futures)};
237
1
    }
util::Expected<std::vector<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>, std::allocator<std::future<std::invoke_result<decltype(*std::declval<decltype(ranges::__access::__begin(std::declval<T&>()))&>())>::type>>>, ThreadPool::SubmitError> ThreadPool::Submit<std::vector<std::function<int ()>, std::allocator<std::function<int ()>>>>(T&&)
Line
Count
Source
221
1
    {
222
1
        std::vector<RangeFuture<R>> futures;
223
1
        futures.reserve(std::ranges::size(fns));
224
225
1
        {
226
1
            LOCK(m_mutex);
227
1
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228
1
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229
50
            for (auto&& fn : fns) {
230
50
                PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
231
50
                futures.emplace_back(task.get_future());
232
50
                m_work_queue.emplace(std::move(task));
233
50
            }
234
1
        }
235
0
        m_cv.notify_all();
236
1
        return {std::move(futures)};
237
1
    }
238
239
    /**
240
     * @brief Execute a single queued task synchronously.
241
     * Removes one task from the queue and executes it on the calling thread.
242
     */
243
    bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
244
1.20k
    {
245
1.20k
        std::packaged_task<void()> task;
246
1.20k
        {
247
1.20k
            LOCK(m_mutex);
248
1.20k
            if (m_work_queue.empty()) return false;
249
250
            // Pop the task
251
41
            task = std::move(m_work_queue.front());
252
41
            m_work_queue.pop();
253
41
        }
254
0
        task();
255
41
        return true;
256
1.20k
    }
257
258
    /**
259
     * @brief Stop accepting new tasks and begin asynchronous shutdown.
260
     *
261
     * Wakes all worker threads so they can drain the queue and exit.
262
     * Unlike Stop(), this function does not wait for threads to finish.
263
     *
264
     * Note: The next step in the pool lifecycle is calling Stop(), which
265
     *       releases any dangling resources and resets the pool state
266
     *       for shutdown or restart.
267
     */
268
    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
269
1.14k
    {
270
1.14k
        WITH_LOCK(m_mutex, m_interrupt = true);
271
1.14k
        m_cv.notify_all();
272
1.14k
    }
273
274
    size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
275
171k
    {
276
171k
        return WITH_LOCK(m_mutex, return m_work_queue.size());
277
171k
    }
278
279
    size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
280
4.96k
    {
281
4.96k
        return WITH_LOCK(m_mutex, return m_workers.size());
282
4.96k
    }
283
};
284
285
7
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
286
7
    switch (err) {
287
4
        case ThreadPool::SubmitError::Inactive:
288
4
            return "No active workers";
289
3
        case ThreadPool::SubmitError::Interrupted:
290
3
            return "Interrupted";
291
7
    }
292
0
    Assume(false); // Unreachable
293
0
    return "Unknown error";
294
7
}
295
296
#endif // BITCOIN_UTIL_THREADPOOL_H