mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
The DatabaseImp has threads that asynchronously call JobQueue to perform database reads. Formerly these threads had the same lifespan as Database, which was until the end-of-life of ApplicationImp. During shutdown these threads could call JobQueue after JobQueue had already stopped. Or, even worse, occasionally call JobQueue after JobQueue's destructor had run. To avoid these shutdown conditions, Database is made a Stoppable, with JobQueue as its parent. When Database stops, it shuts down its asynchronous read threads. This prevents Database from accessing JobQueue after JobQueue has stopped, but allows Database to perform stores for the remainder of shutdown. During development it was noted that the Database::close() method was never called. So that method is removed from Database and all derived classes. Stoppable is also adjusted so it can be constructed using either a char const* or a std::string. For those files touched for other reasons, unneeded #includes are removed.
847 lines
26 KiB
C++
847 lines
26 KiB
C++
//------------------------------------------------------------------------------
|
|
/*
|
|
This file is part of rippled: https://github.com/ripple/rippled
|
|
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
|
|
|
Permission to use, copy, modify, and/or distribute this software for any
|
|
purpose with or without fee is hereby granted, provided that the above
|
|
copyright notice and this permission notice appear in all copies.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
//==============================================================================
|
|
|
|
#include <BeastConfig.h>
|
|
|
|
#include <ripple/app/misc/SHAMapStoreImp.h>
|
|
#include <ripple/app/ledger/TransactionMaster.h>
|
|
#include <ripple/app/misc/NetworkOPs.h>
|
|
#include <ripple/core/ConfigSections.h>
|
|
#include <ripple/beast/core/CurrentThreadName.h>
|
|
|
|
namespace ripple {
|
|
void SHAMapStoreImp::SavedStateDB::init (BasicConfig const& config,
|
|
std::string const& dbName)
|
|
{
|
|
std::lock_guard<std::mutex> lock (mutex_);
|
|
|
|
open(session_, config, dbName);
|
|
|
|
session_ << "PRAGMA synchronous=FULL;";
|
|
|
|
session_ <<
|
|
"CREATE TABLE IF NOT EXISTS DbState ("
|
|
" Key INTEGER PRIMARY KEY,"
|
|
" WritableDb TEXT,"
|
|
" ArchiveDb TEXT,"
|
|
" LastRotatedLedger INTEGER"
|
|
");"
|
|
;
|
|
|
|
session_ <<
|
|
"CREATE TABLE IF NOT EXISTS CanDelete ("
|
|
" Key INTEGER PRIMARY KEY,"
|
|
" CanDeleteSeq INTEGER"
|
|
");"
|
|
;
|
|
|
|
std::int64_t count = 0;
|
|
{
|
|
boost::optional<std::int64_t> countO;
|
|
session_ <<
|
|
"SELECT COUNT(Key) FROM DbState WHERE Key = 1;"
|
|
, soci::into (countO);
|
|
if (!countO)
|
|
Throw<std::runtime_error> ("Failed to fetch Key Count from DbState.");
|
|
count = *countO;
|
|
}
|
|
|
|
if (!count)
|
|
{
|
|
session_ <<
|
|
"INSERT INTO DbState VALUES (1, '', '', 0);";
|
|
}
|
|
|
|
|
|
{
|
|
boost::optional<std::int64_t> countO;
|
|
session_ <<
|
|
"SELECT COUNT(Key) FROM CanDelete WHERE Key = 1;"
|
|
, soci::into (countO);
|
|
if (!countO)
|
|
Throw<std::runtime_error> ("Failed to fetch Key Count from CanDelete.");
|
|
count = *countO;
|
|
}
|
|
|
|
if (!count)
|
|
{
|
|
session_ <<
|
|
"INSERT INTO CanDelete VALUES (1, 0);";
|
|
}
|
|
}
|
|
|
|
LedgerIndex
|
|
SHAMapStoreImp::SavedStateDB::getCanDelete()
|
|
{
|
|
LedgerIndex seq;
|
|
std::lock_guard<std::mutex> lock (mutex_);
|
|
|
|
session_ <<
|
|
"SELECT CanDeleteSeq FROM CanDelete WHERE Key = 1;"
|
|
, soci::into (seq);
|
|
;
|
|
|
|
return seq;
|
|
}
|
|
|
|
LedgerIndex
|
|
SHAMapStoreImp::SavedStateDB::setCanDelete (LedgerIndex canDelete)
|
|
{
|
|
std::lock_guard<std::mutex> lock (mutex_);
|
|
|
|
session_ <<
|
|
"UPDATE CanDelete SET CanDeleteSeq = :canDelete WHERE Key = 1;"
|
|
, soci::use (canDelete)
|
|
;
|
|
|
|
return canDelete;
|
|
}
|
|
|
|
SHAMapStoreImp::SavedState
|
|
SHAMapStoreImp::SavedStateDB::getState()
|
|
{
|
|
SavedState state;
|
|
|
|
std::lock_guard<std::mutex> lock (mutex_);
|
|
|
|
session_ <<
|
|
"SELECT WritableDb, ArchiveDb, LastRotatedLedger"
|
|
" FROM DbState WHERE Key = 1;"
|
|
, soci::into (state.writableDb), soci::into (state.archiveDb)
|
|
, soci::into (state.lastRotated)
|
|
;
|
|
|
|
return state;
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::SavedStateDB::setState (SavedState const& state)
|
|
{
|
|
std::lock_guard<std::mutex> lock (mutex_);
|
|
session_ <<
|
|
"UPDATE DbState"
|
|
" SET WritableDb = :writableDb,"
|
|
" ArchiveDb = :archiveDb,"
|
|
" LastRotatedLedger = :lastRotated"
|
|
" WHERE Key = 1;"
|
|
, soci::use (state.writableDb)
|
|
, soci::use (state.archiveDb)
|
|
, soci::use (state.lastRotated)
|
|
;
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::SavedStateDB::setLastRotated (LedgerIndex seq)
|
|
{
|
|
std::lock_guard<std::mutex> lock (mutex_);
|
|
session_ <<
|
|
"UPDATE DbState SET LastRotatedLedger = :seq"
|
|
" WHERE Key = 1;"
|
|
, soci::use (seq)
|
|
;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
SHAMapStoreImp::SHAMapStoreImp (
|
|
Application& app,
|
|
Setup const& setup,
|
|
Stoppable& parent,
|
|
NodeStore::Scheduler& scheduler,
|
|
beast::Journal journal,
|
|
beast::Journal nodeStoreJournal,
|
|
TransactionMaster& transactionMaster,
|
|
BasicConfig const& config)
|
|
: SHAMapStore (parent)
|
|
, app_ (app)
|
|
, setup_ (setup)
|
|
, scheduler_ (scheduler)
|
|
, journal_ (journal)
|
|
, nodeStoreJournal_ (nodeStoreJournal)
|
|
, working_(true)
|
|
, transactionMaster_ (transactionMaster)
|
|
, canDelete_ (std::numeric_limits <LedgerIndex>::max())
|
|
{
|
|
if (setup_.deleteInterval)
|
|
{
|
|
auto const minInterval = setup.standalone ?
|
|
minimumDeletionIntervalSA_ : minimumDeletionInterval_;
|
|
if (setup_.deleteInterval < minInterval)
|
|
{
|
|
Throw<std::runtime_error> ("online_delete must be at least " +
|
|
std::to_string (minInterval));
|
|
}
|
|
|
|
if (setup_.ledgerHistory > setup_.deleteInterval)
|
|
{
|
|
Throw<std::runtime_error> (
|
|
"online_delete must not be less than ledger_history (currently " +
|
|
std::to_string (setup_.ledgerHistory) + ")");
|
|
}
|
|
|
|
state_db_.init (config, dbName_);
|
|
|
|
dbPaths();
|
|
}
|
|
}
|
|
|
|
std::unique_ptr <NodeStore::Database>
|
|
SHAMapStoreImp::makeDatabase (std::string const& name,
|
|
std::int32_t readThreads, Stoppable& parent)
|
|
{
|
|
std::unique_ptr <NodeStore::Database> db;
|
|
|
|
if (setup_.deleteInterval)
|
|
{
|
|
SavedState state = state_db_.getState();
|
|
|
|
std::shared_ptr <NodeStore::Backend> writableBackend (
|
|
makeBackendRotating (state.writableDb));
|
|
std::shared_ptr <NodeStore::Backend> archiveBackend (
|
|
makeBackendRotating (state.archiveDb));
|
|
|
|
fdlimit_ = writableBackend->fdlimit() + archiveBackend->fdlimit();
|
|
|
|
std::unique_ptr <NodeStore::DatabaseRotating> dbr =
|
|
makeDatabaseRotating (name, readThreads, parent,
|
|
writableBackend, archiveBackend);
|
|
|
|
if (!state.writableDb.size())
|
|
{
|
|
state.writableDb = writableBackend->getName();
|
|
state.archiveDb = archiveBackend->getName();
|
|
state_db_.setState (state);
|
|
}
|
|
|
|
database_ = dbr.get();
|
|
db.reset (dynamic_cast <NodeStore::Database*>(dbr.release()));
|
|
}
|
|
else
|
|
{
|
|
db = NodeStore::Manager::instance().make_Database (name, scheduler_,
|
|
readThreads, parent, setup_.nodeDatabase, nodeStoreJournal_);
|
|
fdlimit_ = db->fdlimit();
|
|
}
|
|
|
|
return db;
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::onLedgerClosed(
|
|
std::shared_ptr<Ledger const> const& ledger)
|
|
{
|
|
{
|
|
std::lock_guard <std::mutex> lock (mutex_);
|
|
newLedger_ = ledger;
|
|
working_ = true;
|
|
}
|
|
cond_.notify_one();
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::rendezvous() const
|
|
{
|
|
if (!working_)
|
|
return;
|
|
|
|
std::unique_lock <std::mutex> lock(mutex_);
|
|
rendezvous_.wait(lock, [&]
|
|
{
|
|
return !working_;
|
|
});
|
|
}
|
|
|
|
int
|
|
SHAMapStoreImp::fdlimit () const
|
|
{
|
|
return fdlimit_;
|
|
}
|
|
|
|
bool
|
|
SHAMapStoreImp::copyNode (std::uint64_t& nodeCount,
|
|
SHAMapAbstractNode const& node)
|
|
{
|
|
// Copy a single record from node to database_
|
|
database_->fetchNode (node.getNodeHash().as_uint256());
|
|
if (! (++nodeCount % checkHealthInterval_))
|
|
{
|
|
if (health())
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::run()
|
|
{
|
|
beast::setCurrentThreadName ("SHAMapStore");
|
|
LedgerIndex lastRotated = state_db_.getState().lastRotated;
|
|
netOPs_ = &app_.getOPs();
|
|
ledgerMaster_ = &app_.getLedgerMaster();
|
|
fullBelowCache_ = &app_.family().fullbelow();
|
|
treeNodeCache_ = &app_.family().treecache();
|
|
transactionDb_ = &app_.getTxnDB();
|
|
ledgerDb_ = &app_.getLedgerDB();
|
|
|
|
if (setup_.advisoryDelete)
|
|
canDelete_ = state_db_.getCanDelete ();
|
|
|
|
while (1)
|
|
{
|
|
healthy_ = true;
|
|
std::shared_ptr<Ledger const> validatedLedger;
|
|
|
|
{
|
|
std::unique_lock <std::mutex> lock (mutex_);
|
|
working_ = false;
|
|
rendezvous_.notify_all();
|
|
if (stop_)
|
|
{
|
|
stopped();
|
|
return;
|
|
}
|
|
cond_.wait (lock);
|
|
if (newLedger_)
|
|
{
|
|
validatedLedger = std::move(newLedger_);
|
|
}
|
|
else
|
|
continue;
|
|
}
|
|
|
|
LedgerIndex validatedSeq = validatedLedger->info().seq;
|
|
if (!lastRotated)
|
|
{
|
|
lastRotated = validatedSeq;
|
|
state_db_.setLastRotated (lastRotated);
|
|
}
|
|
|
|
// will delete up to (not including) lastRotated)
|
|
if (validatedSeq >= lastRotated + setup_.deleteInterval
|
|
&& canDelete_ >= lastRotated - 1)
|
|
{
|
|
JLOG(journal_.debug()) << "rotating validatedSeq " << validatedSeq
|
|
<< " lastRotated " << lastRotated << " deleteInterval "
|
|
<< setup_.deleteInterval << " canDelete_ " << canDelete_;
|
|
|
|
switch (health())
|
|
{
|
|
case Health::stopping:
|
|
stopped();
|
|
return;
|
|
case Health::unhealthy:
|
|
continue;
|
|
case Health::ok:
|
|
default:
|
|
;
|
|
}
|
|
|
|
clearPrior (lastRotated);
|
|
switch (health())
|
|
{
|
|
case Health::stopping:
|
|
stopped();
|
|
return;
|
|
case Health::unhealthy:
|
|
continue;
|
|
case Health::ok:
|
|
default:
|
|
;
|
|
}
|
|
|
|
std::uint64_t nodeCount = 0;
|
|
validatedLedger->stateMap().snapShot (
|
|
false)->visitNodes (
|
|
std::bind (&SHAMapStoreImp::copyNode, this,
|
|
std::ref(nodeCount), std::placeholders::_1));
|
|
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
|
|
<< " nodecount " << nodeCount;
|
|
switch (health())
|
|
{
|
|
case Health::stopping:
|
|
stopped();
|
|
return;
|
|
case Health::unhealthy:
|
|
continue;
|
|
case Health::ok:
|
|
default:
|
|
;
|
|
}
|
|
|
|
freshenCaches();
|
|
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
|
|
switch (health())
|
|
{
|
|
case Health::stopping:
|
|
stopped();
|
|
return;
|
|
case Health::unhealthy:
|
|
continue;
|
|
case Health::ok:
|
|
default:
|
|
;
|
|
}
|
|
|
|
std::shared_ptr <NodeStore::Backend> newBackend =
|
|
makeBackendRotating();
|
|
JLOG(journal_.debug()) << validatedSeq << " new backend "
|
|
<< newBackend->getName();
|
|
std::shared_ptr <NodeStore::Backend> oldBackend;
|
|
|
|
clearCaches (validatedSeq);
|
|
switch (health())
|
|
{
|
|
case Health::stopping:
|
|
stopped();
|
|
return;
|
|
case Health::unhealthy:
|
|
continue;
|
|
case Health::ok:
|
|
default:
|
|
;
|
|
}
|
|
|
|
std::string nextArchiveDir =
|
|
database_->getWritableBackend()->getName();
|
|
lastRotated = validatedSeq;
|
|
{
|
|
std::lock_guard <std::mutex> lock (database_->peekMutex());
|
|
|
|
state_db_.setState (SavedState {newBackend->getName(),
|
|
nextArchiveDir, lastRotated});
|
|
clearCaches (validatedSeq);
|
|
oldBackend = database_->rotateBackends (newBackend);
|
|
}
|
|
JLOG(journal_.debug()) << "finished rotation " << validatedSeq;
|
|
|
|
oldBackend->setDeletePath();
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::dbPaths()
|
|
{
|
|
boost::filesystem::path dbPath =
|
|
get<std::string>(setup_.nodeDatabase, "path");
|
|
|
|
if (boost::filesystem::exists (dbPath))
|
|
{
|
|
if (! boost::filesystem::is_directory (dbPath))
|
|
{
|
|
journal_.error() << "node db path must be a directory. "
|
|
<< dbPath.string();
|
|
Throw<std::runtime_error> (
|
|
"node db path must be a directory.");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
boost::filesystem::create_directories (dbPath);
|
|
}
|
|
|
|
SavedState state = state_db_.getState();
|
|
bool writableDbExists = false;
|
|
bool archiveDbExists = false;
|
|
|
|
for (boost::filesystem::directory_iterator it (dbPath);
|
|
it != boost::filesystem::directory_iterator(); ++it)
|
|
{
|
|
if (! state.writableDb.compare (it->path().string()))
|
|
writableDbExists = true;
|
|
else if (! state.archiveDb.compare (it->path().string()))
|
|
archiveDbExists = true;
|
|
else if (! dbPrefix_.compare (it->path().stem().string()))
|
|
boost::filesystem::remove_all (it->path());
|
|
}
|
|
|
|
if ((!writableDbExists && state.writableDb.size()) ||
|
|
(!archiveDbExists && state.archiveDb.size()) ||
|
|
(writableDbExists != archiveDbExists) ||
|
|
state.writableDb.empty() != state.archiveDb.empty())
|
|
{
|
|
boost::filesystem::path stateDbPathName = setup_.databasePath;
|
|
stateDbPathName /= dbName_;
|
|
stateDbPathName += "*";
|
|
|
|
journal_.error() << "state db error: " << std::endl
|
|
<< " writableDbExists " << writableDbExists
|
|
<< " archiveDbExists " << archiveDbExists << std::endl
|
|
<< " writableDb '" << state.writableDb
|
|
<< "' archiveDb '" << state.archiveDb << "'"
|
|
<< std::endl << std::endl
|
|
<< "To resume operation, make backups of and "
|
|
<< "remove the files matching "
|
|
<< stateDbPathName.string()
|
|
<< " and contents of the directory "
|
|
<< get<std::string>(setup_.nodeDatabase, "path")
|
|
<< std::endl;
|
|
|
|
Throw<std::runtime_error> ("state db error");
|
|
}
|
|
}
|
|
|
|
std::shared_ptr <NodeStore::Backend>
|
|
SHAMapStoreImp::makeBackendRotating (std::string path)
|
|
{
|
|
boost::filesystem::path newPath;
|
|
Section parameters = setup_.nodeDatabase;
|
|
|
|
if (path.size())
|
|
{
|
|
newPath = path;
|
|
}
|
|
else
|
|
{
|
|
boost::filesystem::path p = get<std::string>(parameters, "path");
|
|
p /= dbPrefix_;
|
|
p += ".%%%%";
|
|
newPath = boost::filesystem::unique_path (p);
|
|
}
|
|
parameters.set("path", newPath.string());
|
|
|
|
return NodeStore::Manager::instance().make_Backend (parameters, scheduler_,
|
|
nodeStoreJournal_);
|
|
}
|
|
|
|
std::unique_ptr <NodeStore::DatabaseRotating>
|
|
SHAMapStoreImp::makeDatabaseRotating (std::string const& name,
|
|
std::int32_t readThreads, Stoppable& parent,
|
|
std::shared_ptr <NodeStore::Backend> writableBackend,
|
|
std::shared_ptr <NodeStore::Backend> archiveBackend) const
|
|
{
|
|
return NodeStore::Manager::instance().make_DatabaseRotating (
|
|
name, scheduler_, readThreads, parent,
|
|
writableBackend, archiveBackend, nodeStoreJournal_);
|
|
}
|
|
|
|
bool
|
|
SHAMapStoreImp::clearSql (DatabaseCon& database,
|
|
LedgerIndex lastRotated,
|
|
std::string const& minQuery,
|
|
std::string const& deleteQuery)
|
|
{
|
|
LedgerIndex min = std::numeric_limits <LedgerIndex>::max();
|
|
|
|
{
|
|
auto db = database.checkoutDb ();
|
|
boost::optional<std::uint64_t> m;
|
|
*db << minQuery, soci::into(m);
|
|
if (!m)
|
|
return false;
|
|
min = *m;
|
|
}
|
|
|
|
if(min > lastRotated || health() != Health::ok)
|
|
return false;
|
|
|
|
boost::format formattedDeleteQuery (deleteQuery);
|
|
|
|
JLOG(journal_.debug()) <<
|
|
"start: " << deleteQuery << " from " << min << " to " << lastRotated;
|
|
while (min < lastRotated)
|
|
{
|
|
min = std::min(lastRotated, min + setup_.deleteBatch);
|
|
{
|
|
auto db = database.checkoutDb ();
|
|
*db << boost::str (formattedDeleteQuery % min);
|
|
}
|
|
if (health())
|
|
return true;
|
|
if (min < lastRotated)
|
|
std::this_thread::sleep_for (
|
|
std::chrono::milliseconds (setup_.backOff));
|
|
}
|
|
JLOG(journal_.debug()) << "finished: " << deleteQuery;
|
|
return true;
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::clearCaches (LedgerIndex validatedSeq)
|
|
{
|
|
ledgerMaster_->clearLedgerCachePrior (validatedSeq);
|
|
fullBelowCache_->clear();
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::freshenCaches()
|
|
{
|
|
if (freshenCache (database_->getPositiveCache()))
|
|
return;
|
|
if (freshenCache (*treeNodeCache_))
|
|
return;
|
|
if (freshenCache (transactionMaster_.getCache()))
|
|
return;
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::clearPrior (LedgerIndex lastRotated)
|
|
{
|
|
if (health())
|
|
return;
|
|
|
|
ledgerMaster_->clearPriorLedgers (lastRotated);
|
|
if (health())
|
|
return;
|
|
|
|
clearSql (*ledgerDb_, lastRotated,
|
|
"SELECT MIN(LedgerSeq) FROM Ledgers;",
|
|
"DELETE FROM Ledgers WHERE LedgerSeq < %u;");
|
|
if (health())
|
|
return;
|
|
|
|
{
|
|
/*
|
|
Steps for migration:
|
|
Assume: online_delete = 100, lastRotated = 1000,
|
|
Last shutdown was at ledger # 1080.
|
|
The current network validated ledger is 1090.
|
|
Implies: Ledgers has entries from 900 to 1080.
|
|
Validations has entries for all 1080 ledgers,
|
|
including orphan validations that were not included
|
|
in a validated ledger.
|
|
1) Columns are created in Validations with default NULL values.
|
|
2) During syncing, Ledgers and Validations for 1080 - 1090
|
|
are received from the network. Records are created in
|
|
Validations with InitialSeq approximately 1080 (exact value
|
|
doesn't matter), and later validated with the matching
|
|
LedgerSeq value.
|
|
3) rippled participates in ledgers 1091-1100. Validations
|
|
received are created with InitialSeq in that range, and
|
|
appropriate LedgerSeqs. Maybe some of those ledgers are
|
|
not accepted, so LedgerSeq stays null.
|
|
4) At ledger 1100, this function is called with
|
|
lastRotated = 1000. The first query tries to delete
|
|
rows WHERE LedgerSeq < 1000. It finds none.
|
|
5) The second round of deletions does not run.
|
|
6) Ledgers continue to advance from 1100-1200 as described
|
|
in step 3.
|
|
7) At ledger 1200, this function is called again with
|
|
lastRotated = 1100. The first query again tries to delete
|
|
rows WHERE LedgerSeq < 1100. It finds the rows for 1080-1099.
|
|
8) The second round of deletions runs. It gets
|
|
WHERE v.LedgerSeq is NULL AND
|
|
(v.InitialSeq IS NULL OR v.InitialSeq < 1100)
|
|
The rows that are found include (a) ALL of the Validations
|
|
for the first 1080 ledgers. (b) Any orphan validations that
|
|
were created in step 3.
|
|
9) This continues. The next rotation cycle does the same as steps
|
|
7 & 8, except that none of the original Validations (8a) exist
|
|
anymore, and 8b gets the orphans from step 6.
|
|
*/
|
|
|
|
static auto anyValDeleted = false;
|
|
auto const valDeleted = clearSql(*ledgerDb_, lastRotated,
|
|
"SELECT MIN(LedgerSeq) FROM Validations;",
|
|
"DELETE FROM Validations WHERE LedgerSeq < %u;");
|
|
anyValDeleted |= valDeleted;
|
|
|
|
if (health())
|
|
return;
|
|
|
|
if (anyValDeleted)
|
|
{
|
|
/* Delete the old NULL LedgerSeqs - the Validations that
|
|
aren't linked to a validated ledger - but only if we
|
|
deleted rows in the matching `clearSql` call, and only
|
|
for those created with an old InitialSeq.
|
|
*/
|
|
using namespace std::chrono;
|
|
auto const deleteBatch = setup_.deleteBatch;
|
|
auto const continueLimit = (deleteBatch + 1) / 2;
|
|
|
|
std::string const deleteQuery(
|
|
R"sql(DELETE FROM Validations
|
|
WHERE LedgerHash IN
|
|
(
|
|
SELECT v.LedgerHash
|
|
FROM Validations v
|
|
WHERE v.LedgerSeq is NULL AND
|
|
(v.InitialSeq IS NULL OR v.InitialSeq < )sql" +
|
|
std::to_string(lastRotated) +
|
|
") LIMIT " +
|
|
std::to_string (deleteBatch) +
|
|
");");
|
|
|
|
JLOG(journal_.debug()) << "start: " << deleteQuery << " of "
|
|
<< deleteBatch << " rows.";
|
|
long long totalRowsAffected = 0;
|
|
long long rowsAffected;
|
|
auto st = [&]
|
|
{
|
|
auto db = ledgerDb_->checkoutDb();
|
|
return soci::statement(db->prepare << deleteQuery);
|
|
}();
|
|
if (health())
|
|
return;
|
|
do
|
|
{
|
|
{
|
|
auto db = ledgerDb_->checkoutDb();
|
|
auto const start = high_resolution_clock::now();
|
|
st.execute(true);
|
|
rowsAffected = st.get_affected_rows();
|
|
totalRowsAffected += rowsAffected;
|
|
auto const ms = duration_cast<milliseconds>(
|
|
high_resolution_clock::now() - start).count();
|
|
JLOG(journal_.trace()) << "step: deleted " << rowsAffected
|
|
<< " rows in " << ms << "ms.";
|
|
}
|
|
if (health())
|
|
return;
|
|
if (rowsAffected >= continueLimit)
|
|
std::this_thread::sleep_for(
|
|
std::chrono::milliseconds(setup_.backOff));
|
|
}
|
|
while (rowsAffected && rowsAffected >= continueLimit);
|
|
JLOG(journal_.debug()) << "finished: " << deleteQuery << ". Deleted "
|
|
<< totalRowsAffected << " rows.";
|
|
}
|
|
}
|
|
|
|
if (health())
|
|
return;
|
|
|
|
clearSql (*transactionDb_, lastRotated,
|
|
"SELECT MIN(LedgerSeq) FROM Transactions;",
|
|
"DELETE FROM Transactions WHERE LedgerSeq < %u;");
|
|
if (health())
|
|
return;
|
|
|
|
clearSql (*transactionDb_, lastRotated,
|
|
"SELECT MIN(LedgerSeq) FROM AccountTransactions;",
|
|
"DELETE FROM AccountTransactions WHERE LedgerSeq < %u;");
|
|
if (health())
|
|
return;
|
|
}
|
|
|
|
SHAMapStoreImp::Health
|
|
SHAMapStoreImp::health()
|
|
{
|
|
{
|
|
std::lock_guard<std::mutex> lock (mutex_);
|
|
if (stop_)
|
|
return Health::stopping;
|
|
}
|
|
if (!netOPs_)
|
|
return Health::ok;
|
|
|
|
NetworkOPs::OperatingMode mode = netOPs_->getOperatingMode();
|
|
|
|
auto age = ledgerMaster_->getValidatedLedgerAge();
|
|
if (mode != NetworkOPs::omFULL || age.count() >= setup_.ageThreshold)
|
|
{
|
|
JLOG(journal_.warn()) << "Not deleting. state: " << mode
|
|
<< " age " << age.count()
|
|
<< " age threshold " << setup_.ageThreshold;
|
|
healthy_ = false;
|
|
}
|
|
|
|
if (healthy_)
|
|
return Health::ok;
|
|
else
|
|
return Health::unhealthy;
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::onStop()
|
|
{
|
|
if (setup_.deleteInterval)
|
|
{
|
|
{
|
|
std::lock_guard <std::mutex> lock (mutex_);
|
|
stop_ = true;
|
|
}
|
|
cond_.notify_one();
|
|
}
|
|
else
|
|
{
|
|
stopped();
|
|
}
|
|
}
|
|
|
|
void
|
|
SHAMapStoreImp::onChildrenStopped()
|
|
{
|
|
if (setup_.deleteInterval)
|
|
{
|
|
{
|
|
std::lock_guard <std::mutex> lock (mutex_);
|
|
stop_ = true;
|
|
}
|
|
cond_.notify_one();
|
|
}
|
|
else
|
|
{
|
|
stopped();
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
SHAMapStore::Setup
|
|
setup_SHAMapStore (Config const& c)
|
|
{
|
|
SHAMapStore::Setup setup;
|
|
|
|
setup.standalone = c.standalone();
|
|
|
|
// Get existing settings and add some default values if not specified:
|
|
setup.nodeDatabase = c.section (ConfigSection::nodeDatabase ());
|
|
|
|
// These two parameters apply only to RocksDB. We want to give them sensible
|
|
// defaults if no values are specified.
|
|
if (!setup.nodeDatabase.exists ("cache_mb"))
|
|
setup.nodeDatabase.set ("cache_mb", std::to_string (c.getSize (siHashNodeDBCache)));
|
|
|
|
if (!setup.nodeDatabase.exists ("filter_bits") && (c.NODE_SIZE >= 2))
|
|
setup.nodeDatabase.set ("filter_bits", "10");
|
|
|
|
get_if_exists (setup.nodeDatabase, "online_delete", setup.deleteInterval);
|
|
|
|
if (setup.deleteInterval)
|
|
get_if_exists (setup.nodeDatabase, "advisory_delete", setup.advisoryDelete);
|
|
|
|
setup.ledgerHistory = c.LEDGER_HISTORY;
|
|
setup.databasePath = c.legacy("database_path");
|
|
|
|
get_if_exists (setup.nodeDatabase, "delete_batch", setup.deleteBatch);
|
|
get_if_exists (setup.nodeDatabase, "backOff", setup.backOff);
|
|
get_if_exists (setup.nodeDatabase, "age_threshold", setup.ageThreshold);
|
|
|
|
return setup;
|
|
}
|
|
|
|
std::unique_ptr<SHAMapStore>
|
|
make_SHAMapStore (Application& app,
|
|
SHAMapStore::Setup const& s,
|
|
Stoppable& parent,
|
|
NodeStore::Scheduler& scheduler,
|
|
beast::Journal journal,
|
|
beast::Journal nodeStoreJournal,
|
|
TransactionMaster& transactionMaster,
|
|
BasicConfig const& config)
|
|
{
|
|
return std::make_unique<SHAMapStoreImp>(app, s, parent, scheduler,
|
|
journal, nodeStoreJournal, transactionMaster,
|
|
config);
|
|
}
|
|
|
|
}
|