LCOV - code coverage report
Current view: top level - src - checkqueue.h (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 79 79 100.0 %
Date: 2020-09-26 01:30:44 Functions: 109 109 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2012-2020 The Bitcoin Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #ifndef BITCOIN_CHECKQUEUE_H
       6             : #define BITCOIN_CHECKQUEUE_H
       7             : 
       8             : #include <sync.h>
       9             : 
      10             : #include <algorithm>
      11             : #include <vector>
      12             : 
      13             : #include <boost/thread/condition_variable.hpp>
      14             : #include <boost/thread/mutex.hpp>
      15             : 
      16             : template <typename T>
      17             : class CCheckQueueControl;
      18             : 
      19             : /**
      20             :  * Queue for verifications that have to be performed.
      21             :   * The verifications are represented by a type T, which must provide an
      22             :   * operator(), returning a bool.
      23             :   *
      24             :   * One thread (the master) is assumed to push batches of verifications
      25             :   * onto the queue, where they are processed by N-1 worker threads. When
      26             :   * the master is done adding work, it temporarily joins the worker pool
      27             :   * as an N'th worker, until all jobs are done.
      28             :   */
      29             : template <typename T>
      30             : class CCheckQueue
      31             : {
      32             : private:
      33             :     //! Mutex to protect the inner state
      34             :     boost::mutex mutex;
      35             : 
      36             :     //! Worker threads block on this when out of work
      37             :     boost::condition_variable condWorker;
      38             : 
      39             :     //! Master thread blocks on this when out of work
      40             :     boost::condition_variable condMaster;
      41             : 
      42             :     //! The queue of elements to be processed.
      43             :     //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
      44             :     std::vector<T> queue;
      45             : 
      46             :     //! The number of workers (including the master) that are idle.
      47             :     int nIdle;
      48             : 
      49             :     //! The total number of workers (including the master).
      50             :     int nTotal;
      51             : 
      52             :     //! The temporary evaluation result.
      53             :     bool fAllOk;
      54             : 
      55             :     /**
      56             :      * Number of verifications that haven't completed yet.
      57             :      * This includes elements that are no longer queued, but still in the
      58             :      * worker's own batches.
      59             :      */
      60             :     unsigned int nTodo;
      61             : 
      62             :     //! The maximum number of elements to be processed in one batch
      63             :     unsigned int nBatchSize;
      64             : 
      65             :     /** Internal function that does bulk of the verification work. */
      66       65867 :     bool Loop(bool fMaster = false)
      67             :     {
      68       65867 :         boost::condition_variable& cond = fMaster ? condMaster : condWorker;
      69       65867 :         std::vector<T> vChecks;
      70       65867 :         vChecks.reserve(nBatchSize);
      71     4214109 :         unsigned int nNow = 0;
      72     4214109 :         bool fOk = true;
      73       65848 :         do {
      74             :             {
      75     4214109 :                 boost::unique_lock<boost::mutex> lock(mutex);
      76             :                 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
      77     4225874 :                 if (nNow) {
      78     4159989 :                     fAllOk &= fOk;
      79     4159989 :                     nTodo -= nNow;
      80     4159989 :                     if (nTodo == 0 && !fMaster)
      81             :                         // We processed the last element; inform the master it can exit and return the result
      82      110654 :                         condMaster.notify_one();
      83             :                 } else {
      84             :                     // first iteration
      85       65885 :                     nTotal++;
      86             :                 }
      87             :                 // logically, the do loop starts here
      88     4719471 :                 while (queue.empty()) {
      89      559482 :                     if (fMaster && nTodo == 0) {
      90       64065 :                         nTotal--;
      91       64065 :                         bool fRet = fAllOk;
      92             :                         // reset the status for new work later
      93       64065 :                         fAllOk = true;
      94             :                         // return the current status
      95             :                         return fRet;
      96             :                     }
      97      495417 :                     nIdle++;
      98      495417 :                     cond.wait(lock); // wait
      99      493597 :                     nIdle--;
     100             :                 }
     101             :                 // Decide how many work units to process now.
     102             :                 // * Do not try to do everything at once, but aim for increasingly smaller batches so
     103             :                 //   all workers finish approximately simultaneously.
     104             :                 // * Try to account for idle jobs which will instantly start helping.
     105             :                 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
     106     4159989 :                 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
     107     4159989 :                 vChecks.resize(nNow);
     108    16551104 :                 for (unsigned int i = 0; i < nNow; i++) {
     109             :                     // We want the lock on the mutex to be as short as possible, so swap jobs from the global
     110             :                     // queue to the local batch vector instead of copying.
     111    12391115 :                     vChecks[i].swap(queue.back());
     112    12391115 :                     queue.pop_back();
     113             :                 }
     114             :                 // Check whether we need to do work at all
     115     4159989 :                 fOk = fAllOk;
     116     4225874 :             }
     117             :             // execute work
     118    16514768 :             for (T& check : vChecks)
     119    12337835 :                 if (fOk)
     120    12322763 :                     fOk = check();
     121     4152866 :             vChecks.clear();
     122     4152866 :         } while (true);
     123       67705 :     }
     124             : 
     125             : public:
     126             :     //! Mutex to ensure only one concurrent CCheckQueueControl
     127             :     boost::mutex ControlMutex;
     128             : 
     129             :     //! Create a new check queue
     130        1304 :     explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}
     131             : 
     132             :     //! Worker thread
     133        1812 :     void Thread()
     134             :     {
     135        1812 :         Loop();
     136        1812 :     }
     137             : 
     138             :     //! Wait until execution finishes, and return whether all evaluations were successful.
     139       64065 :     bool Wait()
     140             :     {
     141       64065 :         return Loop(true);
     142             :     }
     143             : 
     144             :     //! Add a batch of checks to the queue
     145     2738185 :     void Add(std::vector<T>& vChecks)
     146             :     {
     147     2738185 :         boost::unique_lock<boost::mutex> lock(mutex);
     148    15129300 :         for (T& check : vChecks) {
     149    12391115 :             queue.push_back(T());
     150    12391115 :             check.swap(queue.back());
     151             :         }
     152     2738185 :         nTodo += vChecks.size();
     153     2738185 :         if (vChecks.size() == 1)
     154      295416 :             condWorker.notify_one();
     155     2442769 :         else if (vChecks.size() > 1)
     156     2149090 :             condWorker.notify_all();
     157     2738185 :     }
     158             : 
     159        1304 :     ~CCheckQueue()
     160         652 :     {
     161        1304 :     }
     162             : 
     163             : };
     164             : 
     165             : /**
     166             :  * RAII-style controller object for a CCheckQueue that guarantees the passed
     167             :  * queue is finished before continuing.
     168             :  */
     169             : template <typename T>
     170             : class CCheckQueueControl
     171             : {
     172             : private:
     173             :     CCheckQueue<T> * const pqueue;
     174             :     bool fDone;
     175             : 
     176             : public:
     177             :     CCheckQueueControl() = delete;
     178             :     CCheckQueueControl(const CCheckQueueControl&) = delete;
     179             :     CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
     180      140012 :     explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
     181       70006 :     {
     182             :         // passed queue is supposed to be unused, or nullptr
     183       70006 :         if (pqueue != nullptr) {
     184       64065 :             ENTER_CRITICAL_SECTION(pqueue->ControlMutex);
     185       64065 :         }
     186      140012 :     }
     187             : 
     188       75841 :     bool Wait()
     189             :     {
     190       75841 :         if (pqueue == nullptr)
     191       11776 :             return true;
     192       64065 :         bool fRet = pqueue->Wait();
     193       64065 :         fDone = true;
     194             :         return fRet;
     195       75841 :     }
     196             : 
     197     2738620 :     void Add(std::vector<T>& vChecks)
     198             :     {
     199     2738620 :         if (pqueue != nullptr)
     200     2738185 :             pqueue->Add(vChecks);
     201     2738620 :     }
     202             : 
     203      140012 :     ~CCheckQueueControl()
     204       70006 :     {
     205       70006 :         if (!fDone)
     206        6981 :             Wait();
     207       70006 :         if (pqueue != nullptr) {
     208       64065 :             LEAVE_CRITICAL_SECTION(pqueue->ControlMutex);
     209       64065 :         }
     210      140012 :     }
     211             : };
     212             : 
     213             : #endif // BITCOIN_CHECKQUEUE_H

Generated by: LCOV version 1.15