Allow server to stabilize after online delete health check failure

This commit is contained in:
Mark Travis
2022-04-05 18:02:53 -07:00
committed by manojsdoshi
parent dfe69f1b76
commit 5aedb0e07a
3 changed files with 52 additions and 112 deletions

View File

@@ -1140,17 +1140,10 @@
# The online delete process checks periodically # The online delete process checks periodically
# that rippled is still in sync with the network, # that rippled is still in sync with the network,
# and that the validated ledger is less than # and that the validated ledger is less than
# 'age_threshold_seconds' old. By default, if it # 'age_threshold_seconds' old. If not, then continue
# is not the online delete process aborts and # sleeping for this number of seconds and
# tries again later. If 'recovery_wait_seconds' # checking until healthy.
# is set and rippled is out of sync, but likely to # Default is 5.
# recover quickly, then online delete will wait
# this number of seconds for rippled to get back
# into sync before it aborts.
# Set this value if the node is otherwise staying
# in sync, or recovering quickly, but the online
# delete process is unable to finish.
# Default is unset.
# #
# Optional keys for Cassandra: # Optional keys for Cassandra:
# #

View File

@@ -138,7 +138,7 @@ SHAMapStoreImp::SHAMapStoreImp(
if (get_if_exists(section, "age_threshold_seconds", temp)) if (get_if_exists(section, "age_threshold_seconds", temp))
ageThreshold_ = std::chrono::seconds{temp}; ageThreshold_ = std::chrono::seconds{temp};
if (get_if_exists(section, "recovery_wait_seconds", temp)) if (get_if_exists(section, "recovery_wait_seconds", temp))
recoveryWaitTime_.emplace(std::chrono::seconds{temp}); recoveryWaitTime_ = std::chrono::seconds{temp};
get_if_exists(section, "advisory_delete", advisoryDelete_); get_if_exists(section, "advisory_delete", advisoryDelete_);
@@ -268,7 +268,7 @@ SHAMapStoreImp::copyNode(std::uint64_t& nodeCount, SHAMapTreeNode const& node)
true); true);
if (!(++nodeCount % checkHealthInterval_)) if (!(++nodeCount % checkHealthInterval_))
{ {
if (health()) if (stopping())
return false; return false;
} }
@@ -326,7 +326,7 @@ SHAMapStoreImp::run()
bool const readyToRotate = bool const readyToRotate =
validatedSeq >= lastRotated + deleteInterval_ && validatedSeq >= lastRotated + deleteInterval_ &&
canDelete_ >= lastRotated - 1 && !health(); canDelete_ >= lastRotated - 1 && !stopping();
// Make sure we don't delete ledgers currently being // Make sure we don't delete ledgers currently being
// imported into the ShardStore // imported into the ShardStore
@@ -358,15 +358,8 @@ SHAMapStoreImp::run()
<< ledgerMaster_->getValidatedLedgerAge().count() << 's'; << ledgerMaster_->getValidatedLedgerAge().count() << 's';
clearPrior(lastRotated); clearPrior(lastRotated);
switch (health()) if (stopping())
{ return;
case Health::stopping:
return;
case Health::unhealthy:
continue;
case Health::ok:
default:;
}
JLOG(journal_.debug()) << "copying ledger " << validatedSeq; JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
std::uint64_t nodeCount = 0; std::uint64_t nodeCount = 0;
@@ -375,30 +368,16 @@ SHAMapStoreImp::run()
this, this,
std::ref(nodeCount), std::ref(nodeCount),
std::placeholders::_1)); std::placeholders::_1));
switch (health()) if (stopping())
{ return;
case Health::stopping:
return;
case Health::unhealthy:
continue;
case Health::ok:
default:;
}
// Only log if we completed without a "health" abort // Only log if we completed without a "health" abort
JLOG(journal_.debug()) << "copied ledger " << validatedSeq JLOG(journal_.debug()) << "copied ledger " << validatedSeq
<< " nodecount " << nodeCount; << " nodecount " << nodeCount;
JLOG(journal_.debug()) << "freshening caches"; JLOG(journal_.debug()) << "freshening caches";
freshenCaches(); freshenCaches();
switch (health()) if (stopping())
{ return;
case Health::stopping:
return;
case Health::unhealthy:
continue;
case Health::ok:
default:;
}
// Only log if we completed without a "health" abort // Only log if we completed without a "health" abort
JLOG(journal_.debug()) << validatedSeq << " freshened caches"; JLOG(journal_.debug()) << validatedSeq << " freshened caches";
@@ -408,15 +387,8 @@ SHAMapStoreImp::run()
<< validatedSeq << " new backend " << newBackend->getName(); << validatedSeq << " new backend " << newBackend->getName();
clearCaches(validatedSeq); clearCaches(validatedSeq);
switch (health()) if (stopping())
{ return;
case Health::stopping:
return;
case Health::unhealthy:
continue;
case Health::ok:
default:;
}
lastRotated = validatedSeq; lastRotated = validatedSeq;
@@ -580,7 +552,7 @@ SHAMapStoreImp::clearSql(
min = *m; min = *m;
} }
if (min > lastRotated || health() != Health::ok) if (min > lastRotated || stopping())
return; return;
if (min == lastRotated) if (min == lastRotated)
{ {
@@ -601,11 +573,11 @@ SHAMapStoreImp::clearSql(
JLOG(journal_.trace()) JLOG(journal_.trace())
<< "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < " << "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < "
<< min << " from: " << TableName; << min << " from: " << TableName;
if (health()) if (stopping())
return; return;
if (min < lastRotated) if (min < lastRotated)
std::this_thread::sleep_for(backOff_); std::this_thread::sleep_for(backOff_);
if (health()) if (stopping())
return; return;
} }
JLOG(journal_.debug()) << "finished deleting from: " << TableName; JLOG(journal_.debug()) << "finished deleting from: " << TableName;
@@ -645,7 +617,7 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
ledgerMaster_->clearPriorLedgers(lastRotated); ledgerMaster_->clearPriorLedgers(lastRotated);
JLOG(journal_.trace()) << "End: Clear internal ledgers up to " JLOG(journal_.trace()) << "End: Clear internal ledgers up to "
<< lastRotated; << lastRotated;
if (health()) if (stopping())
return; return;
RelationalDBInterfaceSqlite* iface = RelationalDBInterfaceSqlite* iface =
@@ -661,7 +633,7 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
[&iface](LedgerIndex min) -> void { [&iface](LedgerIndex min) -> void {
iface->deleteBeforeLedgerSeq(min); iface->deleteBeforeLedgerSeq(min);
}); });
if (health()) if (stopping())
return; return;
if (!app_.config().useTxTables()) if (!app_.config().useTxTables())
@@ -676,7 +648,7 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
[&iface](LedgerIndex min) -> void { [&iface](LedgerIndex min) -> void {
iface->deleteTransactionsBeforeLedgerSeq(min); iface->deleteTransactionsBeforeLedgerSeq(min);
}); });
if (health()) if (stopping())
return; return;
clearSql( clearSql(
@@ -688,52 +660,30 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
[&iface](LedgerIndex min) -> void { [&iface](LedgerIndex min) -> void {
iface->deleteAccountTransactionsBeforeLedgerSeq(min); iface->deleteAccountTransactionsBeforeLedgerSeq(min);
}); });
if (health()) if (stopping())
return; return;
} }
SHAMapStoreImp::Health bool
SHAMapStoreImp::health() SHAMapStoreImp::stopping()
{ {
auto age = ledgerMaster_->getValidatedLedgerAge();
OperatingMode mode = netOPs_->getOperatingMode();
std::unique_lock lock(mutex_);
while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_))
{ {
std::lock_guard lock(mutex_); lock.unlock();
if (stop_) JLOG(journal_.warn()) << "Waiting " << recoveryWaitTime_.count()
return Health::stopping; << "s for node to stabilize. state: "
} << app_.getOPs().strOperatingMode(mode, false)
if (!netOPs_) << ". age " << age.count() << 's';
return Health::ok; std::this_thread::sleep_for(recoveryWaitTime_);
assert(deleteInterval_); age = ledgerMaster_->getValidatedLedgerAge();
mode = netOPs_->getOperatingMode();
if (healthy_) lock.lock();
{
auto age = ledgerMaster_->getValidatedLedgerAge();
OperatingMode mode = netOPs_->getOperatingMode();
if (recoveryWaitTime_ && mode == OperatingMode::SYNCING &&
age < ageThreshold_)
{
JLOG(journal_.warn())
<< "Waiting " << recoveryWaitTime_->count()
<< "s for node to get back into sync with network. state: "
<< app_.getOPs().strOperatingMode(mode, false) << ". age "
<< age.count() << 's';
std::this_thread::sleep_for(*recoveryWaitTime_);
age = ledgerMaster_->getValidatedLedgerAge();
mode = netOPs_->getOperatingMode();
}
if (mode != OperatingMode::FULL || age > ageThreshold_)
{
JLOG(journal_.warn()) << "Not deleting. state: "
<< app_.getOPs().strOperatingMode(mode, false)
<< ". age " << age.count() << 's';
healthy_ = false;
}
} }
if (healthy_) return stop_;
return Health::ok;
else
return Health::unhealthy;
} }
void void

