Line data Source code
1 : // Copyright (c) 2015-2019 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include <zmq/zmqnotificationinterface.h> 6 : #include <zmq/zmqpublishnotifier.h> 7 : 8 : #include <validation.h> 9 : #include <util/system.h> 10 : 11 1 : void zmqError(const char *str) 12 : { 13 1 : LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); 14 1 : } 15 : 16 6 : CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) 17 6 : { 18 6 : } 19 : 20 6 : CZMQNotificationInterface::~CZMQNotificationInterface() 21 6 : { 22 3 : Shutdown(); 23 : 24 11 : for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) 25 : { 26 8 : delete *i; 27 : } 28 6 : } 29 : 30 1 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const 31 : { 32 1 : std::list<const CZMQAbstractNotifier*> result; 33 5 : for (const auto* n : notifiers) { 34 4 : result.push_back(n); 35 4 : } 36 : return result; 37 1 : } 38 : 39 498 : CZMQNotificationInterface* CZMQNotificationInterface::Create() 40 : { 41 : CZMQNotificationInterface* notificationInterface = nullptr; 42 498 : std::map<std::string, CZMQNotifierFactory> factories; 43 498 : std::list<CZMQAbstractNotifier*> notifiers; 44 : 45 498 : factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; 46 498 : factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; 47 498 : factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; 48 498 : factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; 49 : 50 2490 : for (const auto& entry : factories) 51 : { 52 1992 : std::string arg("-zmq" + entry.first); 53 1992 : if (gArgs.IsArgSet(arg)) 54 : { 55 8 : CZMQNotifierFactory factory = entry.second; 56 8 : std::string address = gArgs.GetArg(arg, ""); 57 8 : CZMQAbstractNotifier *notifier = factory(); 58 8 : notifier->SetType(entry.first); 59 8 : notifier->SetAddress(address); 60 8 : notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); 61 8 : notifiers.push_back(notifier); 62 8 : } 63 1992 : } 64 : 65 498 : if (!notifiers.empty()) 66 : { 67 3 : notificationInterface = new CZMQNotificationInterface(); 68 3 : notificationInterface->notifiers = notifiers; 69 : 70 3 : if (!notificationInterface->Initialize()) 71 : { 72 1 : delete notificationInterface; 73 : notificationInterface = nullptr; 74 1 : } 75 : } 76 : 77 : return notificationInterface; 78 498 : } 79 : 80 : // Called at startup to conditionally set up ZMQ socket(s) 81 3 : bool CZMQNotificationInterface::Initialize() 82 : { 83 3 : int major = 0, minor = 0, patch = 0; 84 3 : zmq_version(&major, &minor, &patch); 85 3 : LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch); 86 : 87 3 : LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); 88 3 : assert(!pcontext); 89 : 90 3 : pcontext = zmq_ctx_new(); 91 : 92 3 : if (!pcontext) 93 : { 94 0 : zmqError("Unable to initialize context"); 95 0 : return false; 96 : } 97 : 98 3 : std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); 99 9 : for (; i!=notifiers.end(); ++i) 100 : { 101 7 : CZMQAbstractNotifier *notifier = *i; 102 7 : if (notifier->Initialize(pcontext)) 103 : { 104 6 : LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); 105 : } 106 : else 107 : { 108 1 : LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); 109 1 : break; 110 : } 111 6 : } 112 : 113 3 : if (i!=notifiers.end()) 114 : { 115 1 : return false; 116 : } 117 : 118 2 : return true; 119 3 : } 120 : 121 : // Called during shutdown sequence 122 3 : void CZMQNotificationInterface::Shutdown() 123 : { 124 3 : LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); 125 3 : if (pcontext) 126 : { 127 11 : for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) 128 : { 129 8 : CZMQAbstractNotifier *notifier = *i; 130 8 : LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); 131 8 : notifier->Shutdown(); 132 : } 133 3 : zmq_ctx_term(pcontext); 134 : 135 3 : pcontext = nullptr; 136 3 : } 137 3 : } 138 : 139 9 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) 140 : { 141 9 : if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones 142 : return; 143 : 144 39 : for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) 145 : { 146 30 : CZMQAbstractNotifier *notifier = *i; 147 30 : if (notifier->NotifyBlock(pindexNew)) 148 : { 149 30 : i++; 150 30 : } 151 : else 152 : { 153 0 : notifier->Shutdown(); 154 0 : i = notifiers.erase(i); 155 : } 156 : } 157 9 : } 158 : 159 20 : void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx) 160 : { 161 : // Used by BlockConnected and BlockDisconnected as well, because they're 162 : // all the same external callback. 163 20 : const CTransaction& tx = *ptx; 164 : 165 76 : for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) 166 : { 167 56 : CZMQAbstractNotifier *notifier = *i; 168 56 : if (notifier->NotifyTransaction(tx)) 169 : { 170 56 : i++; 171 56 : } 172 : else 173 : { 174 0 : notifier->Shutdown(); 175 0 : i = notifiers.erase(i); 176 : } 177 : } 178 20 : } 179 : 180 10 : void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) 181 : { 182 23 : for (const CTransactionRef& ptx : pblock->vtx) { 183 : // Do a normal notify for each transaction added in the block 184 13 : TransactionAddedToMempool(ptx); 185 : } 186 10 : } 187 : 188 3 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) 189 : { 190 7 : for (const CTransactionRef& ptx : pblock->vtx) { 191 : // Do a normal notify for each transaction removed in block disconnection 192 4 : TransactionAddedToMempool(ptx); 193 : } 194 3 : } 195 : 196 : CZMQNotificationInterface* g_zmq_notification_interface = nullptr;