mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 19:25:51 +00:00
Maintain history back to the earliest persisted ledger:
This makes behavior consistent with configurations both with and without online delete.
This commit is contained in:
@@ -39,6 +39,7 @@
|
||||
#include <ripple/protocol/Protocol.h>
|
||||
#include <ripple/protocol/RippleLedgerHash.h>
|
||||
#include <ripple/protocol/STValidation.h>
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
#include <mutex>
|
||||
|
||||
@@ -280,11 +281,6 @@ private:
|
||||
// Try to publish ledgers, acquire missing ledgers. Always called with
|
||||
// m_mutex locked. The passed lock is a reminder to callers.
|
||||
void doAdvance(std::unique_lock<std::recursive_mutex>&);
|
||||
bool shouldAcquire(
|
||||
std::uint32_t const currentLedger,
|
||||
std::uint32_t const ledgerHistory,
|
||||
std::uint32_t const ledgerHistoryIndex,
|
||||
std::uint32_t const candidateLedger) const;
|
||||
|
||||
std::vector<std::shared_ptr<Ledger const>>
|
||||
findNewLedgersToPublish(std::unique_lock<std::recursive_mutex>&);
|
||||
@@ -295,7 +291,6 @@ private:
|
||||
// The passed lock is a reminder to callers.
|
||||
bool newPFWork(const char *name, std::unique_lock<std::recursive_mutex>&);
|
||||
|
||||
private:
|
||||
Application& app_;
|
||||
beast::Journal m_journal;
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include <ripple/basics/Log.h>
|
||||
#include <ripple/basics/TaggedCache.h>
|
||||
#include <ripple/basics/UptimeClock.h>
|
||||
#include <ripple/core/DatabaseCon.h>
|
||||
#include <ripple/core/TimeKeeper.h>
|
||||
#include <ripple/nodestore/DatabaseShard.h>
|
||||
#include <ripple/overlay/Overlay.h>
|
||||
@@ -47,6 +48,7 @@
|
||||
#include <ripple/resource/Fees.h>
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
@@ -140,6 +142,57 @@ static constexpr std::chrono::minutes MAX_LEDGER_AGE_ACQUIRE {1};
|
||||
// Don't acquire history if write load is too high
|
||||
static constexpr int MAX_WRITE_LOAD_ACQUIRE {8192};
|
||||
|
||||
// Helper function for LedgerMaster::doAdvance()
|
||||
// Returns the minimum ledger sequence in SQL database, if any.
|
||||
static boost::optional<LedgerIndex>
|
||||
minSqlSeq(Application& app)
|
||||
{
|
||||
boost::optional<LedgerIndex> seq;
|
||||
auto db = app.getLedgerDB().checkoutDb();
|
||||
*db << "SELECT MIN(LedgerSeq) FROM Ledgers", soci::into(seq);
|
||||
return seq;
|
||||
}
|
||||
|
||||
// Helper function for LedgerMaster::doAdvance()
|
||||
// Return true if candidateLedger should be fetched from the network.
|
||||
static bool
|
||||
shouldAcquire (
|
||||
std::uint32_t const currentLedger,
|
||||
std::uint32_t const ledgerHistory,
|
||||
boost::optional<LedgerIndex> minSeq,
|
||||
std::uint32_t const lastRotated,
|
||||
std::uint32_t const candidateLedger,
|
||||
beast::Journal j)
|
||||
{
|
||||
bool ret = [&]()
|
||||
{
|
||||
// Fetch ledger if it may be the current ledger
|
||||
if (candidateLedger >= currentLedger)
|
||||
return true;
|
||||
|
||||
// Or if it is within our configured history range:
|
||||
if (currentLedger - candidateLedger <= ledgerHistory)
|
||||
return true;
|
||||
|
||||
// Or it's greater than or equal to both:
|
||||
// - the minimum persisted ledger or the maximum possible
|
||||
// sequence value, if no persisted ledger, and
|
||||
// - minimum ledger that will be persisted as of the next online
|
||||
// deletion interval, or 1 if online deletion is disabled.
|
||||
return
|
||||
candidateLedger >= std::max(
|
||||
minSeq.value_or(std::numeric_limits<LedgerIndex>::max()),
|
||||
lastRotated + 1);
|
||||
}();
|
||||
|
||||
JLOG (j.trace())
|
||||
<< "Missing ledger "
|
||||
<< candidateLedger
|
||||
<< (ret ? " should" : " should NOT")
|
||||
<< " be acquired";
|
||||
return ret;
|
||||
}
|
||||
|
||||
LedgerMaster::LedgerMaster (Application& app, Stopwatch& stopwatch,
|
||||
Stoppable& parent,
|
||||
beast::insight::Collector::ptr const& collector, beast::Journal journal)
|
||||
@@ -1697,31 +1750,6 @@ LedgerMaster::releaseReplay ()
|
||||
return std::move (replayData);
|
||||
}
|
||||
|
||||
bool
|
||||
LedgerMaster::shouldAcquire (
|
||||
std::uint32_t const currentLedger,
|
||||
std::uint32_t const ledgerHistory,
|
||||
std::uint32_t const ledgerHistoryIndex,
|
||||
std::uint32_t const candidateLedger) const
|
||||
{
|
||||
|
||||
// Fetch ledger if it might be the current ledger,
|
||||
// is requested by the advisory delete setting, or
|
||||
// is within our configured history range
|
||||
|
||||
bool ret (candidateLedger >= currentLedger ||
|
||||
((ledgerHistoryIndex > 0) &&
|
||||
(candidateLedger > ledgerHistoryIndex)) ||
|
||||
(currentLedger - candidateLedger) <= ledgerHistory);
|
||||
|
||||
JLOG (m_journal.trace())
|
||||
<< "Missing ledger "
|
||||
<< candidateLedger
|
||||
<< (ret ? " should" : " should NOT")
|
||||
<< " be acquired";
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
LedgerMaster::fetchForHistory(
|
||||
std::uint32_t missing,
|
||||
@@ -1875,7 +1903,9 @@ void LedgerMaster::doAdvance (std::unique_lock<std::recursive_mutex>& sl)
|
||||
"tryAdvance discovered missing " << *missing;
|
||||
if ((mFillInProgress == 0 || *missing > mFillInProgress) &&
|
||||
shouldAcquire(mValidLedgerSeq, ledger_history_,
|
||||
app_.getSHAMapStore().getCanDelete(), *missing))
|
||||
minSqlSeq(app_),
|
||||
app_.getSHAMapStore().getLastRotated(), *missing,
|
||||
m_journal))
|
||||
{
|
||||
JLOG(m_journal.trace()) <<
|
||||
"advanceThread should acquire";
|
||||
|
||||
@@ -56,7 +56,9 @@ public:
|
||||
/** Whether advisory delete is enabled. */
|
||||
virtual bool advisoryDelete() const = 0;
|
||||
|
||||
/** Last ledger which was copied during rotation of backends. */
|
||||
/** Maximum ledger that has been deleted, or will be deleted if
|
||||
* currently in the act of online deletion.
|
||||
*/
|
||||
virtual LedgerIndex getLastRotated() = 0;
|
||||
|
||||
/** Highest ledger that may be deleted. */
|
||||
|
||||
@@ -322,7 +322,7 @@ void
|
||||
SHAMapStoreImp::run()
|
||||
{
|
||||
beast::setCurrentThreadName ("SHAMapStore");
|
||||
LedgerIndex lastRotated = state_db_.getState().lastRotated;
|
||||
lastRotated_ = state_db_.getState().lastRotated;
|
||||
netOPs_ = &app_.getOPs();
|
||||
ledgerMaster_ = &app_.getLedgerMaster();
|
||||
fullBelowCache_ = &app_.family().fullbelow();
|
||||
@@ -333,7 +333,7 @@ SHAMapStoreImp::run()
|
||||
if (advisoryDelete_)
|
||||
canDelete_ = state_db_.getCanDelete ();
|
||||
|
||||
while (1)
|
||||
while (true)
|
||||
{
|
||||
healthy_ = true;
|
||||
std::shared_ptr<Ledger const> validatedLedger;
|
||||
@@ -356,20 +356,20 @@ SHAMapStoreImp::run()
|
||||
continue;
|
||||
}
|
||||
|
||||
LedgerIndex validatedSeq = validatedLedger->info().seq;
|
||||
if (!lastRotated)
|
||||
LedgerIndex const validatedSeq = validatedLedger->info().seq;
|
||||
if (!lastRotated_)
|
||||
{
|
||||
lastRotated = validatedSeq;
|
||||
state_db_.setLastRotated (lastRotated);
|
||||
lastRotated_ = validatedSeq;
|
||||
state_db_.setLastRotated (lastRotated_);
|
||||
}
|
||||
|
||||
// will delete up to (not including) lastRotated)
|
||||
if (validatedSeq >= lastRotated + deleteInterval_
|
||||
&& canDelete_ >= lastRotated - 1)
|
||||
// will delete up to (not including) lastRotated_
|
||||
if (validatedSeq >= lastRotated_ + deleteInterval_
|
||||
&& canDelete_ >= lastRotated_ - 1)
|
||||
{
|
||||
JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq
|
||||
<< " lastRotated " << lastRotated << " deleteInterval "
|
||||
<< deleteInterval_ << " canDelete_ " << canDelete_;
|
||||
<< " lastRotated_ " << lastRotated_ << " deleteInterval "
|
||||
<< deleteInterval_ << " canDelete_ " << canDelete_;
|
||||
|
||||
switch (health())
|
||||
{
|
||||
@@ -383,7 +383,7 @@ SHAMapStoreImp::run()
|
||||
;
|
||||
}
|
||||
|
||||
clearPrior (lastRotated);
|
||||
clearPrior (lastRotated_);
|
||||
switch (health())
|
||||
{
|
||||
case Health::stopping:
|
||||
@@ -448,13 +448,13 @@ SHAMapStoreImp::run()
|
||||
|
||||
std::string nextArchiveDir =
|
||||
dbRotating_->getWritableBackend()->getName();
|
||||
lastRotated = validatedSeq;
|
||||
lastRotated_ = validatedSeq;
|
||||
std::shared_ptr<NodeStore::Backend> oldBackend;
|
||||
{
|
||||
std::lock_guard lock (dbRotating_->peekMutex());
|
||||
|
||||
state_db_.setState (SavedState {newBackend->getName(),
|
||||
nextArchiveDir, lastRotated});
|
||||
nextArchiveDir, lastRotated_});
|
||||
clearCaches (validatedSeq);
|
||||
oldBackend = dbRotating_->rotateBackends(
|
||||
std::move(newBackend),
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <ripple/core/DatabaseCon.h>
|
||||
#include <ripple/nodestore/DatabaseRotating.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <thread>
|
||||
|
||||
@@ -84,6 +85,13 @@ private:
|
||||
static std::uint32_t const minimumDeletionInterval_ = 256;
|
||||
// minimum # of ledgers required for standalone mode.
|
||||
static std::uint32_t const minimumDeletionIntervalSA_ = 8;
|
||||
// Ledger sequence at which the last deletion interval was triggered,
|
||||
// or the current validated sequence as of first use
|
||||
// if there have been no prior deletions. Deletion occurs up to (but
|
||||
// not including) this value. All ledgers past this value are accumulated
|
||||
// until the next online deletion. This value is persisted to SQLite
|
||||
// nearly immediately after modification.
|
||||
std::atomic<LedgerIndex> lastRotated_ {};
|
||||
|
||||
NodeStore::Scheduler& scheduler_;
|
||||
beast::Journal const journal_;
|
||||
@@ -159,7 +167,7 @@ public:
|
||||
LedgerIndex
|
||||
getLastRotated() override
|
||||
{
|
||||
return state_db_.getState().lastRotated;
|
||||
return lastRotated_;
|
||||
}
|
||||
|
||||
// All ledgers before and including this are unprotected
|
||||
|
||||
Reference in New Issue
Block a user