View File

@@ -40,8 +40,6 @@ class NetworkOPs;
class SHAMapStoreImp : public SHAMapStore class SHAMapStoreImp : public SHAMapStore
{ {
private: private:
enum Health : std::uint8_t { ok = 0, stopping, unhealthy };
class SavedStateDB class SavedStateDB
{ {
public: public:
@@ -106,12 +104,12 @@ private:
std::uint32_t deleteBatch_ = 100; std::uint32_t deleteBatch_ = 100;
std::chrono::milliseconds backOff_{100}; std::chrono::milliseconds backOff_{100};
std::chrono::seconds ageThreshold_{60}; std::chrono::seconds ageThreshold_{60};
/// If set, and the node is out of sync during an /// If the node is out of sync during an
/// online_delete health check, sleep the thread /// online_delete health check, sleep the thread
/// for this time and check again so the node can /// for this time, and continue checking until
/// recover. /// recovery.
/// See also: "recovery_wait_seconds" in rippled-example.cfg /// See also: "recovery_wait_seconds" in rippled-example.cfg
std::optional<std::chrono::seconds> recoveryWaitTime_; std::chrono::seconds recoveryWaitTime_{5};
// these do not exist upon SHAMapStore creation, but do exist // these do not exist upon SHAMapStore creation, but do exist
// as of run() or before // as of run() or before
@@ -201,7 +199,7 @@ private:
{ {
dbRotating_->fetchNodeObject( dbRotating_->fetchNodeObject(
key, 0, NodeStore::FetchType::synchronous, true); key, 0, NodeStore::FetchType::synchronous, true);
if (!(++check % checkHealthInterval_) && health()) if (!(++check % checkHealthInterval_) && stopping())
return true; return true;
} }
@@ -225,16 +223,15 @@ private:
void void
clearPrior(LedgerIndex lastRotated); clearPrior(LedgerIndex lastRotated);
// If rippled is not healthy, defer rotate-delete. /**
// If already unhealthy, do not change state on further check. * This is a health check for online deletion that waits until rippled is
// Assume that, once unhealthy, a necessary step has been * stable until returning. If the server is stopping, then it returns
// aborted, so the online-delete process needs to restart * "true" to inform the caller to allow the server to stop.
// at next ledger. *
// If recoveryWaitTime_ is set, this may sleep to give rippled * @return Whether the server is stopping.
// time to recover, so never call it from any thread other than */
// the main "run()". bool
Health stopping();
health();
public: public:
void void