Line data Source code
1 : // Copyright (c) 2015-2020 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include <scheduler.h> 6 : 7 : #include <random.h> 8 : 9 : #include <assert.h> 10 : #include <utility> 11 : 12 1258 : CScheduler::CScheduler() 13 629 : { 14 1258 : } 15 : 16 1258 : CScheduler::~CScheduler() 17 629 : { 18 629 : assert(nThreadsServicingQueue == 0); 19 629 : if (stopWhenEmpty) assert(taskQueue.empty()); 20 1258 : } 21 : 22 : 23 642 : void CScheduler::serviceQueue() 24 : { 25 642 : WAIT_LOCK(newTaskMutex, lock); 26 642 : ++nThreadsServicingQueue; 27 : 28 : // newTaskMutex is locked throughout this loop EXCEPT 29 : // when the thread is waiting or when the user's function 30 : // is called. 31 187706 : while (!shouldStop()) { 32 : try { 33 187713 : while (!shouldStop() && taskQueue.empty()) { 34 : // Wait until there is something to do. 35 649 : newTaskScheduled.wait(lock); 36 : } 37 : 38 : // Wait until either there is a new task, or until 39 : // the time of the first item on the queue: 40 : 41 258647 : while (!shouldStop() && !taskQueue.empty()) { 42 258033 : std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; 43 258033 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { 44 186450 : break; // Exit loop after timeout, it means we reached the time of the event 45 : } 46 258033 : } 47 : 48 : // If there are multiple threads, the queue can empty while we're waiting (another 49 : // thread may service the task we were waiting on). 50 187064 : if (shouldStop() || taskQueue.empty()) 51 620 : continue; 52 : 53 186444 : Function f = taskQueue.begin()->second; 54 186444 : taskQueue.erase(taskQueue.begin()); 55 : 56 : { 57 : // Unlock before calling f, so it can reschedule itself or another task 58 : // without deadlocking: 59 186444 : REVERSE_LOCK(lock); 60 186444 : f(); 61 186444 : } 62 186444 : } catch (...) { 63 0 : --nThreadsServicingQueue; 64 0 : throw; 65 0 : } 66 : } 67 642 : --nThreadsServicingQueue; 68 642 : newTaskScheduled.notify_one(); 69 642 : } 70 : 71 193824 : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) 72 : { 73 : { 74 193824 : LOCK(newTaskMutex); 75 193824 : taskQueue.insert(std::make_pair(t, f)); 76 193824 : } 77 193824 : newTaskScheduled.notify_one(); 78 193824 : } 79 : 80 4 : void CScheduler::MockForward(std::chrono::seconds delta_seconds) 81 : { 82 4 : assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1}); 83 : 84 : { 85 4 : LOCK(newTaskMutex); 86 : 87 : // use temp_queue to maintain updated schedule 88 4 : std::multimap<std::chrono::system_clock::time_point, Function> temp_queue; 89 : 90 26 : for (const auto& element : taskQueue) { 91 22 : temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); 92 0 : } 93 : 94 : // point taskQueue to temp_queue 95 4 : taskQueue = std::move(temp_queue); 96 4 : } 97 629 : 98 629 : // notify that the taskQueue needs to be processed 99 633 : newTaskScheduled.notify_one(); 100 4 : } 101 : 102 38115 : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta) 103 : { 104 38115 : f(); 105 605069 : s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta); 106 38115 : } 107 : 108 3095 : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta) 109 : { 110 38880 : scheduleFromNow([=] { Repeat(*this, f, delta); }, delta); 111 3095 : } 112 : 113 4 : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first, 114 : std::chrono::system_clock::time_point& last) const 115 : { 116 4 : LOCK(newTaskMutex); 117 4 : size_t result = taskQueue.size(); 118 4 : if (!taskQueue.empty()) { 119 3 : first = taskQueue.begin()->first; 120 3 : last = taskQueue.rbegin()->first; 121 3 : } 122 : return result; 123 4 : } 124 : 125 626 : bool CScheduler::AreThreadsServicingQueue() const 126 : { 127 626 : LOCK(newTaskMutex); 128 626 : return nThreadsServicingQueue; 129 626 : } 130 : 131 : 132 267518 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() 133 : { 134 : { 135 267518 : LOCK(m_cs_callbacks_pending); 136 : // Try to avoid scheduling too many copies here, but if we 137 : // accidentally have two ProcessQueue's scheduled at once its 138 : // not a big deal. 139 267518 : if (m_are_callbacks_running) return; 140 221355 : if (m_callbacks_pending.empty()) return; 141 267518 : } 142 151597 : m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now()); 143 267518 : } 144 : 145 149458 : void SingleThreadedSchedulerClient::ProcessQueue() 146 : { 147 149458 : std::function<void()> callback; 148 : { 149 149458 : LOCK(m_cs_callbacks_pending); 150 149458 : if (m_are_callbacks_running) return; 151 149458 : if (m_callbacks_pending.empty()) return; 152 133510 : m_are_callbacks_running = true; 153 : 154 133510 : callback = std::move(m_callbacks_pending.front()); 155 133510 : m_callbacks_pending.pop_front(); 156 149458 : } 157 : 158 : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue 159 : // to ensure both happen safely even if callback() throws. 160 133510 : struct RAIICallbacksRunning { 161 : SingleThreadedSchedulerClient* instance; 162 267020 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} 163 267020 : ~RAIICallbacksRunning() 164 133510 : { 165 : { 166 133510 : LOCK(instance->m_cs_callbacks_pending); 167 133510 : instance->m_are_callbacks_running = false; 168 133510 : } 169 133510 : instance->MaybeScheduleProcessQueue(); 170 267020 : } 171 133510 : } raiicallbacksrunning(this); 172 : 173 133510 : callback(); 174 149458 : } 175 : 176 134008 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func) 177 : { 178 134008 : assert(m_pscheduler); 179 : 180 : { 181 134008 : LOCK(m_cs_callbacks_pending); 182 134008 : m_callbacks_pending.emplace_back(std::move(func)); 183 134008 : } 184 134008 : MaybeScheduleProcessQueue(); 185 134008 : } 186 : 187 626 : void SingleThreadedSchedulerClient::EmptyQueue() 188 : { 189 626 : assert(!m_pscheduler->AreThreadsServicingQueue()); 190 : bool should_continue = true; 191 2161 : while (should_continue) { 192 1535 : ProcessQueue(); 193 1535 : LOCK(m_cs_callbacks_pending); 194 1535 : should_continue = !m_callbacks_pending.empty(); 195 1535 : } 196 626 : } 197 : 198 58987 : size_t SingleThreadedSchedulerClient::CallbacksPending() 199 : { 200 58987 : LOCK(m_cs_callbacks_pending); 201 58987 : return m_callbacks_pending.size(); 202 58987 : }