LCOV - code coverage report
Current view: top level - src - scheduler.cpp (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 114 118 96.6 %
Date: 2020-09-26 01:30:44 Functions: 34 34 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2015-2020 The Bitcoin Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #include <scheduler.h>
       6             : 
       7             : #include <random.h>
       8             : 
       9             : #include <assert.h>
      10             : #include <utility>
      11             : 
      12        1258 : CScheduler::CScheduler()
      13         629 : {
      14        1258 : }
      15             : 
      16        1258 : CScheduler::~CScheduler()
      17         629 : {
      18         629 :     assert(nThreadsServicingQueue == 0);
      19         629 :     if (stopWhenEmpty) assert(taskQueue.empty());
      20        1258 : }
      21             : 
      22             : 
      23         642 : void CScheduler::serviceQueue()
      24             : {
      25         642 :     WAIT_LOCK(newTaskMutex, lock);
      26         642 :     ++nThreadsServicingQueue;
      27             : 
      28             :     // newTaskMutex is locked throughout this loop EXCEPT
      29             :     // when the thread is waiting or when the user's function
      30             :     // is called.
      31      187706 :     while (!shouldStop()) {
      32             :         try {
      33      187713 :             while (!shouldStop() && taskQueue.empty()) {
      34             :                 // Wait until there is something to do.
      35         649 :                 newTaskScheduled.wait(lock);
      36             :             }
      37             : 
      38             :             // Wait until either there is a new task, or until
      39             :             // the time of the first item on the queue:
      40             : 
      41      258647 :             while (!shouldStop() && !taskQueue.empty()) {
      42      258033 :                 std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
      43      258033 :                 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
      44      186450 :                     break; // Exit loop after timeout, it means we reached the time of the event
      45             :                 }
      46      258033 :             }
      47             : 
      48             :             // If there are multiple threads, the queue can empty while we're waiting (another
      49             :             // thread may service the task we were waiting on).
      50      187064 :             if (shouldStop() || taskQueue.empty())
      51         620 :                 continue;
      52             : 
      53      186444 :             Function f = taskQueue.begin()->second;
      54      186444 :             taskQueue.erase(taskQueue.begin());
      55             : 
      56             :             {
      57             :                 // Unlock before calling f, so it can reschedule itself or another task
      58             :                 // without deadlocking:
      59      186444 :                 REVERSE_LOCK(lock);
      60      186444 :                 f();
      61      186444 :             }
      62      186444 :         } catch (...) {
      63           0 :             --nThreadsServicingQueue;
      64           0 :             throw;
      65           0 :         }
      66             :     }
      67         642 :     --nThreadsServicingQueue;
      68         642 :     newTaskScheduled.notify_one();
      69         642 : }
      70             : 
      71      193824 : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
      72             : {
      73             :     {
      74      193824 :         LOCK(newTaskMutex);
      75      193824 :         taskQueue.insert(std::make_pair(t, f));
      76      193824 :     }
      77      193824 :     newTaskScheduled.notify_one();
      78      193824 : }
      79             : 
      80           4 : void CScheduler::MockForward(std::chrono::seconds delta_seconds)
      81             : {
      82           4 :     assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
      83             : 
      84             :     {
      85           4 :         LOCK(newTaskMutex);
      86             : 
      87             :         // use temp_queue to maintain updated schedule
      88           4 :         std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
      89             : 
      90          26 :         for (const auto& element : taskQueue) {
      91          22 :             temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
      92           0 :         }
      93             : 
      94             :         // point taskQueue to temp_queue
      95           4 :         taskQueue = std::move(temp_queue);
      96           4 :     }
      97         629 : 
      98         629 :     // notify that the taskQueue needs to be processed
      99         633 :     newTaskScheduled.notify_one();
     100           4 : }
     101             : 
     102       38115 : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
     103             : {
     104       38115 :     f();
     105      605069 :     s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
     106       38115 : }
     107             : 
     108        3095 : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
     109             : {
     110       38880 :     scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
     111        3095 : }
     112             : 
     113           4 : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
     114             :                                 std::chrono::system_clock::time_point& last) const
     115             : {
     116           4 :     LOCK(newTaskMutex);
     117           4 :     size_t result = taskQueue.size();
     118           4 :     if (!taskQueue.empty()) {
     119           3 :         first = taskQueue.begin()->first;
     120           3 :         last = taskQueue.rbegin()->first;
     121           3 :     }
     122             :     return result;
     123           4 : }
     124             : 
     125         626 : bool CScheduler::AreThreadsServicingQueue() const
     126             : {
     127         626 :     LOCK(newTaskMutex);
     128         626 :     return nThreadsServicingQueue;
     129         626 : }
     130             : 
     131             : 
     132      267518 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
     133             : {
     134             :     {
     135      267518 :         LOCK(m_cs_callbacks_pending);
     136             :         // Try to avoid scheduling too many copies here, but if we
     137             :         // accidentally have two ProcessQueue's scheduled at once its
     138             :         // not a big deal.
     139      267518 :         if (m_are_callbacks_running) return;
     140      221355 :         if (m_callbacks_pending.empty()) return;
     141      267518 :     }
     142      151597 :     m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
     143      267518 : }
     144             : 
     145      149458 : void SingleThreadedSchedulerClient::ProcessQueue()
     146             : {
     147      149458 :     std::function<void()> callback;
     148             :     {
     149      149458 :         LOCK(m_cs_callbacks_pending);
     150      149458 :         if (m_are_callbacks_running) return;
     151      149458 :         if (m_callbacks_pending.empty()) return;
     152      133510 :         m_are_callbacks_running = true;
     153             : 
     154      133510 :         callback = std::move(m_callbacks_pending.front());
     155      133510 :         m_callbacks_pending.pop_front();
     156      149458 :     }
     157             : 
     158             :     // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
     159             :     // to ensure both happen safely even if callback() throws.
     160      133510 :     struct RAIICallbacksRunning {
     161             :         SingleThreadedSchedulerClient* instance;
     162      267020 :         explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
     163      267020 :         ~RAIICallbacksRunning()
     164      133510 :         {
     165             :             {
     166      133510 :                 LOCK(instance->m_cs_callbacks_pending);
     167      133510 :                 instance->m_are_callbacks_running = false;
     168      133510 :             }
     169      133510 :             instance->MaybeScheduleProcessQueue();
     170      267020 :         }
     171      133510 :     } raiicallbacksrunning(this);
     172             : 
     173      133510 :     callback();
     174      149458 : }
     175             : 
     176      134008 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
     177             : {
     178      134008 :     assert(m_pscheduler);
     179             : 
     180             :     {
     181      134008 :         LOCK(m_cs_callbacks_pending);
     182      134008 :         m_callbacks_pending.emplace_back(std::move(func));
     183      134008 :     }
     184      134008 :     MaybeScheduleProcessQueue();
     185      134008 : }
     186             : 
     187         626 : void SingleThreadedSchedulerClient::EmptyQueue()
     188             : {
     189         626 :     assert(!m_pscheduler->AreThreadsServicingQueue());
     190             :     bool should_continue = true;
     191        2161 :     while (should_continue) {
     192        1535 :         ProcessQueue();
     193        1535 :         LOCK(m_cs_callbacks_pending);
     194        1535 :         should_continue = !m_callbacks_pending.empty();
     195        1535 :     }
     196         626 : }
     197             : 
     198       58987 : size_t SingleThreadedSchedulerClient::CallbacksPending()
     199             : {
     200       58987 :     LOCK(m_cs_callbacks_pending);
     201       58987 :     return m_callbacks_pending.size();
     202       58987 : }

Generated by: LCOV version 1.15