LCOV - code coverage report
Current view: top level - src - scheduler.h (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 15 15 100.0 %
Date: 2020-09-26 01:30:44 Functions: 10 10 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2015-2020 The Bitcoin Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #ifndef BITCOIN_SCHEDULER_H
       6             : #define BITCOIN_SCHEDULER_H
       7             : 
       8             : #include <condition_variable>
       9             : #include <functional>
      10             : #include <list>
      11             : #include <map>
      12             : 
      13             : #include <sync.h>
      14             : 
      15             : /**
      16             :  * Simple class for background tasks that should be run
      17             :  * periodically or once "after a while"
      18             :  *
      19             :  * Usage:
      20             :  *
      21             :  * CScheduler* s = new CScheduler();
      22             :  * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { }
      23             :  * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
      24             :  * std::thread* t = new std::thread([&] { s->serviceQueue(); });
      25             :  *
      26             :  * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
      27             :  * s->stop();
      28             :  * t->join();
      29             :  * delete t;
      30             :  * delete s; // Must be done after thread is interrupted/joined.
      31             :  */
      32             : class CScheduler
      33             : {
      34             : public:
      35             :     CScheduler();
      36             :     ~CScheduler();
      37             : 
      38             :     typedef std::function<void()> Function;
      39             : 
      40             :     /** Call func at/after time t */
      41             :     void schedule(Function f, std::chrono::system_clock::time_point t);
      42             : 
      43             :     /** Call f once after the delta has passed */
      44       41827 :     void scheduleFromNow(Function f, std::chrono::milliseconds delta)
      45             :     {
      46       41827 :         schedule(std::move(f), std::chrono::system_clock::now() + delta);
      47       41827 :     }
      48             : 
      49             :     /**
      50             :      * Repeat f until the scheduler is stopped. First run is after delta has passed once.
      51             :      *
      52             :      * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more
      53             :      * accurate scheduling, don't use this method.
      54             :      */
      55             :     void scheduleEvery(Function f, std::chrono::milliseconds delta);
      56             : 
      57             :     /**
      58             :      * Mock the scheduler to fast forward in time.
      59             :      * Iterates through items on taskQueue and reschedules them
      60             :      * to be delta_seconds sooner.
      61             :      */
      62             :     void MockForward(std::chrono::seconds delta_seconds);
      63             : 
      64             :     /**
      65             :      * Services the queue 'forever'. Should be run in a thread,
      66             :      * and interrupted using boost::interrupt_thread
      67             :      */
      68             :     void serviceQueue();
      69             : 
      70             :     /** Tell any threads running serviceQueue to stop as soon as the current task is done */
      71         627 :     void stop()
      72             :     {
      73        1254 :         WITH_LOCK(newTaskMutex, stopRequested = true);
      74         627 :         newTaskScheduled.notify_all();
      75         627 :     }
      76             :     /** Tell any threads running serviceQueue to stop when there is no work left to be done */
      77           2 :     void StopWhenDrained()
      78             :     {
      79           4 :         WITH_LOCK(newTaskMutex, stopWhenEmpty = true);
      80           2 :         newTaskScheduled.notify_all();
      81           2 :     }
      82             : 
      83             :     /**
      84             :      * Returns number of tasks waiting to be serviced,
      85             :      * and first and last task times
      86             :      */
      87             :     size_t getQueueInfo(std::chrono::system_clock::time_point& first,
      88             :                         std::chrono::system_clock::time_point& last) const;
      89             : 
      90             :     /** Returns true if there are threads actively running in serviceQueue() */
      91             :     bool AreThreadsServicingQueue() const;
      92             : 
      93             : private:
      94             :     mutable Mutex newTaskMutex;
      95             :     std::condition_variable newTaskScheduled;
      96             :     std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
      97             :     int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0};
      98             :     bool stopRequested GUARDED_BY(newTaskMutex){false};
      99             :     bool stopWhenEmpty GUARDED_BY(newTaskMutex){false};
     100      821130 :     bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
     101             : };
     102             : 
     103             : /**
     104             :  * Class used by CScheduler clients which may schedule multiple jobs
     105             :  * which are required to be run serially. Jobs may not be run on the
     106             :  * same thread, but no two jobs will be executed
     107             :  * at the same time and memory will be release-acquire consistent
     108             :  * (the scheduler will internally do an acquire before invoking a callback
     109             :  * as well as a release at the end). In practice this means that a callback
     110             :  * B() will be able to observe all of the effects of callback A() which executed
     111             :  * before it.
     112             :  */
     113        1256 : class SingleThreadedSchedulerClient
     114             : {
     115             : private:
     116             :     CScheduler* m_pscheduler;
     117             : 
     118             :     RecursiveMutex m_cs_callbacks_pending;
     119             :     std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending);
     120         628 :     bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false;
     121             : 
     122             :     void MaybeScheduleProcessQueue();
     123             :     void ProcessQueue();
     124             : 
     125             : public:
     126        1256 :     explicit SingleThreadedSchedulerClient(CScheduler* pschedulerIn) : m_pscheduler(pschedulerIn) {}
     127             : 
     128             :     /**
     129             :      * Add a callback to be executed. Callbacks are executed serially
     130             :      * and memory is release-acquire consistent between callback executions.
     131             :      * Practically, this means that callbacks can behave as if they are executed
     132             :      * in order by a single thread.
     133             :      */
     134             :     void AddToProcessQueue(std::function<void()> func);
     135             : 
     136             :     /**
     137             :      * Processes all remaining queue members on the calling thread, blocking until queue is empty
     138             :      * Must be called after the CScheduler has no remaining processing threads!
     139             :      */
     140             :     void EmptyQueue();
     141             : 
     142             :     size_t CallbacksPending();
     143             : };
     144             : 
     145             : #endif

Generated by: LCOV version 1.15