mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-19 10:35:50 +00:00
Maintain history back to the earliest persisted ledger:
This makes behavior consistent with configurations both with and without online delete.
This commit is contained in:
@@ -39,6 +39,7 @@
|
|||||||
#include <ripple/protocol/Protocol.h>
|
#include <ripple/protocol/Protocol.h>
|
||||||
#include <ripple/protocol/RippleLedgerHash.h>
|
#include <ripple/protocol/RippleLedgerHash.h>
|
||||||
#include <ripple/protocol/STValidation.h>
|
#include <ripple/protocol/STValidation.h>
|
||||||
|
#include <boost/optional.hpp>
|
||||||
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
@@ -280,11 +281,6 @@ private:
|
|||||||
// Try to publish ledgers, acquire missing ledgers. Always called with
|
// Try to publish ledgers, acquire missing ledgers. Always called with
|
||||||
// m_mutex locked. The passed lock is a reminder to callers.
|
// m_mutex locked. The passed lock is a reminder to callers.
|
||||||
void doAdvance(std::unique_lock<std::recursive_mutex>&);
|
void doAdvance(std::unique_lock<std::recursive_mutex>&);
|
||||||
bool shouldAcquire(
|
|
||||||
std::uint32_t const currentLedger,
|
|
||||||
std::uint32_t const ledgerHistory,
|
|
||||||
std::uint32_t const ledgerHistoryIndex,
|
|
||||||
std::uint32_t const candidateLedger) const;
|
|
||||||
|
|
||||||
std::vector<std::shared_ptr<Ledger const>>
|
std::vector<std::shared_ptr<Ledger const>>
|
||||||
findNewLedgersToPublish(std::unique_lock<std::recursive_mutex>&);
|
findNewLedgersToPublish(std::unique_lock<std::recursive_mutex>&);
|
||||||
@@ -295,7 +291,6 @@ private:
|
|||||||
// The passed lock is a reminder to callers.
|
// The passed lock is a reminder to callers.
|
||||||
bool newPFWork(const char *name, std::unique_lock<std::recursive_mutex>&);
|
bool newPFWork(const char *name, std::unique_lock<std::recursive_mutex>&);
|
||||||
|
|
||||||
private:
|
|
||||||
Application& app_;
|
Application& app_;
|
||||||
beast::Journal m_journal;
|
beast::Journal m_journal;
|
||||||
|
|
||||||
|
|||||||
@@ -38,6 +38,7 @@
|
|||||||
#include <ripple/basics/Log.h>
|
#include <ripple/basics/Log.h>
|
||||||
#include <ripple/basics/TaggedCache.h>
|
#include <ripple/basics/TaggedCache.h>
|
||||||
#include <ripple/basics/UptimeClock.h>
|
#include <ripple/basics/UptimeClock.h>
|
||||||
|
#include <ripple/core/DatabaseCon.h>
|
||||||
#include <ripple/core/TimeKeeper.h>
|
#include <ripple/core/TimeKeeper.h>
|
||||||
#include <ripple/nodestore/DatabaseShard.h>
|
#include <ripple/nodestore/DatabaseShard.h>
|
||||||
#include <ripple/overlay/Overlay.h>
|
#include <ripple/overlay/Overlay.h>
|
||||||
@@ -47,6 +48,7 @@
|
|||||||
#include <ripple/resource/Fees.h>
|
#include <ripple/resource/Fees.h>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
#include <limits>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@@ -140,6 +142,57 @@ static constexpr std::chrono::minutes MAX_LEDGER_AGE_ACQUIRE {1};
|
|||||||
// Don't acquire history if write load is too high
|
// Don't acquire history if write load is too high
|
||||||
static constexpr int MAX_WRITE_LOAD_ACQUIRE {8192};
|
static constexpr int MAX_WRITE_LOAD_ACQUIRE {8192};
|
||||||
|
|
||||||
|
// Helper function for LedgerMaster::doAdvance()
|
||||||
|
// Returns the minimum ledger sequence in SQL database, if any.
|
||||||
|
static boost::optional<LedgerIndex>
|
||||||
|
minSqlSeq(Application& app)
|
||||||
|
{
|
||||||
|
boost::optional<LedgerIndex> seq;
|
||||||
|
auto db = app.getLedgerDB().checkoutDb();
|
||||||
|
*db << "SELECT MIN(LedgerSeq) FROM Ledgers", soci::into(seq);
|
||||||
|
return seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function for LedgerMaster::doAdvance()
|
||||||
|
// Return true if candidateLedger should be fetched from the network.
|
||||||
|
static bool
|
||||||
|
shouldAcquire (
|
||||||
|
std::uint32_t const currentLedger,
|
||||||
|
std::uint32_t const ledgerHistory,
|
||||||
|
boost::optional<LedgerIndex> minSeq,
|
||||||
|
std::uint32_t const lastRotated,
|
||||||
|
std::uint32_t const candidateLedger,
|
||||||
|
beast::Journal j)
|
||||||
|
{
|
||||||
|
bool ret = [&]()
|
||||||
|
{
|
||||||
|
// Fetch ledger if it may be the current ledger
|
||||||
|
if (candidateLedger >= currentLedger)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
// Or if it is within our configured history range:
|
||||||
|
if (currentLedger - candidateLedger <= ledgerHistory)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
// Or it's greater than or equal to both:
|
||||||
|
// - the minimum persisted ledger or the maximum possible
|
||||||
|
// sequence value, if no persisted ledger, and
|
||||||
|
// - minimum ledger that will be persisted as of the next online
|
||||||
|
// deletion interval, or 1 if online deletion is disabled.
|
||||||
|
return
|
||||||
|
candidateLedger >= std::max(
|
||||||
|
minSeq.value_or(std::numeric_limits<LedgerIndex>::max()),
|
||||||
|
lastRotated + 1);
|
||||||
|
}();
|
||||||
|
|
||||||
|
JLOG (j.trace())
|
||||||
|
<< "Missing ledger "
|
||||||
|
<< candidateLedger
|
||||||
|
<< (ret ? " should" : " should NOT")
|
||||||
|
<< " be acquired";
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
LedgerMaster::LedgerMaster (Application& app, Stopwatch& stopwatch,
|
LedgerMaster::LedgerMaster (Application& app, Stopwatch& stopwatch,
|
||||||
Stoppable& parent,
|
Stoppable& parent,
|
||||||
beast::insight::Collector::ptr const& collector, beast::Journal journal)
|
beast::insight::Collector::ptr const& collector, beast::Journal journal)
|
||||||
@@ -1697,31 +1750,6 @@ LedgerMaster::releaseReplay ()
|
|||||||
return std::move (replayData);
|
return std::move (replayData);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
|
||||||
LedgerMaster::shouldAcquire (
|
|
||||||
std::uint32_t const currentLedger,
|
|
||||||
std::uint32_t const ledgerHistory,
|
|
||||||
std::uint32_t const ledgerHistoryIndex,
|
|
||||||
std::uint32_t const candidateLedger) const
|
|
||||||
{
|
|
||||||
|
|
||||||
// Fetch ledger if it might be the current ledger,
|
|
||||||
// is requested by the advisory delete setting, or
|
|
||||||
// is within our configured history range
|
|
||||||
|
|
||||||
bool ret (candidateLedger >= currentLedger ||
|
|
||||||
((ledgerHistoryIndex > 0) &&
|
|
||||||
(candidateLedger > ledgerHistoryIndex)) ||
|
|
||||||
(currentLedger - candidateLedger) <= ledgerHistory);
|
|
||||||
|
|
||||||
JLOG (m_journal.trace())
|
|
||||||
<< "Missing ledger "
|
|
||||||
<< candidateLedger
|
|
||||||
<< (ret ? " should" : " should NOT")
|
|
||||||
<< " be acquired";
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
LedgerMaster::fetchForHistory(
|
LedgerMaster::fetchForHistory(
|
||||||
std::uint32_t missing,
|
std::uint32_t missing,
|
||||||
@@ -1875,7 +1903,9 @@ void LedgerMaster::doAdvance (std::unique_lock<std::recursive_mutex>& sl)
|
|||||||
"tryAdvance discovered missing " << *missing;
|
"tryAdvance discovered missing " << *missing;
|
||||||
if ((mFillInProgress == 0 || *missing > mFillInProgress) &&
|
if ((mFillInProgress == 0 || *missing > mFillInProgress) &&
|
||||||
shouldAcquire(mValidLedgerSeq, ledger_history_,
|
shouldAcquire(mValidLedgerSeq, ledger_history_,
|
||||||
app_.getSHAMapStore().getCanDelete(), *missing))
|
minSqlSeq(app_),
|
||||||
|
app_.getSHAMapStore().getLastRotated(), *missing,
|
||||||
|
m_journal))
|
||||||
{
|
{
|
||||||
JLOG(m_journal.trace()) <<
|
JLOG(m_journal.trace()) <<
|
||||||
"advanceThread should acquire";
|
"advanceThread should acquire";
|
||||||
|
|||||||
@@ -56,7 +56,9 @@ public:
|
|||||||
/** Whether advisory delete is enabled. */
|
/** Whether advisory delete is enabled. */
|
||||||
virtual bool advisoryDelete() const = 0;
|
virtual bool advisoryDelete() const = 0;
|
||||||
|
|
||||||
/** Last ledger which was copied during rotation of backends. */
|
/** Maximum ledger that has been deleted, or will be deleted if
|
||||||
|
* currently in the act of online deletion.
|
||||||
|
*/
|
||||||
virtual LedgerIndex getLastRotated() = 0;
|
virtual LedgerIndex getLastRotated() = 0;
|
||||||
|
|
||||||
/** Highest ledger that may be deleted. */
|
/** Highest ledger that may be deleted. */
|
||||||
|
|||||||
@@ -322,7 +322,7 @@ void
|
|||||||
SHAMapStoreImp::run()
|
SHAMapStoreImp::run()
|
||||||
{
|
{
|
||||||
beast::setCurrentThreadName ("SHAMapStore");
|
beast::setCurrentThreadName ("SHAMapStore");
|
||||||
LedgerIndex lastRotated = state_db_.getState().lastRotated;
|
lastRotated_ = state_db_.getState().lastRotated;
|
||||||
netOPs_ = &app_.getOPs();
|
netOPs_ = &app_.getOPs();
|
||||||
ledgerMaster_ = &app_.getLedgerMaster();
|
ledgerMaster_ = &app_.getLedgerMaster();
|
||||||
fullBelowCache_ = &app_.family().fullbelow();
|
fullBelowCache_ = &app_.family().fullbelow();
|
||||||
@@ -333,7 +333,7 @@ SHAMapStoreImp::run()
|
|||||||
if (advisoryDelete_)
|
if (advisoryDelete_)
|
||||||
canDelete_ = state_db_.getCanDelete ();
|
canDelete_ = state_db_.getCanDelete ();
|
||||||
|
|
||||||
while (1)
|
while (true)
|
||||||
{
|
{
|
||||||
healthy_ = true;
|
healthy_ = true;
|
||||||
std::shared_ptr<Ledger const> validatedLedger;
|
std::shared_ptr<Ledger const> validatedLedger;
|
||||||
@@ -356,20 +356,20 @@ SHAMapStoreImp::run()
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
LedgerIndex validatedSeq = validatedLedger->info().seq;
|
LedgerIndex const validatedSeq = validatedLedger->info().seq;
|
||||||
if (!lastRotated)
|
if (!lastRotated_)
|
||||||
{
|
{
|
||||||
lastRotated = validatedSeq;
|
lastRotated_ = validatedSeq;
|
||||||
state_db_.setLastRotated (lastRotated);
|
state_db_.setLastRotated (lastRotated_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// will delete up to (not including) lastRotated)
|
// will delete up to (not including) lastRotated_
|
||||||
if (validatedSeq >= lastRotated + deleteInterval_
|
if (validatedSeq >= lastRotated_ + deleteInterval_
|
||||||
&& canDelete_ >= lastRotated - 1)
|
&& canDelete_ >= lastRotated_ - 1)
|
||||||
{
|
{
|
||||||
JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq
|
JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq
|
||||||
<< " lastRotated " << lastRotated << " deleteInterval "
|
<< " lastRotated_ " << lastRotated_ << " deleteInterval "
|
||||||
<< deleteInterval_ << " canDelete_ " << canDelete_;
|
<< deleteInterval_ << " canDelete_ " << canDelete_;
|
||||||
|
|
||||||
switch (health())
|
switch (health())
|
||||||
{
|
{
|
||||||
@@ -383,7 +383,7 @@ SHAMapStoreImp::run()
|
|||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
clearPrior (lastRotated);
|
clearPrior (lastRotated_);
|
||||||
switch (health())
|
switch (health())
|
||||||
{
|
{
|
||||||
case Health::stopping:
|
case Health::stopping:
|
||||||
@@ -448,13 +448,13 @@ SHAMapStoreImp::run()
|
|||||||
|
|
||||||
std::string nextArchiveDir =
|
std::string nextArchiveDir =
|
||||||
dbRotating_->getWritableBackend()->getName();
|
dbRotating_->getWritableBackend()->getName();
|
||||||
lastRotated = validatedSeq;
|
lastRotated_ = validatedSeq;
|
||||||
std::shared_ptr<NodeStore::Backend> oldBackend;
|
std::shared_ptr<NodeStore::Backend> oldBackend;
|
||||||
{
|
{
|
||||||
std::lock_guard lock (dbRotating_->peekMutex());
|
std::lock_guard lock (dbRotating_->peekMutex());
|
||||||
|
|
||||||
state_db_.setState (SavedState {newBackend->getName(),
|
state_db_.setState (SavedState {newBackend->getName(),
|
||||||
nextArchiveDir, lastRotated});
|
nextArchiveDir, lastRotated_});
|
||||||
clearCaches (validatedSeq);
|
clearCaches (validatedSeq);
|
||||||
oldBackend = dbRotating_->rotateBackends(
|
oldBackend = dbRotating_->rotateBackends(
|
||||||
std::move(newBackend),
|
std::move(newBackend),
|
||||||
|
|||||||
@@ -25,6 +25,7 @@
|
|||||||
#include <ripple/core/DatabaseCon.h>
|
#include <ripple/core/DatabaseCon.h>
|
||||||
#include <ripple/nodestore/DatabaseRotating.h>
|
#include <ripple/nodestore/DatabaseRotating.h>
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
@@ -84,6 +85,13 @@ private:
|
|||||||
static std::uint32_t const minimumDeletionInterval_ = 256;
|
static std::uint32_t const minimumDeletionInterval_ = 256;
|
||||||
// minimum # of ledgers required for standalone mode.
|
// minimum # of ledgers required for standalone mode.
|
||||||
static std::uint32_t const minimumDeletionIntervalSA_ = 8;
|
static std::uint32_t const minimumDeletionIntervalSA_ = 8;
|
||||||
|
// Ledger sequence at which the last deletion interval was triggered,
|
||||||
|
// or the current validated sequence as of first use
|
||||||
|
// if there have been no prior deletions. Deletion occurs up to (but
|
||||||
|
// not including) this value. All ledgers past this value are accumulated
|
||||||
|
// until the next online deletion. This value is persisted to SQLite
|
||||||
|
// nearly immediately after modification.
|
||||||
|
std::atomic<LedgerIndex> lastRotated_ {};
|
||||||
|
|
||||||
NodeStore::Scheduler& scheduler_;
|
NodeStore::Scheduler& scheduler_;
|
||||||
beast::Journal const journal_;
|
beast::Journal const journal_;
|
||||||
@@ -159,7 +167,7 @@ public:
|
|||||||
LedgerIndex
|
LedgerIndex
|
||||||
getLastRotated() override
|
getLastRotated() override
|
||||||
{
|
{
|
||||||
return state_db_.getState().lastRotated;
|
return lastRotated_;
|
||||||
}
|
}
|
||||||
|
|
||||||
// All ledgers before and including this are unprotected
|
// All ledgers before and including this are unprotected
|
||||||
|
|||||||
Reference in New Issue
Block a user