Line data Source code
1 : // Copyright (c) 2015-2020 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #ifndef BITCOIN_SCHEDULER_H 6 : #define BITCOIN_SCHEDULER_H 7 : 8 : #include <condition_variable> 9 : #include <functional> 10 : #include <list> 11 : #include <map> 12 : 13 : #include <sync.h> 14 : 15 : /** 16 : * Simple class for background tasks that should be run 17 : * periodically or once "after a while" 18 : * 19 : * Usage: 20 : * 21 : * CScheduler* s = new CScheduler(); 22 : * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } 23 : * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); 24 : * std::thread* t = new std::thread([&] { s->serviceQueue(); }); 25 : * 26 : * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: 27 : * s->stop(); 28 : * t->join(); 29 : * delete t; 30 : * delete s; // Must be done after thread is interrupted/joined. 31 : */ 32 : class CScheduler 33 : { 34 : public: 35 : CScheduler(); 36 : ~CScheduler(); 37 : 38 : typedef std::function<void()> Function; 39 : 40 : /** Call func at/after time t */ 41 : void schedule(Function f, std::chrono::system_clock::time_point t); 42 : 43 : /** Call f once after the delta has passed */ 44 41827 : void scheduleFromNow(Function f, std::chrono::milliseconds delta) 45 : { 46 41827 : schedule(std::move(f), std::chrono::system_clock::now() + delta); 47 41827 : } 48 : 49 : /** 50 : * Repeat f until the scheduler is stopped. First run is after delta has passed once. 51 : * 52 : * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more 53 : * accurate scheduling, don't use this method. 54 : */ 55 : void scheduleEvery(Function f, std::chrono::milliseconds delta); 56 : 57 : /** 58 : * Mock the scheduler to fast forward in time. 59 : * Iterates through items on taskQueue and reschedules them 60 : * to be delta_seconds sooner. 61 : */ 62 : void MockForward(std::chrono::seconds delta_seconds); 63 : 64 : /** 65 : * Services the queue 'forever'. Should be run in a thread, 66 : * and interrupted using boost::interrupt_thread 67 : */ 68 : void serviceQueue(); 69 : 70 : /** Tell any threads running serviceQueue to stop as soon as the current task is done */ 71 627 : void stop() 72 : { 73 1254 : WITH_LOCK(newTaskMutex, stopRequested = true); 74 627 : newTaskScheduled.notify_all(); 75 627 : } 76 : /** Tell any threads running serviceQueue to stop when there is no work left to be done */ 77 2 : void StopWhenDrained() 78 : { 79 4 : WITH_LOCK(newTaskMutex, stopWhenEmpty = true); 80 2 : newTaskScheduled.notify_all(); 81 2 : } 82 : 83 : /** 84 : * Returns number of tasks waiting to be serviced, 85 : * and first and last task times 86 : */ 87 : size_t getQueueInfo(std::chrono::system_clock::time_point& first, 88 : std::chrono::system_clock::time_point& last) const; 89 : 90 : /** Returns true if there are threads actively running in serviceQueue() */ 91 : bool AreThreadsServicingQueue() const; 92 : 93 : private: 94 : mutable Mutex newTaskMutex; 95 : std::condition_variable newTaskScheduled; 96 : std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); 97 : int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; 98 : bool stopRequested GUARDED_BY(newTaskMutex){false}; 99 : bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; 100 821130 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } 101 : }; 102 : 103 : /** 104 : * Class used by CScheduler clients which may schedule multiple jobs 105 : * which are required to be run serially. Jobs may not be run on the 106 : * same thread, but no two jobs will be executed 107 : * at the same time and memory will be release-acquire consistent 108 : * (the scheduler will internally do an acquire before invoking a callback 109 : * as well as a release at the end). In practice this means that a callback 110 : * B() will be able to observe all of the effects of callback A() which executed 111 : * before it. 112 : */ 113 1256 : class SingleThreadedSchedulerClient 114 : { 115 : private: 116 : CScheduler* m_pscheduler; 117 : 118 : RecursiveMutex m_cs_callbacks_pending; 119 : std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending); 120 628 : bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false; 121 : 122 : void MaybeScheduleProcessQueue(); 123 : void ProcessQueue(); 124 : 125 : public: 126 1256 : explicit SingleThreadedSchedulerClient(CScheduler* pschedulerIn) : m_pscheduler(pschedulerIn) {} 127 : 128 : /** 129 : * Add a callback to be executed. Callbacks are executed serially 130 : * and memory is release-acquire consistent between callback executions. 131 : * Practically, this means that callbacks can behave as if they are executed 132 : * in order by a single thread. 133 : */ 134 : void AddToProcessQueue(std::function<void()> func); 135 : 136 : /** 137 : * Processes all remaining queue members on the calling thread, blocking until queue is empty 138 : * Must be called after the CScheduler has no remaining processing threads! 139 : */ 140 : void EmptyQueue(); 141 : 142 : size_t CallbacksPending(); 143 : }; 144 : 145 : #endif