Line data Source code
1 : // Copyright (c) 2012-2020 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include <checkqueue.h> 6 : #include <sync.h> 7 : #include <test/util/setup_common.h> 8 : #include <util/memory.h> 9 : #include <util/system.h> 10 : #include <util/time.h> 11 : 12 : #include <boost/test/unit_test.hpp> 13 : #include <boost/thread/thread.hpp> 14 : 15 : #include <atomic> 16 : #include <condition_variable> 17 : #include <mutex> 18 : #include <thread> 19 : #include <unordered_set> 20 : #include <utility> 21 : #include <vector> 22 : 23 89 : BOOST_FIXTURE_TEST_SUITE(checkqueue_tests, TestingSetup) 24 : 25 : static const unsigned int QUEUE_BATCH_SIZE = 128; 26 : static const int SCRIPT_CHECK_THREADS = 3; 27 : 28 : struct FakeCheck { 29 0 : bool operator()() 30 : { 31 0 : return true; 32 : } 33 0 : void swap(FakeCheck& x){}; 34 : }; 35 : 36 : struct FakeCheckCheckCompletion { 37 : static std::atomic<size_t> n_calls; 38 10815917 : bool operator()() 39 : { 40 10815917 : n_calls.fetch_add(1, std::memory_order_relaxed); 41 10815917 : return true; 42 : } 43 21731774 : void swap(FakeCheckCheckCompletion& x){}; 44 : }; 45 : 46 : struct FailingCheck { 47 : bool fails; 48 1001080 : FailingCheck(bool _fails) : fails(_fails){}; 49 2010000 : FailingCheck() : fails(true){}; 50 487490 : bool operator()() 51 : { 52 487490 : return !fails; 53 : } 54 1005000 : void swap(FailingCheck& x) 55 : { 56 1005000 : std::swap(fails, x.fails); 57 1005000 : }; 58 : }; 59 : 60 : struct UniqueCheck { 61 : static Mutex m; 62 : static std::unordered_multiset<size_t> results GUARDED_BY(m); 63 : size_t check_id; 64 200000 : UniqueCheck(size_t check_id_in) : check_id(check_id_in){}; 65 400000 : UniqueCheck() : check_id(0){}; 66 99766 : bool operator()() 67 : { 68 99766 : LOCK(m); 69 99766 : results.insert(check_id); 70 : return true; 71 100000 : } 72 200000 : void swap(UniqueCheck& x) { std::swap(x.check_id, check_id); }; 73 : }; 74 : 75 : 76 : struct MemoryCheck { 77 : static std::atomic<size_t> fake_allocated_memory; 78 2053910 : bool b {false}; 79 497411 : bool operator()() 80 : { 81 497411 : return true; 82 : } 83 1998000 : MemoryCheck(){}; 84 2109820 : MemoryCheck(const MemoryCheck& x) 85 1054910 : { 86 : // We have to do this to make sure that destructor calls are paired 87 : // 88 : // Really, copy constructor should be deletable, but CCheckQueue breaks 89 : // if it is deleted because of internal push_back. 90 1054910 : fake_allocated_memory.fetch_add(b, std::memory_order_relaxed); 91 2109820 : }; 92 999000 : MemoryCheck(bool b_) : b(b_) 93 499500 : { 94 499500 : fake_allocated_memory.fetch_add(b, std::memory_order_relaxed); 95 999000 : }; 96 5048341 : ~MemoryCheck() 97 2524313 : { 98 2524028 : fake_allocated_memory.fetch_sub(b, std::memory_order_relaxed); 99 5048341 : }; 100 999000 : void swap(MemoryCheck& x) { std::swap(b, x.b); }; 101 : }; 102 : 103 : struct FrozenCleanupCheck { 104 : static std::atomic<uint64_t> nFrozen; 105 : static std::condition_variable cv; 106 : static std::mutex m; 107 : // Freezing can't be the default initialized behavior given how the queue 108 : // swaps in default initialized Checks. 109 3 : bool should_freeze {false}; 110 1 : bool operator()() 111 : { 112 1 : return true; 113 : } 114 6 : FrozenCleanupCheck() {} 115 8 : ~FrozenCleanupCheck() 116 4 : { 117 4 : if (should_freeze) { 118 1 : std::unique_lock<std::mutex> l(m); 119 1 : nFrozen.store(1, std::memory_order_relaxed); 120 1 : cv.notify_one(); 121 3 : cv.wait(l, []{ return nFrozen.load(std::memory_order_relaxed) == 0;}); 122 1 : } 123 8 : } 124 2 : void swap(FrozenCleanupCheck& x){std::swap(should_freeze, x.should_freeze);}; 125 : }; 126 : 127 : // Static Allocations 128 89 : std::mutex FrozenCleanupCheck::m{}; 129 : std::atomic<uint64_t> FrozenCleanupCheck::nFrozen{0}; 130 89 : std::condition_variable FrozenCleanupCheck::cv{}; 131 89 : Mutex UniqueCheck::m; 132 89 : std::unordered_multiset<size_t> UniqueCheck::results; 133 : std::atomic<size_t> FakeCheckCheckCompletion::n_calls{0}; 134 : std::atomic<size_t> MemoryCheck::fake_allocated_memory{0}; 135 : 136 : // Queue Typedefs 137 : typedef CCheckQueue<FakeCheckCheckCompletion> Correct_Queue; 138 : typedef CCheckQueue<FakeCheck> Standard_Queue; 139 : typedef CCheckQueue<FailingCheck> Failing_Queue; 140 : typedef CCheckQueue<UniqueCheck> Unique_Queue; 141 : typedef CCheckQueue<MemoryCheck> Memory_Queue; 142 : typedef CCheckQueue<FrozenCleanupCheck> FrozenCleanup_Queue; 143 : 144 : 145 : /** This test case checks that the CCheckQueue works properly 146 : * with each specified size_t Checks pushed. 147 : */ 148 4 : static void Correct_Queue_range(std::vector<size_t> range) 149 : { 150 4 : auto small_queue = MakeUnique<Correct_Queue>(QUEUE_BATCH_SIZE); 151 4 : boost::thread_group tg; 152 16 : for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { 153 23 : tg.create_thread([&]{small_queue->Thread();}); 154 : } 155 : // Make vChecks here to save on malloc (this test can be slow...) 156 4 : std::vector<FakeCheckCheckCompletion> vChecks; 157 211 : for (const size_t i : range) { 158 207 : size_t total = i; 159 207 : FakeCheckCheckCompletion::n_calls = 0; 160 207 : CCheckQueueControl<FakeCheckCheckCompletion> control(small_queue.get()); 161 2416891 : while (total) { 162 2416684 : vChecks.resize(std::min(total, (size_t) InsecureRandRange(10))); 163 2416684 : total -= vChecks.size(); 164 2416684 : control.Add(vChecks); 165 : } 166 207 : BOOST_REQUIRE(control.Wait()); 167 207 : if (FakeCheckCheckCompletion::n_calls != i) { 168 0 : BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); 169 : } 170 207 : } 171 4 : tg.interrupt_all(); 172 4 : tg.join_all(); 173 4 : } 174 : 175 : /** Test that 0 checks is correct 176 : */ 177 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Zero) 178 : { 179 1 : std::vector<size_t> range; 180 1 : range.push_back((size_t)0); 181 1 : Correct_Queue_range(range); 182 1 : } 183 : /** Test that 1 check is correct 184 : */ 185 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_One) 186 : { 187 1 : std::vector<size_t> range; 188 1 : range.push_back((size_t)1); 189 1 : Correct_Queue_range(range); 190 1 : } 191 : /** Test that MAX check is correct 192 : */ 193 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Max) 194 : { 195 1 : std::vector<size_t> range; 196 1 : range.push_back(100000); 197 1 : Correct_Queue_range(range); 198 1 : } 199 : /** Test that random numbers of checks are correct 200 : */ 201 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) 202 : { 203 1 : std::vector<size_t> range; 204 1 : range.reserve(100000/1000); 205 205 : for (size_t i = 2; i < 100000; i += std::max((size_t)1, (size_t)InsecureRandRange(std::min((size_t)1000, ((size_t)100000) - i)))) 206 204 : range.push_back(i); 207 1 : Correct_Queue_range(range); 208 1 : } 209 : 210 : 211 : /** Test that failing checks are caught */ 212 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) 213 : { 214 1 : auto fail_queue = MakeUnique<Failing_Queue>(QUEUE_BATCH_SIZE); 215 : 216 1 : boost::thread_group tg; 217 4 : for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { 218 6 : tg.create_thread([&]{fail_queue->Thread();}); 219 : } 220 : 221 1002 : for (size_t i = 0; i < 1001; ++i) { 222 1001 : CCheckQueueControl<FailingCheck> control(fail_queue.get()); 223 112912 : size_t remaining = i; 224 112912 : while (remaining) { 225 111911 : size_t r = InsecureRandRange(10); 226 : 227 111911 : std::vector<FailingCheck> vChecks; 228 111911 : vChecks.reserve(r); 229 612411 : for (size_t k = 0; k < r && remaining; k++, remaining--) 230 500500 : vChecks.emplace_back(remaining == 1); 231 111911 : control.Add(vChecks); 232 111911 : } 233 1001 : bool success = control.Wait(); 234 1001 : if (i > 0) { 235 1000 : BOOST_REQUIRE(!success); 236 1 : } else if (i == 0) { 237 1 : BOOST_REQUIRE(success); 238 : } 239 1001 : } 240 1 : tg.interrupt_all(); 241 1 : tg.join_all(); 242 1 : } 243 : // Test that a block validation which fails does not interfere with 244 : // future blocks, ie, the bad state is cleared. 245 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) 246 : { 247 1 : auto fail_queue = MakeUnique<Failing_Queue>(QUEUE_BATCH_SIZE); 248 1 : boost::thread_group tg; 249 4 : for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { 250 6 : tg.create_thread([&]{fail_queue->Thread();}); 251 : } 252 : 253 11 : for (auto times = 0; times < 10; ++times) { 254 30 : for (const bool end_fails : {true, false}) { 255 20 : CCheckQueueControl<FailingCheck> control(fail_queue.get()); 256 : { 257 20 : std::vector<FailingCheck> vChecks; 258 20 : vChecks.resize(100, false); 259 20 : vChecks[99] = end_fails; 260 20 : control.Add(vChecks); 261 20 : } 262 20 : bool r =control.Wait(); 263 20 : BOOST_REQUIRE(r != end_fails); 264 20 : } 265 : } 266 1 : tg.interrupt_all(); 267 1 : tg.join_all(); 268 1 : } 269 : 270 : // Test that unique checks are actually all called individually, rather than 271 : // just one check being called repeatedly. Test that checks are not called 272 : // more than once as well 273 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) 274 : { 275 1 : auto queue = MakeUnique<Unique_Queue>(QUEUE_BATCH_SIZE); 276 1 : boost::thread_group tg; 277 4 : for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { 278 5 : tg.create_thread([&]{queue->Thread();}); 279 : 280 : } 281 : 282 1 : size_t COUNT = 100000; 283 1 : size_t total = COUNT; 284 : { 285 1 : CCheckQueueControl<UniqueCheck> control(queue.get()); 286 22181 : while (total) { 287 22180 : size_t r = InsecureRandRange(10); 288 22180 : std::vector<UniqueCheck> vChecks; 289 122180 : for (size_t k = 0; k < r && total; k++) 290 100000 : vChecks.emplace_back(--total); 291 22180 : control.Add(vChecks); 292 22180 : } 293 1 : } 294 : { 295 1 : LOCK(UniqueCheck::m); 296 : bool r = true; 297 1 : BOOST_REQUIRE_EQUAL(UniqueCheck::results.size(), COUNT); 298 100001 : for (size_t i = 0; i < COUNT; ++i) { 299 100000 : r = r && UniqueCheck::results.count(i) == 1; 300 : } 301 1 : BOOST_REQUIRE(r); 302 1 : } 303 1 : tg.interrupt_all(); 304 1 : tg.join_all(); 305 1 : } 306 : 307 : 308 : // Test that blocks which might allocate lots of memory free their memory aggressively. 309 : // 310 : // This test attempts to catch a pathological case where by lazily freeing 311 : // checks might mean leaving a check un-swapped out, and decreasing by 1 each 312 : // time could leave the data hanging across a sequence of blocks. 313 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) 314 : { 315 1 : auto queue = MakeUnique<Memory_Queue>(QUEUE_BATCH_SIZE); 316 1 : boost::thread_group tg; 317 4 : for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { 318 6 : tg.create_thread([&]{queue->Thread();}); 319 : } 320 1001 : for (size_t i = 0; i < 1000; ++i) { 321 112704 : size_t total = i; 322 : { 323 1000 : CCheckQueueControl<MemoryCheck> control(queue.get()); 324 112704 : while (total) { 325 111704 : size_t r = InsecureRandRange(10); 326 111704 : std::vector<MemoryCheck> vChecks; 327 611204 : for (size_t k = 0; k < r && total; k++) { 328 499500 : total--; 329 : // Each iteration leaves data at the front, back, and middle 330 : // to catch any sort of deallocation failure 331 499500 : vChecks.emplace_back(total == 0 || total == i || total == i/2); 332 : } 333 111704 : control.Add(vChecks); 334 111704 : } 335 1000 : } 336 1000 : BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); 337 : } 338 1 : tg.interrupt_all(); 339 1 : tg.join_all(); 340 1 : } 341 : 342 : // Test that a new verification cannot occur until all checks 343 : // have been destructed 344 95 : BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) 345 : { 346 1 : auto queue = MakeUnique<FrozenCleanup_Queue>(QUEUE_BATCH_SIZE); 347 1 : boost::thread_group tg; 348 : bool fails = false; 349 4 : for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { 350 6 : tg.create_thread([&]{queue->Thread();}); 351 : } 352 2 : std::thread t0([&]() { 353 1 : CCheckQueueControl<FrozenCleanupCheck> control(queue.get()); 354 1 : std::vector<FrozenCleanupCheck> vChecks(1); 355 : // Freezing can't be the default initialized behavior given how the queue 356 : // swaps in default initialized Checks (otherwise freezing destructor 357 : // would get called twice). 358 1 : vChecks[0].should_freeze = true; 359 1 : control.Add(vChecks); 360 1 : bool waitResult = control.Wait(); // Hangs here 361 1 : assert(waitResult); 362 1 : }); 363 : { 364 1 : std::unique_lock<std::mutex> l(FrozenCleanupCheck::m); 365 : // Wait until the queue has finished all jobs and frozen 366 3 : FrozenCleanupCheck::cv.wait(l, [](){return FrozenCleanupCheck::nFrozen == 1;}); 367 1 : } 368 : // Try to get control of the queue a bunch of times 369 101 : for (auto x = 0; x < 100 && !fails; ++x) { 370 100 : fails = queue->ControlMutex.try_lock(); 371 : } 372 : { 373 : // Unfreeze (we need lock n case of spurious wakeup) 374 1 : std::unique_lock<std::mutex> l(FrozenCleanupCheck::m); 375 1 : FrozenCleanupCheck::nFrozen = 0; 376 1 : } 377 : // Awaken frozen destructor 378 1 : FrozenCleanupCheck::cv.notify_one(); 379 : // Wait for control to finish 380 1 : t0.join(); 381 1 : tg.interrupt_all(); 382 1 : tg.join_all(); 383 1 : BOOST_REQUIRE(!fails); 384 1 : } 385 : 386 : 387 : /** Test that CCheckQueueControl is threadsafe */ 388 95 : BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) 389 : { 390 1 : auto queue = MakeUnique<Standard_Queue>(QUEUE_BATCH_SIZE); 391 : { 392 1 : boost::thread_group tg; 393 1 : std::atomic<int> nThreads {0}; 394 1 : std::atomic<int> fails {0}; 395 4 : for (size_t i = 0; i < 3; ++i) { 396 3 : tg.create_thread( 397 6 : [&]{ 398 3 : CCheckQueueControl<FakeCheck> control(queue.get()); 399 : // While sleeping, no other thread should execute to this point 400 3 : auto observed = ++nThreads; 401 3 : UninterruptibleSleep(std::chrono::milliseconds{10}); 402 3 : fails += observed != nThreads; 403 3 : }); 404 : } 405 1 : tg.join_all(); 406 1 : BOOST_REQUIRE_EQUAL(fails, 0); 407 1 : } 408 : { 409 1 : boost::thread_group tg; 410 1 : std::mutex m; 411 1 : std::condition_variable cv; 412 1 : bool has_lock{false}; 413 1 : bool has_tried{false}; 414 1 : bool done{false}; 415 1 : bool done_ack{false}; 416 : { 417 1 : std::unique_lock<std::mutex> l(m); 418 2 : tg.create_thread([&]{ 419 1 : CCheckQueueControl<FakeCheck> control(queue.get()); 420 1 : std::unique_lock<std::mutex> ll(m); 421 1 : has_lock = true; 422 1 : cv.notify_one(); 423 3 : cv.wait(ll, [&]{return has_tried;}); 424 1 : done = true; 425 1 : cv.notify_one(); 426 : // Wait until the done is acknowledged 427 : // 428 3 : cv.wait(ll, [&]{return done_ack;}); 429 1 : }); 430 : // Wait for thread to get the lock 431 3 : cv.wait(l, [&](){return has_lock;}); 432 : bool fails = false; 433 101 : for (auto x = 0; x < 100 && !fails; ++x) { 434 100 : fails = queue->ControlMutex.try_lock(); 435 : } 436 1 : has_tried = true; 437 1 : cv.notify_one(); 438 3 : cv.wait(l, [&](){return done;}); 439 : // Acknowledge the done 440 1 : done_ack = true; 441 1 : cv.notify_one(); 442 1 : BOOST_REQUIRE(!fails); 443 1 : } 444 1 : tg.join_all(); 445 1 : } 446 1 : } 447 89 : BOOST_AUTO_TEST_SUITE_END() 448 :