Files
rippled/src/xrpld/app/misc/SHAMapStoreImp.cpp

715 lines
22 KiB
C++

#include <xrpld/app/ledger/TransactionMaster.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/app/misc/SHAMapStoreImp.h>
#include <xrpld/app/rdb/State.h>
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/nodestore/Scheduler.h>
#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
#include <xrpl/shamap/SHAMapMissingNode.h>
#include <boost/algorithm/string/predicate.hpp>
namespace ripple {
void
SHAMapStoreImp::SavedStateDB::init(
BasicConfig const& config,
std::string const& dbName)
{
std::lock_guard lock(mutex_);
initStateDB(sqlDb_, config, dbName);
}
LedgerIndex
SHAMapStoreImp::SavedStateDB::getCanDelete()
{
std::lock_guard lock(mutex_);
return ripple::getCanDelete(sqlDb_);
}
LedgerIndex
SHAMapStoreImp::SavedStateDB::setCanDelete(LedgerIndex canDelete)
{
std::lock_guard lock(mutex_);
return ripple::setCanDelete(sqlDb_, canDelete);
}
SavedState
SHAMapStoreImp::SavedStateDB::getState()
{
std::lock_guard lock(mutex_);
return ripple::getSavedState(sqlDb_);
}
void
SHAMapStoreImp::SavedStateDB::setState(SavedState const& state)
{
std::lock_guard lock(mutex_);
ripple::setSavedState(sqlDb_, state);
}
void
SHAMapStoreImp::SavedStateDB::setLastRotated(LedgerIndex seq)
{
std::lock_guard lock(mutex_);
ripple::setLastRotated(sqlDb_, seq);
}
//------------------------------------------------------------------------------
SHAMapStoreImp::SHAMapStoreImp(
Application& app,
NodeStore::Scheduler& scheduler,
beast::Journal journal)
: app_(app)
, scheduler_(scheduler)
, journal_(journal)
, working_(true)
, canDelete_(std::numeric_limits<LedgerIndex>::max())
{
Config& config{app.config()};
Section& section{config.section(ConfigSection::nodeDatabase())};
if (section.empty())
{
Throw<std::runtime_error>(
"Missing [" + ConfigSection::nodeDatabase() +
"] entry in configuration file");
}
// RocksDB only. Use sensible defaults if no values specified.
if (boost::iequals(get(section, "type"), "RocksDB"))
{
if (!section.exists("cache_mb"))
{
section.set(
"cache_mb",
std::to_string(config.getValueFor(SizedItem::hashNodeDBCache)));
}
if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2))
section.set("filter_bits", "10");
}
get_if_exists(section, "online_delete", deleteInterval_);
if (deleteInterval_)
{
// Configuration that affects the behavior of online delete
get_if_exists(section, "delete_batch", deleteBatch_);
std::uint32_t temp;
if (get_if_exists(section, "back_off_milliseconds", temp) ||
// Included for backward compaibility with an undocumented setting
get_if_exists(section, "backOff", temp))
{
backOff_ = std::chrono::milliseconds{temp};
}
if (get_if_exists(section, "age_threshold_seconds", temp))
ageThreshold_ = std::chrono::seconds{temp};
if (get_if_exists(section, "recovery_wait_seconds", temp))
recoveryWaitTime_ = std::chrono::seconds{temp};
get_if_exists(section, "advisory_delete", advisoryDelete_);
auto const minInterval = config.standalone()
? minimumDeletionIntervalSA_
: minimumDeletionInterval_;
if (deleteInterval_ < minInterval)
{
Throw<std::runtime_error>(
"online_delete must be at least " +
std::to_string(minInterval));
}
if (config.LEDGER_HISTORY > deleteInterval_)
{
Throw<std::runtime_error>(
"online_delete must not be less than ledger_history "
"(currently " +
std::to_string(config.LEDGER_HISTORY) + ")");
}
state_db_.init(config, dbName_);
dbPaths();
}
}
std::unique_ptr<NodeStore::Database>
SHAMapStoreImp::makeNodeStore(int readThreads)
{
auto nscfg = app_.config().section(ConfigSection::nodeDatabase());
// Provide default values:
if (!nscfg.exists("cache_size"))
nscfg.set(
"cache_size",
std::to_string(app_.config().getValueFor(
SizedItem::treeCacheSize, std::nullopt)));
if (!nscfg.exists("cache_age"))
nscfg.set(
"cache_age",
std::to_string(app_.config().getValueFor(
SizedItem::treeCacheAge, std::nullopt)));
std::unique_ptr<NodeStore::Database> db;
if (deleteInterval_)
{
SavedState state = state_db_.getState();
auto writableBackend = makeBackendRotating(state.writableDb);
auto archiveBackend = makeBackendRotating(state.archiveDb);
if (!state.writableDb.size())
{
state.writableDb = writableBackend->getName();
state.archiveDb = archiveBackend->getName();
state_db_.setState(state);
}
// Create NodeStore with two backends to allow online deletion of
// data
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
scheduler_,
readThreads,
std::move(writableBackend),
std::move(archiveBackend),
nscfg,
app_.logs().journal(nodeStoreName_));
fdRequired_ += dbr->fdRequired();
dbRotating_ = dbr.get();
db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));
}
else
{
db = NodeStore::Manager::instance().make_Database(
megabytes(
app_.config().getValueFor(SizedItem::burstSize, std::nullopt)),
scheduler_,
readThreads,
nscfg,
app_.logs().journal(nodeStoreName_));
fdRequired_ += db->fdRequired();
}
return db;
}
void
SHAMapStoreImp::onLedgerClosed(std::shared_ptr<Ledger const> const& ledger)
{
{
std::lock_guard lock(mutex_);
newLedger_ = ledger;
working_ = true;
}
cond_.notify_one();
}
void
SHAMapStoreImp::rendezvous() const
{
if (!working_)
return;
std::unique_lock<std::mutex> lock(mutex_);
rendezvous_.wait(lock, [&] { return !working_; });
}
int
SHAMapStoreImp::fdRequired() const
{
return fdRequired_;
}
bool
SHAMapStoreImp::copyNode(std::uint64_t& nodeCount, SHAMapTreeNode const& node)
{
// Copy a single record from node to dbRotating_
dbRotating_->fetchNodeObject(
node.getHash().as_uint256(),
0,
NodeStore::FetchType::synchronous,
true);
if (!(++nodeCount % checkHealthInterval_))
{
if (healthWait() == stopping)
return false;
}
return true;
}
void
SHAMapStoreImp::run()
{
beast::setCurrentThreadName("SHAMapStore");
LedgerIndex lastRotated = state_db_.getState().lastRotated;
netOPs_ = &app_.getOPs();
ledgerMaster_ = &app_.getLedgerMaster();
fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache());
treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache());
if (advisoryDelete_)
canDelete_ = state_db_.getCanDelete();
while (true)
{
healthy_ = true;
std::shared_ptr<Ledger const> validatedLedger;
{
std::unique_lock<std::mutex> lock(mutex_);
working_ = false;
rendezvous_.notify_all();
if (stop_)
{
return;
}
cond_.wait(lock);
if (newLedger_)
{
validatedLedger = std::move(newLedger_);
}
else
continue;
}
LedgerIndex const validatedSeq = validatedLedger->header().seq;
if (!lastRotated)
{
lastRotated = validatedSeq;
state_db_.setLastRotated(lastRotated);
}
bool const readyToRotate =
validatedSeq >= lastRotated + deleteInterval_ &&
canDelete_ >= lastRotated - 1 && healthWait() == keepGoing;
JLOG(journal_.debug())
<< "run: Setting lastGoodValidatedLedger_ to " << validatedSeq;
{
// Note that this is set after the healthWait() check, so that we
// don't start the rotation until the validated ledger is fully
// processed. It is not guaranteed to be done at this point. It also
// allows the testLedgerGaps unit test to work.
std::unique_lock<std::mutex> lock(mutex_);
lastGoodValidatedLedger_ = validatedSeq;
}
// will delete up to (not including) lastRotated
if (readyToRotate)
{
JLOG(journal_.warn())
<< "rotating validatedSeq " << validatedSeq << " lastRotated "
<< lastRotated << " deleteInterval " << deleteInterval_
<< " canDelete_ " << canDelete_ << " state "
<< app_.getOPs().strOperatingMode(false) << " age "
<< ledgerMaster_->getValidatedLedgerAge().count()
<< "s. Complete ledgers: "
<< ledgerMaster_->getCompleteLedgers();
clearPrior(lastRotated);
if (healthWait() == stopping)
return;
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
std::uint64_t nodeCount = 0;
try
{
validatedLedger->stateMap().snapShot(false)->visitNodes(
std::bind(
&SHAMapStoreImp::copyNode,
this,
std::ref(nodeCount),
std::placeholders::_1));
}
catch (SHAMapMissingNode const& e)
{
JLOG(journal_.error())
<< "Missing node while copying ledger before rotate: "
<< e.what();
continue;
}
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
<< " nodecount " << nodeCount;
JLOG(journal_.debug()) << "freshening caches";
freshenCaches();
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
JLOG(journal_.trace()) << "Making a new backend";
auto newBackend = makeBackendRotating();
JLOG(journal_.debug())
<< validatedSeq << " new backend " << newBackend->getName();
clearCaches(validatedSeq);
if (healthWait() == stopping)
return;
lastRotated = validatedSeq;
dbRotating_->rotate(
std::move(newBackend),
[&](std::string const& writableName,
std::string const& archiveName) {
SavedState savedState;
savedState.writableDb = writableName;
savedState.archiveDb = archiveName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);
clearCaches(validatedSeq);
});
JLOG(journal_.warn())
<< "finished rotation. validatedSeq: " << validatedSeq
<< ", lastRotated: " << lastRotated << ". Complete ledgers: "
<< ledgerMaster_->getCompleteLedgers();
}
}
}
void
SHAMapStoreImp::dbPaths()
{
Section section{app_.config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path dbPath = get(section, "path");
if (boost::filesystem::exists(dbPath))
{
if (!boost::filesystem::is_directory(dbPath))
{
journal_.error()
<< "node db path must be a directory. " << dbPath.string();
Throw<std::runtime_error>("node db path must be a directory.");
}
}
else
{
boost::filesystem::create_directories(dbPath);
}
SavedState state = state_db_.getState();
{
auto update = [&dbPath](std::string& sPath) {
if (sPath.empty())
return false;
// Check if configured "path" matches stored directory path
using namespace boost::filesystem;
auto const stored{path(sPath)};
if (stored.parent_path() == dbPath)
return false;
sPath = (dbPath / stored.filename()).string();
return true;
};
if (update(state.writableDb))
{
update(state.archiveDb);
state_db_.setState(state);
}
}
bool writableDbExists = false;
bool archiveDbExists = false;
std::vector<boost::filesystem::path> pathsToDelete;
for (boost::filesystem::directory_iterator it(dbPath);
it != boost::filesystem::directory_iterator();
++it)
{
if (!state.writableDb.compare(it->path().string()))
writableDbExists = true;
else if (!state.archiveDb.compare(it->path().string()))
archiveDbExists = true;
else if (!dbPrefix_.compare(it->path().stem().string()))
pathsToDelete.push_back(it->path());
}
if ((!writableDbExists && state.writableDb.size()) ||
(!archiveDbExists && state.archiveDb.size()) ||
(writableDbExists != archiveDbExists) ||
state.writableDb.empty() != state.archiveDb.empty())
{
boost::filesystem::path stateDbPathName =
app_.config().legacy("database_path");
stateDbPathName /= dbName_;
stateDbPathName += "*";
journal_.error()
<< "state db error:\n"
<< " writableDbExists " << writableDbExists << " archiveDbExists "
<< archiveDbExists << '\n'
<< " writableDb '" << state.writableDb << "' archiveDb '"
<< state.archiveDb << "\n\n"
<< "The existing data is in a corrupted state.\n"
<< "To resume operation, remove the files matching "
<< stateDbPathName.string() << " and contents of the directory "
<< get(section, "path") << '\n'
<< "Optionally, you can move those files to another\n"
<< "location if you wish to analyze or back up the data.\n"
<< "However, there is no guarantee that the data in its\n"
<< "existing form is usable.";
Throw<std::runtime_error>("state db error");
}
// The necessary directories exist. Now, remove any others.
for (boost::filesystem::path& p : pathsToDelete)
boost::filesystem::remove_all(p);
}
std::unique_ptr<NodeStore::Backend>
SHAMapStoreImp::makeBackendRotating(std::string path)
{
Section section{app_.config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path newPath;
if (path.size())
{
newPath = path;
}
else
{
boost::filesystem::path p = get(section, "path");
p /= dbPrefix_;
p += ".%%%%";
newPath = boost::filesystem::unique_path(p);
}
section.set("path", newPath.string());
auto backend{NodeStore::Manager::instance().make_Backend(
section,
megabytes(
app_.config().getValueFor(SizedItem::burstSize, std::nullopt)),
scheduler_,
app_.logs().journal(nodeStoreName_))};
backend->open();
return backend;
}
void
SHAMapStoreImp::clearSql(
LedgerIndex lastRotated,
std::string const& TableName,
std::function<std::optional<LedgerIndex>()> const& getMinSeq,
std::function<void(LedgerIndex)> const& deleteBeforeSeq)
{
XRPL_ASSERT(
deleteInterval_,
"ripple::SHAMapStoreImp::clearSql : nonzero delete interval");
LedgerIndex min = std::numeric_limits<LedgerIndex>::max();
{
JLOG(journal_.trace())
<< "Begin: Look up lowest value of: " << TableName;
auto m = getMinSeq();
JLOG(journal_.trace()) << "End: Look up lowest value of: " << TableName;
if (!m)
return;
min = *m;
}
if (min > lastRotated || healthWait() == stopping)
return;
if (min == lastRotated)
{
// Micro-optimization mainly to clarify logs
JLOG(journal_.trace()) << "Nothing to delete from " << TableName;
return;
}
JLOG(journal_.debug()) << "start deleting in: " << TableName << " from "
<< min << " to " << lastRotated;
while (min < lastRotated)
{
min = std::min(lastRotated, min + deleteBatch_);
JLOG(journal_.trace())
<< "Begin: Delete up to " << deleteBatch_
<< " rows with LedgerSeq < " << min << " from: " << TableName;
deleteBeforeSeq(min);
JLOG(journal_.trace())
<< "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < "
<< min << " from: " << TableName;
if (healthWait() == stopping)
return;
if (min < lastRotated)
std::this_thread::sleep_for(backOff_);
if (healthWait() == stopping)
return;
}
JLOG(journal_.debug()) << "finished deleting from: " << TableName;
}
void
SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
{
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
fullBelowCache_->clear();
}
void
SHAMapStoreImp::freshenCaches()
{
if (freshenCache(*treeNodeCache_))
return;
if (freshenCache(app_.getMasterTransaction().getCache()))
return;
}
void
SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
{
// Do not allow ledgers to be acquired from the network
// that are about to be deleted.
minimumOnline_ = lastRotated + 1;
JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to "
<< lastRotated;
ledgerMaster_->clearPriorLedgers(lastRotated);
JLOG(journal_.trace()) << "End: Clear internal ledgers up to "
<< lastRotated;
if (healthWait() == stopping)
return;
SQLiteDatabase* const db =
dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase());
if (!db)
Throw<std::runtime_error>("Failed to get relational database");
clearSql(
lastRotated,
"Ledgers",
[db]() -> std::optional<LedgerIndex> { return db->getMinLedgerSeq(); },
[db](LedgerIndex min) -> void { db->deleteBeforeLedgerSeq(min); });
if (healthWait() == stopping)
return;
if (!app_.config().useTxTables())
return;
clearSql(
lastRotated,
"Transactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
clearSql(
lastRotated,
"AccountTransactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getAccountTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteAccountTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
}
SHAMapStoreImp::HealthResult
SHAMapStoreImp::healthWait()
{
auto index = ledgerMaster_->getValidLedgerIndex();
auto age = ledgerMaster_->getValidatedLedgerAge();
OperatingMode mode = netOPs_->getOperatingMode();
std::unique_lock lock(mutex_);
auto numMissing = ledgerMaster_->missingFromCompleteLedgerRange(
lastGoodValidatedLedger_, index);
while (
!stop_ &&
(mode != OperatingMode::FULL || age > ageThreshold_ || numMissing > 0))
{
// this value shouldn't change, so grab it while we have the
// lock
auto const lastGood = lastGoodValidatedLedger_;
lock.unlock();
auto const stream = mode != OperatingMode::FULL || age > ageThreshold_
? journal_.warn()
: journal_.info();
JLOG(stream) << "Waiting " << recoveryWaitTime_.count()
<< "s for node to stabilize. state: "
<< app_.getOPs().strOperatingMode(mode, false) << ". age "
<< age.count() << "s. Missing ledgers: " << numMissing
<< ". Expect: " << lastGood << "-" << index
<< ". Complete ledgers: "
<< ledgerMaster_->getCompleteLedgers();
std::this_thread::sleep_for(recoveryWaitTime_);
index = ledgerMaster_->getValidLedgerIndex();
age = ledgerMaster_->getValidatedLedgerAge();
mode = netOPs_->getOperatingMode();
numMissing =
ledgerMaster_->missingFromCompleteLedgerRange(lastGood, index);
lock.lock();
}
JLOG(journal_.debug()) << "healthWait: Setting lastGoodValidatedLedger_ to "
<< index;
lastGoodValidatedLedger_ = index;
return stop_ ? stopping : keepGoing;
}
void
SHAMapStoreImp::stop()
{
if (thread_.joinable())
{
{
std::lock_guard lock(mutex_);
stop_ = true;
cond_.notify_one();
}
thread_.join();
}
}
std::optional<LedgerIndex>
SHAMapStoreImp::minimumOnline() const
{
// minimumOnline_ with 0 value is equivalent to unknown/not set.
// Don't attempt to acquire ledgers if that value is unknown.
if (deleteInterval_ && minimumOnline_)
return minimumOnline_.load();
return app_.getLedgerMaster().minSqlSeq();
}
//------------------------------------------------------------------------------
std::unique_ptr<SHAMapStore>
make_SHAMapStore(
Application& app,
NodeStore::Scheduler& scheduler,
beast::Journal journal)
{
return std::make_unique<SHAMapStoreImp>(app, scheduler, journal);
}
} // namespace ripple