refactor: acquireAsync will dispatch the job, not the other way around

This commit is contained in:
Ed Hennis
2025-02-04 16:54:26 -05:00
parent 7b72b9cc82
commit 46a5bc74db
5 changed files with 41 additions and 33 deletions

View File

@@ -91,6 +91,8 @@ public:
virtual void
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override

View File

@@ -118,15 +118,12 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
// Tell the ledger acquire system that we need the consensus ledger
acquiringLedger_ = hash;
app_.getJobQueue().addJob(
app_.getInboundLedgers().acquireAsync(
jtADVANCE,
"getConsensusLedger1",
[id = hash, &app = app_, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(
id, 0, InboundLedger::Reason::CONSENSUS);
});
hash,
0,
InboundLedger::Reason::CONSENSUS);
}
return std::nullopt;
}

View File

@@ -120,15 +120,12 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
JLOG(j_.warn())
<< "Need validated ledger for preferred ledger analysis " << hash;
Application* pApp = &app_;
app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(
hash, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getInboundLedgers().acquireAsync(
jtADVANCE,
"getConsensusLedger2",
hash,
0,
InboundLedger::Reason::CONSENSUS);
return std::nullopt;
}

View File

@@ -28,6 +28,8 @@ public:
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;

View File

@@ -123,28 +123,38 @@ public:
void
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash})
if (auto check = std::make_shared<CanProcess const>(
acquiresMutex_, pendingAcquires_, hash);
*check)
{
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
app_.getJobQueue().addJob(
type, name, [check, name, hash, seq, reason, this]() {
JLOG(j_.debug())
<< "JOB acquireAsync " << name << " started ";
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new "
"inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn())
<< "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
});
}
}