mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-29 23:45:51 +00:00
Compare commits
16 Commits
ximinez/on
...
ximinez/ac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fde000f3eb | ||
|
|
d0a62229da | ||
|
|
d5932cc7d4 | ||
|
|
0b534da781 | ||
|
|
71a70d343b | ||
|
|
0899e65030 | ||
|
|
31ba529761 | ||
|
|
e2c6e5ebb6 | ||
|
|
9d807fce48 | ||
|
|
9ef160765c | ||
|
|
d6c0eb243b | ||
|
|
84c9fc123c | ||
|
|
00a2a58cfa | ||
|
|
bb2098d873 | ||
|
|
46a5bc74db | ||
|
|
7b72b9cc82 |
@@ -1051,11 +1051,10 @@
|
||||
# The online delete process checks periodically
|
||||
# that rippled is still in sync with the network,
|
||||
# and that the validated ledger is less than
|
||||
# 'age_threshold_seconds' old, and that all
|
||||
# recent ledgers are available. If not, then continue
|
||||
# 'age_threshold_seconds' old. If not, then continue
|
||||
# sleeping for this number of seconds and
|
||||
# checking until healthy.
|
||||
# Default is 1.
|
||||
# Default is 5.
|
||||
#
|
||||
# Notes:
|
||||
# The 'node_db' entry configures the primary, persistent storage.
|
||||
|
||||
139
include/xrpl/basics/CanProcess.h
Normal file
139
include/xrpl/basics/CanProcess.h
Normal file
@@ -0,0 +1,139 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
||||
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
||||
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
|
||||
/** RAII class to check if an Item is already being processed on another thread,
|
||||
* as indicated by it's presence in a Collection.
|
||||
*
|
||||
* If the Item is not in the Collection, it will be added under lock in the
|
||||
* ctor, and removed under lock in the dtor. The object will be considered
|
||||
* "usable" and evaluate to `true`.
|
||||
*
|
||||
* If the Item is in the Collection, no changes will be made to the collection,
|
||||
* and the CanProcess object will be considered "unusable".
|
||||
*
|
||||
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
|
||||
* Process or skip a block of code, or set a flag.)
|
||||
*
|
||||
* The current use is to avoid lock contention that would be involved in
|
||||
* processing something associated with the Item.
|
||||
*
|
||||
* Examples:
|
||||
*
|
||||
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
|
||||
* {
|
||||
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
|
||||
* {
|
||||
* acquire(hash, ...);
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* bool
|
||||
* NetworkOPsImp::recvValidation(
|
||||
* std::shared_ptr<STValidation> const& val,
|
||||
* std::string const& source)
|
||||
* {
|
||||
* CanProcess check(
|
||||
* validationsMutex_, pendingValidations_, val->getLedgerHash());
|
||||
* BypassAccept bypassAccept =
|
||||
* check ? BypassAccept::no : BypassAccept::yes;
|
||||
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
||||
* }
|
||||
*
|
||||
*/
|
||||
class CanProcess
|
||||
{
|
||||
public:
|
||||
template <class Mutex, class Collection, class Item>
|
||||
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
|
||||
: cleanup_(insert(mtx, collection, item))
|
||||
{
|
||||
}
|
||||
|
||||
~CanProcess()
|
||||
{
|
||||
if (cleanup_)
|
||||
cleanup_();
|
||||
}
|
||||
|
||||
CanProcess(CanProcess const&) = delete;
|
||||
|
||||
CanProcess&
|
||||
operator=(CanProcess const&) = delete;
|
||||
|
||||
explicit
|
||||
operator bool() const
|
||||
{
|
||||
return static_cast<bool>(cleanup_);
|
||||
}
|
||||
|
||||
private:
|
||||
template <bool useIterator, class Mutex, class Collection, class Item>
|
||||
std::function<void()>
|
||||
doInsert(Mutex& mtx, Collection& collection, Item const& item)
|
||||
{
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
// TODO: Use structured binding once LLVM 16 is the minimum supported
|
||||
// version. See also: https://github.com/llvm/llvm-project/issues/48582
|
||||
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
|
||||
auto const insertResult = collection.insert(item);
|
||||
auto const it = insertResult.first;
|
||||
if (!insertResult.second)
|
||||
return {};
|
||||
if constexpr (useIterator)
|
||||
return [&, it]() {
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
collection.erase(it);
|
||||
};
|
||||
else
|
||||
return [&]() {
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
collection.erase(item);
|
||||
};
|
||||
}
|
||||
|
||||
// Generic insert() function doesn't use iterators because they may get
|
||||
// invalidated
|
||||
template <class Mutex, class Collection, class Item>
|
||||
std::function<void()>
|
||||
insert(Mutex& mtx, Collection& collection, Item const& item)
|
||||
{
|
||||
return doInsert<false>(mtx, collection, item);
|
||||
}
|
||||
|
||||
// Specialize insert() for std::set, which does not invalidate iterators for
|
||||
// insert and erase
|
||||
template <class Mutex, class Item>
|
||||
std::function<void()>
|
||||
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
|
||||
{
|
||||
return doInsert<true>(mtx, collection, item);
|
||||
}
|
||||
|
||||
// If set, then the item is "usable"
|
||||
std::function<void()> cleanup_;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -36,6 +36,8 @@ struct LedgerHeader
|
||||
|
||||
// If validated is false, it means "not yet validated."
|
||||
// Once validated is true, it will never be set false at a later time.
|
||||
// NOTE: If you are accessing this directly, you are probably doing it
|
||||
// wrong. Use LedgerMaster::isValidated().
|
||||
// VFALCO TODO Make this not mutable
|
||||
bool mutable validated = false;
|
||||
bool accepted = false;
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
#include <test/jtx/Env.h>
|
||||
|
||||
#include <xrpld/app/ledger/LedgerMaster.h>
|
||||
#include <xrpld/app/misc/SHAMapStore.h>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
@@ -101,88 +100,6 @@ class LedgerMaster_test : public beast::unit_test::suite
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testCompleteLedgerRange(FeatureBitset features)
|
||||
{
|
||||
// Note that this test is intentionally very similar to
|
||||
// SHAMapStore_test::testLedgerGaps, but has a different
|
||||
// focus.
|
||||
|
||||
testcase("Complete Ledger operations");
|
||||
|
||||
using namespace test::jtx;
|
||||
|
||||
auto const deleteInterval = 8;
|
||||
|
||||
Env env{*this, envconfig([](auto cfg) {
|
||||
return online_delete(std::move(cfg), deleteInterval);
|
||||
})};
|
||||
|
||||
auto const alice = Account("alice");
|
||||
env.fund(XRP(1000), alice);
|
||||
env.close();
|
||||
|
||||
auto& lm = env.app().getLedgerMaster();
|
||||
LedgerIndex minSeq = 2;
|
||||
LedgerIndex maxSeq = env.closed()->info().seq;
|
||||
auto& store = env.app().getSHAMapStore();
|
||||
store.rendezvous();
|
||||
LedgerIndex lastRotated = store.getLastRotated();
|
||||
BEAST_EXPECTS(maxSeq == 3, to_string(maxSeq));
|
||||
BEAST_EXPECTS(
|
||||
lm.getCompleteLedgers() == "2-3", lm.getCompleteLedgers());
|
||||
BEAST_EXPECTS(lastRotated == 3, to_string(lastRotated));
|
||||
BEAST_EXPECT(lm.missingFromCompleteLedgerRange(minSeq, maxSeq) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 1, maxSeq - 1) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 1, maxSeq + 1) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 2, maxSeq - 2) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 2, maxSeq + 2) == 2);
|
||||
|
||||
// Close enough ledgers to rotate a few times
|
||||
for (int i = 0; i < 24; ++i)
|
||||
{
|
||||
for (int t = 0; t < 3; ++t)
|
||||
{
|
||||
env(noop(alice));
|
||||
}
|
||||
env.close();
|
||||
store.rendezvous();
|
||||
|
||||
++maxSeq;
|
||||
|
||||
if (maxSeq == lastRotated + deleteInterval)
|
||||
{
|
||||
minSeq = lastRotated;
|
||||
lastRotated = maxSeq;
|
||||
}
|
||||
BEAST_EXPECTS(
|
||||
env.closed()->info().seq == maxSeq,
|
||||
to_string(env.closed()->info().seq));
|
||||
BEAST_EXPECTS(
|
||||
store.getLastRotated() == lastRotated,
|
||||
to_string(store.getLastRotated()));
|
||||
std::stringstream expectedRange;
|
||||
expectedRange << minSeq << "-" << maxSeq;
|
||||
BEAST_EXPECTS(
|
||||
lm.getCompleteLedgers() == expectedRange.str(),
|
||||
lm.getCompleteLedgers());
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq, maxSeq) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 1, maxSeq - 1) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 1, maxSeq + 1) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 2, maxSeq - 2) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 2, maxSeq + 2) == 2);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
@@ -196,7 +113,6 @@ public:
|
||||
testWithFeats(FeatureBitset features)
|
||||
{
|
||||
testTxnIdFromIndex(features);
|
||||
testCompleteLedgerRange(features);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -91,6 +91,8 @@ public:
|
||||
|
||||
virtual void
|
||||
acquireAsync(
|
||||
JobType type,
|
||||
std::string const& name,
|
||||
uint256 const& hash,
|
||||
std::uint32_t seq,
|
||||
InboundLedger::Reason reason) override
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#include <test/jtx.h>
|
||||
#include <test/jtx/envconfig.h>
|
||||
|
||||
#include <xrpld/app/ledger/LedgerMaster.h>
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/app/main/NodeStoreScheduler.h>
|
||||
#include <xrpld/app/misc/SHAMapStore.h>
|
||||
@@ -11,8 +10,6 @@
|
||||
#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
|
||||
#include <thread>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
@@ -23,8 +20,10 @@ class SHAMapStore_test : public beast::unit_test::suite
|
||||
static auto
|
||||
onlineDelete(std::unique_ptr<Config> cfg)
|
||||
{
|
||||
using namespace jtx;
|
||||
return online_delete(std::move(cfg), deleteInterval);
|
||||
cfg->LEDGER_HISTORY = deleteInterval;
|
||||
auto& section = cfg->section(ConfigSection::nodeDatabase());
|
||||
section.set("online_delete", std::to_string(deleteInterval));
|
||||
return cfg;
|
||||
}
|
||||
|
||||
static auto
|
||||
@@ -627,184 +626,6 @@ public:
|
||||
BEAST_EXPECT(dbr->getName() == "3");
|
||||
}
|
||||
|
||||
void
|
||||
testLedgerGaps()
|
||||
{
|
||||
// Note that this test is intentionally very similar to
|
||||
// LedgerMaster_test::testCompleteLedgerRange, but has a different
|
||||
// focus.
|
||||
|
||||
testcase("Wait for ledger gaps to fill in");
|
||||
|
||||
using namespace test::jtx;
|
||||
|
||||
Env env{*this, envconfig(onlineDelete)};
|
||||
|
||||
std::map<LedgerIndex, uint256> hashes;
|
||||
|
||||
auto failureMessage = [&](char const* label,
|
||||
auto expected,
|
||||
auto actual) {
|
||||
std::stringstream ss;
|
||||
ss << label << ": Expected: " << expected << ", Got: " << actual;
|
||||
return ss.str();
|
||||
};
|
||||
|
||||
auto const alice = Account("alice");
|
||||
env.fund(XRP(1000), alice);
|
||||
env.close();
|
||||
|
||||
auto& lm = env.app().getLedgerMaster();
|
||||
LedgerIndex minSeq = 2;
|
||||
LedgerIndex maxSeq = env.closed()->info().seq;
|
||||
auto& store = env.app().getSHAMapStore();
|
||||
store.rendezvous();
|
||||
LedgerIndex lastRotated = store.getLastRotated();
|
||||
BEAST_EXPECTS(maxSeq == 3, to_string(maxSeq));
|
||||
BEAST_EXPECTS(
|
||||
lm.getCompleteLedgers() == "2-3", lm.getCompleteLedgers());
|
||||
BEAST_EXPECTS(lastRotated == 3, to_string(lastRotated));
|
||||
BEAST_EXPECT(lm.missingFromCompleteLedgerRange(minSeq, maxSeq) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 1, maxSeq - 1) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 1, maxSeq + 1) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 2, maxSeq - 2) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 2, maxSeq + 2) == 2);
|
||||
|
||||
// Close enough ledgers to rotate a few times
|
||||
while (maxSeq < 20)
|
||||
{
|
||||
for (int t = 0; t < 3; ++t)
|
||||
{
|
||||
env(noop(alice));
|
||||
}
|
||||
env.close();
|
||||
store.rendezvous();
|
||||
|
||||
++maxSeq;
|
||||
|
||||
if (maxSeq + 1 == lastRotated + deleteInterval)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
// The next ledger will trigger a rotation. Delete the
|
||||
// current ledger from LedgerMaster.
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LedgerIndex const deleteSeq = maxSeq;
|
||||
while (!lm.haveLedger(deleteSeq))
|
||||
{
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
lm.clearLedger(deleteSeq);
|
||||
|
||||
auto expectedRange =
|
||||
[](auto minSeq, auto deleteSeq, auto maxSeq) {
|
||||
std::stringstream expectedRange;
|
||||
expectedRange << minSeq << "-" << (deleteSeq - 1);
|
||||
if (deleteSeq + 1 == maxSeq)
|
||||
expectedRange << "," << maxSeq;
|
||||
else if (deleteSeq < maxSeq)
|
||||
expectedRange << "," << (deleteSeq + 1) << "-"
|
||||
<< maxSeq;
|
||||
return expectedRange.str();
|
||||
};
|
||||
BEAST_EXPECTS(
|
||||
lm.getCompleteLedgers() ==
|
||||
expectedRange(minSeq, deleteSeq, maxSeq),
|
||||
failureMessage(
|
||||
"Complete ledgers",
|
||||
expectedRange(minSeq, deleteSeq, maxSeq),
|
||||
lm.getCompleteLedgers()));
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq, maxSeq) == 1);
|
||||
|
||||
// Close another ledger, which will trigger a rotation, but the
|
||||
// rotation will be stuck until the missing ledger is filled in.
|
||||
env.close();
|
||||
// DO NOT CALL rendezvous()! You'll end up with a deadlock.
|
||||
++maxSeq;
|
||||
|
||||
// Nothing has changed
|
||||
BEAST_EXPECTS(
|
||||
store.getLastRotated() == lastRotated,
|
||||
failureMessage(
|
||||
"lastRotated", lastRotated, store.getLastRotated()));
|
||||
BEAST_EXPECTS(
|
||||
lm.getCompleteLedgers() ==
|
||||
expectedRange(minSeq, deleteSeq, maxSeq),
|
||||
failureMessage(
|
||||
"Complete ledgers",
|
||||
expectedRange(minSeq, deleteSeq, maxSeq),
|
||||
lm.getCompleteLedgers()));
|
||||
|
||||
// Close 5 more ledgers, waiting one second in between to
|
||||
// simulate the ledger making progress while online delete waits
|
||||
// for the missing ledger to be filled in.
|
||||
// This ensures the healthWait check has time to run and
|
||||
// detect the gap.
|
||||
for (int l = 0; l < 5; ++l)
|
||||
{
|
||||
env.close();
|
||||
// DO NOT CALL rendezvous()! You'll end up with a deadlock.
|
||||
++maxSeq;
|
||||
// Nothing has changed
|
||||
BEAST_EXPECTS(
|
||||
store.getLastRotated() == lastRotated,
|
||||
failureMessage(
|
||||
"lastRotated",
|
||||
lastRotated,
|
||||
store.getLastRotated()));
|
||||
BEAST_EXPECTS(
|
||||
lm.getCompleteLedgers() ==
|
||||
expectedRange(minSeq, deleteSeq, maxSeq),
|
||||
failureMessage(
|
||||
"Complete Ledgers",
|
||||
expectedRange(minSeq, deleteSeq, maxSeq),
|
||||
lm.getCompleteLedgers()));
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
|
||||
// Put the missing ledger back in LedgerMaster
|
||||
lm.setLedgerRangePresent(deleteSeq, deleteSeq);
|
||||
|
||||
// Wait for the rotation to finish
|
||||
store.rendezvous();
|
||||
|
||||
minSeq = lastRotated;
|
||||
lastRotated = deleteSeq + 1;
|
||||
}
|
||||
BEAST_EXPECT(maxSeq != lastRotated + deleteInterval);
|
||||
BEAST_EXPECTS(
|
||||
env.closed()->info().seq == maxSeq,
|
||||
failureMessage("maxSeq", maxSeq, env.closed()->info().seq));
|
||||
BEAST_EXPECTS(
|
||||
store.getLastRotated() == lastRotated,
|
||||
failureMessage(
|
||||
"lastRotated", lastRotated, store.getLastRotated()));
|
||||
std::stringstream expectedRange;
|
||||
expectedRange << minSeq << "-" << maxSeq;
|
||||
BEAST_EXPECTS(
|
||||
lm.getCompleteLedgers() == expectedRange.str(),
|
||||
failureMessage(
|
||||
"CompleteLedgers",
|
||||
expectedRange.str(),
|
||||
lm.getCompleteLedgers()));
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq, maxSeq) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 1, maxSeq - 1) == 0);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 1, maxSeq + 1) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq - 2, maxSeq - 2) == 2);
|
||||
BEAST_EXPECT(
|
||||
lm.missingFromCompleteLedgerRange(minSeq + 2, maxSeq + 2) == 2);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
@@ -812,7 +633,6 @@ public:
|
||||
testAutomatic();
|
||||
testCanDelete();
|
||||
testRotate();
|
||||
testLedgerGaps();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
166
src/test/basics/CanProcess_test.cpp
Normal file
166
src/test/basics/CanProcess_test.cpp
Normal file
@@ -0,0 +1,166 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012-2016 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
struct CanProcess_test : beast::unit_test::suite
|
||||
{
|
||||
template <class Mutex, class Collection, class Item>
|
||||
void
|
||||
test(
|
||||
std::string const& name,
|
||||
Mutex& mtx,
|
||||
Collection& collection,
|
||||
std::vector<Item> const& items)
|
||||
{
|
||||
testcase(name);
|
||||
|
||||
if (!BEAST_EXPECT(!items.empty()))
|
||||
return;
|
||||
if (!BEAST_EXPECT(collection.empty()))
|
||||
return;
|
||||
|
||||
// CanProcess objects can't be copied or moved. To make that easier,
|
||||
// store shared_ptrs
|
||||
std::vector<std::shared_ptr<CanProcess>> trackers;
|
||||
// Fill up the vector with two CanProcess for each Item. The first
|
||||
// inserts the item into the collection and is "good". The second does
|
||||
// not and is "bad".
|
||||
for (int i = 0; i < items.size(); ++i)
|
||||
{
|
||||
{
|
||||
auto const& good = trackers.emplace_back(
|
||||
std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(*good);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
|
||||
BEAST_EXPECT(collection.size() == i + 1);
|
||||
{
|
||||
auto const& bad = trackers.emplace_back(
|
||||
std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(!*bad);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * (i + 1));
|
||||
BEAST_EXPECT(collection.size() == i + 1);
|
||||
}
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
// Now remove the items from the vector<CanProcess> two at a time, and
|
||||
// try to get another CanProcess for that item.
|
||||
for (int i = 0; i < items.size(); ++i)
|
||||
{
|
||||
// Remove the "bad" one in the second position
|
||||
// This will have no effect on the collection
|
||||
{
|
||||
auto const iter = trackers.begin() + 1;
|
||||
BEAST_EXPECT(!**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
{
|
||||
// Append a new "bad" one
|
||||
auto const& bad = trackers.emplace_back(
|
||||
std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(!*bad);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * items.size());
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
|
||||
// Remove the "good" one from the front
|
||||
{
|
||||
auto const iter = trackers.begin();
|
||||
BEAST_EXPECT(**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
|
||||
BEAST_EXPECT(collection.size() == items.size() - 1);
|
||||
{
|
||||
// Append a new "good" one
|
||||
auto const& good = trackers.emplace_back(
|
||||
std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(*good);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * items.size());
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
}
|
||||
// Now remove them all two at a time
|
||||
for (int i = items.size() - 1; i >= 0; --i)
|
||||
{
|
||||
// Remove the "bad" one from the front
|
||||
{
|
||||
auto const iter = trackers.begin();
|
||||
BEAST_EXPECT(!**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
|
||||
BEAST_EXPECT(collection.size() == i + 1);
|
||||
// Remove the "good" one now in front
|
||||
{
|
||||
auto const iter = trackers.begin();
|
||||
BEAST_EXPECT(**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * i);
|
||||
BEAST_EXPECT(collection.size() == i);
|
||||
}
|
||||
BEAST_EXPECT(trackers.empty());
|
||||
BEAST_EXPECT(collection.empty());
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
{
|
||||
std::mutex m;
|
||||
std::set<int> collection;
|
||||
std::vector<int> const items{1, 2, 3, 4, 5};
|
||||
test("set of int", m, collection, items);
|
||||
}
|
||||
{
|
||||
std::mutex m;
|
||||
std::set<std::string> collection;
|
||||
std::vector<std::string> const items{
|
||||
"one", "two", "three", "four", "five"};
|
||||
test("set of string", m, collection, items);
|
||||
}
|
||||
{
|
||||
std::mutex m;
|
||||
std::unordered_set<char> collection;
|
||||
std::vector<char> const items{'1', '2', '3', '4', '5'};
|
||||
test("unorderd_set of char", m, collection, items);
|
||||
}
|
||||
{
|
||||
std::mutex m;
|
||||
std::unordered_set<std::uint64_t> collection;
|
||||
std::vector<std::uint64_t> const items{100u, 1000u, 150u, 4u, 0u};
|
||||
test("unordered_set of uint64_t", m, collection, items);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(CanProcess, ripple_basics, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
@@ -58,17 +58,6 @@ envconfig(F&& modfunc, Args&&... args)
|
||||
return modfunc(envconfig(), std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
/// @brief adjust config to enable online_delete
|
||||
///
|
||||
/// @param cfg config instance to be modified
|
||||
///
|
||||
/// @param deleteInterval how many new ledgers should be available before
|
||||
/// rotating. Defaults to 8, because the standalone minimum is 8.
|
||||
///
|
||||
/// @return unique_ptr to Config instance
|
||||
std::unique_ptr<Config>
|
||||
online_delete(std::unique_ptr<Config> cfg, std::uint32_t deleteInterval = 8);
|
||||
|
||||
/// @brief adjust config so no admin ports are enabled
|
||||
///
|
||||
/// this is intended for use with envconfig, as in
|
||||
|
||||
@@ -53,15 +53,6 @@ setupConfigForUnitTests(Config& cfg)
|
||||
|
||||
namespace jtx {
|
||||
|
||||
std::unique_ptr<Config>
|
||||
online_delete(std::unique_ptr<Config> cfg, std::uint32_t deleteInterval)
|
||||
{
|
||||
cfg->LEDGER_HISTORY = deleteInterval;
|
||||
auto& section = cfg->section(ConfigSection::nodeDatabase());
|
||||
section.set("online_delete", std::to_string(deleteInterval));
|
||||
return cfg;
|
||||
}
|
||||
|
||||
std::unique_ptr<Config>
|
||||
no_admin(std::unique_ptr<Config> cfg)
|
||||
{
|
||||
|
||||
@@ -118,15 +118,12 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
|
||||
// Tell the ledger acquire system that we need the consensus ledger
|
||||
acquiringLedger_ = hash;
|
||||
|
||||
app_.getJobQueue().addJob(
|
||||
app_.getInboundLedgers().acquireAsync(
|
||||
jtADVANCE,
|
||||
"getConsensusLedger1",
|
||||
[id = hash, &app = app_, this]() {
|
||||
JLOG(j_.debug())
|
||||
<< "JOB advanceLedger getConsensusLedger1 started";
|
||||
app.getInboundLedgers().acquireAsync(
|
||||
id, 0, InboundLedger::Reason::CONSENSUS);
|
||||
});
|
||||
hash,
|
||||
0,
|
||||
InboundLedger::Reason::CONSENSUS);
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
@@ -1056,7 +1053,8 @@ void
|
||||
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
|
||||
{
|
||||
if (!positions && app_.getOPs().isFull())
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED);
|
||||
app_.getOPs().setMode(
|
||||
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -120,15 +120,12 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
|
||||
JLOG(j_.warn())
|
||||
<< "Need validated ledger for preferred ledger analysis " << hash;
|
||||
|
||||
Application* pApp = &app_;
|
||||
|
||||
app_.getJobQueue().addJob(
|
||||
jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() {
|
||||
JLOG(j_.debug())
|
||||
<< "JOB advanceLedger getConsensusLedger2 started";
|
||||
pApp->getInboundLedgers().acquireAsync(
|
||||
hash, 0, InboundLedger::Reason::CONSENSUS);
|
||||
});
|
||||
app_.getInboundLedgers().acquireAsync(
|
||||
jtADVANCE,
|
||||
"getConsensusLedger2",
|
||||
hash,
|
||||
0,
|
||||
InboundLedger::Reason::CONSENSUS);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ public:
|
||||
// instead. Inbound ledger acquisition is asynchronous anyway.
|
||||
virtual void
|
||||
acquireAsync(
|
||||
JobType type,
|
||||
std::string const& name,
|
||||
uint256 const& hash,
|
||||
std::uint32_t seq,
|
||||
InboundLedger::Reason reason) = 0;
|
||||
|
||||
@@ -108,10 +108,7 @@ public:
|
||||
failedSave(std::uint32_t seq, uint256 const& hash);
|
||||
|
||||
std::string
|
||||
getCompleteLedgers() const;
|
||||
|
||||
std::size_t
|
||||
missingFromCompleteLedgerRange(LedgerIndex first, LedgerIndex last) const;
|
||||
getCompleteLedgers();
|
||||
|
||||
/** Apply held transactions to the open ledger
|
||||
This is normally called as we close the ledger.
|
||||
@@ -328,7 +325,7 @@ private:
|
||||
// A set of transactions to replay during the next close
|
||||
std::unique_ptr<LedgerReplay> replayData;
|
||||
|
||||
std::recursive_mutex mutable mCompleteLock;
|
||||
std::recursive_mutex mCompleteLock;
|
||||
RangeSet<std::uint32_t> mCompleteLedgers;
|
||||
|
||||
// Publish thread is running.
|
||||
|
||||
@@ -373,7 +373,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
|
||||
|
||||
if (!wasProgress)
|
||||
{
|
||||
checkLocal();
|
||||
if (checkLocal())
|
||||
{
|
||||
// Done. Something else (probably consensus) built the ledger
|
||||
// locally while waiting for data (or possibly before requesting)
|
||||
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
|
||||
JLOG(journal_.info()) << "Finished while waiting " << hash_;
|
||||
return;
|
||||
}
|
||||
|
||||
mByHash = true;
|
||||
|
||||
|
||||
@@ -5,9 +5,9 @@
|
||||
#include <xrpld/core/JobQueue.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/basics/DecayingSample.h>
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/scope.h>
|
||||
#include <xrpl/beast/container/aged_map.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
|
||||
@@ -64,12 +64,15 @@ public:
|
||||
(reason != InboundLedger::Reason::CONSENSUS))
|
||||
return {};
|
||||
|
||||
std::stringstream ss;
|
||||
|
||||
bool isNew = true;
|
||||
std::shared_ptr<InboundLedger> inbound;
|
||||
{
|
||||
ScopedLockType sl(mLock);
|
||||
if (stopping_)
|
||||
{
|
||||
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -93,53 +96,66 @@ public:
|
||||
++mCounter;
|
||||
}
|
||||
}
|
||||
ss << " IsNew: " << (isNew ? "true" : "false");
|
||||
|
||||
if (inbound->isFailed())
|
||||
{
|
||||
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!isNew)
|
||||
inbound->update(seq);
|
||||
|
||||
if (!inbound->isComplete())
|
||||
{
|
||||
JLOG(j_.debug()) << "InProgress: " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
JLOG(j_.debug()) << "Complete: " << ss.str();
|
||||
return inbound->getLedger();
|
||||
};
|
||||
using namespace std::chrono_literals;
|
||||
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog(
|
||||
return perf::measureDurationAndLog(
|
||||
doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
|
||||
|
||||
return ledger;
|
||||
}
|
||||
|
||||
void
|
||||
acquireAsync(
|
||||
JobType type,
|
||||
std::string const& name,
|
||||
uint256 const& hash,
|
||||
std::uint32_t seq,
|
||||
InboundLedger::Reason reason) override
|
||||
{
|
||||
std::unique_lock lock(acquiresMutex_);
|
||||
try
|
||||
if (auto check = std::make_shared<CanProcess const>(
|
||||
acquiresMutex_, pendingAcquires_, hash);
|
||||
*check)
|
||||
{
|
||||
if (pendingAcquires_.contains(hash))
|
||||
return;
|
||||
pendingAcquires_.insert(hash);
|
||||
scope_unlock unlock(lock);
|
||||
acquire(hash, seq, reason);
|
||||
app_.getJobQueue().addJob(
|
||||
type, name, [check, name, hash, seq, reason, this]() {
|
||||
JLOG(j_.debug())
|
||||
<< "JOB acquireAsync " << name << " started ";
|
||||
try
|
||||
{
|
||||
acquire(hash, seq, reason);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.warn()) << "Exception thrown for acquiring new "
|
||||
"inbound ledger "
|
||||
<< hash << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "Unknown exception thrown for acquiring new "
|
||||
"inbound ledger "
|
||||
<< hash;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "Exception thrown for acquiring new inbound ledger " << hash
|
||||
<< ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "Unknown exception thrown for acquiring new inbound ledger "
|
||||
<< hash;
|
||||
}
|
||||
pendingAcquires_.erase(hash);
|
||||
}
|
||||
|
||||
std::shared_ptr<InboundLedger>
|
||||
|
||||
@@ -942,8 +942,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
|
||||
}
|
||||
|
||||
JLOG(m_journal.info()) << "Advancing accepted ledger to "
|
||||
<< ledger->info().seq << " with >= " << minVal
|
||||
<< " validations";
|
||||
<< ledger->info().seq << " ("
|
||||
<< to_short_string(ledger->info().hash)
|
||||
<< ") with >= " << minVal << " validations";
|
||||
|
||||
ledger->setValidated();
|
||||
ledger->setFull();
|
||||
@@ -1571,36 +1572,12 @@ LedgerMaster::getPublishedLedger()
|
||||
}
|
||||
|
||||
std::string
|
||||
LedgerMaster::getCompleteLedgers() const
|
||||
LedgerMaster::getCompleteLedgers()
|
||||
{
|
||||
std::lock_guard sl(mCompleteLock);
|
||||
return to_string(mCompleteLedgers);
|
||||
}
|
||||
|
||||
std::size_t
|
||||
LedgerMaster::missingFromCompleteLedgerRange(
|
||||
LedgerIndex first,
|
||||
LedgerIndex last) const
|
||||
{
|
||||
// Make a copy of the range to avoid holding the lock
|
||||
auto const range = [&] {
|
||||
std::lock_guard sl(mCompleteLock);
|
||||
return mCompleteLedgers;
|
||||
}();
|
||||
|
||||
std::size_t missing = 0;
|
||||
|
||||
for (LedgerIndex idx = first; idx <= last; ++idx)
|
||||
{
|
||||
if (!boost::icl::contains(range, idx))
|
||||
{
|
||||
++missing;
|
||||
}
|
||||
}
|
||||
|
||||
return missing;
|
||||
}
|
||||
|
||||
std::optional<NetClock::time_point>
|
||||
LedgerMaster::getCloseTimeBySeq(LedgerIndex ledgerIndex)
|
||||
{
|
||||
|
||||
@@ -12,7 +12,8 @@ TimeoutCounter::TimeoutCounter(
|
||||
QueueJobParameter&& jobParameter,
|
||||
beast::Journal journal)
|
||||
: app_(app)
|
||||
, journal_(journal)
|
||||
, sink_(journal, to_short_string(hash) + " ")
|
||||
, journal_(sink_)
|
||||
, hash_(hash)
|
||||
, timeouts_(0)
|
||||
, complete_(false)
|
||||
@@ -32,6 +33,8 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
|
||||
{
|
||||
if (isDone())
|
||||
return;
|
||||
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
|
||||
<< "ms";
|
||||
timer_.expires_after(timerInterval_);
|
||||
timer_.async_wait(
|
||||
[wptr = pmDowncast()](boost::system::error_code const& ec) {
|
||||
@@ -40,6 +43,12 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
|
||||
|
||||
if (auto ptr = wptr.lock())
|
||||
{
|
||||
JLOG(ptr->journal_.debug())
|
||||
<< "timer: ec: " << ec << " (operation_aborted: "
|
||||
<< boost::asio::error::operation_aborted << " - "
|
||||
<< (ec == boost::asio::error::operation_aborted ? "aborted"
|
||||
: "other")
|
||||
<< ")";
|
||||
ScopedLockType sl(ptr->mtx_);
|
||||
ptr->queueJob(sl);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#include <xrpld/core/Job.h>
|
||||
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/beast/utility/WrappedSink.h>
|
||||
|
||||
#include <boost/asio/basic_waitable_timer.hpp>
|
||||
|
||||
@@ -104,6 +105,7 @@ protected:
|
||||
// Used in this class for access to boost::asio::io_context and
|
||||
// ripple::Overlay. Used in subtypes for the kitchen sink.
|
||||
Application& app_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::Journal journal_;
|
||||
mutable std::recursive_mutex mtx_;
|
||||
|
||||
|
||||
@@ -203,7 +203,7 @@ public:
|
||||
|
||||
/** Add a suppression peer and get message's relay status.
|
||||
* Return pair:
|
||||
* element 1: true if the peer is added.
|
||||
* element 1: true if the key is added.
|
||||
* element 2: optional is seated to the relay time point or
|
||||
* is unseated if has not relayed yet. */
|
||||
std::pair<bool, std::optional<Stopwatch::time_point>>
|
||||
|
||||
@@ -34,10 +34,10 @@
|
||||
#include <xrpld/rpc/MPTokenIssuanceID.h>
|
||||
#include <xrpld/rpc/ServerHandler.h>
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/basics/UptimeClock.h>
|
||||
#include <xrpl/basics/mulDiv.h>
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/basics/scope.h>
|
||||
#include <xrpl/beast/utility/rngfill.h>
|
||||
#include <xrpl/crypto/RFC1751.h>
|
||||
#include <xrpl/crypto/csprng.h>
|
||||
@@ -408,7 +408,7 @@ public:
|
||||
isFull() override;
|
||||
|
||||
void
|
||||
setMode(OperatingMode om) override;
|
||||
setMode(OperatingMode om, char const* reason) override;
|
||||
|
||||
bool
|
||||
isBlocked() override;
|
||||
@@ -886,7 +886,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
|
||||
inline void
|
||||
NetworkOPsImp::setStandAlone()
|
||||
{
|
||||
setMode(OperatingMode::FULL);
|
||||
setMode(OperatingMode::FULL, "setStandAlone");
|
||||
}
|
||||
|
||||
inline void
|
||||
@@ -1036,7 +1036,9 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
{
|
||||
if (mMode != OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::DISCONNECTED);
|
||||
setMode(
|
||||
OperatingMode::DISCONNECTED,
|
||||
"Heartbeat: insufficient peers");
|
||||
std::stringstream ss;
|
||||
ss << "Node count (" << numPeers << ") has fallen "
|
||||
<< "below required minimum (" << minPeerCount_ << ").";
|
||||
@@ -1061,7 +1063,7 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
|
||||
if (mMode == OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
|
||||
JLOG(m_journal.info())
|
||||
<< "Node count (" << numPeers << ") is sufficient.";
|
||||
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
|
||||
@@ -1073,9 +1075,9 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
auto origMode = mMode.load();
|
||||
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
|
||||
if (mMode == OperatingMode::SYNCING)
|
||||
setMode(OperatingMode::SYNCING);
|
||||
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
|
||||
else if (mMode == OperatingMode::CONNECTED)
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
|
||||
auto newMode = mMode.load();
|
||||
if (origMode != newMode)
|
||||
{
|
||||
@@ -1824,7 +1826,7 @@ void
|
||||
NetworkOPsImp::setAmendmentBlocked()
|
||||
{
|
||||
amendmentBlocked_ = true;
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
|
||||
}
|
||||
|
||||
inline bool
|
||||
@@ -1855,7 +1857,7 @@ void
|
||||
NetworkOPsImp::setUNLBlocked()
|
||||
{
|
||||
unlBlocked_ = true;
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
|
||||
}
|
||||
|
||||
inline void
|
||||
@@ -1956,7 +1958,7 @@ NetworkOPsImp::checkLastClosedLedger(
|
||||
|
||||
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
|
||||
}
|
||||
|
||||
if (consensus)
|
||||
@@ -2045,8 +2047,9 @@ NetworkOPsImp::beginConsensus(
|
||||
// this shouldn't happen unless we jump ledgers
|
||||
if (mMode == OperatingMode::FULL)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
|
||||
setMode(OperatingMode::TRACKING);
|
||||
JLOG(m_journal.warn())
|
||||
<< "beginConsensus Don't have LCL, going to tracking";
|
||||
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
|
||||
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
|
||||
}
|
||||
|
||||
@@ -2181,7 +2184,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
|
||||
// validations we have for LCL. If the ledger is good enough, go to
|
||||
// TRACKING - TODO
|
||||
if (!needNetworkLedger_)
|
||||
setMode(OperatingMode::TRACKING);
|
||||
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
|
||||
}
|
||||
|
||||
if (((mMode == OperatingMode::CONNECTED) ||
|
||||
@@ -2195,7 +2198,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
|
||||
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
|
||||
2 * current->info().closeTimeResolution))
|
||||
{
|
||||
setMode(OperatingMode::FULL);
|
||||
setMode(OperatingMode::FULL, "endConsensus: check full");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2207,7 +2210,7 @@ NetworkOPsImp::consensusViewChange()
|
||||
{
|
||||
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "consensusViewChange");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2526,7 +2529,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::setMode(OperatingMode om)
|
||||
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
if (om == OperatingMode::CONNECTED)
|
||||
@@ -2546,11 +2549,12 @@ NetworkOPsImp::setMode(OperatingMode om)
|
||||
if (mMode == om)
|
||||
return;
|
||||
|
||||
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
|
||||
mMode = om;
|
||||
|
||||
accounting_.mode(om);
|
||||
|
||||
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
|
||||
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
|
||||
pubServer();
|
||||
}
|
||||
|
||||
@@ -2562,34 +2566,28 @@ NetworkOPsImp::recvValidation(
|
||||
JLOG(m_journal.trace())
|
||||
<< "recvValidation " << val->getLedgerHash() << " from " << source;
|
||||
|
||||
std::unique_lock lock(validationsMutex_);
|
||||
BypassAccept bypassAccept = BypassAccept::no;
|
||||
try
|
||||
{
|
||||
if (pendingValidations_.contains(val->getLedgerHash()))
|
||||
bypassAccept = BypassAccept::yes;
|
||||
else
|
||||
pendingValidations_.insert(val->getLedgerHash());
|
||||
scope_unlock unlock(lock);
|
||||
handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
||||
CanProcess const check(
|
||||
validationsMutex_, pendingValidations_, val->getLedgerHash());
|
||||
try
|
||||
{
|
||||
BypassAccept bypassAccept =
|
||||
check ? BypassAccept::no : BypassAccept::yes;
|
||||
handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "Exception thrown for handling new validation "
|
||||
<< val->getLedgerHash() << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "Unknown exception thrown for handling new validation "
|
||||
<< val->getLedgerHash();
|
||||
}
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "Exception thrown for handling new validation "
|
||||
<< val->getLedgerHash() << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "Unknown exception thrown for handling new validation "
|
||||
<< val->getLedgerHash();
|
||||
}
|
||||
if (bypassAccept == BypassAccept::no)
|
||||
{
|
||||
pendingValidations_.erase(val->getLedgerHash());
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
pubValidation(val);
|
||||
|
||||
|
||||
@@ -191,7 +191,7 @@ public:
|
||||
virtual bool
|
||||
isFull() = 0;
|
||||
virtual void
|
||||
setMode(OperatingMode om) = 0;
|
||||
setMode(OperatingMode om, char const* reason) = 0;
|
||||
virtual bool
|
||||
isBlocked() = 0;
|
||||
virtual bool
|
||||
|
||||
@@ -289,18 +289,6 @@ SHAMapStoreImp::run()
|
||||
validatedSeq >= lastRotated + deleteInterval_ &&
|
||||
canDelete_ >= lastRotated - 1 && healthWait() == keepGoing;
|
||||
|
||||
JLOG(journal_.debug())
|
||||
<< "run: Setting lastGoodValidatedLedger_ to " << validatedSeq;
|
||||
|
||||
{
|
||||
// Note that this is set after the healthWait() check, so that we
|
||||
// don't start the rotation until the validated ledger is fully
|
||||
// processed. It is not guaranteed to be done at this point. It also
|
||||
// allows the testLedgerGaps unit test to work.
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
lastGoodValidatedLedger_ = validatedSeq;
|
||||
}
|
||||
|
||||
// will delete up to (not including) lastRotated
|
||||
if (readyToRotate)
|
||||
{
|
||||
@@ -309,9 +297,7 @@ SHAMapStoreImp::run()
|
||||
<< lastRotated << " deleteInterval " << deleteInterval_
|
||||
<< " canDelete_ " << canDelete_ << " state "
|
||||
<< app_.getOPs().strOperatingMode(false) << " age "
|
||||
<< ledgerMaster_->getValidatedLedgerAge().count()
|
||||
<< "s. Complete ledgers: "
|
||||
<< ledgerMaster_->getCompleteLedgers();
|
||||
<< ledgerMaster_->getValidatedLedgerAge().count() << 's';
|
||||
|
||||
clearPrior(lastRotated);
|
||||
if (healthWait() == stopping)
|
||||
@@ -374,10 +360,7 @@ SHAMapStoreImp::run()
|
||||
clearCaches(validatedSeq);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn())
|
||||
<< "finished rotation. validatedSeq: " << validatedSeq
|
||||
<< ", lastRotated: " << lastRotated << ". Complete ledgers: "
|
||||
<< ledgerMaster_->getCompleteLedgers();
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -632,47 +615,22 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
|
||||
SHAMapStoreImp::HealthResult
|
||||
SHAMapStoreImp::healthWait()
|
||||
{
|
||||
auto index = ledgerMaster_->getValidLedgerIndex();
|
||||
auto age = ledgerMaster_->getValidatedLedgerAge();
|
||||
OperatingMode mode = netOPs_->getOperatingMode();
|
||||
std::unique_lock lock(mutex_);
|
||||
|
||||
auto numMissing = ledgerMaster_->missingFromCompleteLedgerRange(
|
||||
lastGoodValidatedLedger_, index);
|
||||
while (
|
||||
!stop_ &&
|
||||
(mode != OperatingMode::FULL || age > ageThreshold_ || numMissing > 0))
|
||||
while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_))
|
||||
{
|
||||
// this value shouldn't change, so grab it while we have the
|
||||
// lock
|
||||
auto const lastGood = lastGoodValidatedLedger_;
|
||||
|
||||
lock.unlock();
|
||||
|
||||
auto const stream = mode != OperatingMode::FULL || age > ageThreshold_
|
||||
? journal_.warn()
|
||||
: journal_.info();
|
||||
JLOG(stream) << "Waiting " << recoveryWaitTime_.count()
|
||||
<< "s for node to stabilize. state: "
|
||||
<< app_.getOPs().strOperatingMode(mode, false) << ". age "
|
||||
<< age.count() << "s. Missing ledgers: " << numMissing
|
||||
<< ". Expect: " << lastGood << "-" << index
|
||||
<< ". Complete ledgers: "
|
||||
<< ledgerMaster_->getCompleteLedgers();
|
||||
JLOG(journal_.warn()) << "Waiting " << recoveryWaitTime_.count()
|
||||
<< "s for node to stabilize. state: "
|
||||
<< app_.getOPs().strOperatingMode(mode, false)
|
||||
<< ". age " << age.count() << 's';
|
||||
std::this_thread::sleep_for(recoveryWaitTime_);
|
||||
index = ledgerMaster_->getValidLedgerIndex();
|
||||
age = ledgerMaster_->getValidatedLedgerAge();
|
||||
mode = netOPs_->getOperatingMode();
|
||||
numMissing =
|
||||
ledgerMaster_->missingFromCompleteLedgerRange(lastGood, index);
|
||||
|
||||
lock.lock();
|
||||
}
|
||||
|
||||
JLOG(journal_.debug()) << "healthWait: Setting lastGoodValidatedLedger_ to "
|
||||
<< index;
|
||||
lastGoodValidatedLedger_ = index;
|
||||
|
||||
return stop_ ? stopping : keepGoing;
|
||||
}
|
||||
|
||||
|
||||
@@ -71,11 +71,6 @@ private:
|
||||
std::thread thread_;
|
||||
bool stop_ = false;
|
||||
bool healthy_ = true;
|
||||
// Used to prevent ledger gaps from forming during online deletion. Keeps
|
||||
// track of the last validated ledger that was processed without gaps. There
|
||||
// are no guarantees about gaps while online delete is not running. For
|
||||
// that, use advisory_delete and check for gaps externally.
|
||||
LedgerIndex lastGoodValidatedLedger_ = 0;
|
||||
mutable std::condition_variable cond_;
|
||||
mutable std::condition_variable rendezvous_;
|
||||
mutable std::mutex mutex_;
|
||||
@@ -89,11 +84,11 @@ private:
|
||||
std::uint32_t deleteBatch_ = 100;
|
||||
std::chrono::milliseconds backOff_{100};
|
||||
std::chrono::seconds ageThreshold_{60};
|
||||
/// If the node is out of sync, or any recent ledgers are not
|
||||
/// available during an online_delete healthWait() call, sleep
|
||||
/// the thread for this time, and continue checking until recovery.
|
||||
/// If the node is out of sync during an online_delete healthWait()
|
||||
/// call, sleep the thread for this time, and continue checking until
|
||||
/// recovery.
|
||||
/// See also: "recovery_wait_seconds" in rippled-example.cfg
|
||||
std::chrono::seconds recoveryWaitTime_{1};
|
||||
std::chrono::seconds recoveryWaitTime_{5};
|
||||
|
||||
// these do not exist upon SHAMapStore creation, but do exist
|
||||
// as of run() or before
|
||||
@@ -217,8 +212,6 @@ private:
|
||||
enum HealthResult { stopping, keepGoing };
|
||||
[[nodiscard]] HealthResult
|
||||
healthWait();
|
||||
bool
|
||||
hasCompleteRange(LedgerIndex first, LedgerIndex last);
|
||||
|
||||
public:
|
||||
void
|
||||
|
||||
Reference in New Issue
Block a user