Compare commits

...

4 Commits

17 changed files with 245 additions and 34 deletions

View File

@@ -953,6 +953,21 @@
#
# Optional keys for NuDB and RocksDB:
#
# cache_size Size of cache for database records. Default is 16384.
# Setting this value to 0 will use the default value.
#
# cache_age Length of time in minutes to keep database records
# cached. Default is 5 minutes. Setting this value to
# 0 will use the default value.
#
# Note: if cache_size or cache_age is not specified,
# default values will be used for the unspecified
# parameter.
#
# Note: the cache will not be created if online_delete
# is specified, because the rotating NodeStore does
# not use this cache).
#
# fast_load Boolean. If set, load the last persisted ledger
# from disk upon process start before syncing to
# the network. This is likely to improve performance

View File

@@ -131,6 +131,10 @@ public:
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback);
/** Remove expired entries from the positive and negative caches. */
virtual void
sweep() = 0;
/** Gather statistics pertaining to read and write activities.
*
* @param obj Json object reference into which to place counters.

View File

@@ -22,6 +22,32 @@ public:
beast::Journal j)
: Database(scheduler, readThreads, config, j), backend_(std::move(backend))
{
std::optional<int> cacheSize, cacheAge;
if (config.exists("cache_size"))
{
cacheSize = get<int>(config, "cache_size");
if (cacheSize.value() < 0)
Throw<std::runtime_error>("Specified negative value for cache_size");
}
if (config.exists("cache_age"))
{
cacheAge = get<int>(config, "cache_age");
if (cacheAge.value() < 0)
Throw<std::runtime_error>("Specified negative value for cache_age");
}
if (cacheSize.has_value() || cacheAge.has_value())
{
cache_ = std::make_shared<TaggedCache<uint256, NodeObject>>(
"DatabaseNodeImp",
cacheSize.value_or(0),
std::chrono::minutes(cacheAge.value_or(0)),
stopwatch(),
j);
}
XRPL_ASSERT(
backend_,
"xrpl::NodeStore::DatabaseNodeImp::DatabaseNodeImp : non-null "
@@ -73,7 +99,13 @@ public:
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback) override;
void
sweep() override;
private:
// Cache for database objects. This cache is not always initialized. Check
// for null before using.
std::shared_ptr<TaggedCache<uint256, NodeObject>> cache_;
// Persistent key/value storage
std::shared_ptr<Backend> backend_;

View File

@@ -55,6 +55,9 @@ public:
void
sync() override;
void
sweep() override;
private:
std::shared_ptr<Backend> writableBackend_;
std::shared_ptr<Backend> archiveBackend_;

View File

@@ -3,6 +3,8 @@
#include <xrpl/basics/Log.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/Zero.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/ledger/helpers/CredentialHelpers.h>
#include <xrpl/protocol/AccountID.h>
@@ -19,6 +21,16 @@ namespace xrpl::permissioned_dex {
bool
accountInDomain(ReadView const& view, AccountID const& account, Domain const& domainID)
{
// Avoid constructing a zero-key PermissionedDomain keylet.
// keylet::permissionedDomain(uint256) uses the DomainID as the ledger key.
if (view.rules().enabled(fixCleanup3_2_0) && domainID == beast::kZero)
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::permissioned_dex::accountInDomain : domainID is zero");
return false;
// LCOV_EXCL_STOP
}
auto const sleDomain = view.read(keylet::permissionedDomain(domainID));
if (!sleDomain)
return false;

View File

@@ -24,6 +24,13 @@ DatabaseNodeImp::store(NodeObjectType type, Blob&& data, uint256 const& hash, st
auto obj = NodeObject::createObject(type, std::move(data), hash);
backend_->store(obj);
if (cache_)
{
// After the store, replace a negative cache entry if there is one
cache_->canonicalize(hash, obj, [](std::shared_ptr<NodeObject> const& n) {
return n->getType() == NodeObjectType::Dummy;
});
}
}
void
@@ -32,9 +39,25 @@ DatabaseNodeImp::asyncFetch(
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
{
if (cache_)
{
std::shared_ptr<NodeObject> const obj = cache_->fetch(hash);
if (obj)
{
callback(obj->getType() == NodeObjectType::Dummy ? nullptr : obj);
return;
}
}
Database::asyncFetch(hash, ledgerSeq, std::move(callback));
}
void
DatabaseNodeImp::sweep()
{
if (cache_)
cache_->sweep();
}
std::shared_ptr<NodeObject>
DatabaseNodeImp::fetchNodeObject(
uint256 const& hash,
@@ -42,32 +65,58 @@ DatabaseNodeImp::fetchNodeObject(
FetchReport& fetchReport,
bool duplicate)
{
std::shared_ptr<NodeObject> nodeObject = nullptr;
Status status = Status::Ok;
std::shared_ptr<NodeObject> nodeObject = cache_ ? cache_->fetch(hash) : nullptr;
if (!nodeObject)
{
JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record not "
<< (cache_ ? "cached" : "found");
try
{
status = backend_->fetch(hash, &nodeObject);
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "fetchNodeObject " << hash
<< ": Exception fetching from backend: " << e.what();
rethrow();
}
Status status = Status::Ok;
try
{
status = backend_->fetch(hash, &nodeObject);
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "fetchNodeObject " << hash
<< ": Exception fetching from backend: " << e.what();
rethrow();
}
switch (status)
switch (status)
{
case Status::Ok:
if (cache_)
{
if (nodeObject)
{
cache_->canonicalizeReplaceClient(hash, nodeObject);
}
else
{
auto notFound = NodeObject::createObject(NodeObjectType::Dummy, {}, hash);
cache_->canonicalizeReplaceClient(hash, notFound);
if (notFound->getType() != NodeObjectType::Dummy)
nodeObject = notFound;
}
}
break;
case Status::NotFound:
break;
case Status::DataCorrupt:
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": nodestore data is corrupted";
break;
default:
JLOG(j_.warn()) << "fetchNodeObject " << hash << ": backend returns unknown result "
<< static_cast<int>(status);
break;
}
}
else
{
case Status::Ok:
case Status::NotFound:
break;
case Status::DataCorrupt:
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": nodestore data is corrupted";
break;
default:
JLOG(j_.warn()) << "fetchNodeObject " << hash << ": backend returns unknown result "
<< static_cast<int>(status);
break;
JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record found in cache";
if (nodeObject->getType() == NodeObjectType::Dummy)
nodeObject.reset();
}
if (nodeObject)

View File

@@ -113,6 +113,12 @@ DatabaseRotatingImp::store(NodeObjectType type, Blob&& data, uint256 const& hash
storeStats(1, nObj->getData().size());
}
void
DatabaseRotatingImp::sweep()
{
// Nothing to do.
}
std::shared_ptr<NodeObject>
DatabaseRotatingImp::fetchNodeObject(
uint256 const& hash,

View File

@@ -94,6 +94,12 @@ OfferCreate::preflight(PreflightContext const& ctx)
if (tx.isFlag(tfHybrid) && !tx.isFieldPresent(sfDomainID))
return temINVALID_FLAG;
// A zero DomainID is invalid for a PermissionedDomain ledger entry because
// keylet::permissionedDomain(uint256) uses the DomainID as the ledger key.
if (auto const domainID = tx[~sfDomainID];
ctx.rules.enabled(fixCleanup3_2_0) && domainID && *domainID == beast::kZero)
return temMALFORMED;
bool const bImmediateOrCancel(tx.isFlag(tfImmediateOrCancel));
bool const bFillOrKill(tx.isFlag(tfFillOrKill));

View File

@@ -125,6 +125,12 @@ Payment::preflight(PreflightContext const& ctx)
if (!mpTokensV2 && isDstMPT && ctx.tx.isFieldPresent(sfPaths))
return temMALFORMED;
// A zero DomainID is invalid for a PermissionedDomain ledger entry because
// keylet::permissionedDomain(uint256) uses the DomainID as the ledger key.
if (auto const domainID = tx[~sfDomainID];
ctx.rules.enabled(fixCleanup3_2_0) && domainID && *domainID == beast::kZero)
return temMALFORMED;
bool const partialPaymentAllowed = tx.isFlag(tfPartialPayment);
bool const limitQuality = tx.isFlag(tfLimitQuality);
bool const defaultPathsAllowed = !tx.isFlag(tfNoRippleDirect);

View File

@@ -197,6 +197,20 @@ class PermissionedDEX_test : public beast::unit_test::Suite
env.close();
}
// test preflight - malformed DomainID being zero
// Only test this with fixCleanup3_2_0 enabled. Without the fix,
// an assert-enabled build can crash when Ledger::read() receives
// a zero-key PermissionedDomain keylet.
if (features[fixCleanup3_2_0])
{
Env env(*this, features);
auto const& [gw_, domainOwner, alice_, bob_, carol_, USD, domainID, credType] =
PermissionedDEX(env);
env(offer(bob_, XRP(10), USD(10)), Domain(uint256{}), Ter(temMALFORMED));
env.close();
}
// preclaim - someone outside of the domain cannot create domain offer
{
Env env(*this, features);
@@ -396,6 +410,24 @@ class PermissionedDEX_test : public beast::unit_test::Suite
env.close();
}
// test preflight - malformed DomainID being zero
// Only test this with fixCleanup3_2_0 enabled. Without the fix,
// an assert-enabled build can crash when Ledger::read() receives
// a zero-key PermissionedDomain keylet.
if (features[fixCleanup3_2_0])
{
Env env(*this, features);
auto const& [gw_, domainOwner, alice_, bob_, carol_, USD, domainID, credType] =
PermissionedDEX(env);
env(pay(bob_, alice_, USD(10)),
Path(~USD),
Sendmax(XRP(10)),
Domain(uint256{}),
Ter(temMALFORMED));
env.close();
}
// preclaim - cannot send payment with non existent domain
{
Env env(*this, features);
@@ -1772,7 +1804,9 @@ public:
// Test domain offer (w/o hybrid)
testOfferCreate(all);
testOfferCreate(all - fixCleanup3_2_0);
testPayment(all);
testPayment(all - fixCleanup3_2_0);
testBookStep(all);
testRippling(all);
testOfferTokenIssuerInDomain(all);

View File

@@ -520,6 +520,25 @@ public:
/////////////////////////////////////////////////////////////
// Create NodeStore with two backends to allow online deletion of data.
// Normally, SHAMapStoreImp handles all these details.
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
// Provide default values.
if (!nscfg.exists("cache_size"))
{
nscfg.set(
"cache_size",
std::to_string(
env.app().config().getValueFor(SizedItem::TreeCacheSize, std::nullopt)));
}
if (!nscfg.exists("cache_age"))
{
nscfg.set(
"cache_age",
std::to_string(
env.app().config().getValueFor(SizedItem::TreeCacheAge, std::nullopt)));
}
NodeStoreScheduler scheduler(env.app().getJobQueue());
std::string const writableDb = "write";
@@ -528,7 +547,6 @@ public:
auto archiveBackend = makeBackendRotating(env, scheduler, archiveDb);
static constexpr int kReadThreads = 4;
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
scheduler,
kReadThreads,

View File

@@ -784,7 +784,7 @@ RCLConsensus::Adaptor::buildLCL(
// Update fee computations based on accepted txs
using namespace std::chrono_literals;
app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s);
app_.getTxQ().processClosedLedger(app_, *built, roundTime > 2500ms);
// And stash the ledger in the ledger master
if (ledgerMaster_.storeLedger(built))

View File

@@ -998,6 +998,10 @@ public:
JLOG(journal_.debug()) << "MasterTransaction sweep. Size before: " << oldMasterTxSize
<< "; size after: " << masterTxCache.size();
}
{
// Sweep NodeStore database cache(s), if enabled.
getNodeStore().sweep();
}
{
std::size_t const oldLedgerMasterCacheSize = getLedgerMaster().getFetchPackCacheSize();

View File

@@ -165,6 +165,22 @@ std::unique_ptr<NodeStore::Database>
SHAMapStoreImp::makeNodeStore(int readThreads)
{
auto nscfg = app_.config().section(ConfigSection::nodeDatabase());
// Provide default values.
if (!nscfg.exists("cache_size"))
{
nscfg.set(
"cache_size",
std::to_string(app_.config().getValueFor(SizedItem::TreeCacheSize, std::nullopt)));
}
if (!nscfg.exists("cache_age"))
{
nscfg.set(
"cache_age",
std::to_string(app_.config().getValueFor(SizedItem::TreeCacheAge, std::nullopt)));
}
std::unique_ptr<NodeStore::Database> db;
if (deleteInterval_ != 0u)
@@ -254,6 +270,8 @@ SHAMapStoreImp::run()
LedgerIndex lastRotated = stateDb_.getState().lastRotated;
netOPs_ = &app_.getOPs();
ledgerMaster_ = &app_.getLedgerMaster();
fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache());
treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache());
if (advisoryDelete_)
canDelete_ = stateDb_.getCanDelete();
@@ -542,16 +560,16 @@ SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
// Also clear the FullBelowCache so its generation counter is bumped.
// This prevents stale "full below" markers from persisting across
// backend rotation/online deletion and interfering with SHAMap sync.
app_.getNodeFamily().getFullBelowCache()->clear();
fullBelowCache_->clear();
}
void
SHAMapStoreImp::freshenCaches()
{
if (freshenCache(*app_.getNodeFamily().getTreeNodeCache()))
if (freshenCache(*treeNodeCache_))
return;
if (freshenCache(app_.getMasterTransaction().getCache()))
return;
freshenCache(app_.getMasterTransaction().getCache());
}
void

View File

@@ -7,6 +7,8 @@
#include <xrpl/nodestore/Scheduler.h>
#include <xrpl/rdb/DatabaseCon.h>
#include <xrpl/server/State.h>
#include <xrpl/shamap/FullBelowCache.h>
#include <xrpl/shamap/TreeNodeCache.h>
#include <atomic>
#include <chrono>
@@ -93,6 +95,8 @@ private:
// as of run() or before
NetworkOPs* netOPs_ = nullptr;
LedgerMaster* ledgerMaster_ = nullptr;
FullBelowCache* fullBelowCache_ = nullptr;
TreeNodeCache* treeNodeCache_ = nullptr;
static constexpr auto kNodeStoreName = "NodeStore";

View File

@@ -59,10 +59,10 @@ struct ConsensusParms
std::size_t const minConsensusPct = 80;
//! The duration a ledger may remain idle before closing
std::chrono::milliseconds const ledgerIdleInterval = std::chrono::seconds{15};
std::chrono::milliseconds const ledgerIdleInterval = std::chrono::seconds{5};
//! The number of seconds we wait minimum to ensure participation
std::chrono::milliseconds const ledgerMinConsensus = std::chrono::milliseconds{1950};
std::chrono::milliseconds const ledgerMinConsensus = std::chrono::milliseconds{1000};
/** The maximum amount of time to spend pausing for laggards.
*
@@ -73,7 +73,7 @@ struct ConsensusParms
std::chrono::milliseconds const ledgerMaxConsensus = std::chrono::seconds{15};
//! Minimum number of seconds to wait to ensure others have computed the LCL
std::chrono::milliseconds const ledgerMinClose = std::chrono::seconds{2};
std::chrono::milliseconds const ledgerMinClose = std::chrono::seconds{1};
//! How often we check state or change positions
std::chrono::milliseconds const ledgerGRANULARITY = std::chrono::seconds{1};
@@ -99,7 +99,7 @@ struct ConsensusParms
twice the interval between proposals (0.7s) divided by
the interval between mid and late consensus ([85-50]/100).
*/
std::chrono::milliseconds const avMinConsensusTime = std::chrono::seconds{5};
std::chrono::milliseconds const avMinConsensusTime = std::chrono::seconds{2};
//------------------------------------------------------------------------------
// Avalanche tuning

View File

@@ -209,7 +209,7 @@ public:
// Work queue limits
int maxTransactions = 250;
static constexpr int kMaxJobQueueTx = 1000;
static constexpr int kMaxJobQueueTx = 100000;
static constexpr int kMinJobQueueTx = 100;
// Amendment majority time