Clear old Validations during online delete (RIPD-870):

* Add Validations.LedgerSeq and .InitialSeq fields.
* Clean up logging.
* Lower online delete minimum for standalone mode.
* Unit tests of online_delete.
This commit is contained in:
Edward Hennis
2015-12-09 19:30:05 -05:00
committed by Nik Bougalis
parent 70d5c4eca7
commit eb62959216
14 changed files with 917 additions and 69 deletions

View File

@@ -41,6 +41,7 @@ class SHAMapStore
public:
struct Setup
{
bool standalone = false;
std::uint32_t deleteInterval = 0;
bool advisoryDelete = false;
std::uint32_t ledgerHistory = 0;

View File

@@ -29,6 +29,7 @@
#include <boost/format.hpp>
#include <boost/optional.hpp>
#include <memory>
#include <chrono>
namespace ripple {
void SHAMapStoreImp::SavedStateDB::init (BasicConfig const& config,
@@ -179,21 +180,24 @@ SHAMapStoreImp::SHAMapStoreImp (
, scheduler_ (scheduler)
, journal_ (journal)
, nodeStoreJournal_ (nodeStoreJournal)
, rotating_(false)
, transactionMaster_ (transactionMaster)
, canDelete_ (std::numeric_limits <LedgerIndex>::max())
{
if (setup_.deleteInterval)
{
if (setup_.deleteInterval < minimumDeletionInterval_)
auto const minInterval = setup.standalone ?
minimumDeletionIntervalSA_ : minimumDeletionInterval_;
if (setup_.deleteInterval < minInterval)
{
Throw<std::runtime_error> ("online_delete must be at least " +
std::to_string (minimumDeletionInterval_));
std::to_string (minInterval));
}
if (setup_.ledgerHistory > setup_.deleteInterval)
{
Throw<std::runtime_error> (
"online_delete must be less than ledger_history (currently " +
"online_delete must not be less than ledger_history (currently " +
std::to_string (setup_.ledgerHistory) + ")");
}
@@ -283,7 +287,8 @@ SHAMapStoreImp::run()
while (1)
{
healthy_ = true;
validatedLedger_.reset();
std::shared_ptr<Ledger const> validatedLedger;
rotating_ = false;
{
std::unique_lock <std::mutex> lock (mutex_);
@@ -294,12 +299,15 @@ SHAMapStoreImp::run()
}
cond_.wait (lock);
if (newLedger_)
validatedLedger_ = std::move (newLedger_);
{
rotating_ = true;
validatedLedger = std::move(newLedger_);
}
else
continue;
}
LedgerIndex validatedSeq = validatedLedger_->info().seq;
LedgerIndex validatedSeq = validatedLedger->info().seq;
if (!lastRotated)
{
lastRotated = validatedSeq;
@@ -340,7 +348,7 @@ SHAMapStoreImp::run()
}
std::uint64_t nodeCount = 0;
validatedLedger_->stateMap().snapShot (
validatedLedger->stateMap().snapShot (
false)->visitNodes (
std::bind (&SHAMapStoreImp::copyNode, this,
std::ref(nodeCount), std::placeholders::_1));
@@ -504,7 +512,7 @@ SHAMapStoreImp::makeDatabaseRotating (std::string const& name,
readThreads, writableBackend, archiveBackend, nodeStoreJournal_);
}
void
bool
SHAMapStoreImp::clearSql (DatabaseCon& database,
LedgerIndex lastRotated,
std::string const& minQuery,
@@ -517,12 +525,12 @@ SHAMapStoreImp::clearSql (DatabaseCon& database,
boost::optional<std::uint64_t> m;
*db << minQuery, soci::into(m);
if (!m)
return;
return false;
min = *m;
}
if (health() != Health::ok)
return;
if(min > lastRotated || health() != Health::ok)
return false;
boost::format formattedDeleteQuery (deleteQuery);
@@ -530,19 +538,19 @@ SHAMapStoreImp::clearSql (DatabaseCon& database,
"start: " << deleteQuery << " from " << min << " to " << lastRotated;
while (min < lastRotated)
{
min = (min + setup_.deleteBatch >= lastRotated) ? lastRotated :
min + setup_.deleteBatch;
min = std::min(lastRotated, min + setup_.deleteBatch);
{
auto db = database.checkoutDb ();
*db << boost::str (formattedDeleteQuery % min);
}
if (health())
return;
return true;
if (min < lastRotated)
std::this_thread::sleep_for (
std::chrono::milliseconds (setup_.backOff));
}
JLOG(journal_.debug) << "finished: " << deleteQuery;
return true;
}
void
@@ -566,25 +574,10 @@ SHAMapStoreImp::freshenCaches()
void
SHAMapStoreImp::clearPrior (LedgerIndex lastRotated)
{
ledgerMaster_->clearPriorLedgers (lastRotated);
if (health())
return;
// TODO This won't remove validations for ledgers that do not get
// validated. That will likely require inserting LedgerSeq into
// the validations table.
//
// This query has poor performance with large data sets.
// The schema needs to be redesigned to avoid the JOIN, or an
// RDBMS that supports concurrency should be used.
/*
clearSql (*ledgerDb_, lastRotated,
"SELECT MIN(LedgerSeq) FROM Ledgers;",
"DELETE FROM Validations WHERE LedgerHash IN "
"(SELECT Ledgers.LedgerHash FROM Validations JOIN Ledgers ON "
"Validations.LedgerHash=Ledgers.LedgerHash WHERE Ledgers.LedgerSeq < %u);");
*/
ledgerMaster_->clearPriorLedgers (lastRotated);
if (health())
return;
@@ -594,6 +587,118 @@ SHAMapStoreImp::clearPrior (LedgerIndex lastRotated)
if (health())
return;
{
/*
Steps for migration:
Assume: online_delete = 100, lastRotated = 1000,
Last shutdown was at ledger # 1080.
The current network validated ledger is 1090.
Implies: Ledgers has entries from 900 to 1080.
Validations has entries for all 1080 ledgers,
including orphan validations that were not included
in a validated ledger.
1) Columns are created in Validations with default NULL values.
2) During syncing, Ledgers and Validations for 1080 - 1090
are received from the network. Records are created in
Validations with InitialSeq approximately 1080 (exact value
doesn't matter), and later validated with the matching
LedgerSeq value.
3) rippled participates in ledgers 1091-1100. Validations
received are created with InitialSeq in that range, and
appropriate LedgerSeqs. Maybe some of those ledgers are
not accepted, so LedgerSeq stays null.
4) At ledger 1100, this function is called with
lastRotated = 1000. The first query tries to delete
rows WHERE LedgerSeq < 1000. It finds none.
5) The second round of deletions does not run.
6) Ledgers continue to advance from 1100-1200 as described
in step 3.
7) At ledger 1200, this function is called again with
lastRotated = 1100. The first query again tries to delete
rows WHERE LedgerSeq < 1100. It finds the rows for 1080-1099.
8) The second round of deletions runs. It gets
WHERE v.LedgerSeq is NULL AND
(v.InitialSeq IS NULL OR v.InitialSeq < 1100)
The rows that are found include (a) ALL of the Validations
for the first 1080 ledgers. (b) Any orphan validations that
were created in step 3.
9) This continues. The next rotation cycle does the same as steps
7 & 8, except that none of the original Validations (8a) exist
anymore, and 8b gets the orphans from step 6.
*/
static auto anyValDeleted = false;
auto const valDeleted = clearSql(*ledgerDb_, lastRotated,
"SELECT MIN(LedgerSeq) FROM Validations;",
"DELETE FROM Validations WHERE LedgerSeq < %u;");
anyValDeleted |= valDeleted;
if (health())
return;
if (anyValDeleted)
{
/* Delete the old NULL LedgerSeqs - the Validations that
aren't linked to a validated ledger - but only if we
deleted rows in the matching `clearSql` call, and only
for those created with an old InitialSeq.
*/
using namespace std::chrono;
auto const deleteBatch = setup_.deleteBatch;
auto const continueLimit = (deleteBatch + 1) / 2;
std::string const deleteQuery(
R"sql(DELETE FROM Validations
WHERE LedgerHash IN
(
SELECT v.LedgerHash
FROM Validations v
WHERE v.LedgerSeq is NULL AND
(v.InitialSeq IS NULL OR v.InitialSeq < )sql" +
std::to_string(lastRotated) +
") LIMIT " +
std::to_string (deleteBatch) +
");");
JLOG(journal_.debug) << "start: " << deleteQuery << " of "
<< deleteBatch << " rows.";
long long totalRowsAffected = 0;
long long rowsAffected;
soci::statement st = [&]
{
auto db = ledgerDb_->checkoutDb();
return (db->prepare << deleteQuery);
}();
if (health())
return;
do
{
{
auto db = ledgerDb_->checkoutDb();
auto const start = high_resolution_clock::now();
st.execute(true);
rowsAffected = st.get_affected_rows();
totalRowsAffected += rowsAffected;
auto const ms = duration_cast<milliseconds>(
high_resolution_clock::now() - start).count();
JLOG(journal_.trace) << "step: deleted " << rowsAffected
<< " rows in " << ms << "ms.";
}
if (health())
return;
if (rowsAffected >= continueLimit)
std::this_thread::sleep_for(
std::chrono::milliseconds(setup_.backOff));
}
while (rowsAffected && rowsAffected >= continueLimit);
JLOG(journal_.debug) << "finished: " << deleteQuery << ". Deleted "
<< totalRowsAffected << " rows.";
}
}
if (health())
return;
clearSql (*transactionDb_, lastRotated,
"SELECT MIN(LedgerSeq) FROM Transactions;",
"DELETE FROM Transactions WHERE LedgerSeq < %u;");
@@ -675,6 +780,8 @@ setup_SHAMapStore (Config const& c)
{
SHAMapStore::Setup setup;
setup.standalone = c.RUN_STANDALONE;
// Get existing settings and add some default values if not specified:
setup.nodeDatabase = c.section (ConfigSection::nodeDatabase ());

View File

@@ -80,7 +80,9 @@ private:
// check health/stop status as records are copied
std::uint64_t const checkHealthInterval_ = 1000;
// minimum # of ledgers to maintain for health of network
std::uint32_t minimumDeletionInterval_ = 256;
static std::uint32_t const minimumDeletionInterval_ = 256;
// minimum # of ledgers required for standalone mode.
static std::uint32_t const minimumDeletionIntervalSA_ = 8;
Setup setup_;
NodeStore::Scheduler& scheduler_;
@@ -94,7 +96,7 @@ private:
mutable std::condition_variable cond_;
mutable std::mutex mutex_;
std::shared_ptr<Ledger const> newLedger_;
std::shared_ptr<Ledger const> validatedLedger_;
std::atomic<bool> rotating_;
TransactionMaster& transactionMaster_;
std::atomic <LedgerIndex> canDelete_;
// these do not exist upon SHAMapStore creation, but do exist
@@ -106,6 +108,12 @@ private:
DatabaseCon* transactionDb_ = nullptr;
DatabaseCon* ledgerDb_ = nullptr;
public:
bool rotating() const
{
return rotating_;
}
public:
SHAMapStoreImp (Application& app,
Setup const& setup,
@@ -203,8 +211,10 @@ private:
/** delete from sqlite table in batches to not lock the db excessively
* pause briefly to extend access time to other users
* call with mutex object unlocked
* @return true if any deletable rows were found (though not
* necessarily deleted.
*/
void clearSql (DatabaseCon& database, LedgerIndex lastRotated,
bool clearSql (DatabaseCon& database, LedgerIndex lastRotated,
std::string const& minQuery, std::string const& deleteQuery);
void clearCaches (LedgerIndex validatedSeq);
void freshenCaches();

View File

@@ -461,8 +461,10 @@ private:
void doWrite ()
{
LoadEvent::autoptr event (app_.getJobQueue ().getLoadEventAP (jtDISK, "ValidationWrite"));
boost::format insVal ("INSERT INTO Validations "
"(LedgerHash,NodePubKey,SignTime,RawData) VALUES ('%s','%s','%u',%s);");
std::string insVal ("INSERT INTO Validations "
"(InitialSeq, LedgerSeq, LedgerHash,NodePubKey,SignTime,RawData) "
"VALUES (:initialSeq, :ledgerSeq, :ledgerHash,:nodePubKey,:signTime,:rawData);");
std::string findSeq("SELECT LedgerSeq FROM Ledgers WHERE Ledgerhash=:ledgerHash;");
ScopedLockType sl (mLock);
assert (mWriting);
@@ -484,13 +486,34 @@ private:
{
s.erase ();
it->add (s);
*db << boost::str (
insVal % to_string (it->getLedgerHash ()) %
toBase58(
TokenType::TOKEN_NODE_PUBLIC,
it->getSignerPublic ()) %
it->getSignTime().time_since_epoch().count() %
sqlEscape (s.peekData ()));
auto const ledgerHash = to_string(it->getLedgerHash());
boost::optional<std::uint64_t> ledgerSeq;
*db << findSeq, soci::use(ledgerHash),
soci::into(ledgerSeq);
auto const initialSeq = ledgerSeq.value_or(
app_.getLedgerMaster().getCurrentLedgerIndex());
auto const nodePubKey = toBase58(
TokenType::TOKEN_NODE_PUBLIC,
it->getSignerPublic());
auto const signTime =
it->getSignTime().time_since_epoch().count();
soci::blob rawData(*db);
rawData.append(reinterpret_cast<const char*>(
s.peekData().data()), s.peekData().size());
assert(rawData.get_len() == s.peekData().size());
*db <<
insVal,
soci::use(initialSeq),
soci::use(ledgerSeq),
soci::use(ledgerHash),
soci::use(nodePubKey),
soci::use(signTime),
soci::use(rawData);
}
tr.commit ();