define macro option to use std::map in nodestore... concurrent flatmap has a race condition on ledger cleaning cycle

This commit is contained in:
Richard Holland
2024-11-06 10:38:02 +11:00
parent 006bd729d8
commit 8b0d42785c
7 changed files with 164 additions and 49 deletions

View File

@@ -118,7 +118,9 @@ SHAMapStoreImp::SHAMapStoreImp(
get_if_exists(section, "online_delete", deleteInterval_);
if (deleteInterval_)
bool const isMem = config.mem_backend();
if (deleteInterval_ || isMem)
{
if (app_.config().reporting())
{
@@ -127,15 +129,8 @@ SHAMapStoreImp::SHAMapStoreImp(
"online_delete info from config");
}
if ((!app_.config().section(SECTION_RELATIONAL_DB).empty() &&
boost::iequals(get(app.config().section(SECTION_RELATIONAL_DB), "backend"), "memory")) ||
(!app_.config().section("node_db").empty() &&
boost::iequals(get(app.config().section("node_db"), "type"), "memory")))
{
Throw<std::runtime_error>(
"Memory does not support online_delete. Remove "
"online_delete info from config. Use [ledger_history] to set a history limit.");
}
if (isMem)
deleteInterval_ = config.LEDGER_HISTORY;
// Configuration that affects the behavior of online delete
get_if_exists(section, "delete_batch", deleteBatch_);
@@ -172,7 +167,8 @@ SHAMapStoreImp::SHAMapStoreImp(
}
state_db_.init(config, dbName_);
dbPaths();
if (!isMem)
dbPaths();
}
}
@@ -205,6 +201,7 @@ SHAMapStoreImp::makeNodeStore(int readThreads)
"online_delete info from config");
}
SavedState state = state_db_.getState();
auto writableBackend = makeBackendRotating(state.writableDb);
auto archiveBackend = makeBackendRotating(state.archiveDb);
if (!state.writableDb.size())
@@ -303,9 +300,14 @@ SHAMapStoreImp::run()
fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache(0));
treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache(0));
bool const isMem = app_.config().mem_backend();
std::cout << "SHAMapStoreImp: isMem = " << (isMem ? "true" : "false") << "\n";
std::cout << "SHAMapStoreImp: lastRotated = " << lastRotated << "\n";
if (advisoryDelete_)
canDelete_ = state_db_.getCanDelete();
while (true)
{
healthy_ = true;
@@ -353,7 +355,7 @@ SHAMapStoreImp::run()
if (waitForImport)
{
JLOG(journal_.info())
JLOG(journal_.warn())
<< "NOT rotating validatedSeq " << validatedSeq
<< " as rotation would interfere with ShardStore import";
}
@@ -372,12 +374,12 @@ SHAMapStoreImp::run()
if (healthWait() == stopping)
return;
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
JLOG(journal_.warn()) << "copying ledger " << validatedSeq;
std::uint64_t nodeCount = 0;
try
{
validatedLedger->stateMap().snapShot(false)->visitNodes(
validatedLedger->stateMap().snapShot(isMem)->visitNodes(
std::bind(
&SHAMapStoreImp::copyNode,
this,
@@ -395,19 +397,19 @@ SHAMapStoreImp::run()
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
JLOG(journal_.warn()) << "copied ledger " << validatedSeq
<< " nodecount " << nodeCount;
JLOG(journal_.debug()) << "freshening caches";
JLOG(journal_.warn()) << "freshening caches";
freshenCaches();
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
JLOG(journal_.warn()) << validatedSeq << " freshened caches";
JLOG(journal_.trace()) << "Making a new backend";
JLOG(journal_.warn()) << "Making a new backend";
auto newBackend = makeBackendRotating();
JLOG(journal_.debug())
JLOG(journal_.warn())
<< validatedSeq << " new backend " << newBackend->getName();
clearCaches(validatedSeq);