LCOV - code coverage report
Current view: top level - src/zmq - zmqnotificationinterface.cpp (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 99 105 94.3 %
Date: 2020-09-26 01:30:44 Functions: 13 14 92.9 %

          Line data    Source code
       1             : // Copyright (c) 2015-2019 The Bitcoin Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #include <zmq/zmqnotificationinterface.h>
       6             : #include <zmq/zmqpublishnotifier.h>
       7             : 
       8             : #include <validation.h>
       9             : #include <util/system.h>
      10             : 
      11           1 : void zmqError(const char *str)
      12             : {
      13           1 :     LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
      14           1 : }
      15             : 
      16           6 : CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr)
      17           6 : {
      18           6 : }
      19             : 
      20           6 : CZMQNotificationInterface::~CZMQNotificationInterface()
      21           6 : {
      22           3 :     Shutdown();
      23             : 
      24          11 :     for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
      25             :     {
      26           8 :         delete *i;
      27             :     }
      28           6 : }
      29             : 
      30           1 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
      31             : {
      32           1 :     std::list<const CZMQAbstractNotifier*> result;
      33           5 :     for (const auto* n : notifiers) {
      34           4 :         result.push_back(n);
      35           4 :     }
      36             :     return result;
      37           1 : }
      38             : 
      39         498 : CZMQNotificationInterface* CZMQNotificationInterface::Create()
      40             : {
      41             :     CZMQNotificationInterface* notificationInterface = nullptr;
      42         498 :     std::map<std::string, CZMQNotifierFactory> factories;
      43         498 :     std::list<CZMQAbstractNotifier*> notifiers;
      44             : 
      45         498 :     factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
      46         498 :     factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
      47         498 :     factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
      48         498 :     factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
      49             : 
      50        2490 :     for (const auto& entry : factories)
      51             :     {
      52        1992 :         std::string arg("-zmq" + entry.first);
      53        1992 :         if (gArgs.IsArgSet(arg))
      54             :         {
      55           8 :             CZMQNotifierFactory factory = entry.second;
      56           8 :             std::string address = gArgs.GetArg(arg, "");
      57           8 :             CZMQAbstractNotifier *notifier = factory();
      58           8 :             notifier->SetType(entry.first);
      59           8 :             notifier->SetAddress(address);
      60           8 :             notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
      61           8 :             notifiers.push_back(notifier);
      62           8 :         }
      63        1992 :     }
      64             : 
      65         498 :     if (!notifiers.empty())
      66             :     {
      67           3 :         notificationInterface = new CZMQNotificationInterface();
      68           3 :         notificationInterface->notifiers = notifiers;
      69             : 
      70           3 :         if (!notificationInterface->Initialize())
      71             :         {
      72           1 :             delete notificationInterface;
      73             :             notificationInterface = nullptr;
      74           1 :         }
      75             :     }
      76             : 
      77             :     return notificationInterface;
      78         498 : }
      79             : 
      80             : // Called at startup to conditionally set up ZMQ socket(s)
      81           3 : bool CZMQNotificationInterface::Initialize()
      82             : {
      83           3 :     int major = 0, minor = 0, patch = 0;
      84           3 :     zmq_version(&major, &minor, &patch);
      85           3 :     LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
      86             : 
      87           3 :     LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
      88           3 :     assert(!pcontext);
      89             : 
      90           3 :     pcontext = zmq_ctx_new();
      91             : 
      92           3 :     if (!pcontext)
      93             :     {
      94           0 :         zmqError("Unable to initialize context");
      95           0 :         return false;
      96             :     }
      97             : 
      98           3 :     std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
      99           9 :     for (; i!=notifiers.end(); ++i)
     100             :     {
     101           7 :         CZMQAbstractNotifier *notifier = *i;
     102           7 :         if (notifier->Initialize(pcontext))
     103             :         {
     104           6 :             LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
     105             :         }
     106             :         else
     107             :         {
     108           1 :             LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
     109           1 :             break;
     110             :         }
     111           6 :     }
     112             : 
     113           3 :     if (i!=notifiers.end())
     114             :     {
     115           1 :         return false;
     116             :     }
     117             : 
     118           2 :     return true;
     119           3 : }
     120             : 
     121             : // Called during shutdown sequence
     122           3 : void CZMQNotificationInterface::Shutdown()
     123             : {
     124           3 :     LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
     125           3 :     if (pcontext)
     126             :     {
     127          11 :         for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
     128             :         {
     129           8 :             CZMQAbstractNotifier *notifier = *i;
     130           8 :             LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
     131           8 :             notifier->Shutdown();
     132             :         }
     133           3 :         zmq_ctx_term(pcontext);
     134             : 
     135           3 :         pcontext = nullptr;
     136           3 :     }
     137           3 : }
     138             : 
     139           9 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
     140             : {
     141           9 :     if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
     142             :         return;
     143             : 
     144          39 :     for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
     145             :     {
     146          30 :         CZMQAbstractNotifier *notifier = *i;
     147          30 :         if (notifier->NotifyBlock(pindexNew))
     148             :         {
     149          30 :             i++;
     150          30 :         }
     151             :         else
     152             :         {
     153           0 :             notifier->Shutdown();
     154           0 :             i = notifiers.erase(i);
     155             :         }
     156             :     }
     157           9 : }
     158             : 
     159          20 : void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx)
     160             : {
     161             :     // Used by BlockConnected and BlockDisconnected as well, because they're
     162             :     // all the same external callback.
     163          20 :     const CTransaction& tx = *ptx;
     164             : 
     165          76 :     for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
     166             :     {
     167          56 :         CZMQAbstractNotifier *notifier = *i;
     168          56 :         if (notifier->NotifyTransaction(tx))
     169             :         {
     170          56 :             i++;
     171          56 :         }
     172             :         else
     173             :         {
     174           0 :             notifier->Shutdown();
     175           0 :             i = notifiers.erase(i);
     176             :         }
     177             :     }
     178          20 : }
     179             : 
     180          10 : void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
     181             : {
     182          23 :     for (const CTransactionRef& ptx : pblock->vtx) {
     183             :         // Do a normal notify for each transaction added in the block
     184          13 :         TransactionAddedToMempool(ptx);
     185             :     }
     186          10 : }
     187             : 
     188           3 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
     189             : {
     190           7 :     for (const CTransactionRef& ptx : pblock->vtx) {
     191             :         // Do a normal notify for each transaction removed in block disconnection
     192           4 :         TransactionAddedToMempool(ptx);
     193             :     }
     194           3 : }
     195             : 
     196             : CZMQNotificationInterface* g_zmq_notification_interface = nullptr;

Generated by: LCOV version 1.15