Line data Source code
1 : // Copyright (c) 2012-2020 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #ifndef BITCOIN_CHECKQUEUE_H 6 : #define BITCOIN_CHECKQUEUE_H 7 : 8 : #include <sync.h> 9 : 10 : #include <algorithm> 11 : #include <vector> 12 : 13 : #include <boost/thread/condition_variable.hpp> 14 : #include <boost/thread/mutex.hpp> 15 : 16 : template <typename T> 17 : class CCheckQueueControl; 18 : 19 : /** 20 : * Queue for verifications that have to be performed. 21 : * The verifications are represented by a type T, which must provide an 22 : * operator(), returning a bool. 23 : * 24 : * One thread (the master) is assumed to push batches of verifications 25 : * onto the queue, where they are processed by N-1 worker threads. When 26 : * the master is done adding work, it temporarily joins the worker pool 27 : * as an N'th worker, until all jobs are done. 28 : */ 29 : template <typename T> 30 : class CCheckQueue 31 : { 32 : private: 33 : //! Mutex to protect the inner state 34 : boost::mutex mutex; 35 : 36 : //! Worker threads block on this when out of work 37 : boost::condition_variable condWorker; 38 : 39 : //! Master thread blocks on this when out of work 40 : boost::condition_variable condMaster; 41 : 42 : //! The queue of elements to be processed. 43 : //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 44 : std::vector<T> queue; 45 : 46 : //! The number of workers (including the master) that are idle. 47 : int nIdle; 48 : 49 : //! The total number of workers (including the master). 50 : int nTotal; 51 : 52 : //! The temporary evaluation result. 53 : bool fAllOk; 54 : 55 : /** 56 : * Number of verifications that haven't completed yet. 57 : * This includes elements that are no longer queued, but still in the 58 : * worker's own batches. 59 : */ 60 : unsigned int nTodo; 61 : 62 : //! The maximum number of elements to be processed in one batch 63 : unsigned int nBatchSize; 64 : 65 : /** Internal function that does bulk of the verification work. */ 66 65867 : bool Loop(bool fMaster = false) 67 : { 68 65867 : boost::condition_variable& cond = fMaster ? condMaster : condWorker; 69 65867 : std::vector<T> vChecks; 70 65867 : vChecks.reserve(nBatchSize); 71 4214109 : unsigned int nNow = 0; 72 4214109 : bool fOk = true; 73 65848 : do { 74 : { 75 4214109 : boost::unique_lock<boost::mutex> lock(mutex); 76 : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 77 4225874 : if (nNow) { 78 4159989 : fAllOk &= fOk; 79 4159989 : nTodo -= nNow; 80 4159989 : if (nTodo == 0 && !fMaster) 81 : // We processed the last element; inform the master it can exit and return the result 82 110654 : condMaster.notify_one(); 83 : } else { 84 : // first iteration 85 65885 : nTotal++; 86 : } 87 : // logically, the do loop starts here 88 4719471 : while (queue.empty()) { 89 559482 : if (fMaster && nTodo == 0) { 90 64065 : nTotal--; 91 64065 : bool fRet = fAllOk; 92 : // reset the status for new work later 93 64065 : fAllOk = true; 94 : // return the current status 95 : return fRet; 96 : } 97 495417 : nIdle++; 98 495417 : cond.wait(lock); // wait 99 493597 : nIdle--; 100 : } 101 : // Decide how many work units to process now. 102 : // * Do not try to do everything at once, but aim for increasingly smaller batches so 103 : // all workers finish approximately simultaneously. 104 : // * Try to account for idle jobs which will instantly start helping. 105 : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 106 4159989 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 107 4159989 : vChecks.resize(nNow); 108 16551104 : for (unsigned int i = 0; i < nNow; i++) { 109 : // We want the lock on the mutex to be as short as possible, so swap jobs from the global 110 : // queue to the local batch vector instead of copying. 111 12391115 : vChecks[i].swap(queue.back()); 112 12391115 : queue.pop_back(); 113 : } 114 : // Check whether we need to do work at all 115 4159989 : fOk = fAllOk; 116 4225874 : } 117 : // execute work 118 16514768 : for (T& check : vChecks) 119 12337835 : if (fOk) 120 12322763 : fOk = check(); 121 4152866 : vChecks.clear(); 122 4152866 : } while (true); 123 67705 : } 124 : 125 : public: 126 : //! Mutex to ensure only one concurrent CCheckQueueControl 127 : boost::mutex ControlMutex; 128 : 129 : //! Create a new check queue 130 1304 : explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} 131 : 132 : //! Worker thread 133 1812 : void Thread() 134 : { 135 1812 : Loop(); 136 1812 : } 137 : 138 : //! Wait until execution finishes, and return whether all evaluations were successful. 139 64065 : bool Wait() 140 : { 141 64065 : return Loop(true); 142 : } 143 : 144 : //! Add a batch of checks to the queue 145 2738185 : void Add(std::vector<T>& vChecks) 146 : { 147 2738185 : boost::unique_lock<boost::mutex> lock(mutex); 148 15129300 : for (T& check : vChecks) { 149 12391115 : queue.push_back(T()); 150 12391115 : check.swap(queue.back()); 151 : } 152 2738185 : nTodo += vChecks.size(); 153 2738185 : if (vChecks.size() == 1) 154 295416 : condWorker.notify_one(); 155 2442769 : else if (vChecks.size() > 1) 156 2149090 : condWorker.notify_all(); 157 2738185 : } 158 : 159 1304 : ~CCheckQueue() 160 652 : { 161 1304 : } 162 : 163 : }; 164 : 165 : /** 166 : * RAII-style controller object for a CCheckQueue that guarantees the passed 167 : * queue is finished before continuing. 168 : */ 169 : template <typename T> 170 : class CCheckQueueControl 171 : { 172 : private: 173 : CCheckQueue<T> * const pqueue; 174 : bool fDone; 175 : 176 : public: 177 : CCheckQueueControl() = delete; 178 : CCheckQueueControl(const CCheckQueueControl&) = delete; 179 : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 180 140012 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) 181 70006 : { 182 : // passed queue is supposed to be unused, or nullptr 183 70006 : if (pqueue != nullptr) { 184 64065 : ENTER_CRITICAL_SECTION(pqueue->ControlMutex); 185 64065 : } 186 140012 : } 187 : 188 75841 : bool Wait() 189 : { 190 75841 : if (pqueue == nullptr) 191 11776 : return true; 192 64065 : bool fRet = pqueue->Wait(); 193 64065 : fDone = true; 194 : return fRet; 195 75841 : } 196 : 197 2738620 : void Add(std::vector<T>& vChecks) 198 : { 199 2738620 : if (pqueue != nullptr) 200 2738185 : pqueue->Add(vChecks); 201 2738620 : } 202 : 203 140012 : ~CCheckQueueControl() 204 70006 : { 205 70006 : if (!fDone) 206 6981 : Wait(); 207 70006 : if (pqueue != nullptr) { 208 64065 : LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); 209 64065 : } 210 140012 : } 211 : }; 212 : 213 : #endif // BITCOIN_CHECKQUEUE_H