LCOV - code coverage report
Current view: top level - src/zmq - zmqpublishnotifier.cpp (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 106 124 85.5 %
Date: 2020-09-26 01:30:44 Functions: 9 9 100.0 %

          Line data    Source code
       1             : // Copyright (c) 2015-2020 The Bitcoin Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #include <chain.h>
       6             : #include <chainparams.h>
       7             : #include <streams.h>
       8             : #include <zmq/zmqpublishnotifier.h>
       9             : #include <validation.h>
      10             : #include <util/system.h>
      11             : #include <rpc/server.h>
      12             : 
      13         640 : static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
      14             : 
      15             : static const char *MSG_HASHBLOCK = "hashblock";
      16             : static const char *MSG_HASHTX    = "hashtx";
      17             : static const char *MSG_RAWBLOCK  = "rawblock";
      18             : static const char *MSG_RAWTX     = "rawtx";
      19             : 
      20             : // Internal function to send multipart message
      21          43 : static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
      22             : {
      23          43 :     va_list args;
      24          43 :     va_start(args, size);
      25             : 
      26          43 :     while (1)
      27             :     {
      28         129 :         zmq_msg_t msg;
      29             : 
      30         129 :         int rc = zmq_msg_init_size(&msg, size);
      31         129 :         if (rc != 0)
      32             :         {
      33           0 :             zmqError("Unable to initialize ZMQ msg");
      34           0 :             va_end(args);
      35           0 :             return -1;
      36             :         }
      37             : 
      38         129 :         void *buf = zmq_msg_data(&msg);
      39         129 :         memcpy(buf, data, size);
      40             : 
      41         129 :         data = va_arg(args, const void*);
      42             : 
      43         129 :         rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
      44         129 :         if (rc == -1)
      45             :         {
      46           0 :             zmqError("Unable to send ZMQ msg");
      47           0 :             zmq_msg_close(&msg);
      48           0 :             va_end(args);
      49           0 :             return -1;
      50             :         }
      51             : 
      52         129 :         zmq_msg_close(&msg);
      53             : 
      54         129 :         if (!data)
      55          43 :             break;
      56             : 
      57          86 :         size = va_arg(args, size_t);
      58         215 :     }
      59          43 :     va_end(args);
      60          43 :     return 0;
      61          43 : }
      62             : 
      63           7 : bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
      64             : {
      65           7 :     assert(!psocket);
      66             : 
      67             :     // check if address is being used by other publish notifier
      68           7 :     std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
      69             : 
      70           7 :     if (i==mapPublishNotifiers.end())
      71             :     {
      72           3 :         psocket = zmq_socket(pcontext, ZMQ_PUB);
      73           3 :         if (!psocket)
      74             :         {
      75           0 :             zmqError("Failed to create socket");
      76           0 :             return false;
      77             :         }
      78             : 
      79           3 :         LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
      80             : 
      81           3 :         int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
      82           3 :         if (rc != 0)
      83             :         {
      84           0 :             zmqError("Failed to set outbound message high water mark");
      85           0 :             zmq_close(psocket);
      86           0 :             return false;
      87             :         }
      88             : 
      89           3 :         const int so_keepalive_option {1};
      90           3 :         rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
      91           3 :         if (rc != 0) {
      92           0 :             zmqError("Failed to set SO_KEEPALIVE");
      93           0 :             zmq_close(psocket);
      94           0 :             return false;
      95             :         }
      96             : 
      97           3 :         rc = zmq_bind(psocket, address.c_str());
      98           3 :         if (rc != 0)
      99             :         {
     100           1 :             zmqError("Failed to bind address");
     101           1 :             zmq_close(psocket);
     102           1 :             return false;
     103             :         }
     104             : 
     105             :         // register this notifier for the address, so it can be reused for other publish notifier
     106           2 :         mapPublishNotifiers.insert(std::make_pair(address, this));
     107           2 :         return true;
     108           3 :     }
     109             :     else
     110             :     {
     111           4 :         LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
     112           4 :         LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
     113             : 
     114           4 :         psocket = i->second->psocket;
     115           4 :         mapPublishNotifiers.insert(std::make_pair(address, this));
     116             : 
     117           4 :         return true;
     118             :     }
     119           7 : }
     120             : 
     121           8 : void CZMQAbstractPublishNotifier::Shutdown()
     122             : {
     123             :     // Early return if Initialize was not called
     124           8 :     if (!psocket) return;
     125             : 
     126           7 :     int count = mapPublishNotifiers.count(address);
     127             : 
     128             :     // remove this notifier from the list of publishers using this address
     129             :     typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
     130           7 :     std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
     131             : 
     132          13 :     for (iterator it = iterpair.first; it != iterpair.second; ++it)
     133             :     {
     134           6 :         if (it->second==this)
     135             :         {
     136           6 :             mapPublishNotifiers.erase(it);
     137           6 :             break;
     138             :         }
     139             :     }
     140             : 
     141           7 :     if (count == 1)
     142             :     {
     143           2 :         LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
     144           2 :         int linger = 0;
     145           2 :         zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
     146           2 :         zmq_close(psocket);
     147           2 :     }
     148             : 
     149           7 :     psocket = nullptr;
     150           8 : }
     151             : 
     152          43 : bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
     153             : {
     154          43 :     assert(psocket);
     155             : 
     156             :     /* send three parts, command & data & a LE 4byte sequence number */
     157          43 :     unsigned char msgseq[sizeof(uint32_t)];
     158          43 :     WriteLE32(&msgseq[0], nSequence);
     159          43 :     int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
     160          43 :     if (rc == -1)
     161           0 :         return false;
     162             : 
     163             :     /* increment memory only sequence number after sending */
     164          43 :     nSequence++;
     165             : 
     166          43 :     return true;
     167          43 : }
     168             : 
     169           9 : bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
     170             : {
     171           9 :     uint256 hash = pindex->GetBlockHash();
     172           9 :     LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
     173           9 :     char data[32];
     174         297 :     for (unsigned int i = 0; i < 32; i++)
     175         288 :         data[31 - i] = hash.begin()[i];
     176          18 :     return SendMessage(MSG_HASHBLOCK, data, 32);
     177           9 : }
     178             : 
     179          20 : bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
     180             : {
     181          20 :     uint256 hash = transaction.GetHash();
     182          20 :     LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
     183          20 :     char data[32];
     184         660 :     for (unsigned int i = 0; i < 32; i++)
     185         640 :         data[31 - i] = hash.begin()[i];
     186          40 :     return SendMessage(MSG_HASHTX, data, 32);
     187          20 : }
     188             : 
     189           6 : bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
     190             : {
     191           6 :     LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
     192             : 
     193           6 :     const Consensus::Params& consensusParams = Params().GetConsensus();
     194           6 :     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
     195             :     {
     196           6 :         LOCK(cs_main);
     197           6 :         CBlock block;
     198           6 :         if(!ReadBlockFromDisk(block, pindex, consensusParams))
     199             :         {
     200           0 :             zmqError("Can't read block from disk");
     201           0 :             return false;
     202             :         }
     203             : 
     204           6 :         ss << block;
     205           6 :     }
     206             : 
     207           6 :     return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
     208           6 : }
     209             : 
     210           8 : bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
     211             : {
     212           8 :     uint256 hash = transaction.GetHash();
     213           8 :     LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
     214           8 :     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
     215           8 :     ss << transaction;
     216           8 :     return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
     217           8 : }

Generated by: LCOV version 1.15