/tmp/bitcoin/src/scheduler.h
Line | Count | Source |
1 | | // Copyright (c) 2015-present The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or http://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #ifndef BITCOIN_SCHEDULER_H |
6 | | #define BITCOIN_SCHEDULER_H |
7 | | |
8 | | #include <attributes.h> |
9 | | #include <sync.h> |
10 | | #include <util/task_runner.h> |
11 | | |
12 | | #include <chrono> |
13 | | #include <condition_variable> |
14 | | #include <cstddef> |
15 | | #include <functional> |
16 | | #include <list> |
17 | | #include <map> |
18 | | #include <thread> |
19 | | #include <utility> |
20 | | |
21 | | /** |
22 | | * Simple class for background tasks that should be run |
23 | | * periodically or once "after a while" |
24 | | * |
25 | | * Usage: |
26 | | * |
27 | | * CScheduler* s = new CScheduler(); |
28 | | * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } |
29 | | * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); |
30 | | * std::thread* t = new std::thread([&] { s->serviceQueue(); }); |
31 | | * |
32 | | * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: |
33 | | * s->stop(); |
34 | | * t->join(); |
35 | | * delete t; |
36 | | * delete s; // Must be done after thread is interrupted/joined. |
37 | | */ |
38 | | class CScheduler |
39 | | { |
40 | | public: |
41 | | CScheduler(); |
42 | | ~CScheduler(); |
43 | | |
44 | | std::thread m_service_thread; |
45 | | |
46 | | typedef std::function<void()> Function; |
47 | | |
48 | | /** Call func at/after time t */ |
49 | | void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
50 | | |
51 | | /** Call f once after the delta has passed */ |
52 | | void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) |
53 | 8.03k | { |
54 | 8.03k | schedule(std::move(f), std::chrono::steady_clock::now() + delta); |
55 | 8.03k | } |
56 | | |
57 | | /** |
58 | | * Repeat f until the scheduler is stopped. First run is after delta has passed once. |
59 | | * |
60 | | * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more |
61 | | * accurate scheduling, don't use this method. |
62 | | */ |
63 | | void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
64 | | |
65 | | /** |
66 | | * Mock the scheduler to fast forward in time. |
67 | | * Iterates through items on taskQueue and reschedules them |
68 | | * to be delta_seconds sooner. |
69 | | */ |
70 | | void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
71 | | |
72 | | /** |
73 | | * Services the queue 'forever'. Should be run in a thread. |
74 | | */ |
75 | | void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
76 | | |
77 | | /** Tell any threads running serviceQueue to stop as soon as the current task is done */ |
78 | | void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) |
79 | 1.28k | { |
80 | 1.28k | WITH_LOCK(newTaskMutex, stopRequested = true); |
81 | 1.28k | newTaskScheduled.notify_all(); |
82 | 1.28k | if (m_service_thread.joinable()) m_service_thread.join(); |
83 | 1.28k | } |
84 | | /** Tell any threads running serviceQueue to stop when there is no work left to be done */ |
85 | | void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) |
86 | 2 | { |
87 | 2 | WITH_LOCK(newTaskMutex, stopWhenEmpty = true); |
88 | 2 | newTaskScheduled.notify_all(); |
89 | 2 | if (m_service_thread.joinable()) m_service_thread.join(); |
90 | 2 | } |
91 | | |
92 | | /** |
93 | | * Returns number of tasks waiting to be serviced, |
94 | | * and first and last task times |
95 | | */ |
96 | | size_t getQueueInfo(std::chrono::steady_clock::time_point& first, |
97 | | std::chrono::steady_clock::time_point& last) const |
98 | | EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
99 | | |
100 | | /** Returns true if there are threads actively running in serviceQueue() */ |
101 | | bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); |
102 | | |
103 | | private: |
104 | | mutable Mutex newTaskMutex; |
105 | | std::condition_variable newTaskScheduled; |
106 | | std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); |
107 | | int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; |
108 | | bool stopRequested GUARDED_BY(newTaskMutex){false}; |
109 | | bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; |
110 | 2.02M | bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } |
111 | | }; |
112 | | |
113 | | /** |
114 | | * Class used by CScheduler clients which may schedule multiple jobs |
115 | | * which are required to be run serially. Jobs may not be run on the |
116 | | * same thread, but no two jobs will be executed |
117 | | * at the same time and memory will be release-acquire consistent |
118 | | * (the scheduler will internally do an acquire before invoking a callback |
119 | | * as well as a release at the end). In practice this means that a callback |
120 | | * B() will be able to observe all of the effects of callback A() which executed |
121 | | * before it. |
122 | | */ |
123 | | class SerialTaskRunner : public util::TaskRunnerInterface |
124 | | { |
125 | | private: |
126 | | CScheduler& m_scheduler; |
127 | | |
128 | | Mutex m_callbacks_mutex; |
129 | | |
130 | | // We are not allowed to assume the scheduler only runs in one thread, |
131 | | // but must ensure all callbacks happen in-order, so we end up creating |
132 | | // our own queue here :( |
133 | | std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); |
134 | | bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; |
135 | | |
136 | | void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
137 | | void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
138 | | |
139 | | public: |
140 | 1.27k | explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} |
141 | | |
142 | | /** |
143 | | * Add a callback to be executed. Callbacks are executed serially |
144 | | * and memory is release-acquire consistent between callback executions. |
145 | | * Practically, this means that callbacks can behave as if they are executed |
146 | | * in order by a single thread. |
147 | | */ |
148 | | void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
149 | | |
150 | | /** |
151 | | * Processes all remaining queue members on the calling thread, blocking until queue is empty |
152 | | * Must be called after the CScheduler has no remaining processing threads! |
153 | | */ |
154 | | void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
155 | | |
156 | | size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); |
157 | | }; |
158 | | |
159 | | #endif // BITCOIN_SCHEDULER_H |