/tmp/bitcoin/src/checkqueue.h
Line | Count | Source |
1 | | // Copyright (c) 2012-present The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or http://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #ifndef BITCOIN_CHECKQUEUE_H |
6 | | #define BITCOIN_CHECKQUEUE_H |
7 | | |
8 | | #include <sync.h> |
9 | | #include <tinyformat.h> |
10 | | #include <util/log.h> |
11 | | #include <util/threadnames.h> |
12 | | |
13 | | #include <algorithm> |
14 | | #include <iterator> |
15 | | #include <optional> |
16 | | #include <vector> |
17 | | |
18 | | /** |
19 | | * Queue for verifications that have to be performed. |
20 | | * The verifications are represented by a type T, which must provide an |
21 | | * operator(), returning an std::optional<R>. |
22 | | * |
23 | | * The overall result of the computation is std::nullopt if all invocations |
24 | | * return std::nullopt, or one of the other results otherwise. |
25 | | * |
26 | | * One thread (the master) is assumed to push batches of verifications |
27 | | * onto the queue, where they are processed by N-1 worker threads. When |
28 | | * the master is done adding work, it temporarily joins the worker pool |
29 | | * as an N'th worker, until all jobs are done. |
30 | | * |
31 | | */ |
32 | | template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>> |
33 | | class CCheckQueue |
34 | | { |
35 | | private: |
36 | | //! Mutex to protect the inner state |
37 | | Mutex m_mutex; |
38 | | |
39 | | //! Worker threads block on this when out of work |
40 | | std::condition_variable m_worker_cv; |
41 | | |
42 | | //! Master thread blocks on this when out of work |
43 | | std::condition_variable m_master_cv; |
44 | | |
45 | | //! The queue of elements to be processed. |
46 | | //! As the order of booleans doesn't matter, it is used as a LIFO (stack) |
47 | | std::vector<T> queue GUARDED_BY(m_mutex); |
48 | | |
49 | | //! The number of workers (including the master) that are idle. |
50 | | int nIdle GUARDED_BY(m_mutex){0}; |
51 | | |
52 | | //! The total number of workers (including the master). |
53 | | int nTotal GUARDED_BY(m_mutex){0}; |
54 | | |
55 | | //! The temporary evaluation result. |
56 | | std::optional<R> m_result GUARDED_BY(m_mutex); |
57 | | |
58 | | /** |
59 | | * Number of verifications that haven't completed yet. |
60 | | * This includes elements that are no longer queued, but still in the |
61 | | * worker's own batches. |
62 | | */ |
63 | | unsigned int nTodo GUARDED_BY(m_mutex){0}; |
64 | | |
65 | | //! The maximum number of elements to be processed in one batch |
66 | | const unsigned int nBatchSize; |
67 | | |
68 | | std::vector<std::thread> m_worker_threads; |
69 | | bool m_request_stop GUARDED_BY(m_mutex){false}; |
70 | | |
71 | | /// \anchor checkqueue |
72 | | /** Internal function that does bulk of the verification work. If fMaster, return the final result. */ |
73 | | std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
74 | 158k | { |
75 | 158k | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; |
76 | 158k | std::vector<T> vChecks; |
77 | 158k | vChecks.reserve(nBatchSize); |
78 | 158k | unsigned int nNow = 0; |
79 | 158k | std::optional<R> local_result; |
80 | 158k | bool do_work; |
81 | 6.36M | do { |
82 | 6.36M | { |
83 | 6.36M | WAIT_LOCK(m_mutex, lock); |
84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) |
85 | 6.36M | if (nNow) { |
86 | 6.20M | if (local_result.has_value() && !m_result.has_value()) { |
87 | 3.63k | std::swap(local_result, m_result); |
88 | 3.63k | } |
89 | 6.20M | nTodo -= nNow; |
90 | 6.20M | if (nTodo == 0 && !fMaster) { |
91 | | // We processed the last element; inform the master it can exit and return the result |
92 | 1.12M | m_master_cv.notify_one(); |
93 | 1.12M | } |
94 | 6.20M | } else { |
95 | | // first iteration |
96 | 158k | nTotal++; |
97 | 158k | } |
98 | | // logically, the do loop starts here |
99 | 9.72M | while (queue.empty() && !m_request_stop) { |
100 | 3.51M | if (fMaster && nTodo == 0) { |
101 | 157k | nTotal--; |
102 | 157k | std::optional<R> to_return = std::move(m_result); |
103 | | // reset the status for new work later |
104 | 157k | m_result = std::nullopt; |
105 | | // return the current status |
106 | 157k | return to_return; |
107 | 157k | } |
108 | 3.36M | nIdle++; |
109 | 3.36M | cond.wait(lock); // wait |
110 | 3.36M | nIdle--; |
111 | 3.36M | } |
112 | 6.20M | if (m_request_stop) { |
113 | | // return value does not matter, because m_request_stop is only set in the destructor. |
114 | 1.42k | return std::nullopt; |
115 | 1.42k | } |
116 | | |
117 | | // Decide how many work units to process now. |
118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so |
119 | | // all workers finish approximately simultaneously. |
120 | | // * Try to account for idle jobs which will instantly start helping. |
121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. |
122 | 6.20M | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); |
123 | 6.20M | auto start_it = queue.end() - nNow; |
124 | 6.20M | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); |
125 | 6.20M | queue.erase(start_it, queue.end()); |
126 | | // Check whether we need to do work at all |
127 | 6.20M | do_work = !m_result.has_value(); |
128 | 6.20M | } |
129 | | // execute work |
130 | 6.20M | if (do_work) { |
131 | 11.8M | for (T& check : vChecks) { |
132 | 11.8M | local_result = check(); |
133 | 11.8M | if (local_result.has_value()) break; |
134 | 11.8M | } |
135 | 6.19M | } |
136 | 6.20M | vChecks.clear(); |
137 | 6.20M | } while (true); |
138 | 158k | } CCheckQueue<FakeCheck, int>::Loop(bool) Line | Count | Source | 74 | 7 | { | 75 | 7 | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; | 76 | 7 | std::vector<T> vChecks; | 77 | 7 | vChecks.reserve(nBatchSize); | 78 | 7 | unsigned int nNow = 0; | 79 | 7 | std::optional<R> local_result; | 80 | 7 | bool do_work; | 81 | 7 | do { | 82 | 7 | { | 83 | 7 | WAIT_LOCK(m_mutex, lock); | 84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | 85 | 7 | if (nNow) { | 86 | 0 | if (local_result.has_value() && !m_result.has_value()) { | 87 | 0 | std::swap(local_result, m_result); | 88 | 0 | } | 89 | 0 | nTodo -= nNow; | 90 | 0 | if (nTodo == 0 && !fMaster) { | 91 | | // We processed the last element; inform the master it can exit and return the result | 92 | 0 | m_master_cv.notify_one(); | 93 | 0 | } | 94 | 7 | } else { | 95 | | // first iteration | 96 | 7 | nTotal++; | 97 | 7 | } | 98 | | // logically, the do loop starts here | 99 | 10 | while (queue.empty() && !m_request_stop) { | 100 | 7 | if (fMaster && nTodo == 0) { | 101 | 4 | nTotal--; | 102 | 4 | std::optional<R> to_return = std::move(m_result); | 103 | | // reset the status for new work later | 104 | 4 | m_result = std::nullopt; | 105 | | // return the current status | 106 | 4 | return to_return; | 107 | 4 | } | 108 | 3 | nIdle++; | 109 | 3 | cond.wait(lock); // wait | 110 | 3 | nIdle--; | 111 | 3 | } | 112 | 3 | if (m_request_stop) { | 113 | | // return value does not matter, because m_request_stop is only set in the destructor. | 114 | 3 | return std::nullopt; | 115 | 3 | } | 116 | | | 117 | | // Decide how many work units to process now. | 118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so | 119 | | // all workers finish approximately simultaneously. | 120 | | // * Try to account for idle jobs which will instantly start helping. | 121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | 122 | 0 | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | 123 | 0 | auto start_it = queue.end() - nNow; | 124 | 0 | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); | 125 | 0 | queue.erase(start_it, queue.end()); | 126 | | // Check whether we need to do work at all | 127 | 0 | do_work = !m_result.has_value(); | 128 | 0 | } | 129 | | // execute work | 130 | 0 | if (do_work) { | 131 | 0 | for (T& check : vChecks) { | 132 | 0 | local_result = check(); | 133 | 0 | if (local_result.has_value()) break; | 134 | 0 | } | 135 | 0 | } | 136 | 0 | vChecks.clear(); | 137 | 0 | } while (true); | 138 | 7 | } |
CCheckQueue<FakeCheckCheckCompletion, int>::Loop(bool) Line | Count | Source | 74 | 230 | { | 75 | 230 | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; | 76 | 230 | std::vector<T> vChecks; | 77 | 230 | vChecks.reserve(nBatchSize); | 78 | 230 | unsigned int nNow = 0; | 79 | 230 | std::optional<R> local_result; | 80 | 230 | bool do_work; | 81 | 5.35M | do { | 82 | 5.35M | { | 83 | 5.35M | WAIT_LOCK(m_mutex, lock); | 84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | 85 | 5.35M | if (nNow) { | 86 | 5.35M | if (local_result.has_value() && !m_result.has_value()) { | 87 | 0 | std::swap(local_result, m_result); | 88 | 0 | } | 89 | 5.35M | nTodo -= nNow; | 90 | 5.35M | if (nTodo == 0 && !fMaster) { | 91 | | // We processed the last element; inform the master it can exit and return the result | 92 | 951k | m_master_cv.notify_one(); | 93 | 951k | } | 94 | 18.4E | } else { | 95 | | // first iteration | 96 | 18.4E | nTotal++; | 97 | 18.4E | } | 98 | | // logically, the do loop starts here | 99 | 8.23M | while (queue.empty() && !m_request_stop) { | 100 | 2.88M | if (fMaster && nTodo == 0) { | 101 | 218 | nTotal--; | 102 | 218 | std::optional<R> to_return = std::move(m_result); | 103 | | // reset the status for new work later | 104 | 218 | m_result = std::nullopt; | 105 | | // return the current status | 106 | 218 | return to_return; | 107 | 218 | } | 108 | 2.88M | nIdle++; | 109 | 2.88M | cond.wait(lock); // wait | 110 | 2.88M | nIdle--; | 111 | 2.88M | } | 112 | 5.35M | if (m_request_stop) { | 113 | | // return value does not matter, because m_request_stop is only set in the destructor. | 114 | 12 | return std::nullopt; | 115 | 12 | } | 116 | | | 117 | | // Decide how many work units to process now. | 118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so | 119 | | // all workers finish approximately simultaneously. | 120 | | // * Try to account for idle jobs which will instantly start helping. | 121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | 122 | 5.35M | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | 123 | 5.35M | auto start_it = queue.end() - nNow; | 124 | 5.35M | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); | 125 | 5.35M | queue.erase(start_it, queue.end()); | 126 | | // Check whether we need to do work at all | 127 | 5.35M | do_work = !m_result.has_value(); | 128 | 5.35M | } | 129 | | // execute work | 130 | 5.35M | if (do_work) { | 131 | 10.7M | for (T& check : vChecks) { | 132 | 10.7M | local_result = check(); | 133 | 10.7M | if (local_result.has_value()) break; | 134 | 10.7M | } | 135 | 5.35M | } | 136 | 5.35M | vChecks.clear(); | 137 | 5.35M | } while (true); | 138 | 230 | } |
CCheckQueue<FixedCheck, int>::Loop(bool) Line | Count | Source | 74 | 1.02k | { | 75 | 1.02k | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; | 76 | 1.02k | std::vector<T> vChecks; | 77 | 1.02k | vChecks.reserve(nBatchSize); | 78 | 1.02k | unsigned int nNow = 0; | 79 | 1.02k | std::optional<R> local_result; | 80 | 1.02k | bool do_work; | 81 | 371k | do { | 82 | 371k | { | 83 | 371k | WAIT_LOCK(m_mutex, lock); | 84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | 85 | 371k | if (nNow) { | 86 | 370k | if (local_result.has_value() && !m_result.has_value()) { | 87 | 1.01k | std::swap(local_result, m_result); | 88 | 1.01k | } | 89 | 370k | nTodo -= nNow; | 90 | 370k | if (nTodo == 0 && !fMaster) { | 91 | | // We processed the last element; inform the master it can exit and return the result | 92 | 69.3k | m_master_cv.notify_one(); | 93 | 69.3k | } | 94 | 370k | } else { | 95 | | // first iteration | 96 | 1.00k | nTotal++; | 97 | 1.00k | } | 98 | | // logically, the do loop starts here | 99 | 576k | while (queue.empty() && !m_request_stop) { | 100 | 205k | if (fMaster && nTodo == 0) { | 101 | 1.02k | nTotal--; | 102 | 1.02k | std::optional<R> to_return = std::move(m_result); | 103 | | // reset the status for new work later | 104 | 1.02k | m_result = std::nullopt; | 105 | | // return the current status | 106 | 1.02k | return to_return; | 107 | 1.02k | } | 108 | 204k | nIdle++; | 109 | 204k | cond.wait(lock); // wait | 110 | 204k | nIdle--; | 111 | 204k | } | 112 | 370k | if (m_request_stop) { | 113 | | // return value does not matter, because m_request_stop is only set in the destructor. | 114 | 6 | return std::nullopt; | 115 | 6 | } | 116 | | | 117 | | // Decide how many work units to process now. | 118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so | 119 | | // all workers finish approximately simultaneously. | 120 | | // * Try to account for idle jobs which will instantly start helping. | 121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | 122 | 370k | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | 123 | 370k | auto start_it = queue.end() - nNow; | 124 | 370k | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); | 125 | 370k | queue.erase(start_it, queue.end()); | 126 | | // Check whether we need to do work at all | 127 | 370k | do_work = !m_result.has_value(); | 128 | 370k | } | 129 | | // execute work | 130 | 370k | if (do_work) { | 131 | 395k | for (T& check : vChecks) { | 132 | 395k | local_result = check(); | 133 | 395k | if (local_result.has_value()) break; | 134 | 395k | } | 135 | 358k | } | 136 | 370k | vChecks.clear(); | 137 | 370k | } while (true); | 138 | 1.02k | } |
CCheckQueue<UniqueCheck, int>::Loop(bool) Line | Count | Source | 74 | 4 | { | 75 | 4 | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; | 76 | 4 | std::vector<T> vChecks; | 77 | 4 | vChecks.reserve(nBatchSize); | 78 | 4 | unsigned int nNow = 0; | 79 | 4 | std::optional<R> local_result; | 80 | 4 | bool do_work; | 81 | 809 | do { | 82 | 809 | { | 83 | 809 | WAIT_LOCK(m_mutex, lock); | 84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | 85 | 809 | if (nNow) { | 86 | 805 | if (local_result.has_value() && !m_result.has_value()) { | 87 | 0 | std::swap(local_result, m_result); | 88 | 0 | } | 89 | 805 | nTodo -= nNow; | 90 | 805 | if (nTodo == 0 && !fMaster) { | 91 | | // We processed the last element; inform the master it can exit and return the result | 92 | 1 | m_master_cv.notify_one(); | 93 | 1 | } | 94 | 805 | } else { | 95 | | // first iteration | 96 | 4 | nTotal++; | 97 | 4 | } | 98 | | // logically, the do loop starts here | 99 | 813 | while (queue.empty() && !m_request_stop) { | 100 | 5 | if (fMaster && nTodo == 0) { | 101 | 1 | nTotal--; | 102 | 1 | std::optional<R> to_return = std::move(m_result); | 103 | | // reset the status for new work later | 104 | 1 | m_result = std::nullopt; | 105 | | // return the current status | 106 | 1 | return to_return; | 107 | 1 | } | 108 | 4 | nIdle++; | 109 | 4 | cond.wait(lock); // wait | 110 | 4 | nIdle--; | 111 | 4 | } | 112 | 808 | if (m_request_stop) { | 113 | | // return value does not matter, because m_request_stop is only set in the destructor. | 114 | 3 | return std::nullopt; | 115 | 3 | } | 116 | | | 117 | | // Decide how many work units to process now. | 118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so | 119 | | // all workers finish approximately simultaneously. | 120 | | // * Try to account for idle jobs which will instantly start helping. | 121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | 122 | 805 | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | 123 | 805 | auto start_it = queue.end() - nNow; | 124 | 805 | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); | 125 | 805 | queue.erase(start_it, queue.end()); | 126 | | // Check whether we need to do work at all | 127 | 805 | do_work = !m_result.has_value(); | 128 | 805 | } | 129 | | // execute work | 130 | 805 | if (do_work) { | 131 | 99.9k | for (T& check : vChecks) { | 132 | 99.9k | local_result = check(); | 133 | 99.9k | if (local_result.has_value()) break; | 134 | 99.9k | } | 135 | 805 | } | 136 | 805 | vChecks.clear(); | 137 | 805 | } while (true); | 138 | 4 | } |
CCheckQueue<MemoryCheck, int>::Loop(bool) Line | Count | Source | 74 | 1.00k | { | 75 | 1.00k | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; | 76 | 1.00k | std::vector<T> vChecks; | 77 | 1.00k | vChecks.reserve(nBatchSize); | 78 | 1.00k | unsigned int nNow = 0; | 79 | 1.00k | std::optional<R> local_result; | 80 | 1.00k | bool do_work; | 81 | 451k | do { | 82 | 451k | { | 83 | 451k | WAIT_LOCK(m_mutex, lock); | 84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | 85 | 451k | if (nNow) { | 86 | 450k | if (local_result.has_value() && !m_result.has_value()) { | 87 | 0 | std::swap(local_result, m_result); | 88 | 0 | } | 89 | 450k | nTodo -= nNow; | 90 | 450k | if (nTodo == 0 && !fMaster) { | 91 | | // We processed the last element; inform the master it can exit and return the result | 92 | 85.0k | m_master_cv.notify_one(); | 93 | 85.0k | } | 94 | 450k | } else { | 95 | | // first iteration | 96 | 990 | nTotal++; | 97 | 990 | } | 98 | | // logically, the do loop starts here | 99 | 704k | while (queue.empty() && !m_request_stop) { | 100 | 253k | if (fMaster && nTodo == 0) { | 101 | 1.00k | nTotal--; | 102 | 1.00k | std::optional<R> to_return = std::move(m_result); | 103 | | // reset the status for new work later | 104 | 1.00k | m_result = std::nullopt; | 105 | | // return the current status | 106 | 1.00k | return to_return; | 107 | 1.00k | } | 108 | 252k | nIdle++; | 109 | 252k | cond.wait(lock); // wait | 110 | 252k | nIdle--; | 111 | 252k | } | 112 | 450k | if (m_request_stop) { | 113 | | // return value does not matter, because m_request_stop is only set in the destructor. | 114 | 3 | return std::nullopt; | 115 | 3 | } | 116 | | | 117 | | // Decide how many work units to process now. | 118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so | 119 | | // all workers finish approximately simultaneously. | 120 | | // * Try to account for idle jobs which will instantly start helping. | 121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | 122 | 450k | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | 123 | 450k | auto start_it = queue.end() - nNow; | 124 | 450k | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); | 125 | 450k | queue.erase(start_it, queue.end()); | 126 | | // Check whether we need to do work at all | 127 | 450k | do_work = !m_result.has_value(); | 128 | 450k | } | 129 | | // execute work | 130 | 450k | if (do_work) { | 131 | 499k | for (T& check : vChecks) { | 132 | 499k | local_result = check(); | 133 | 499k | if (local_result.has_value()) break; | 134 | 499k | } | 135 | 450k | } | 136 | 450k | vChecks.clear(); | 137 | 450k | } while (true); | 138 | 1.00k | } |
CCheckQueue<FrozenCleanupCheck, int>::Loop(bool) Line | Count | Source | 74 | 4 | { | 75 | 4 | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; | 76 | 4 | std::vector<T> vChecks; | 77 | 4 | vChecks.reserve(nBatchSize); | 78 | 4 | unsigned int nNow = 0; | 79 | 4 | std::optional<R> local_result; | 80 | 4 | bool do_work; | 81 | 5 | do { | 82 | 5 | { | 83 | 5 | WAIT_LOCK(m_mutex, lock); | 84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | 85 | 5 | if (nNow) { | 86 | 1 | if (local_result.has_value() && !m_result.has_value()) { | 87 | 0 | std::swap(local_result, m_result); | 88 | 0 | } | 89 | 1 | nTodo -= nNow; | 90 | 1 | if (nTodo == 0 && !fMaster) { | 91 | | // We processed the last element; inform the master it can exit and return the result | 92 | 1 | m_master_cv.notify_one(); | 93 | 1 | } | 94 | 4 | } else { | 95 | | // first iteration | 96 | 4 | nTotal++; | 97 | 4 | } | 98 | | // logically, the do loop starts here | 99 | 8 | while (queue.empty() && !m_request_stop) { | 100 | 4 | if (fMaster && nTodo == 0) { | 101 | 1 | nTotal--; | 102 | 1 | std::optional<R> to_return = std::move(m_result); | 103 | | // reset the status for new work later | 104 | 1 | m_result = std::nullopt; | 105 | | // return the current status | 106 | 1 | return to_return; | 107 | 1 | } | 108 | 3 | nIdle++; | 109 | 3 | cond.wait(lock); // wait | 110 | 3 | nIdle--; | 111 | 3 | } | 112 | 4 | if (m_request_stop) { | 113 | | // return value does not matter, because m_request_stop is only set in the destructor. | 114 | 3 | return std::nullopt; | 115 | 3 | } | 116 | | | 117 | | // Decide how many work units to process now. | 118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so | 119 | | // all workers finish approximately simultaneously. | 120 | | // * Try to account for idle jobs which will instantly start helping. | 121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | 122 | 1 | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | 123 | 1 | auto start_it = queue.end() - nNow; | 124 | 1 | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); | 125 | 1 | queue.erase(start_it, queue.end()); | 126 | | // Check whether we need to do work at all | 127 | 1 | do_work = !m_result.has_value(); | 128 | 1 | } | 129 | | // execute work | 130 | 1 | if (do_work) { | 131 | 1 | for (T& check : vChecks) { | 132 | 1 | local_result = check(); | 133 | 1 | if (local_result.has_value()) break; | 134 | 1 | } | 135 | 1 | } | 136 | 1 | vChecks.clear(); | 137 | 1 | } while (true); | 138 | 4 | } |
CCheckQueue<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::Loop(bool) Line | Count | Source | 74 | 156k | { | 75 | 156k | std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; | 76 | 156k | std::vector<T> vChecks; | 77 | 156k | vChecks.reserve(nBatchSize); | 78 | 156k | unsigned int nNow = 0; | 79 | 156k | std::optional<R> local_result; | 80 | 156k | bool do_work; | 81 | 182k | do { | 82 | 182k | { | 83 | 182k | WAIT_LOCK(m_mutex, lock); | 84 | | // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) | 85 | 182k | if (nNow) { | 86 | 25.6k | if (local_result.has_value() && !m_result.has_value()) { | 87 | 2.62k | std::swap(local_result, m_result); | 88 | 2.62k | } | 89 | 25.6k | nTodo -= nNow; | 90 | 25.6k | if (nTodo == 0 && !fMaster) { | 91 | | // We processed the last element; inform the master it can exit and return the result | 92 | 18.2k | m_master_cv.notify_one(); | 93 | 18.2k | } | 94 | 156k | } else { | 95 | | // first iteration | 96 | 156k | nTotal++; | 97 | 156k | } | 98 | | // logically, the do loop starts here | 99 | 204k | while (queue.empty() && !m_request_stop) { | 100 | 177k | if (fMaster && nTodo == 0) { | 101 | 155k | nTotal--; | 102 | 155k | std::optional<R> to_return = std::move(m_result); | 103 | | // reset the status for new work later | 104 | 155k | m_result = std::nullopt; | 105 | | // return the current status | 106 | 155k | return to_return; | 107 | 155k | } | 108 | 21.9k | nIdle++; | 109 | 21.9k | cond.wait(lock); // wait | 110 | 21.9k | nIdle--; | 111 | 21.9k | } | 112 | 27.0k | if (m_request_stop) { | 113 | | // return value does not matter, because m_request_stop is only set in the destructor. | 114 | 1.39k | return std::nullopt; | 115 | 1.39k | } | 116 | | | 117 | | // Decide how many work units to process now. | 118 | | // * Do not try to do everything at once, but aim for increasingly smaller batches so | 119 | | // all workers finish approximately simultaneously. | 120 | | // * Try to account for idle jobs which will instantly start helping. | 121 | | // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. | 122 | 25.6k | nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); | 123 | 25.6k | auto start_it = queue.end() - nNow; | 124 | 25.6k | vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); | 125 | 25.6k | queue.erase(start_it, queue.end()); | 126 | | // Check whether we need to do work at all | 127 | 25.6k | do_work = !m_result.has_value(); | 128 | 25.6k | } | 129 | | // execute work | 130 | 25.6k | if (do_work) { | 131 | 48.0k | for (T& check : vChecks) { | 132 | 48.0k | local_result = check(); | 133 | 48.0k | if (local_result.has_value()) break; | 134 | 48.0k | } | 135 | 24.6k | } | 136 | 25.6k | vChecks.clear(); | 137 | 25.6k | } while (true); | 138 | 156k | } |
|
139 | | |
140 | | public: |
141 | | //! Mutex to ensure only one concurrent CCheckQueueControl |
142 | | Mutex m_control_mutex; |
143 | | |
144 | | //! Create a new check queue |
145 | | explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) |
146 | 1.22k | : nBatchSize(batch_size) |
147 | 1.22k | { |
148 | 1.22k | LogInfo("Script verification uses %d additional threads", worker_threads_num); |
149 | 1.22k | m_worker_threads.reserve(worker_threads_num); |
150 | 2.64k | for (int n = 0; n < worker_threads_num; ++n) { |
151 | 1.42k | m_worker_threads.emplace_back([this, n]() { |
152 | 1.42k | util::ThreadRename(strprintf("scriptch.%02i", n)); |
153 | 1.42k | Loop(false /* worker thread */); |
154 | 1.42k | }); CCheckQueue<FakeCheckCheckCompletion, int>::CCheckQueue(unsigned int, int)::'lambda'()::operator()() const Line | Count | Source | 151 | 12 | m_worker_threads.emplace_back([this, n]() { | 152 | 12 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 12 | Loop(false /* worker thread */); | 154 | 12 | }); |
CCheckQueue<FixedCheck, int>::CCheckQueue(unsigned int, int)::'lambda'()::operator()() const Line | Count | Source | 151 | 6 | m_worker_threads.emplace_back([this, n]() { | 152 | 6 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 6 | Loop(false /* worker thread */); | 154 | 6 | }); |
CCheckQueue<UniqueCheck, int>::CCheckQueue(unsigned int, int)::'lambda'()::operator()() const Line | Count | Source | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); |
CCheckQueue<MemoryCheck, int>::CCheckQueue(unsigned int, int)::'lambda'()::operator()() const Line | Count | Source | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); |
CCheckQueue<FrozenCleanupCheck, int>::CCheckQueue(unsigned int, int)::'lambda'()::operator()() const Line | Count | Source | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); |
CCheckQueue<FakeCheck, int>::CCheckQueue(unsigned int, int)::'lambda'()::operator()() const Line | Count | Source | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); |
CCheckQueue<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::CCheckQueue(unsigned int, int)::'lambda'()::operator()() const Line | Count | Source | 151 | 1.39k | m_worker_threads.emplace_back([this, n]() { | 152 | 1.39k | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 1.39k | Loop(false /* worker thread */); | 154 | 1.39k | }); |
|
155 | 1.42k | } |
156 | 1.22k | } CCheckQueue<FakeCheckCheckCompletion, int>::CCheckQueue(unsigned int, int) Line | Count | Source | 146 | 4 | : nBatchSize(batch_size) | 147 | 4 | { | 148 | 4 | LogInfo("Script verification uses %d additional threads", worker_threads_num); | 149 | 4 | m_worker_threads.reserve(worker_threads_num); | 150 | 16 | for (int n = 0; n < worker_threads_num; ++n) { | 151 | 12 | m_worker_threads.emplace_back([this, n]() { | 152 | 12 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 12 | Loop(false /* worker thread */); | 154 | 12 | }); | 155 | 12 | } | 156 | 4 | } |
CCheckQueue<FixedCheck, int>::CCheckQueue(unsigned int, int) Line | Count | Source | 146 | 2 | : nBatchSize(batch_size) | 147 | 2 | { | 148 | 2 | LogInfo("Script verification uses %d additional threads", worker_threads_num); | 149 | 2 | m_worker_threads.reserve(worker_threads_num); | 150 | 8 | for (int n = 0; n < worker_threads_num; ++n) { | 151 | 6 | m_worker_threads.emplace_back([this, n]() { | 152 | 6 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 6 | Loop(false /* worker thread */); | 154 | 6 | }); | 155 | 6 | } | 156 | 2 | } |
CCheckQueue<UniqueCheck, int>::CCheckQueue(unsigned int, int) Line | Count | Source | 146 | 1 | : nBatchSize(batch_size) | 147 | 1 | { | 148 | 1 | LogInfo("Script verification uses %d additional threads", worker_threads_num); | 149 | 1 | m_worker_threads.reserve(worker_threads_num); | 150 | 4 | for (int n = 0; n < worker_threads_num; ++n) { | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); | 155 | 3 | } | 156 | 1 | } |
CCheckQueue<MemoryCheck, int>::CCheckQueue(unsigned int, int) Line | Count | Source | 146 | 1 | : nBatchSize(batch_size) | 147 | 1 | { | 148 | 1 | LogInfo("Script verification uses %d additional threads", worker_threads_num); | 149 | 1 | m_worker_threads.reserve(worker_threads_num); | 150 | 4 | for (int n = 0; n < worker_threads_num; ++n) { | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); | 155 | 3 | } | 156 | 1 | } |
CCheckQueue<FrozenCleanupCheck, int>::CCheckQueue(unsigned int, int) Line | Count | Source | 146 | 1 | : nBatchSize(batch_size) | 147 | 1 | { | 148 | 1 | LogInfo("Script verification uses %d additional threads", worker_threads_num); | 149 | 1 | m_worker_threads.reserve(worker_threads_num); | 150 | 4 | for (int n = 0; n < worker_threads_num; ++n) { | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); | 155 | 3 | } | 156 | 1 | } |
CCheckQueue<FakeCheck, int>::CCheckQueue(unsigned int, int) Line | Count | Source | 146 | 1 | : nBatchSize(batch_size) | 147 | 1 | { | 148 | 1 | LogInfo("Script verification uses %d additional threads", worker_threads_num); | 149 | 1 | m_worker_threads.reserve(worker_threads_num); | 150 | 4 | for (int n = 0; n < worker_threads_num; ++n) { | 151 | 3 | m_worker_threads.emplace_back([this, n]() { | 152 | 3 | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 3 | Loop(false /* worker thread */); | 154 | 3 | }); | 155 | 3 | } | 156 | 1 | } |
CCheckQueue<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::CCheckQueue(unsigned int, int) Line | Count | Source | 146 | 1.21k | : nBatchSize(batch_size) | 147 | 1.21k | { | 148 | 1.21k | LogInfo("Script verification uses %d additional threads", worker_threads_num); | 149 | 1.21k | m_worker_threads.reserve(worker_threads_num); | 150 | 2.60k | for (int n = 0; n < worker_threads_num; ++n) { | 151 | 1.39k | m_worker_threads.emplace_back([this, n]() { | 152 | 1.39k | util::ThreadRename(strprintf("scriptch.%02i", n)); | 153 | 1.39k | Loop(false /* worker thread */); | 154 | 1.39k | }); | 155 | 1.39k | } | 156 | 1.21k | } |
|
157 | | |
158 | | // Since this class manages its own resources, which is a thread |
159 | | // pool `m_worker_threads`, copy and move operations are not appropriate. |
160 | | CCheckQueue(const CCheckQueue&) = delete; |
161 | | CCheckQueue& operator=(const CCheckQueue&) = delete; |
162 | | CCheckQueue(CCheckQueue&&) = delete; |
163 | | CCheckQueue& operator=(CCheckQueue&&) = delete; |
164 | | |
165 | | //! Join the execution until completion. If at least one evaluation wasn't successful, return |
166 | | //! its error. |
167 | | std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
168 | 157k | { |
169 | 157k | return Loop(true /* master thread */); |
170 | 157k | } CCheckQueue<FakeCheck, int>::Complete() Line | Count | Source | 168 | 4 | { | 169 | 4 | return Loop(true /* master thread */); | 170 | 4 | } |
CCheckQueue<FakeCheckCheckCompletion, int>::Complete() Line | Count | Source | 168 | 218 | { | 169 | 218 | return Loop(true /* master thread */); | 170 | 218 | } |
CCheckQueue<FixedCheck, int>::Complete() Line | Count | Source | 168 | 1.02k | { | 169 | 1.02k | return Loop(true /* master thread */); | 170 | 1.02k | } |
CCheckQueue<UniqueCheck, int>::Complete() Line | Count | Source | 168 | 1 | { | 169 | 1 | return Loop(true /* master thread */); | 170 | 1 | } |
CCheckQueue<MemoryCheck, int>::Complete() Line | Count | Source | 168 | 1.00k | { | 169 | 1.00k | return Loop(true /* master thread */); | 170 | 1.00k | } |
CCheckQueue<FrozenCleanupCheck, int>::Complete() Line | Count | Source | 168 | 1 | { | 169 | 1 | return Loop(true /* master thread */); | 170 | 1 | } |
CCheckQueue<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::Complete() Line | Count | Source | 168 | 155k | { | 169 | 155k | return Loop(true /* master thread */); | 170 | 155k | } |
|
171 | | |
172 | | //! Add a batch of checks to the queue |
173 | | void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
174 | 2.70M | { |
175 | 2.70M | if (vChecks.empty()) { |
176 | 295k | return; |
177 | 295k | } |
178 | | |
179 | 2.40M | { |
180 | 2.40M | LOCK(m_mutex); |
181 | 2.40M | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); |
182 | 2.40M | nTodo += vChecks.size(); |
183 | 2.40M | } |
184 | | |
185 | 2.40M | if (vChecks.size() == 1) { |
186 | 286k | m_worker_cv.notify_one(); |
187 | 2.12M | } else { |
188 | 2.12M | m_worker_cv.notify_all(); |
189 | 2.12M | } |
190 | 2.40M | } CCheckQueue<FakeCheckCheckCompletion, int>::Add(std::vector<FakeCheckCheckCompletion, std::allocator<FakeCheckCheckCompletion>>&&) Line | Count | Source | 174 | 2.39M | { | 175 | 2.39M | if (vChecks.empty()) { | 176 | 238k | return; | 177 | 238k | } | 178 | | | 179 | 2.15M | { | 180 | 2.15M | LOCK(m_mutex); | 181 | 2.15M | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); | 182 | 2.15M | nTodo += vChecks.size(); | 183 | 2.15M | } | 184 | | | 185 | 2.15M | if (vChecks.size() == 1) { | 186 | 239k | m_worker_cv.notify_one(); | 187 | 1.91M | } else { | 188 | 1.91M | m_worker_cv.notify_all(); | 189 | 1.91M | } | 190 | 2.15M | } |
CCheckQueue<FixedCheck, int>::Add(std::vector<FixedCheck, std::allocator<FixedCheck>>&&) Line | Count | Source | 174 | 111k | { | 175 | 111k | if (vChecks.empty()) { | 176 | 10.9k | return; | 177 | 10.9k | } | 178 | | | 179 | 100k | { | 180 | 100k | LOCK(m_mutex); | 181 | 100k | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); | 182 | 100k | nTodo += vChecks.size(); | 183 | 100k | } | 184 | | | 185 | 100k | if (vChecks.size() == 1) { | 186 | 11.3k | m_worker_cv.notify_one(); | 187 | 89.4k | } else { | 188 | 89.4k | m_worker_cv.notify_all(); | 189 | 89.4k | } | 190 | 100k | } |
CCheckQueue<UniqueCheck, int>::Add(std::vector<UniqueCheck, std::allocator<UniqueCheck>>&&) Line | Count | Source | 174 | 22.0k | { | 175 | 22.0k | if (vChecks.empty()) { | 176 | 2.14k | return; | 177 | 2.14k | } | 178 | | | 179 | 19.9k | { | 180 | 19.9k | LOCK(m_mutex); | 181 | 19.9k | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); | 182 | 19.9k | nTodo += vChecks.size(); | 183 | 19.9k | } | 184 | | | 185 | 19.9k | if (vChecks.size() == 1) { | 186 | 2.18k | m_worker_cv.notify_one(); | 187 | 17.7k | } else { | 188 | 17.7k | m_worker_cv.notify_all(); | 189 | 17.7k | } | 190 | 19.9k | } |
CCheckQueue<MemoryCheck, int>::Add(std::vector<MemoryCheck, std::allocator<MemoryCheck>>&&) Line | Count | Source | 174 | 111k | { | 175 | 111k | if (vChecks.empty()) { | 176 | 10.9k | return; | 177 | 10.9k | } | 178 | | | 179 | 100k | { | 180 | 100k | LOCK(m_mutex); | 181 | 100k | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); | 182 | 100k | nTodo += vChecks.size(); | 183 | 100k | } | 184 | | | 185 | 100k | if (vChecks.size() == 1) { | 186 | 11.3k | m_worker_cv.notify_one(); | 187 | 89.2k | } else { | 188 | 89.2k | m_worker_cv.notify_all(); | 189 | 89.2k | } | 190 | 100k | } |
CCheckQueue<FrozenCleanupCheck, int>::Add(std::vector<FrozenCleanupCheck, std::allocator<FrozenCleanupCheck>>&&) Line | Count | Source | 174 | 1 | { | 175 | 1 | if (vChecks.empty()) { | 176 | 0 | return; | 177 | 0 | } | 178 | | | 179 | 1 | { | 180 | 1 | LOCK(m_mutex); | 181 | 1 | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); | 182 | 1 | nTodo += vChecks.size(); | 183 | 1 | } | 184 | | | 185 | 1 | if (vChecks.size() == 1) { | 186 | 1 | m_worker_cv.notify_one(); | 187 | 1 | } else { | 188 | 0 | m_worker_cv.notify_all(); | 189 | 0 | } | 190 | 1 | } |
CCheckQueue<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::Add(std::vector<CScriptCheck, std::allocator<CScriptCheck>>&&) Line | Count | Source | 174 | 66.2k | { | 175 | 66.2k | if (vChecks.empty()) { | 176 | 32.9k | return; | 177 | 32.9k | } | 178 | | | 179 | 33.3k | { | 180 | 33.3k | LOCK(m_mutex); | 181 | 33.3k | queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); | 182 | 33.3k | nTodo += vChecks.size(); | 183 | 33.3k | } | 184 | | | 185 | 33.3k | if (vChecks.size() == 1) { | 186 | 22.7k | m_worker_cv.notify_one(); | 187 | 22.7k | } else { | 188 | 10.6k | m_worker_cv.notify_all(); | 189 | 10.6k | } | 190 | 33.3k | } |
|
191 | | |
192 | | ~CCheckQueue() |
193 | 1.22k | { |
194 | 1.22k | WITH_LOCK(m_mutex, m_request_stop = true); |
195 | 1.22k | m_worker_cv.notify_all(); |
196 | 1.42k | for (std::thread& t : m_worker_threads) { |
197 | 1.42k | t.join(); |
198 | 1.42k | } |
199 | 1.22k | } CCheckQueue<FakeCheckCheckCompletion, int>::~CCheckQueue() Line | Count | Source | 193 | 4 | { | 194 | 4 | WITH_LOCK(m_mutex, m_request_stop = true); | 195 | 4 | m_worker_cv.notify_all(); | 196 | 12 | for (std::thread& t : m_worker_threads) { | 197 | 12 | t.join(); | 198 | 12 | } | 199 | 4 | } |
CCheckQueue<FixedCheck, int>::~CCheckQueue() Line | Count | Source | 193 | 2 | { | 194 | 2 | WITH_LOCK(m_mutex, m_request_stop = true); | 195 | 2 | m_worker_cv.notify_all(); | 196 | 6 | for (std::thread& t : m_worker_threads) { | 197 | 6 | t.join(); | 198 | 6 | } | 199 | 2 | } |
CCheckQueue<UniqueCheck, int>::~CCheckQueue() Line | Count | Source | 193 | 1 | { | 194 | 1 | WITH_LOCK(m_mutex, m_request_stop = true); | 195 | 1 | m_worker_cv.notify_all(); | 196 | 3 | for (std::thread& t : m_worker_threads) { | 197 | 3 | t.join(); | 198 | 3 | } | 199 | 1 | } |
CCheckQueue<MemoryCheck, int>::~CCheckQueue() Line | Count | Source | 193 | 1 | { | 194 | 1 | WITH_LOCK(m_mutex, m_request_stop = true); | 195 | 1 | m_worker_cv.notify_all(); | 196 | 3 | for (std::thread& t : m_worker_threads) { | 197 | 3 | t.join(); | 198 | 3 | } | 199 | 1 | } |
CCheckQueue<FrozenCleanupCheck, int>::~CCheckQueue() Line | Count | Source | 193 | 1 | { | 194 | 1 | WITH_LOCK(m_mutex, m_request_stop = true); | 195 | 1 | m_worker_cv.notify_all(); | 196 | 3 | for (std::thread& t : m_worker_threads) { | 197 | 3 | t.join(); | 198 | 3 | } | 199 | 1 | } |
CCheckQueue<FakeCheck, int>::~CCheckQueue() Line | Count | Source | 193 | 1 | { | 194 | 1 | WITH_LOCK(m_mutex, m_request_stop = true); | 195 | 1 | m_worker_cv.notify_all(); | 196 | 3 | for (std::thread& t : m_worker_threads) { | 197 | 3 | t.join(); | 198 | 3 | } | 199 | 1 | } |
CCheckQueue<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::~CCheckQueue() Line | Count | Source | 193 | 1.21k | { | 194 | 1.21k | WITH_LOCK(m_mutex, m_request_stop = true); | 195 | 1.21k | m_worker_cv.notify_all(); | 196 | 1.39k | for (std::thread& t : m_worker_threads) { | 197 | 1.39k | t.join(); | 198 | 1.39k | } | 199 | 1.21k | } |
|
200 | | |
201 | 155k | bool HasThreads() const { return !m_worker_threads.empty(); } |
202 | | }; |
203 | | |
204 | | /** |
205 | | * RAII-style controller object for a CCheckQueue that guarantees the passed |
206 | | * queue is finished before continuing. |
207 | | */ |
208 | | template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>> |
209 | | class SCOPED_LOCKABLE CCheckQueueControl |
210 | | { |
211 | | private: |
212 | | CCheckQueue<T, R>& m_queue; |
213 | | UniqueLock<Mutex> m_lock; |
214 | | bool fDone; |
215 | | |
216 | | public: |
217 | | CCheckQueueControl() = delete; |
218 | | CCheckQueueControl(const CCheckQueueControl&) = delete; |
219 | | CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; |
220 | 157k | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {}CCheckQueueControl<FakeCheck, int>::CCheckQueueControl(CCheckQueue<FakeCheck, int>&) Line | Count | Source | 220 | 4 | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} |
CCheckQueueControl<FakeCheckCheckCompletion, int>::CCheckQueueControl(CCheckQueue<FakeCheckCheckCompletion, int>&) Line | Count | Source | 220 | 218 | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} |
CCheckQueueControl<FixedCheck, int>::CCheckQueueControl(CCheckQueue<FixedCheck, int>&) Line | Count | Source | 220 | 1.02k | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} |
CCheckQueueControl<UniqueCheck, int>::CCheckQueueControl(CCheckQueue<UniqueCheck, int>&) Line | Count | Source | 220 | 1 | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} |
CCheckQueueControl<MemoryCheck, int>::CCheckQueueControl(CCheckQueue<MemoryCheck, int>&) Line | Count | Source | 220 | 1.00k | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} |
CCheckQueueControl<FrozenCleanupCheck, int>::CCheckQueueControl(CCheckQueue<FrozenCleanupCheck, int>&) Line | Count | Source | 220 | 1 | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} |
CCheckQueueControl<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::CCheckQueueControl(CCheckQueue<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>&) Line | Count | Source | 220 | 155k | explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} |
|
221 | | |
222 | | std::optional<R> Complete() |
223 | 157k | { |
224 | 157k | auto ret = m_queue.Complete(); |
225 | 157k | fDone = true; |
226 | 157k | return ret; |
227 | 157k | } CCheckQueueControl<FakeCheck, int>::Complete() Line | Count | Source | 223 | 4 | { | 224 | 4 | auto ret = m_queue.Complete(); | 225 | 4 | fDone = true; | 226 | 4 | return ret; | 227 | 4 | } |
CCheckQueueControl<FakeCheckCheckCompletion, int>::Complete() Line | Count | Source | 223 | 218 | { | 224 | 218 | auto ret = m_queue.Complete(); | 225 | 218 | fDone = true; | 226 | 218 | return ret; | 227 | 218 | } |
CCheckQueueControl<FixedCheck, int>::Complete() Line | Count | Source | 223 | 1.02k | { | 224 | 1.02k | auto ret = m_queue.Complete(); | 225 | 1.02k | fDone = true; | 226 | 1.02k | return ret; | 227 | 1.02k | } |
CCheckQueueControl<UniqueCheck, int>::Complete() Line | Count | Source | 223 | 1 | { | 224 | 1 | auto ret = m_queue.Complete(); | 225 | 1 | fDone = true; | 226 | 1 | return ret; | 227 | 1 | } |
CCheckQueueControl<MemoryCheck, int>::Complete() Line | Count | Source | 223 | 1.00k | { | 224 | 1.00k | auto ret = m_queue.Complete(); | 225 | 1.00k | fDone = true; | 226 | 1.00k | return ret; | 227 | 1.00k | } |
CCheckQueueControl<FrozenCleanupCheck, int>::Complete() Line | Count | Source | 223 | 1 | { | 224 | 1 | auto ret = m_queue.Complete(); | 225 | 1 | fDone = true; | 226 | 1 | return ret; | 227 | 1 | } |
CCheckQueueControl<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::Complete() Line | Count | Source | 223 | 155k | { | 224 | 155k | auto ret = m_queue.Complete(); | 225 | 155k | fDone = true; | 226 | 155k | return ret; | 227 | 155k | } |
|
228 | | |
229 | | void Add(std::vector<T>&& vChecks) |
230 | 2.70M | { |
231 | 2.70M | m_queue.Add(std::move(vChecks)); |
232 | 2.70M | } CCheckQueueControl<FakeCheckCheckCompletion, int>::Add(std::vector<FakeCheckCheckCompletion, std::allocator<FakeCheckCheckCompletion>>&&) Line | Count | Source | 230 | 2.39M | { | 231 | 2.39M | m_queue.Add(std::move(vChecks)); | 232 | 2.39M | } |
CCheckQueueControl<FixedCheck, int>::Add(std::vector<FixedCheck, std::allocator<FixedCheck>>&&) Line | Count | Source | 230 | 111k | { | 231 | 111k | m_queue.Add(std::move(vChecks)); | 232 | 111k | } |
CCheckQueueControl<UniqueCheck, int>::Add(std::vector<UniqueCheck, std::allocator<UniqueCheck>>&&) Line | Count | Source | 230 | 22.0k | { | 231 | 22.0k | m_queue.Add(std::move(vChecks)); | 232 | 22.0k | } |
CCheckQueueControl<MemoryCheck, int>::Add(std::vector<MemoryCheck, std::allocator<MemoryCheck>>&&) Line | Count | Source | 230 | 111k | { | 231 | 111k | m_queue.Add(std::move(vChecks)); | 232 | 111k | } |
CCheckQueueControl<FrozenCleanupCheck, int>::Add(std::vector<FrozenCleanupCheck, std::allocator<FrozenCleanupCheck>>&&) Line | Count | Source | 230 | 1 | { | 231 | 1 | m_queue.Add(std::move(vChecks)); | 232 | 1 | } |
CCheckQueueControl<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::Add(std::vector<CScriptCheck, std::allocator<CScriptCheck>>&&) Line | Count | Source | 230 | 66.2k | { | 231 | 66.2k | m_queue.Add(std::move(vChecks)); | 232 | 66.2k | } |
|
233 | | |
234 | | ~CCheckQueueControl() UNLOCK_FUNCTION() |
235 | 157k | { |
236 | 157k | if (!fDone) |
237 | 1.00k | Complete(); |
238 | 157k | } CCheckQueueControl<FakeCheck, int>::~CCheckQueueControl() Line | Count | Source | 235 | 4 | { | 236 | 4 | if (!fDone) | 237 | 4 | Complete(); | 238 | 4 | } |
CCheckQueueControl<FakeCheckCheckCompletion, int>::~CCheckQueueControl() Line | Count | Source | 235 | 218 | { | 236 | 218 | if (!fDone) | 237 | 0 | Complete(); | 238 | 218 | } |
CCheckQueueControl<FixedCheck, int>::~CCheckQueueControl() Line | Count | Source | 235 | 1.02k | { | 236 | 1.02k | if (!fDone) | 237 | 0 | Complete(); | 238 | 1.02k | } |
CCheckQueueControl<UniqueCheck, int>::~CCheckQueueControl() Line | Count | Source | 235 | 1 | { | 236 | 1 | if (!fDone) | 237 | 1 | Complete(); | 238 | 1 | } |
CCheckQueueControl<MemoryCheck, int>::~CCheckQueueControl() Line | Count | Source | 235 | 1.00k | { | 236 | 1.00k | if (!fDone) | 237 | 1.00k | Complete(); | 238 | 1.00k | } |
CCheckQueueControl<FrozenCleanupCheck, int>::~CCheckQueueControl() Line | Count | Source | 235 | 1 | { | 236 | 1 | if (!fDone) | 237 | 0 | Complete(); | 238 | 1 | } |
CCheckQueueControl<CScriptCheck, std::pair<ScriptError_t, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>>>::~CCheckQueueControl() Line | Count | Source | 235 | 155k | { | 236 | 155k | if (!fDone) | 237 | 0 | Complete(); | 238 | 155k | } |
|
239 | | }; |
240 | | |
241 | | #endif // BITCOIN_CHECKQUEUE_H |