Simplify processing of inbound ledger data

This commit is contained in:
Nik Bougalis
2020-11-13 22:45:02 -08:00
parent 1bb294afbc
commit e39c452f31
2 changed files with 30 additions and 46 deletions

View File

@@ -36,7 +36,7 @@ class InboundLedgers
public:
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
virtual ~InboundLedgers() = 0;
virtual ~InboundLedgers() = default;
// VFALCO TODO Should this be called findOrAdd ?
//

View File

@@ -43,10 +43,8 @@ private:
beast::Journal const j_;
public:
using u256_acq_pair = std::pair<uint256, std::shared_ptr<InboundLedger>>;
// How long before we try again to acquire the same ledger
static const std::chrono::minutes kReacquireInterval;
static constexpr std::chrono::minutes const kReacquireInterval{5};
InboundLedgersImp(
Application& app,
@@ -166,40 +164,38 @@ public:
gotLedgerData(
LedgerHash const& hash,
std::shared_ptr<Peer> peer,
std::shared_ptr<protocol::TMLedgerData> packet_ptr) override
std::shared_ptr<protocol::TMLedgerData> packet) override
{
protocol::TMLedgerData& packet = *packet_ptr;
JLOG(j_.trace()) << "Got data (" << packet.nodes().size()
<< ") for acquiring ledger: " << hash;
auto ledger = find(hash);
if (!ledger)
if (auto ledger = find(hash))
{
JLOG(j_.trace()) << "Got data for ledger we're no longer acquiring";
JLOG(j_.trace()) << "Got data (" << packet->nodes().size()
<< ") for acquiring ledger: " << hash;
// If it's state node data, stash it because it still might be
// useful.
if (packet.type() == protocol::liAS_NODE)
{
// Stash the data for later processing and see if we need to
// dispatch
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
app_.getJobQueue().addJob(
jtLEDGER_DATA, "gotStaleData", [this, packet_ptr](Job&) {
gotStaleData(packet_ptr);
jtLEDGER_DATA, "processLedgerData", [ledger](Job&) {
ledger->runData();
});
}
return false;
return true;
}
// Stash the data for later processing and see if we need to dispatch
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet_ptr))
app_.getJobQueue().addJob(
jtLEDGER_DATA, "processLedgerData", [this, hash](Job&) {
doLedgerData(hash);
});
JLOG(j_.trace()) << "Got data for ledger " << hash
<< " which we're no longer acquiring";
return true;
// If it's state node data, stash it because it still might be
// useful.
if (packet->type() == protocol::liAS_NODE)
{
app_.getJobQueue().addJob(
jtLEDGER_DATA, "gotStaleData", [this, packet](Job&) {
gotStaleData(packet);
});
}
return false;
}
void
@@ -219,14 +215,6 @@ public:
return mRecentFailures.find(h) != mRecentFailures.end();
}
/** Called (indirectly) only by gotLedgerData(). */
void
doLedgerData(LedgerHash hash)
{
if (auto ledger = find(hash))
ledger->runData();
}
/** We got some data for a ledger we are no longer acquiring Since we paid
the price to receive it, we might as well stash it in case we need it.
@@ -296,15 +284,16 @@ public:
{
Json::Value ret(Json::objectValue);
std::vector<u256_acq_pair> acquires;
std::vector<std::pair<uint256, std::shared_ptr<InboundLedger>>> acqs;
{
ScopedLockType sl(mLock);
acquires.reserve(mLedgers.size());
acqs.reserve(mLedgers.size());
for (auto const& it : mLedgers)
{
assert(it.second);
acquires.push_back(it);
acqs.push_back(it);
}
for (auto const& it : mRecentFailures)
{
@@ -315,7 +304,7 @@ public:
}
}
for (auto const& it : acquires)
for (auto const& it : acqs)
{
// getJson is expensive, so call without the lock
std::uint32_t seq = it.second->getSeq();
@@ -419,11 +408,6 @@ private:
//------------------------------------------------------------------------------
decltype(InboundLedgersImp::kReacquireInterval)
InboundLedgersImp::kReacquireInterval{5};
InboundLedgers::~InboundLedgers() = default;
std::unique_ptr<InboundLedgers>
make_InboundLedgers(
Application& app,