Files
rippled/src/xrpld/app/misc/SHAMapStoreImp.cpp
2026-04-13 20:23:23 -04:00

657 lines
21 KiB
C++

#include <xrpld/app/ledger/TransactionMaster.h>
#include <xrpld/app/misc/SHAMapStoreImp.h>
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/nodestore/Scheduler.h>
#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/server/State.h>
#include <xrpl/shamap/SHAMapMissingNode.h>
#include <boost/algorithm/string/predicate.hpp>
namespace xrpl {
void
SHAMapStoreImp::SavedStateDB::init(BasicConfig const& config, std::string const& dbName)
{
std::lock_guard const lock(mutex_);
initStateDB(sqlDb_, config, dbName);
}
LedgerIndex
SHAMapStoreImp::SavedStateDB::getCanDelete()
{
std::lock_guard const lock(mutex_);
return xrpl::getCanDelete(sqlDb_);
}
LedgerIndex
SHAMapStoreImp::SavedStateDB::setCanDelete(LedgerIndex canDelete)
{
std::lock_guard const lock(mutex_);
return xrpl::setCanDelete(sqlDb_, canDelete);
}
SavedState
SHAMapStoreImp::SavedStateDB::getState()
{
std::lock_guard const lock(mutex_);
return xrpl::getSavedState(sqlDb_);
}
void
SHAMapStoreImp::SavedStateDB::setState(SavedState const& state)
{
std::lock_guard const lock(mutex_);
xrpl::setSavedState(sqlDb_, state);
}
void
SHAMapStoreImp::SavedStateDB::setLastRotated(LedgerIndex seq)
{
std::lock_guard const lock(mutex_);
xrpl::setLastRotated(sqlDb_, seq);
}
//------------------------------------------------------------------------------
SHAMapStoreImp::SHAMapStoreImp(
Application& app,
NodeStore::Scheduler& scheduler,
beast::Journal journal)
: app_(app)
, scheduler_(scheduler)
, journal_(journal)
, working_(true)
, canDelete_(std::numeric_limits<LedgerIndex>::max())
{
Config& config{app.config()};
Section& section{config.section(ConfigSection::nodeDatabase())};
if (section.empty())
{
Throw<std::runtime_error>(
"Missing [" + ConfigSection::nodeDatabase() + "] entry in configuration file");
}
// RocksDB only. Use sensible defaults if no values specified.
if (boost::iequals(get(section, "type"), "RocksDB"))
{
if (!section.exists("cache_mb"))
{
section.set("cache_mb", std::to_string(config.getValueFor(SizedItem::hashNodeDBCache)));
}
if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2))
section.set("filter_bits", "10");
}
get_if_exists(section, "online_delete", deleteInterval_);
if (deleteInterval_ != 0u)
{
// Configuration that affects the behavior of online delete
get_if_exists(section, "delete_batch", deleteBatch_);
std::uint32_t temp = 0;
if (get_if_exists(section, "back_off_milliseconds", temp) ||
// Included for backward compatibility with an undocumented setting
get_if_exists(section, "backOff", temp))
{
backOff_ = std::chrono::milliseconds{temp};
}
if (get_if_exists(section, "age_threshold_seconds", temp))
ageThreshold_ = std::chrono::seconds{temp};
if (get_if_exists(section, "recovery_wait_seconds", temp))
recoveryWaitTime_ = std::chrono::seconds{temp};
get_if_exists(section, "advisory_delete", advisoryDelete_);
auto const minInterval =
config.standalone() ? minimumDeletionIntervalSA_ : minimumDeletionInterval_;
if (deleteInterval_ < minInterval)
{
Throw<std::runtime_error>(
"online_delete must be at least " + std::to_string(minInterval));
}
if (config.LEDGER_HISTORY > deleteInterval_)
{
Throw<std::runtime_error>(
"online_delete must not be less than ledger_history "
"(currently " +
std::to_string(config.LEDGER_HISTORY) + ")");
}
state_db_.init(config, dbName_);
dbPaths();
}
}
std::unique_ptr<NodeStore::Database>
SHAMapStoreImp::makeNodeStore(int readThreads)
{
auto nscfg = app_.config().section(ConfigSection::nodeDatabase());
std::unique_ptr<NodeStore::Database> db;
if (deleteInterval_ != 0u)
{
SavedState state = state_db_.getState();
auto writableBackend = makeBackendRotating(state.writableDb);
auto archiveBackend = makeBackendRotating(state.archiveDb);
if (state.writableDb.empty())
{
state.writableDb = writableBackend->getName();
state.archiveDb = archiveBackend->getName();
state_db_.setState(state);
}
// Create NodeStore with two backends to allow online deletion of
// data
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
scheduler_,
readThreads,
std::move(writableBackend),
std::move(archiveBackend),
nscfg,
app_.getJournal(nodeStoreName_));
fdRequired_ += dbr->fdRequired();
dbRotating_ = dbr.get();
db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));
}
else
{
db = NodeStore::Manager::instance().make_Database(
megabytes(app_.config().getValueFor(SizedItem::burstSize, std::nullopt)),
scheduler_,
readThreads,
nscfg,
app_.getJournal(nodeStoreName_));
fdRequired_ += db->fdRequired();
}
return db;
}
void
SHAMapStoreImp::onLedgerClosed(std::shared_ptr<Ledger const> const& ledger)
{
{
std::lock_guard const lock(mutex_);
newLedger_ = ledger;
working_ = true;
}
cond_.notify_one();
}
void
SHAMapStoreImp::rendezvous() const
{
if (!working_)
return;
std::unique_lock<std::mutex> lock(mutex_);
rendezvous_.wait(lock, [&] { return !working_; });
}
int
SHAMapStoreImp::fdRequired() const
{
return fdRequired_;
}
bool
SHAMapStoreImp::copyNode(std::uint64_t& nodeCount, SHAMapTreeNode const& node)
{
// Copy a single record from node to dbRotating_
dbRotating_->fetchNodeObject(
node.getHash().as_uint256(), 0, NodeStore::FetchType::synchronous, true);
if ((++nodeCount % checkHealthInterval_) == 0u)
{
if (healthWait() == stopping)
return false;
}
return true;
}
void
SHAMapStoreImp::run()
{
beast::setCurrentThreadName("SHAMapStore");
LedgerIndex lastRotated = state_db_.getState().lastRotated;
netOPs_ = &app_.getOPs();
ledgerMaster_ = &app_.getLedgerMaster();
if (advisoryDelete_)
canDelete_ = state_db_.getCanDelete();
while (true)
{
healthy_ = true;
std::shared_ptr<Ledger const> validatedLedger;
{
std::unique_lock<std::mutex> lock(mutex_);
working_ = false;
rendezvous_.notify_all();
if (stop_)
{
return;
}
cond_.wait(lock);
if (newLedger_)
{
validatedLedger = std::move(newLedger_);
}
else
{
continue;
}
}
LedgerIndex const validatedSeq = validatedLedger->header().seq;
if (lastRotated == 0u)
{
lastRotated = validatedSeq;
state_db_.setLastRotated(lastRotated);
}
bool const readyToRotate = validatedSeq >= lastRotated + deleteInterval_ &&
canDelete_ >= lastRotated - 1 && healthWait() == keepGoing;
JLOG(journal_.debug()) << "run: Setting lastGoodValidatedLedger_ to " << validatedSeq;
{
// Note that this is set after the healthWait() check, so that we
// don't start the rotation until the validated ledger is fully
// processed. It is not guaranteed to be done at this point. It also
// allows the testLedgerGaps unit test to work.
std::unique_lock<std::mutex> const lock(mutex_);
lastGoodValidatedLedger_ = validatedSeq;
}
// will delete up to (not including) lastRotated
if (readyToRotate)
{
JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq << " lastRotated "
<< lastRotated << " deleteInterval " << deleteInterval_
<< " canDelete_ " << canDelete_ << " state "
<< app_.getOPs().strOperatingMode(false) << " age "
<< ledgerMaster_->getValidatedLedgerAge().count()
<< "s. Complete ledgers: " << ledgerMaster_->getCompleteLedgers();
clearPrior(lastRotated);
if (healthWait() == stopping)
return;
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
std::uint64_t nodeCount = 0;
try
{
validatedLedger->stateMap().snapShot(false)->visitNodes(
std::bind(
&SHAMapStoreImp::copyNode,
this,
std::ref(nodeCount),
std::placeholders::_1));
}
catch (SHAMapMissingNode const& e)
{
JLOG(journal_.error())
<< "Missing node while copying ledger before rotate: " << e.what();
continue;
}
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug())
<< "copied ledger " << validatedSeq << " nodecount " << nodeCount;
JLOG(journal_.debug()) << "freshening caches";
freshenCaches();
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
JLOG(journal_.trace()) << "Making a new backend";
auto newBackend = makeBackendRotating();
JLOG(journal_.debug()) << validatedSeq << " new backend " << newBackend->getName();
clearCaches(validatedSeq);
if (healthWait() == stopping)
return;
lastRotated = validatedSeq;
dbRotating_->rotate(
std::move(newBackend),
[&](std::string const& writableName, std::string const& archiveName) {
SavedState savedState;
savedState.writableDb = writableName;
savedState.archiveDb = archiveName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);
clearCaches(validatedSeq);
});
JLOG(journal_.warn()) << "finished rotation. validatedSeq: " << validatedSeq
<< ", lastRotated: " << lastRotated
<< ". Complete ledgers: " << ledgerMaster_->getCompleteLedgers();
}
}
}
void
SHAMapStoreImp::dbPaths()
{
Section const section{app_.config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path dbPath = get(section, "path");
if (boost::filesystem::exists(dbPath))
{
if (!boost::filesystem::is_directory(dbPath))
{
journal_.error() << "node db path must be a directory. " << dbPath.string();
Throw<std::runtime_error>("node db path must be a directory.");
}
}
else
{
boost::filesystem::create_directories(dbPath);
}
SavedState state = state_db_.getState();
{
auto update = [&dbPath](std::string& sPath) {
if (sPath.empty())
return false;
// Check if configured "path" matches stored directory path
using namespace boost::filesystem;
auto const stored{path(sPath)};
if (stored.parent_path() == dbPath)
return false;
sPath = (dbPath / stored.filename()).string();
return true;
};
if (update(state.writableDb))
{
update(state.archiveDb);
state_db_.setState(state);
}
}
bool writableDbExists = false;
bool archiveDbExists = false;
std::vector<boost::filesystem::path> pathsToDelete;
for (boost::filesystem::directory_iterator it(dbPath);
it != boost::filesystem::directory_iterator();
++it)
{
if (state.writableDb.compare(it->path().string()) == 0)
{
writableDbExists = true;
}
else if (state.archiveDb.compare(it->path().string()) == 0)
{
archiveDbExists = true;
}
else if (dbPrefix_.compare(it->path().stem().string()) == 0)
{
pathsToDelete.push_back(it->path());
}
}
if ((!writableDbExists && !state.writableDb.empty()) ||
(!archiveDbExists && !state.archiveDb.empty()) || (writableDbExists != archiveDbExists) ||
state.writableDb.empty() != state.archiveDb.empty())
{
boost::filesystem::path stateDbPathName = app_.config().legacy("database_path");
stateDbPathName /= dbName_;
stateDbPathName += "*";
journal_.error() << "state db error:\n"
<< " writableDbExists " << writableDbExists << " archiveDbExists "
<< archiveDbExists << '\n'
<< " writableDb '" << state.writableDb << "' archiveDb '"
<< state.archiveDb << "\n\n"
<< "The existing data is in a corrupted state.\n"
<< "To resume operation, remove the files matching "
<< stateDbPathName.string() << " and contents of the directory "
<< get(section, "path") << '\n'
<< "Optionally, you can move those files to another\n"
<< "location if you wish to analyze or back up the data.\n"
<< "However, there is no guarantee that the data in its\n"
<< "existing form is usable.";
Throw<std::runtime_error>("state db error");
}
// The necessary directories exist. Now, remove any others.
for (boost::filesystem::path const& p : pathsToDelete)
boost::filesystem::remove_all(p);
}
std::unique_ptr<NodeStore::Backend>
SHAMapStoreImp::makeBackendRotating(std::string path)
{
Section section{app_.config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path newPath;
if (!path.empty())
{
newPath = path;
}
else
{
boost::filesystem::path p = get(section, "path");
p /= dbPrefix_;
p += ".%%%%";
newPath = boost::filesystem::unique_path(p);
}
section.set("path", newPath.string());
auto backend{NodeStore::Manager::instance().make_Backend(
section,
megabytes(app_.config().getValueFor(SizedItem::burstSize, std::nullopt)),
scheduler_,
app_.getJournal(nodeStoreName_))};
backend->open();
return backend;
}
void
SHAMapStoreImp::clearSql(
LedgerIndex lastRotated,
std::string const& TableName,
std::function<std::optional<LedgerIndex>()> const& getMinSeq,
std::function<void(LedgerIndex)> const& deleteBeforeSeq)
{
XRPL_ASSERT(deleteInterval_, "xrpl::SHAMapStoreImp::clearSql : nonzero delete interval");
LedgerIndex min = std::numeric_limits<LedgerIndex>::max();
{
JLOG(journal_.trace()) << "Begin: Look up lowest value of: " << TableName;
auto m = getMinSeq();
JLOG(journal_.trace()) << "End: Look up lowest value of: " << TableName;
if (!m)
return;
min = *m;
}
if (min > lastRotated || healthWait() == stopping)
return;
if (min == lastRotated)
{
// Micro-optimization mainly to clarify logs
JLOG(journal_.trace()) << "Nothing to delete from " << TableName;
return;
}
JLOG(journal_.debug()) << "start deleting in: " << TableName << " from " << min << " to "
<< lastRotated;
while (min < lastRotated)
{
min = std::min(lastRotated, min + deleteBatch_);
JLOG(journal_.trace()) << "Begin: Delete up to " << deleteBatch_
<< " rows with LedgerSeq < " << min << " from: " << TableName;
deleteBeforeSeq(min);
JLOG(journal_.trace()) << "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < "
<< min << " from: " << TableName;
if (healthWait() == stopping)
return;
if (min < lastRotated)
std::this_thread::sleep_for(backOff_);
if (healthWait() == stopping)
return;
}
JLOG(journal_.debug()) << "finished deleting from: " << TableName;
}
void
SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
{
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
// Also clear the FullBelowCache so its generation counter is bumped.
// This prevents stale "full below" markers from persisting across
// backend rotation/online deletion and interfering with SHAMap sync.
app_.getNodeFamily().getFullBelowCache()->clear();
}
void
SHAMapStoreImp::freshenCaches()
{
if (freshenCache(*app_.getNodeFamily().getTreeNodeCache()))
return;
freshenCache(app_.getMasterTransaction().getCache());
}
void
SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
{
// Do not allow ledgers to be acquired from the network
// that are about to be deleted.
minimumOnline_ = lastRotated + 1;
JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to " << lastRotated;
ledgerMaster_->clearPriorLedgers(lastRotated);
JLOG(journal_.trace()) << "End: Clear internal ledgers up to " << lastRotated;
if (healthWait() == stopping)
return;
auto& db = app_.getRelationalDatabase();
clearSql(
lastRotated,
"Ledgers",
[&db]() -> std::optional<LedgerIndex> { return db.getMinLedgerSeq(); },
[&db](LedgerIndex min) -> void { db.deleteBeforeLedgerSeq(min); });
if (healthWait() == stopping)
return;
if (!app_.config().useTxTables())
return;
clearSql(
lastRotated,
"Transactions",
[&db]() -> std::optional<LedgerIndex> { return db.getTransactionsMinLedgerSeq(); },
[&db](LedgerIndex min) -> void { db.deleteTransactionsBeforeLedgerSeq(min); });
if (healthWait() == stopping)
return;
clearSql(
lastRotated,
"AccountTransactions",
[&db]() -> std::optional<LedgerIndex> { return db.getAccountTransactionsMinLedgerSeq(); },
[&db](LedgerIndex min) -> void { db.deleteAccountTransactionsBeforeLedgerSeq(min); });
if (healthWait() == stopping)
return;
}
SHAMapStoreImp::HealthResult
SHAMapStoreImp::healthWait()
{
auto index = ledgerMaster_->getValidLedgerIndex();
auto age = ledgerMaster_->getValidatedLedgerAge();
OperatingMode mode = netOPs_->getOperatingMode();
std::unique_lock lock(mutex_);
auto numMissing =
ledgerMaster_->missingFromCompleteLedgerRange(lastGoodValidatedLedger_, index);
while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_ || numMissing > 0))
{
// this value shouldn't change, so grab it while we have the
// lock
auto const lastGood = lastGoodValidatedLedger_;
lock.unlock();
auto const stream =
mode != OperatingMode::FULL || age > ageThreshold_ ? journal_.warn() : journal_.info();
JLOG(stream) << "Waiting " << recoveryWaitTime_.count()
<< "s for node to stabilize. state: "
<< app_.getOPs().strOperatingMode(mode, false) << ". age " << age.count()
<< "s. Missing ledgers: " << numMissing << ". Expect: " << lastGood << "-"
<< index << ". Complete ledgers: " << ledgerMaster_->getCompleteLedgers();
std::this_thread::sleep_for(recoveryWaitTime_);
index = ledgerMaster_->getValidLedgerIndex();
age = ledgerMaster_->getValidatedLedgerAge();
mode = netOPs_->getOperatingMode();
numMissing = ledgerMaster_->missingFromCompleteLedgerRange(lastGood, index);
lock.lock();
}
JLOG(journal_.debug()) << "healthWait: Setting lastGoodValidatedLedger_ to " << index;
lastGoodValidatedLedger_ = index;
return stop_ ? stopping : keepGoing;
}
void
SHAMapStoreImp::stop()
{
if (thread_.joinable())
{
{
std::lock_guard const lock(mutex_);
stop_ = true;
cond_.notify_one();
}
thread_.join();
}
}
std::optional<LedgerIndex>
SHAMapStoreImp::minimumOnline() const
{
// minimumOnline_ with 0 value is equivalent to unknown/not set.
// Don't attempt to acquire ledgers if that value is unknown.
if ((deleteInterval_ != 0u) && (minimumOnline_ != 0u))
return minimumOnline_.load();
return app_.getLedgerMaster().minSqlSeq();
}
//------------------------------------------------------------------------------
std::unique_ptr<SHAMapStore>
make_SHAMapStore(Application& app, NodeStore::Scheduler& scheduler, beast::Journal journal)
{
return std::make_unique<SHAMapStoreImp>(app, scheduler, journal);
}
} // namespace xrpl