Line data Source code
1 : // Copyright (c) 2015-2020 The Bitcoin Core developers
2 : // Distributed under the MIT software license, see the accompanying
3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 :
5 : #include <chain.h>
6 : #include <chainparams.h>
7 : #include <streams.h>
8 : #include <zmq/zmqpublishnotifier.h>
9 : #include <validation.h>
10 : #include <util/system.h>
11 : #include <rpc/server.h>
12 :
13 640 : static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
14 :
15 : static const char *MSG_HASHBLOCK = "hashblock";
16 : static const char *MSG_HASHTX = "hashtx";
17 : static const char *MSG_RAWBLOCK = "rawblock";
18 : static const char *MSG_RAWTX = "rawtx";
19 :
20 : // Internal function to send multipart message
21 43 : static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 : {
23 43 : va_list args;
24 43 : va_start(args, size);
25 :
26 43 : while (1)
27 : {
28 129 : zmq_msg_t msg;
29 :
30 129 : int rc = zmq_msg_init_size(&msg, size);
31 129 : if (rc != 0)
32 : {
33 0 : zmqError("Unable to initialize ZMQ msg");
34 0 : va_end(args);
35 0 : return -1;
36 : }
37 :
38 129 : void *buf = zmq_msg_data(&msg);
39 129 : memcpy(buf, data, size);
40 :
41 129 : data = va_arg(args, const void*);
42 :
43 129 : rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
44 129 : if (rc == -1)
45 : {
46 0 : zmqError("Unable to send ZMQ msg");
47 0 : zmq_msg_close(&msg);
48 0 : va_end(args);
49 0 : return -1;
50 : }
51 :
52 129 : zmq_msg_close(&msg);
53 :
54 129 : if (!data)
55 43 : break;
56 :
57 86 : size = va_arg(args, size_t);
58 215 : }
59 43 : va_end(args);
60 43 : return 0;
61 43 : }
62 :
63 7 : bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
64 : {
65 7 : assert(!psocket);
66 :
67 : // check if address is being used by other publish notifier
68 7 : std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
69 :
70 7 : if (i==mapPublishNotifiers.end())
71 : {
72 3 : psocket = zmq_socket(pcontext, ZMQ_PUB);
73 3 : if (!psocket)
74 : {
75 0 : zmqError("Failed to create socket");
76 0 : return false;
77 : }
78 :
79 3 : LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
80 :
81 3 : int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
82 3 : if (rc != 0)
83 : {
84 0 : zmqError("Failed to set outbound message high water mark");
85 0 : zmq_close(psocket);
86 0 : return false;
87 : }
88 :
89 3 : const int so_keepalive_option {1};
90 3 : rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
91 3 : if (rc != 0) {
92 0 : zmqError("Failed to set SO_KEEPALIVE");
93 0 : zmq_close(psocket);
94 0 : return false;
95 : }
96 :
97 3 : rc = zmq_bind(psocket, address.c_str());
98 3 : if (rc != 0)
99 : {
100 1 : zmqError("Failed to bind address");
101 1 : zmq_close(psocket);
102 1 : return false;
103 : }
104 :
105 : // register this notifier for the address, so it can be reused for other publish notifier
106 2 : mapPublishNotifiers.insert(std::make_pair(address, this));
107 2 : return true;
108 3 : }
109 : else
110 : {
111 4 : LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
112 4 : LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
113 :
114 4 : psocket = i->second->psocket;
115 4 : mapPublishNotifiers.insert(std::make_pair(address, this));
116 :
117 4 : return true;
118 : }
119 7 : }
120 :
121 8 : void CZMQAbstractPublishNotifier::Shutdown()
122 : {
123 : // Early return if Initialize was not called
124 8 : if (!psocket) return;
125 :
126 7 : int count = mapPublishNotifiers.count(address);
127 :
128 : // remove this notifier from the list of publishers using this address
129 : typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
130 7 : std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
131 :
132 13 : for (iterator it = iterpair.first; it != iterpair.second; ++it)
133 : {
134 6 : if (it->second==this)
135 : {
136 6 : mapPublishNotifiers.erase(it);
137 6 : break;
138 : }
139 : }
140 :
141 7 : if (count == 1)
142 : {
143 2 : LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
144 2 : int linger = 0;
145 2 : zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
146 2 : zmq_close(psocket);
147 2 : }
148 :
149 7 : psocket = nullptr;
150 8 : }
151 :
152 43 : bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
153 : {
154 43 : assert(psocket);
155 :
156 : /* send three parts, command & data & a LE 4byte sequence number */
157 43 : unsigned char msgseq[sizeof(uint32_t)];
158 43 : WriteLE32(&msgseq[0], nSequence);
159 43 : int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
160 43 : if (rc == -1)
161 0 : return false;
162 :
163 : /* increment memory only sequence number after sending */
164 43 : nSequence++;
165 :
166 43 : return true;
167 43 : }
168 :
169 9 : bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
170 : {
171 9 : uint256 hash = pindex->GetBlockHash();
172 9 : LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
173 9 : char data[32];
174 297 : for (unsigned int i = 0; i < 32; i++)
175 288 : data[31 - i] = hash.begin()[i];
176 18 : return SendMessage(MSG_HASHBLOCK, data, 32);
177 9 : }
178 :
179 20 : bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
180 : {
181 20 : uint256 hash = transaction.GetHash();
182 20 : LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
183 20 : char data[32];
184 660 : for (unsigned int i = 0; i < 32; i++)
185 640 : data[31 - i] = hash.begin()[i];
186 40 : return SendMessage(MSG_HASHTX, data, 32);
187 20 : }
188 :
189 6 : bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
190 : {
191 6 : LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
192 :
193 6 : const Consensus::Params& consensusParams = Params().GetConsensus();
194 6 : CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
195 : {
196 6 : LOCK(cs_main);
197 6 : CBlock block;
198 6 : if(!ReadBlockFromDisk(block, pindex, consensusParams))
199 : {
200 0 : zmqError("Can't read block from disk");
201 0 : return false;
202 : }
203 :
204 6 : ss << block;
205 6 : }
206 :
207 6 : return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
208 6 : }
209 :
210 8 : bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
211 : {
212 8 : uint256 hash = transaction.GetHash();
213 8 : LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
214 8 : CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
215 8 : ss << transaction;
216 8 : return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
217 8 : }
|