Refactor of InboundTransactions et al:

This refactor was primarily aimed at reducing the size of
objects derived from TimeoutCounter, by improving packing
of structures. Other potential improvements also surfaced
during this process and where implemented.
This commit is contained in:
Nik Bougalis
2023-01-23 09:16:00 -08:00
parent 266ce2b755
commit 81cc278cab
20 changed files with 631 additions and 642 deletions

View File

@@ -61,24 +61,10 @@ public:
void void
update(std::uint32_t seq); update(std::uint32_t seq);
/** Returns true if we got all the data. */
bool
isComplete() const
{
return complete_;
}
/** Returns false if we failed to get the data. */
bool
isFailed() const
{
return failed_;
}
std::shared_ptr<Ledger const> std::shared_ptr<Ledger const>
getLedger() const getLedger() const
{ {
return mLedger; return ledger_;
} }
std::uint32_t std::uint32_t
@@ -175,14 +161,9 @@ private:
clock_type& m_clock; clock_type& m_clock;
clock_type::time_point mLastAction; clock_type::time_point mLastAction;
std::shared_ptr<Ledger> mLedger; std::shared_ptr<Ledger> ledger_;
bool mHaveHeader;
bool mHaveState;
bool mHaveTransactions;
bool mSignaled;
bool mByHash;
std::uint32_t mSeq; std::uint32_t mSeq;
Reason const mReason; Reason const reason_;
std::set<uint256> mRecentNodes; std::set<uint256> mRecentNodes;
@@ -193,7 +174,6 @@ private:
std::vector< std::vector<
std::pair<std::weak_ptr<Peer>, std::shared_ptr<protocol::TMLedgerData>>> std::pair<std::weak_ptr<Peer>, std::shared_ptr<protocol::TMLedgerData>>>
mReceivedData; mReceivedData;
bool mReceiveDispatched;
std::unique_ptr<PeerSet> mPeerSet; std::unique_ptr<PeerSet> mPeerSet;
}; };

View File

@@ -42,7 +42,7 @@ public:
InboundTransactions& InboundTransactions&
operator=(InboundTransactions const&) = delete; operator=(InboundTransactions const&) = delete;
virtual ~InboundTransactions() = 0; virtual ~InboundTransactions() = default;
/** Find and return a transaction set, or nullptr if it is missing. /** Find and return a transaction set, or nullptr if it is missing.
* *
@@ -59,13 +59,13 @@ public:
* *
* @param setHash The transaction set ID (digest of the SHAMap root node). * @param setHash The transaction set ID (digest of the SHAMap root node).
* @param peer The peer that sent the message. * @param peer The peer that sent the message.
* @param message The LedgerData message. * @param data The data we received.
*/ */
virtual void virtual void
gotData( gotData(
uint256 const& setHash, uint256 const& setHash,
std::shared_ptr<Peer> peer, std::shared_ptr<Peer> peer,
std::shared_ptr<protocol::TMLedgerData> message) = 0; std::vector<std::pair<SHAMapNodeID, Slice>> const& data) = 0;
/** Add a transaction set. /** Add a transaction set.
* *

View File

@@ -52,7 +52,8 @@ std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS = 10;
// max number of peers that do not support the ledger replay feature // max number of peers that do not support the ledger replay feature
// returned by the PeerSet before switch to fallback // returned by the PeerSet before switch to fallback
auto constexpr MAX_NO_FEATURE_PEER_COUNT = 2; std::uint8_t constexpr MAX_NO_FEATURE_PEER_COUNT = 2;
// subtask timeout value after fallback // subtask timeout value after fallback
auto constexpr SUB_TASK_FALLBACK_TIMEOUT = std::chrono::milliseconds{1000}; auto constexpr SUB_TASK_FALLBACK_TIMEOUT = std::chrono::milliseconds{1000};

View File

@@ -74,6 +74,13 @@ enum {
// millisecond for each ledger timeout // millisecond for each ledger timeout
auto constexpr ledgerAcquireTimeout = 3000ms; auto constexpr ledgerAcquireTimeout = 3000ms;
static constexpr std::uint8_t const IL_BY_HASH = 0x01;
static constexpr std::uint8_t const IL_SIGNALED = 0x02;
static constexpr std::uint8_t const IL_RECEIVE_DISPATCHED = 0x04;
static constexpr std::uint8_t const IL_HAVE_HEADER = 0x08;
static constexpr std::uint8_t const IL_HAVE_STATE = 0x10;
static constexpr std::uint8_t const IL_HAVE_TXNS = 0x20;
InboundLedger::InboundLedger( InboundLedger::InboundLedger(
Application& app, Application& app,
uint256 const& hash, uint256 const& hash,
@@ -85,17 +92,12 @@ InboundLedger::InboundLedger(
app, app,
hash, hash,
ledgerAcquireTimeout, ledgerAcquireTimeout,
{jtLEDGER_DATA, "InboundLedger", 5}, {jtLEDGER_DATA, 5, "InboundLedger"},
IL_BY_HASH,
app.journal("InboundLedger")) app.journal("InboundLedger"))
, m_clock(clock) , m_clock(clock)
, mHaveHeader(false)
, mHaveState(false)
, mHaveTransactions(false)
, mSignaled(false)
, mByHash(true)
, mSeq(seq) , mSeq(seq)
, mReason(reason) , reason_(reason)
, mReceiveDispatched(false)
, mPeerSet(std::move(peerSet)) , mPeerSet(std::move(peerSet))
{ {
JLOG(journal_.trace()) << "Acquiring ledger " << hash_; JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
@@ -109,44 +111,43 @@ InboundLedger::init(ScopedLockType& collectionLock)
collectionLock.unlock(); collectionLock.unlock();
tryDB(app_.getNodeFamily().db()); tryDB(app_.getNodeFamily().db());
if (failed_)
if (hasFailed())
return; return;
if (!complete_) if (!hasCompleted())
{ {
auto shardStore = app_.getShardStore(); auto shardStore = app_.getShardStore();
if (mReason == Reason::SHARD) if (reason_ == Reason::SHARD)
{ {
if (!shardStore) if (!shardStore)
{ {
JLOG(journal_.error()) JLOG(journal_.error())
<< "Acquiring shard with no shard store available"; << "Acquiring shard with no shard store available";
failed_ = true; markFailed();
return; return;
} }
mHaveHeader = false; userdata_ &= ~(IL_HAVE_HEADER | IL_HAVE_STATE | IL_HAVE_TXNS);
mHaveTransactions = false; ledger_.reset();
mHaveState = false;
mLedger.reset();
tryDB(app_.getShardFamily()->db()); tryDB(app_.getShardFamily()->db());
if (failed_)
if (hasFailed())
return; return;
} }
else if (shardStore && mSeq >= shardStore->earliestLedgerSeq()) else if (shardStore && mSeq >= shardStore->earliestLedgerSeq())
{ {
if (auto l = shardStore->fetchLedger(hash_, mSeq)) if (auto l = shardStore->fetchLedger(hash_, mSeq))
{ {
mHaveHeader = true; userdata_ |= (IL_HAVE_HEADER | IL_HAVE_STATE | IL_HAVE_TXNS);
mHaveTransactions = true; markComplete();
mHaveState = true; ledger_ = std::move(l);
complete_ = true;
mLedger = std::move(l);
} }
} }
} }
if (!complete_)
if (!hasCompleted())
{ {
addPeers(); addPeers();
queueJob(sl); queueJob(sl);
@@ -155,17 +156,17 @@ InboundLedger::init(ScopedLockType& collectionLock)
JLOG(journal_.debug()) << "Acquiring ledger we already have in " JLOG(journal_.debug()) << "Acquiring ledger we already have in "
<< " local store. " << hash_; << " local store. " << hash_;
assert(mLedger->read(keylet::fees())); assert(ledger_->read(keylet::fees()));
mLedger->setImmutable(); ledger_->setImmutable();
if (mReason == Reason::HISTORY || mReason == Reason::SHARD) if (reason_ == Reason::HISTORY || reason_ == Reason::SHARD)
return; return;
app_.getLedgerMaster().storeLedger(mLedger); app_.getLedgerMaster().storeLedger(ledger_);
// Check if this could be a newer fully-validated ledger // Check if this could be a newer fully-validated ledger
if (mReason == Reason::CONSENSUS) if (reason_ == Reason::CONSENSUS)
app_.getLedgerMaster().checkAccept(mLedger); app_.getLedgerMaster().checkAccept(ledger_);
} }
std::size_t std::size_t
@@ -196,13 +197,13 @@ InboundLedger::checkLocal()
ScopedLockType sl(mtx_); ScopedLockType sl(mtx_);
if (!isDone()) if (!isDone())
{ {
if (mLedger) if (ledger_)
tryDB(mLedger->stateMap().family().db()); tryDB(ledger_->stateMap().family().db());
else if (mReason == Reason::SHARD) else if (reason_ == Reason::SHARD)
tryDB(app_.getShardFamily()->db()); tryDB(app_.getShardFamily()->db());
else else
tryDB(app_.getNodeFamily().db()); tryDB(app_.getNodeFamily().db());
if (failed_ || complete_) if (hasFailed() || hasCompleted())
{ {
done(); done();
return true; return true;
@@ -220,14 +221,12 @@ InboundLedger::~InboundLedger()
if (entry.second->type() == protocol::liAS_NODE) if (entry.second->type() == protocol::liAS_NODE)
app_.getInboundLedgers().gotStaleData(entry.second); app_.getInboundLedgers().gotStaleData(entry.second);
} }
if (!isDone()) if (!isDone())
{ {
JLOG(journal_.debug()) JLOG(journal_.debug())
<< "Acquire " << hash_ << " abort " << "Acquire " << hash_ << " aborted. Timeouts: " << timeouts_
<< ((timeouts_ == 0) ? std::string() << ", stats: " << mStats.get();
: (std::string("timeouts:") +
std::to_string(timeouts_) + " "))
<< mStats.get();
} }
} }
@@ -259,14 +258,14 @@ neededHashes(
std::vector<uint256> std::vector<uint256>
InboundLedger::neededTxHashes(int max, SHAMapSyncFilter* filter) const InboundLedger::neededTxHashes(int max, SHAMapSyncFilter* filter) const
{ {
return neededHashes(mLedger->info().txHash, mLedger->txMap(), max, filter); return neededHashes(ledger_->info().txHash, ledger_->txMap(), max, filter);
} }
std::vector<uint256> std::vector<uint256>
InboundLedger::neededStateHashes(int max, SHAMapSyncFilter* filter) const InboundLedger::neededStateHashes(int max, SHAMapSyncFilter* filter) const
{ {
return neededHashes( return neededHashes(
mLedger->info().accountHash, mLedger->stateMap(), max, filter); ledger_->info().accountHash, ledger_->stateMap(), max, filter);
} }
LedgerInfo LedgerInfo
@@ -304,24 +303,24 @@ deserializePrefixedHeader(Slice data, bool hasHash)
void void
InboundLedger::tryDB(NodeStore::Database& srcDB) InboundLedger::tryDB(NodeStore::Database& srcDB)
{ {
if (!mHaveHeader) if (!(userdata_ & IL_HAVE_HEADER))
{ {
auto makeLedger = [&, this](Blob const& data) { auto makeLedger = [&, this](Blob const& data) {
JLOG(journal_.trace()) << "Ledger header found in fetch pack"; JLOG(journal_.trace()) << "Ledger header found in fetch pack";
mLedger = std::make_shared<Ledger>( ledger_ = std::make_shared<Ledger>(
deserializePrefixedHeader(makeSlice(data)), deserializePrefixedHeader(makeSlice(data)),
app_.config(), app_.config(),
mReason == Reason::SHARD ? *app_.getShardFamily() reason_ == Reason::SHARD ? *app_.getShardFamily()
: app_.getNodeFamily()); : app_.getNodeFamily());
if (mLedger->info().hash != hash_ || if (ledger_->info().hash != hash_ ||
(mSeq != 0 && mSeq != mLedger->info().seq)) (mSeq != 0 && mSeq != ledger_->info().seq))
{ {
// We know for a fact the ledger can never be acquired // We know for a fact the ledger can never be acquired
JLOG(journal_.warn()) JLOG(journal_.warn())
<< "hash " << hash_ << " seq " << std::to_string(mSeq) << "hash " << hash_ << " seq " << std::to_string(mSeq)
<< " cannot be a ledger"; << " cannot be a ledger";
mLedger.reset(); ledger_.reset();
failed_ = true; markFailed();
} }
}; };
@@ -331,16 +330,16 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
JLOG(journal_.trace()) << "Ledger header found in local store"; JLOG(journal_.trace()) << "Ledger header found in local store";
makeLedger(nodeObject->getData()); makeLedger(nodeObject->getData());
if (failed_) if (hasFailed())
return; return;
// Store the ledger header if the source and destination differ // Store the ledger header if the source and destination differ
auto& dstDB{mLedger->stateMap().family().db()}; auto& dstDB{ledger_->stateMap().family().db()};
if (std::addressof(dstDB) != std::addressof(srcDB)) if (std::addressof(dstDB) != std::addressof(srcDB))
{ {
Blob blob{nodeObject->getData()}; Blob blob{nodeObject->getData()};
dstDB.store( dstDB.store(
hotLEDGER, std::move(blob), hash_, mLedger->info().seq); hotLEDGER, std::move(blob), hash_, ledger_->info().seq);
} }
} }
else else
@@ -353,72 +352,73 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
JLOG(journal_.trace()) << "Ledger header found in fetch pack"; JLOG(journal_.trace()) << "Ledger header found in fetch pack";
makeLedger(*data); makeLedger(*data);
if (failed_) if (hasFailed())
return; return;
// Store the ledger header in the ledger's database // Store the ledger header in the ledger's database
mLedger->stateMap().family().db().store( ledger_->stateMap().family().db().store(
hotLEDGER, std::move(*data), hash_, mLedger->info().seq); hotLEDGER, std::move(*data), hash_, ledger_->info().seq);
} }
if (mSeq == 0) if (mSeq == 0)
mSeq = mLedger->info().seq; mSeq = ledger_->info().seq;
mLedger->stateMap().setLedgerSeq(mSeq); ledger_->stateMap().setLedgerSeq(mSeq);
mLedger->txMap().setLedgerSeq(mSeq); ledger_->txMap().setLedgerSeq(mSeq);
mHaveHeader = true; userdata_ |= IL_HAVE_HEADER;
} }
if (!mHaveTransactions) if (!(userdata_ & IL_HAVE_TXNS))
{ {
if (mLedger->info().txHash.isZero()) if (ledger_->info().txHash.isZero())
{ {
JLOG(journal_.trace()) << "No TXNs to fetch"; JLOG(journal_.trace()) << "No TXNs to fetch";
mHaveTransactions = true; userdata_ |= IL_HAVE_TXNS;
} }
else else
{ {
TransactionStateSF filter( TransactionStateSF filter(
mLedger->txMap().family().db(), app_.getLedgerMaster()); ledger_->txMap().family().db(), app_.getLedgerMaster());
if (mLedger->txMap().fetchRoot( if (ledger_->txMap().fetchRoot(
SHAMapHash{mLedger->info().txHash}, &filter)) SHAMapHash{ledger_->info().txHash}, &filter))
{ {
if (neededTxHashes(1, &filter).empty()) if (neededTxHashes(1, &filter).empty())
{ {
JLOG(journal_.trace()) << "Had full txn map locally"; JLOG(journal_.trace()) << "Had full txn map locally";
mHaveTransactions = true; userdata_ |= IL_HAVE_TXNS;
} }
} }
} }
} }
if (!mHaveState) if (!(userdata_ & IL_HAVE_STATE))
{ {
if (mLedger->info().accountHash.isZero()) if (ledger_->info().accountHash.isZero())
{ {
JLOG(journal_.fatal()) JLOG(journal_.fatal())
<< "We are acquiring a ledger with a zero account hash"; << "We are acquiring a ledger with a zero account hash";
failed_ = true; markFailed();
return; return;
} }
AccountStateSF filter( AccountStateSF filter(
mLedger->stateMap().family().db(), app_.getLedgerMaster()); ledger_->stateMap().family().db(), app_.getLedgerMaster());
if (mLedger->stateMap().fetchRoot( if (ledger_->stateMap().fetchRoot(
SHAMapHash{mLedger->info().accountHash}, &filter)) SHAMapHash{ledger_->info().accountHash}, &filter))
{ {
if (neededStateHashes(1, &filter).empty()) if (neededStateHashes(1, &filter).empty())
{ {
JLOG(journal_.trace()) << "Had full AS map locally"; JLOG(journal_.trace()) << "Had full AS map locally";
mHaveState = true; userdata_ |= IL_HAVE_STATE;
} }
} }
} }
if (mHaveTransactions && mHaveState) if ((IL_HAVE_TXNS | IL_HAVE_STATE) ==
(userdata_ & (IL_HAVE_TXNS | IL_HAVE_STATE)))
{ {
JLOG(journal_.debug()) << "Had everything locally"; JLOG(journal_.debug()) << "Had everything locally";
complete_ = true; markComplete();
assert(mLedger->read(keylet::fees())); assert(ledger_->read(keylet::fees()));
mLedger->setImmutable(); ledger_->setImmutable();
} }
} }
@@ -447,7 +447,7 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
JLOG(journal_.warn()) JLOG(journal_.warn())
<< timeouts_ << " timeouts for ledger " << hash_; << timeouts_ << " timeouts for ledger " << hash_;
} }
failed_ = true; markFailed();
done(); done();
return; return;
} }
@@ -456,20 +456,19 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
{ {
checkLocal(); checkLocal();
mByHash = true; userdata_ |= IL_BY_HASH;
std::size_t pc = getPeerCount();
JLOG(journal_.debug()) JLOG(journal_.debug())
<< "No progress(" << pc << ") for ledger " << hash_; << "No progress(" << getPeerCount() << ") for ledger " << hash_;
// addPeers triggers if the reason is not HISTORY // addPeers triggers if the reason is not HISTORY
// So if the reason IS HISTORY, need to trigger after we add // So if the reason IS HISTORY, need to trigger after we add
// otherwise, we need to trigger before we add // otherwise, we need to trigger before we add
// so each peer gets triggered once // so each peer gets triggered once
if (mReason != Reason::HISTORY) if (reason_ != Reason::HISTORY)
trigger(nullptr, TriggerReason::timeout); trigger(nullptr, TriggerReason::timeout);
addPeers(); addPeers();
if (mReason == Reason::HISTORY) if (reason_ == Reason::HISTORY)
trigger(nullptr, TriggerReason::timeout); trigger(nullptr, TriggerReason::timeout);
} }
} }
@@ -484,7 +483,7 @@ InboundLedger::addPeers()
[this](auto peer) { [this](auto peer) {
// For historical nodes, do not trigger too soon // For historical nodes, do not trigger too soon
// since a fetch pack is probably coming // since a fetch pack is probably coming
if (mReason != Reason::HISTORY) if (reason_ != Reason::HISTORY)
trigger(peer, TriggerReason::added); trigger(peer, TriggerReason::added);
}); });
} }
@@ -498,35 +497,36 @@ InboundLedger::pmDowncast()
void void
InboundLedger::done() InboundLedger::done()
{ {
if (mSignaled) // Nothing to do if it was already signaled.
if (userdata_.fetch_or(IL_SIGNALED) & IL_SIGNALED)
return; return;
mSignaled = true;
touch(); touch();
JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ") JLOG(journal_.debug()) << "Acquire " << hash_
<< (hasFailed() ? " fail " : " ")
<< ((timeouts_ == 0) << ((timeouts_ == 0)
? std::string() ? std::string()
: (std::string("timeouts:") + : (std::string("timeouts:") +
std::to_string(timeouts_) + " ")) std::to_string(timeouts_) + " "))
<< mStats.get(); << mStats.get();
assert(complete_ || failed_); assert(hasCompleted() || hasFailed());
if (complete_ && !failed_ && mLedger) if (hasCompleted() && !hasFailed() && ledger_)
{ {
assert(mLedger->read(keylet::fees())); assert(ledger_->read(keylet::fees()));
mLedger->setImmutable(); ledger_->setImmutable();
switch (mReason) switch (reason_)
{ {
case Reason::SHARD: case Reason::SHARD:
app_.getShardStore()->setStored(mLedger); app_.getShardStore()->setStored(ledger_);
[[fallthrough]]; [[fallthrough]];
case Reason::HISTORY: case Reason::HISTORY:
app_.getInboundLedgers().onLedgerFetched(); app_.getInboundLedgers().onLedgerFetched();
break; break;
default: default:
app_.getLedgerMaster().storeLedger(mLedger); app_.getLedgerMaster().storeLedger(ledger_);
break; break;
} }
} }
@@ -534,7 +534,7 @@ InboundLedger::done()
// We hold the PeerSet lock, so must dispatch // We hold the PeerSet lock, so must dispatch
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() { jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
if (self->complete_ && !self->failed_) if (self->hasCompleted() && !self->hasFailed())
{ {
self->app_.getLedgerMaster().checkAccept(self->getLedger()); self->app_.getLedgerMaster().checkAccept(self->getLedger());
self->app_.getLedgerMaster().tryAdvance(); self->app_.getLedgerMaster().tryAdvance();
@@ -554,9 +554,9 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (isDone()) if (isDone())
{ {
JLOG(journal_.debug()) JLOG(journal_.debug()) << "Trigger on ledger: " << hash_
<< "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "") << (hasCompleted() ? " completed" : "")
<< (failed_ ? " failed" : ""); << (hasFailed() ? " failed" : "");
return; return;
} }
@@ -567,19 +567,30 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
else else
stream << "Trigger acquiring ledger " << hash_; stream << "Trigger acquiring ledger " << hash_;
if (complete_ || failed_) if (hasCompleted() || hasFailed())
stream << "complete=" << complete_ << " failed=" << failed_; stream << "complete=" << hasCompleted()
<< " failed=" << hasFailed();
else else
stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions {
<< " as=" << mHaveState; auto [haveHeader, haveState, haveTransactions] =
[](std::uint8_t flags) {
return std::make_tuple(
(flags & IL_HAVE_HEADER) == IL_HAVE_HEADER,
(flags & IL_HAVE_STATE) == IL_HAVE_STATE,
(flags & IL_HAVE_TXNS) == IL_HAVE_TXNS);
}(userdata_);
stream << "header=" << haveHeader << " tx=" << haveTransactions
<< " as=" << haveState;
}
} }
if (!mHaveHeader) if (!(userdata_ & IL_HAVE_HEADER))
{ {
tryDB( tryDB(
mReason == Reason::SHARD ? app_.getShardFamily()->db() reason_ == Reason::SHARD ? app_.getShardFamily()->db()
: app_.getNodeFamily().db()); : app_.getNodeFamily().db());
if (failed_) if (hasFailed())
{ {
JLOG(journal_.warn()) << " failed local for " << hash_; JLOG(journal_.warn()) << " failed local for " << hash_;
return; return;
@@ -594,7 +605,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
// Be more aggressive if we've timed out at least once // Be more aggressive if we've timed out at least once
tmGL.set_querytype(protocol::qtINDIRECT); tmGL.set_querytype(protocol::qtINDIRECT);
if (!progress_ && !failed_ && mByHash && if (!hasProgressed() && !hasFailed() && (userdata_ & IL_BY_HASH) &&
(timeouts_ > ledgerBecomeAggressiveThreshold)) (timeouts_ > ledgerBecomeAggressiveThreshold))
{ {
auto need = getNeededHashes(); auto need = getNeededHashes();
@@ -631,7 +642,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
peerIds.begin(), peerIds.end(), [this, &packet](auto id) { peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
if (auto p = app_.overlay().findPeerByShortID(id)) if (auto p = app_.overlay().findPeerByShortID(id))
{ {
mByHash = false; userdata_ &= ~IL_BY_HASH;
p->send(packet); p->send(packet);
} }
}); });
@@ -640,17 +651,15 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
{ {
JLOG(journal_.info()) JLOG(journal_.info())
<< "getNeededHashes says acquire is complete"; << "getNeededHashes says acquire is complete";
mHaveHeader = true; userdata_ |= (IL_HAVE_HEADER | IL_HAVE_STATE | IL_HAVE_TXNS);
mHaveTransactions = true; markComplete();
mHaveState = true;
complete_ = true;
} }
} }
} }
// We can't do much without the header data because we don't know the // We can't do much without the header data because we don't know the
// state or transaction root hashes. // state or transaction root hashes.
if (!mHaveHeader && !failed_) if (!(userdata_ & IL_HAVE_HEADER) && !hasFailed())
{ {
tmGL.set_itype(protocol::liBASE); tmGL.set_itype(protocol::liBASE);
if (mSeq != 0) if (mSeq != 0)
@@ -661,8 +670,8 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
return; return;
} }
if (mLedger) if (ledger_)
tmGL.set_ledgerseq(mLedger->info().seq); tmGL.set_ledgerseq(ledger_->info().seq);
if (reason != TriggerReason::reply) if (reason != TriggerReason::reply)
{ {
@@ -679,15 +688,16 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
// Get the state data first because it's the most likely to be useful // Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch. // if we wind up abandoning this fetch.
if (mHaveHeader && !mHaveState && !failed_) if (((userdata_ & (IL_HAVE_HEADER | IL_HAVE_STATE)) == IL_HAVE_HEADER) &&
!hasFailed())
{ {
assert(mLedger); assert(ledger_);
if (!mLedger->stateMap().isValid()) if (!ledger_->stateMap().isValid())
{ {
failed_ = true; markFailed();
} }
else if (mLedger->stateMap().getHash().isZero()) else if (ledger_->stateMap().getHash().isZero())
{ {
// we need the root node // we need the root node
tmGL.set_itype(protocol::liAS_NODE); tmGL.set_itype(protocol::liAS_NODE);
@@ -700,27 +710,27 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
else else
{ {
AccountStateSF filter( AccountStateSF filter(
mLedger->stateMap().family().db(), app_.getLedgerMaster()); ledger_->stateMap().family().db(), app_.getLedgerMaster());
// Release the lock while we process the large state map // Release the lock while we process the large state map
sl.unlock(); sl.unlock();
auto nodes = auto nodes =
mLedger->stateMap().getMissingNodes(missingNodesFind, &filter); ledger_->stateMap().getMissingNodes(missingNodesFind, &filter);
sl.lock(); sl.lock();
// Make sure nothing happened while we released the lock // Make sure nothing happened while we released the lock
if (!failed_ && !complete_ && !mHaveState) if (!hasFailed() && !hasCompleted() && !(userdata_ & IL_HAVE_STATE))
{ {
if (nodes.empty()) if (nodes.empty())
{ {
if (!mLedger->stateMap().isValid()) if (!ledger_->stateMap().isValid())
failed_ = true; markFailed();
else else
{ {
mHaveState = true; userdata_ |= IL_HAVE_STATE;
if (mHaveTransactions) if (userdata_ & IL_HAVE_TXNS)
complete_ = true; markComplete();
} }
} }
else else
@@ -751,15 +761,16 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
} }
} }
if (mHaveHeader && !mHaveTransactions && !failed_) if (((userdata_ & (IL_HAVE_HEADER | IL_HAVE_TXNS)) == IL_HAVE_HEADER) &&
!hasFailed())
{ {
assert(mLedger); assert(ledger_);
if (!mLedger->txMap().isValid()) if (!ledger_->txMap().isValid())
{ {
failed_ = true; markFailed();
} }
else if (mLedger->txMap().getHash().isZero()) else if (ledger_->txMap().getHash().isZero())
{ {
// we need the root node // we need the root node
tmGL.set_itype(protocol::liTX_NODE); tmGL.set_itype(protocol::liTX_NODE);
@@ -772,21 +783,21 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
else else
{ {
TransactionStateSF filter( TransactionStateSF filter(
mLedger->txMap().family().db(), app_.getLedgerMaster()); ledger_->txMap().family().db(), app_.getLedgerMaster());
auto nodes = auto nodes =
mLedger->txMap().getMissingNodes(missingNodesFind, &filter); ledger_->txMap().getMissingNodes(missingNodesFind, &filter);
if (nodes.empty()) if (nodes.empty())
{ {
if (!mLedger->txMap().isValid()) if (!ledger_->txMap().isValid())
failed_ = true; markFailed();
else else
{ {
mHaveTransactions = true; userdata_ |= IL_HAVE_TXNS;
if (mHaveState) if (userdata_ & IL_HAVE_STATE)
complete_ = true; markComplete();
} }
} }
else else
@@ -814,11 +825,11 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
} }
} }
if (complete_ || failed_) if (hasCompleted() || hasFailed())
{ {
JLOG(journal_.debug()) JLOG(journal_.debug())
<< "Done:" << (complete_ ? " complete" : "") << "Done:" << (hasCompleted() ? " complete" : "")
<< (failed_ ? " failed " : " ") << mLedger->info().seq; << (hasFailed() ? " failed " : " ") << ledger_->info().seq;
sl.unlock(); sl.unlock();
done(); done();
} }
@@ -876,41 +887,41 @@ InboundLedger::takeHeader(std::string const& data)
// Return value: true=normal, false=bad data // Return value: true=normal, false=bad data
JLOG(journal_.trace()) << "got header acquiring ledger " << hash_; JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
if (complete_ || failed_ || mHaveHeader) if (hasCompleted() || hasFailed() || (userdata_ & IL_HAVE_HEADER))
return true; return true;
auto* f = mReason == Reason::SHARD ? app_.getShardFamily() auto* f = reason_ == Reason::SHARD ? app_.getShardFamily()
: &app_.getNodeFamily(); : &app_.getNodeFamily();
mLedger = std::make_shared<Ledger>( ledger_ = std::make_shared<Ledger>(
deserializeHeader(makeSlice(data)), app_.config(), *f); deserializeHeader(makeSlice(data)), app_.config(), *f);
if (mLedger->info().hash != hash_ || if (ledger_->info().hash != hash_ ||
(mSeq != 0 && mSeq != mLedger->info().seq)) (mSeq != 0 && mSeq != ledger_->info().seq))
{ {
JLOG(journal_.warn()) JLOG(journal_.warn())
<< "Acquire hash mismatch: " << mLedger->info().hash << "Acquire hash mismatch: " << ledger_->info().hash
<< "!=" << hash_; << "!=" << hash_;
mLedger.reset(); ledger_.reset();
return false; return false;
} }
if (mSeq == 0) if (mSeq == 0)
mSeq = mLedger->info().seq; mSeq = ledger_->info().seq;
mLedger->stateMap().setLedgerSeq(mSeq); ledger_->stateMap().setLedgerSeq(mSeq);
mLedger->txMap().setLedgerSeq(mSeq); ledger_->txMap().setLedgerSeq(mSeq);
mHaveHeader = true; userdata_ |= IL_HAVE_HEADER;
Serializer s(data.size() + 4); Serializer s(data.size() + 4);
s.add32(HashPrefix::ledgerMaster); s.add32(HashPrefix::ledgerMaster);
s.addRaw(data.data(), data.size()); s.addRaw(data.data(), data.size());
f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq); f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
if (mLedger->info().txHash.isZero()) if (ledger_->info().txHash.isZero())
mHaveTransactions = true; userdata_ |= IL_HAVE_TXNS;
if (mLedger->info().accountHash.isZero()) if (ledger_->info().accountHash.isZero())
mHaveState = true; userdata_ |= IL_HAVE_STATE;
mLedger->txMap().setSynching(); ledger_->txMap().setSynching();
mLedger->stateMap().setSynching(); ledger_->stateMap().setSynching();
return true; return true;
} }
@@ -921,7 +932,7 @@ InboundLedger::takeHeader(std::string const& data)
void void
InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san) InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
{ {
if (!mHaveHeader) if (!(userdata_ & IL_HAVE_HEADER))
{ {
JLOG(journal_.warn()) << "Missing ledger header"; JLOG(journal_.warn()) << "Missing ledger header";
san.incInvalid(); san.incInvalid();
@@ -929,13 +940,13 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
} }
if (packet.type() == protocol::liTX_NODE) if (packet.type() == protocol::liTX_NODE)
{ {
if (mHaveTransactions || failed_) if ((userdata_ & IL_HAVE_TXNS) || hasFailed())
{ {
san.incDuplicate(); san.incDuplicate();
return; return;
} }
} }
else if (mHaveState || failed_) else if ((userdata_ & IL_HAVE_STATE) || hasFailed())
{ {
san.incDuplicate(); san.incDuplicate();
return; return;
@@ -945,15 +956,15 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
-> std::tuple<SHAMap&, SHAMapHash, std::unique_ptr<SHAMapSyncFilter>> { -> std::tuple<SHAMap&, SHAMapHash, std::unique_ptr<SHAMapSyncFilter>> {
if (packet.type() == protocol::liTX_NODE) if (packet.type() == protocol::liTX_NODE)
return { return {
mLedger->txMap(), ledger_->txMap(),
SHAMapHash{mLedger->info().txHash}, SHAMapHash{ledger_->info().txHash},
std::make_unique<TransactionStateSF>( std::make_unique<TransactionStateSF>(
mLedger->txMap().family().db(), app_.getLedgerMaster())}; ledger_->txMap().family().db(), app_.getLedgerMaster())};
return { return {
mLedger->stateMap(), ledger_->stateMap(),
SHAMapHash{mLedger->info().accountHash}, SHAMapHash{ledger_->info().accountHash},
std::make_unique<AccountStateSF>( std::make_unique<AccountStateSF>(
mLedger->stateMap().family().db(), app_.getLedgerMaster())}; ledger_->stateMap().family().db(), app_.getLedgerMaster())};
}(); }();
try try
@@ -993,13 +1004,14 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
if (!map.isSynching()) if (!map.isSynching())
{ {
if (packet.type() == protocol::liTX_NODE) if (packet.type() == protocol::liTX_NODE)
mHaveTransactions = true; userdata_ |= IL_HAVE_TXNS;
else else
mHaveState = true; userdata_ |= IL_HAVE_STATE;
if (mHaveTransactions && mHaveState) if ((IL_HAVE_STATE | IL_HAVE_TXNS) ==
(userdata_ & (IL_HAVE_STATE | IL_HAVE_TXNS)))
{ {
complete_ = true; markComplete();
done(); done();
} }
} }
@@ -1011,22 +1023,22 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
bool bool
InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san) InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
{ {
if (failed_ || mHaveState) if (hasFailed() || (userdata_ & IL_HAVE_STATE))
{ {
san.incDuplicate(); san.incDuplicate();
return true; return true;
} }
if (!mHaveHeader) if (!(userdata_ & IL_HAVE_HEADER))
{ {
assert(false); assert(false);
return false; return false;
} }
AccountStateSF filter( AccountStateSF filter(
mLedger->stateMap().family().db(), app_.getLedgerMaster()); ledger_->stateMap().family().db(), app_.getLedgerMaster());
san += mLedger->stateMap().addRootNode( san += ledger_->stateMap().addRootNode(
SHAMapHash{mLedger->info().accountHash}, data, &filter); SHAMapHash{ledger_->info().accountHash}, data, &filter);
return san.isGood(); return san.isGood();
} }
@@ -1036,22 +1048,22 @@ InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
bool bool
InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san) InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
{ {
if (failed_ || mHaveTransactions) if (hasFailed() || (userdata_ & IL_HAVE_TXNS))
{ {
san.incDuplicate(); san.incDuplicate();
return true; return true;
} }
if (!mHaveHeader) if (!(userdata_ & IL_HAVE_HEADER))
{ {
assert(false); assert(false);
return false; return false;
} }
TransactionStateSF filter( TransactionStateSF filter(
mLedger->txMap().family().db(), app_.getLedgerMaster()); ledger_->txMap().family().db(), app_.getLedgerMaster());
san += mLedger->txMap().addRootNode( san += ledger_->txMap().addRootNode(
SHAMapHash{mLedger->info().txHash}, data, &filter); SHAMapHash{ledger_->info().txHash}, data, &filter);
return san.isGood(); return san.isGood();
} }
@@ -1060,17 +1072,17 @@ InboundLedger::getNeededHashes()
{ {
std::vector<neededHash_t> ret; std::vector<neededHash_t> ret;
if (!mHaveHeader) if (!(userdata_ & IL_HAVE_HEADER))
{ {
ret.push_back( ret.push_back(
std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_)); std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
return ret; return ret;
} }
if (!mHaveState) if (!(userdata_ & IL_HAVE_STATE))
{ {
AccountStateSF filter( AccountStateSF filter(
mLedger->stateMap().family().db(), app_.getLedgerMaster()); ledger_->stateMap().family().db(), app_.getLedgerMaster());
for (auto const& h : neededStateHashes(4, &filter)) for (auto const& h : neededStateHashes(4, &filter))
{ {
ret.push_back( ret.push_back(
@@ -1078,10 +1090,10 @@ InboundLedger::getNeededHashes()
} }
} }
if (!mHaveTransactions) if (!(userdata_ & IL_HAVE_TXNS))
{ {
TransactionStateSF filter( TransactionStateSF filter(
mLedger->txMap().family().db(), app_.getLedgerMaster()); ledger_->txMap().family().db(), app_.getLedgerMaster());
for (auto const& h : neededTxHashes(4, &filter)) for (auto const& h : neededTxHashes(4, &filter))
{ {
ret.push_back(std::make_pair( ret.push_back(std::make_pair(
@@ -1107,10 +1119,9 @@ InboundLedger::gotData(
mReceivedData.emplace_back(peer, data); mReceivedData.emplace_back(peer, data);
if (mReceiveDispatched) if (userdata_.fetch_or(IL_RECEIVE_DISPATCHED) & IL_RECEIVE_DISPATCHED)
return false; return false;
mReceiveDispatched = true;
return true; return true;
} }
@@ -1142,7 +1153,7 @@ InboundLedger::processData(
try try
{ {
if (!mHaveHeader) if (!(userdata_ & IL_HAVE_HEADER))
{ {
if (!takeHeader(packet.nodes(0).nodedata())) if (!takeHeader(packet.nodes(0).nodedata()))
{ {
@@ -1154,13 +1165,13 @@ InboundLedger::processData(
san.incUseful(); san.incUseful();
} }
if (!mHaveState && (packet.nodes().size() > 1) && if (!(userdata_ & IL_HAVE_STATE) && (packet.nodes().size() > 1) &&
!takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san)) !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
{ {
JLOG(journal_.warn()) << "Included AS root invalid"; JLOG(journal_.warn()) << "Included AS root invalid";
} }
if (!mHaveTransactions && (packet.nodes().size() > 2) && if (!(userdata_ & IL_HAVE_TXNS) && (packet.nodes().size() > 2) &&
!takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san)) !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
{ {
JLOG(journal_.warn()) << "Included TX root invalid"; JLOG(journal_.warn()) << "Included TX root invalid";
@@ -1175,7 +1186,7 @@ InboundLedger::processData(
} }
if (san.isUseful()) if (san.isUseful())
progress_ = true; makeProgress();
mStats += san; mStats += san;
return san.getGood(); return san.getGood();
@@ -1216,7 +1227,7 @@ InboundLedger::processData(
<< " node stats: " << san.get(); << " node stats: " << san.get();
if (san.isUseful()) if (san.isUseful())
progress_ = true; makeProgress();
mStats += san; mStats += san;
return san.getGood(); return san.getGood();
@@ -1324,7 +1335,7 @@ InboundLedger::runData()
if (mReceivedData.empty()) if (mReceivedData.empty())
{ {
mReceiveDispatched = false; userdata_ &= ~IL_RECEIVE_DISPATCHED;
break; break;
} }
@@ -1358,45 +1369,49 @@ InboundLedger::getJson(int)
ret[jss::hash] = to_string(hash_); ret[jss::hash] = to_string(hash_);
if (complete_) if (hasCompleted())
ret[jss::complete] = true; ret[jss::complete] = true;
if (failed_) if (hasFailed())
ret[jss::failed] = true; ret[jss::failed] = true;
if (!complete_ && !failed_) if (!hasCompleted() && !hasFailed())
ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size()); ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
ret[jss::have_header] = mHaveHeader; auto [haveHeader, haveState, haveTransactions] = [](std::uint8_t flags) {
return std::make_tuple(
(flags & IL_HAVE_HEADER) == IL_HAVE_HEADER,
(flags & IL_HAVE_STATE) == IL_HAVE_STATE,
(flags & IL_HAVE_TXNS) == IL_HAVE_TXNS);
}(userdata_);
if (mHaveHeader) ret[jss::have_header] = haveHeader;
if (haveHeader)
{ {
ret[jss::have_state] = mHaveState; ret[jss::have_state] = haveState;
ret[jss::have_transactions] = mHaveTransactions;
if (haveHeader && !haveState)
{
Json::Value hv(Json::arrayValue);
for (auto const& h : neededStateHashes(16, nullptr))
hv.append(to_string(h));
ret[jss::needed_state_hashes] = hv;
}
ret[jss::have_transactions] = haveTransactions;
if (haveHeader && !haveTransactions)
{
Json::Value hv(Json::arrayValue);
for (auto const& h : neededTxHashes(16, nullptr))
hv.append(to_string(h));
ret[jss::needed_transaction_hashes] = hv;
}
} }
ret[jss::timeouts] = timeouts_; ret[jss::timeouts] = timeouts_;
if (mHaveHeader && !mHaveState)
{
Json::Value hv(Json::arrayValue);
for (auto const& h : neededStateHashes(16, nullptr))
{
hv.append(to_string(h));
}
ret[jss::needed_state_hashes] = hv;
}
if (mHaveHeader && !mHaveTransactions)
{
Json::Value hv(Json::arrayValue);
for (auto const& h : neededTxHashes(16, nullptr))
{
hv.append(to_string(h));
}
ret[jss::needed_transaction_hashes] = hv;
}
return ret; return ret;
} }

View File

@@ -110,13 +110,13 @@ public:
} }
} }
if (inbound->isFailed()) if (inbound->hasFailed())
return {}; return {};
if (!isNew) if (!isNew)
inbound->update(seq); inbound->update(seq);
if (!inbound->isComplete()) if (!inbound->hasCompleted())
return {}; return {};
if (reason == InboundLedger::Reason::HISTORY) if (reason == InboundLedger::Reason::HISTORY)

View File

@@ -17,49 +17,42 @@
*/ */
//============================================================================== //==============================================================================
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/InboundTransactions.h> #include <ripple/app/ledger/InboundTransactions.h>
#include <ripple/app/ledger/impl/TransactionAcquire.h> #include <ripple/app/ledger/impl/TransactionAcquire.h>
#include <ripple/app/main/Application.h> #include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/Log.h>
#include <ripple/core/JobQueue.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/resource/Fees.h> #include <ripple/resource/Fees.h>
#include <cassert>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
namespace ripple { namespace ripple {
enum { struct InboundTransactionSet
// Ideal number of peers to start with
startPeers = 2,
// How many rounds to keep a set
setKeepRounds = 3,
};
class InboundTransactionSet
{ {
// A transaction set we generated, acquired, or are acquiring // A transaction set we generated, acquired, or are acquiring
public: std::shared_ptr<TransactionAcquire> acquire;
std::uint32_t mSeq; std::shared_ptr<SHAMap> txset;
TransactionAcquire::pointer mAcquire; std::uint32_t seq = 0;
std::shared_ptr<SHAMap> mSet;
InboundTransactionSet(std::uint32_t seq, std::shared_ptr<SHAMap> const& set) InboundTransactionSet() = default;
: mSeq(seq), mSet(set) InboundTransactionSet(InboundTransactionSet&& other) = default;
{
; InboundTransactionSet&
} operator=(InboundTransactionSet&& other) = delete;
InboundTransactionSet() : mSeq(0)
{ InboundTransactionSet(InboundTransactionSet const& other) = delete;
; InboundTransactionSet&
} operator=(InboundTransactionSet const& other) = delete;
}; };
class InboundTransactionsImp : public InboundTransactions class InboundTransactionsImp : public InboundTransactions
{ {
/** The initial number of peers to query when fetching a transaction set. */
static constexpr int const startPeers = 2;
/** How many rounds to keep an inbound transaction set for */
static constexpr std::uint32_t const setKeepRounds = 3;
public: public:
InboundTransactionsImp( InboundTransactionsImp(
Application& app, Application& app,
@@ -67,112 +60,78 @@ public:
std::function<void(std::shared_ptr<SHAMap> const&, bool)> gotSet, std::function<void(std::shared_ptr<SHAMap> const&, bool)> gotSet,
std::unique_ptr<PeerSetBuilder> peerSetBuilder) std::unique_ptr<PeerSetBuilder> peerSetBuilder)
: app_(app) : app_(app)
, m_seq(0)
, m_zeroSet(m_map[uint256()])
, m_gotSet(std::move(gotSet)) , m_gotSet(std::move(gotSet))
, m_peerSetBuilder(std::move(peerSetBuilder)) , peerSetBuilder_(std::move(peerSetBuilder))
, j_(app_.journal("InboundTransactions"))
{ {
m_zeroSet.mSet = std::make_shared<SHAMap>( emptyMap_ = std::make_shared<SHAMap>(
SHAMapType::TRANSACTION, uint256(), app_.getNodeFamily()); SHAMapType::TRANSACTION, uint256(), app_.getNodeFamily());
m_zeroSet.mSet->setUnbacked(); emptyMap_->setUnbacked();
}
TransactionAcquire::pointer
getAcquire(uint256 const& hash)
{
{
std::lock_guard sl(mLock);
auto it = m_map.find(hash);
if (it != m_map.end())
return it->second.mAcquire;
}
return {};
} }
std::shared_ptr<SHAMap> std::shared_ptr<SHAMap>
getSet(uint256 const& hash, bool acquire) override getSet(uint256 const& hash, bool acquire) override
{ {
TransactionAcquire::pointer ta; if (hash.isZero())
return emptyMap_;
std::shared_ptr<TransactionAcquire> ta;
{ {
std::lock_guard sl(mLock); std::lock_guard sl(lock_);
if (stopping_)
return {};
if (auto it = m_map.find(hash); it != m_map.end()) if (auto it = m_map.find(hash); it != m_map.end())
{ {
if (acquire) if (acquire)
{ {
it->second.mSeq = m_seq; it->second.seq = m_seq;
if (it->second.mAcquire)
{ if (it->second.acquire)
it->second.mAcquire->stillNeed(); it->second.acquire->stillNeed();
}
} }
return it->second.mSet;
return it->second.txset;
} }
if (!acquire || stopping_) if (acquire)
return std::shared_ptr<SHAMap>(); {
ta = std::make_shared<TransactionAcquire>(
app_, hash, peerSetBuilder_->build());
ta = std::make_shared<TransactionAcquire>( auto& obj = m_map[hash];
app_, hash, m_peerSetBuilder->build()); obj.acquire = ta;
obj.seq = m_seq;
auto& obj = m_map[hash]; }
obj.mAcquire = ta;
obj.mSeq = m_seq;
} }
ta->init(startPeers); if (ta)
ta->init(startPeers);
return {}; return {};
} }
/** We received a TMLedgerData from a peer. /** We received data from a peer. */
*/
void void
gotData( gotData(
LedgerHash const& hash, uint256 const& hash,
std::shared_ptr<Peer> peer, std::shared_ptr<Peer> peer,
std::shared_ptr<protocol::TMLedgerData> packet_ptr) override std::vector<std::pair<SHAMapNodeID, Slice>> const& data) override
{ {
protocol::TMLedgerData& packet = *packet_ptr; assert(!data.empty());
JLOG(j_.trace()) << "Got data (" << packet.nodes().size() if (hash.isZero())
<< ") for acquiring ledger: " << hash;
TransactionAcquire::pointer ta = getAcquire(hash);
if (ta == nullptr)
{
peer->charge(Resource::feeUnwantedData);
return; return;
}
std::vector<std::pair<SHAMapNodeID, Slice>> data; auto ta = [this, &hash]() -> std::shared_ptr<TransactionAcquire> {
data.reserve(packet.nodes().size()); std::lock_guard sl(lock_);
if (auto it = m_map.find(hash); it != m_map.end())
return it->second.acquire;
return {};
}();
for (auto const& node : packet.nodes()) if (!ta || !ta->takeNodes(data, peer).isUseful())
{
if (!node.has_nodeid() || !node.has_nodedata())
{
peer->charge(Resource::feeInvalidRequest);
return;
}
auto const id = deserializeSHAMapNodeID(node.nodeid());
if (!id)
{
peer->charge(Resource::feeBadData);
return;
}
data.emplace_back(std::make_pair(*id, makeSlice(node.nodedata())));
}
if (!ta->takeNodes(data, peer).isUseful())
peer->charge(Resource::feeUnwantedData); peer->charge(Resource::feeUnwantedData);
} }
@@ -182,22 +141,28 @@ public:
std::shared_ptr<SHAMap> const& set, std::shared_ptr<SHAMap> const& set,
bool fromAcquire) override bool fromAcquire) override
{ {
if (hash.isZero())
return;
bool isNew = true; bool isNew = true;
{ {
std::lock_guard sl(mLock); std::lock_guard sl(lock_);
if (stopping_)
return;
auto& inboundSet = m_map[hash]; auto& inboundSet = m_map[hash];
if (inboundSet.mSeq < m_seq) if (inboundSet.seq < m_seq)
inboundSet.mSeq = m_seq; inboundSet.seq = m_seq;
if (inboundSet.mSet) if (inboundSet.txset)
isNew = false; isNew = false;
else else
inboundSet.mSet = set; inboundSet.txset = set;
inboundSet.mAcquire.reset(); inboundSet.acquire.reset();
} }
if (isNew) if (isNew)
@@ -207,24 +172,23 @@ public:
void void
newRound(std::uint32_t seq) override newRound(std::uint32_t seq) override
{ {
std::lock_guard lock(mLock); assert(
seq <= std::numeric_limits<std::uint32_t>::max() - setKeepRounds);
// Protect zero set from expiration std::lock_guard lock(lock_);
m_zeroSet.mSeq = seq;
if (m_seq != seq) if (!stopping_ && m_seq != seq)
{ {
m_seq = seq; m_seq = seq;
auto it = m_map.begin(); auto it = m_map.begin();
std::uint32_t const minSeq = std::uint32_t const maxSeq = seq + setKeepRounds;
(seq < setKeepRounds) ? 0 : (seq - setKeepRounds); std::uint32_t const minSeq = seq - std::min(seq, setKeepRounds);
std::uint32_t maxSeq = seq + setKeepRounds;
while (it != m_map.end()) while (it != m_map.end())
{ {
if (it->second.mSeq < minSeq || it->second.mSeq > maxSeq) if (it->second.seq < minSeq || it->second.seq > maxSeq)
it = m_map.erase(it); it = m_map.erase(it);
else else
++it; ++it;
@@ -235,36 +199,32 @@ public:
void void
stop() override stop() override
{ {
std::lock_guard lock(mLock); std::lock_guard lock(lock_);
stopping_ = true; stopping_ = true;
m_map.clear(); m_map.clear();
} }
private: private:
using MapType = hash_map<uint256, InboundTransactionSet>;
Application& app_; Application& app_;
std::recursive_mutex mLock; std::mutex lock_;
bool stopping_{false}; hash_map<uint256, InboundTransactionSet> m_map;
MapType m_map;
std::uint32_t m_seq;
// The empty transaction set whose hash is zero // The empty transaction set (whose hash is zero)
InboundTransactionSet& m_zeroSet; std::shared_ptr<SHAMap> emptyMap_;
std::function<void(std::shared_ptr<SHAMap> const&, bool)> m_gotSet; std::function<void(std::shared_ptr<SHAMap> const&, bool)> m_gotSet;
std::unique_ptr<PeerSetBuilder> m_peerSetBuilder; std::unique_ptr<PeerSetBuilder> peerSetBuilder_;
beast::Journal j_; std::uint32_t m_seq = 0;
bool stopping_ = false;
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
InboundTransactions::~InboundTransactions() = default;
std::unique_ptr<InboundTransactions> std::unique_ptr<InboundTransactions>
make_InboundTransactions( make_InboundTransactions(
Application& app, Application& app,

View File

@@ -39,8 +39,9 @@ LedgerDeltaAcquire::LedgerDeltaAcquire(
ledgerHash, ledgerHash,
LedgerReplayParameters::SUB_TASK_TIMEOUT, LedgerReplayParameters::SUB_TASK_TIMEOUT,
{jtREPLAY_TASK, {jtREPLAY_TASK,
"LedgerReplayDelta", LedgerReplayParameters::MAX_QUEUED_TASKS,
LedgerReplayParameters::MAX_QUEUED_TASKS}, "LedgerReplayDelta"},
0,
app.journal("LedgerReplayDelta")) app.journal("LedgerReplayDelta"))
, inboundLedgers_(inboundLedgers) , inboundLedgers_(inboundLedgers)
, ledgerSeq_(ledgerSeq) , ledgerSeq_(ledgerSeq)
@@ -71,13 +72,13 @@ LedgerDeltaAcquire::trigger(std::size_t limit, ScopedLockType& sl)
fullLedger_ = app_.getLedgerMaster().getLedgerByHash(hash_); fullLedger_ = app_.getLedgerMaster().getLedgerByHash(hash_);
if (fullLedger_) if (fullLedger_)
{ {
complete_ = true; markComplete();
JLOG(journal_.trace()) << "existing ledger " << hash_; JLOG(journal_.trace()) << "existing ledger " << hash_;
notify(sl); notify(sl);
return; return;
} }
if (!fallBack_) if (userdata_ <= LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
{ {
peerSet_->addPeers( peerSet_->addPeers(
limit, limit,
@@ -93,22 +94,20 @@ LedgerDeltaAcquire::trigger(std::size_t limit, ScopedLockType& sl)
protocol::TMReplayDeltaRequest request; protocol::TMReplayDeltaRequest request;
request.set_ledgerhash(hash_.data(), hash_.size()); request.set_ledgerhash(hash_.data(), hash_.size());
peerSet_->sendRequest(request, peer); peerSet_->sendRequest(request, peer);
return;
} }
else
if (++userdata_ ==
LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
{ {
if (++noFeaturePeerCount >= JLOG(journal_.debug()) << "Fall back for " << hash_;
LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT) timerInterval_ =
{ LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT;
JLOG(journal_.debug()) << "Fall back for " << hash_;
timerInterval_ =
LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT;
fallBack_ = true;
}
} }
}); });
} }
if (fallBack_) if (userdata_ > LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
inboundLedgers_.acquire( inboundLedgers_.acquire(
hash_, ledgerSeq_, InboundLedger::Reason::GENERIC); hash_, ledgerSeq_, InboundLedger::Reason::GENERIC);
} }
@@ -119,7 +118,7 @@ LedgerDeltaAcquire::onTimer(bool progress, ScopedLockType& sl)
JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_; JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
if (timeouts_ > LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS) if (timeouts_ > LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS)
{ {
failed_ = true; markFailed();
JLOG(journal_.debug()) << "too many timeouts " << hash_; JLOG(journal_.debug()) << "too many timeouts " << hash_;
notify(sl); notify(sl);
} }
@@ -152,7 +151,7 @@ LedgerDeltaAcquire::processData(
std::make_shared<Ledger>(info, app_.config(), app_.getNodeFamily()); std::make_shared<Ledger>(info, app_.config(), app_.getNodeFamily());
if (replayTemp_) if (replayTemp_)
{ {
complete_ = true; markComplete();
orderedTxns_ = std::move(orderedTxns); orderedTxns_ = std::move(orderedTxns);
JLOG(journal_.debug()) << "ready to replay " << hash_; JLOG(journal_.debug()) << "ready to replay " << hash_;
notify(sl); notify(sl);
@@ -160,7 +159,7 @@ LedgerDeltaAcquire::processData(
} }
} }
failed_ = true; markFailed();
JLOG(journal_.error()) JLOG(journal_.error())
<< "failed to create a (info only) ledger from verified data " << hash_; << "failed to create a (info only) ledger from verified data " << hash_;
notify(sl); notify(sl);
@@ -196,7 +195,7 @@ LedgerDeltaAcquire::tryBuild(std::shared_ptr<Ledger const> const& parent)
if (fullLedger_) if (fullLedger_)
return fullLedger_; return fullLedger_;
if (failed_ || !complete_ || !replayTemp_) if (hasFailed() || !hasCompleted() || !replayTemp_)
return {}; return {};
assert(parent->seq() + 1 == replayTemp_->seq()); assert(parent->seq() + 1 == replayTemp_->seq());
@@ -212,8 +211,7 @@ LedgerDeltaAcquire::tryBuild(std::shared_ptr<Ledger const> const& parent)
} }
else else
{ {
failed_ = true; markFailed();
complete_ = false;
JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent " JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent "
<< parent->info().hash; << parent->info().hash;
Throw<std::runtime_error>("Cannot replay ledger"); Throw<std::runtime_error>("Cannot replay ledger");
@@ -265,7 +263,7 @@ LedgerDeltaAcquire::notify(ScopedLockType& sl)
assert(isDone()); assert(isDone());
std::vector<OnDeltaDataCB> toCall; std::vector<OnDeltaDataCB> toCall;
std::swap(toCall, dataReadyCallbacks_); std::swap(toCall, dataReadyCallbacks_);
auto const good = !failed_; auto const good = !hasFailed();
sl.unlock(); sl.unlock();
for (auto& cb : toCall) for (auto& cb : toCall)

View File

@@ -154,8 +154,6 @@ private:
std::map<std::uint32_t, std::shared_ptr<STTx const>> orderedTxns_; std::map<std::uint32_t, std::shared_ptr<STTx const>> orderedTxns_;
std::vector<OnDeltaDataCB> dataReadyCallbacks_; std::vector<OnDeltaDataCB> dataReadyCallbacks_;
std::set<InboundLedger::Reason> reasons_; std::set<InboundLedger::Reason> reasons_;
std::uint32_t noFeaturePeerCount = 0;
bool fallBack_ = false;
friend class LedgerReplayTask; // for asserts only friend class LedgerReplayTask; // for asserts only
friend class test::LedgerReplayClient; friend class test::LedgerReplayClient;

View File

@@ -92,8 +92,9 @@ LedgerReplayTask::LedgerReplayTask(
parameter.finishHash_, parameter.finishHash_,
LedgerReplayParameters::TASK_TIMEOUT, LedgerReplayParameters::TASK_TIMEOUT,
{jtREPLAY_TASK, {jtREPLAY_TASK,
"LedgerReplayTask", LedgerReplayParameters::MAX_QUEUED_TASKS,
LedgerReplayParameters::MAX_QUEUED_TASKS}, "LedgerReplayTask"},
0,
app.journal("LedgerReplayTask")) app.journal("LedgerReplayTask"))
, inboundLedgers_(inboundLedgers) , inboundLedgers_(inboundLedgers)
, replayer_(replayer) , replayer_(replayer)
@@ -213,12 +214,12 @@ LedgerReplayTask::tryAdvance(ScopedLockType& sl)
return; return;
} }
complete_ = true; markComplete();
JLOG(journal_.info()) << "Completed " << hash_; JLOG(journal_.info()) << "Completed " << hash_;
} }
catch (std::runtime_error const&) catch (std::runtime_error const&)
{ {
failed_ = true; markFailed();
} }
} }
@@ -235,7 +236,7 @@ LedgerReplayTask::updateSkipList(
if (!parameter_.update(hash, seq, sList)) if (!parameter_.update(hash, seq, sList))
{ {
JLOG(journal_.error()) << "Parameter update failed " << hash_; JLOG(journal_.error()) << "Parameter update failed " << hash_;
failed_ = true; markFailed();
return; return;
} }
} }
@@ -252,7 +253,7 @@ LedgerReplayTask::onTimer(bool progress, ScopedLockType& sl)
JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_; JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
if (timeouts_ > maxTimeouts_) if (timeouts_ > maxTimeouts_)
{ {
failed_ = true; markFailed();
JLOG(journal_.debug()) JLOG(journal_.debug())
<< "LedgerReplayTask Failed, too many timeouts " << hash_; << "LedgerReplayTask Failed, too many timeouts " << hash_;
} }

View File

@@ -36,8 +36,9 @@ SkipListAcquire::SkipListAcquire(
ledgerHash, ledgerHash,
LedgerReplayParameters::SUB_TASK_TIMEOUT, LedgerReplayParameters::SUB_TASK_TIMEOUT,
{jtREPLAY_TASK, {jtREPLAY_TASK,
"SkipListAcquire", LedgerReplayParameters::MAX_QUEUED_TASKS,
LedgerReplayParameters::MAX_QUEUED_TASKS}, "SkipListAcquire"},
0,
app.journal("LedgerReplaySkipList")) app.journal("LedgerReplaySkipList"))
, inboundLedgers_(inboundLedgers) , inboundLedgers_(inboundLedgers)
, peerSet_(std::move(peerSet)) , peerSet_(std::move(peerSet))
@@ -71,7 +72,7 @@ SkipListAcquire::trigger(std::size_t limit, ScopedLockType& sl)
return; return;
} }
if (!fallBack_) if (userdata_ <= LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
{ {
peerSet_->addPeers( peerSet_->addPeers(
limit, limit,
@@ -91,24 +92,22 @@ SkipListAcquire::trigger(std::size_t limit, ScopedLockType& sl)
request.set_type( request.set_type(
protocol::TMLedgerMapType::lmACCOUNT_STATE); protocol::TMLedgerMapType::lmACCOUNT_STATE);
peerSet_->sendRequest(request, peer); peerSet_->sendRequest(request, peer);
return;
} }
else
JLOG(journal_.trace()) << "Add a no feature peer " << peer->id()
<< " for " << hash_;
if (++userdata_ ==
LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
{ {
JLOG(journal_.trace()) << "Add a no feature peer " JLOG(journal_.debug()) << "Fall back for " << hash_;
<< peer->id() << " for " << hash_; timerInterval_ =
if (++noFeaturePeerCount_ >= LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT;
LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
{
JLOG(journal_.debug()) << "Fall back for " << hash_;
timerInterval_ =
LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT;
fallBack_ = true;
}
} }
}); });
} }
if (fallBack_) if (userdata_ > LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT)
inboundLedgers_.acquire(hash_, 0, InboundLedger::Reason::GENERIC); inboundLedgers_.acquire(hash_, 0, InboundLedger::Reason::GENERIC);
} }
@@ -118,7 +117,7 @@ SkipListAcquire::onTimer(bool progress, ScopedLockType& sl)
JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_; JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
if (timeouts_ > LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS) if (timeouts_ > LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS)
{ {
failed_ = true; markFailed();
JLOG(journal_.debug()) << "too many timeouts " << hash_; JLOG(journal_.debug()) << "too many timeouts " << hash_;
notify(sl); notify(sl);
} }
@@ -161,7 +160,7 @@ SkipListAcquire::processData(
{ {
} }
failed_ = true; markFailed();
JLOG(journal_.error()) << "failed to retrieve Skip list from verified data " JLOG(journal_.error()) << "failed to retrieve Skip list from verified data "
<< hash_; << hash_;
notify(sl); notify(sl);
@@ -203,7 +202,7 @@ SkipListAcquire::retrieveSkipList(
} }
} }
failed_ = true; markFailed();
JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger " JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger "
<< hash_; << hash_;
notify(sl); notify(sl);
@@ -215,7 +214,7 @@ SkipListAcquire::onSkipListAcquired(
std::uint32_t ledgerSeq, std::uint32_t ledgerSeq,
ScopedLockType& sl) ScopedLockType& sl)
{ {
complete_ = true; markComplete();
data_ = std::make_shared<SkipListData>(ledgerSeq, skipList); data_ = std::make_shared<SkipListData>(ledgerSeq, skipList);
JLOG(journal_.debug()) << "Skip list acquired " << hash_; JLOG(journal_.debug()) << "Skip list acquired " << hash_;
notify(sl); notify(sl);
@@ -227,7 +226,7 @@ SkipListAcquire::notify(ScopedLockType& sl)
assert(isDone()); assert(isDone());
std::vector<OnSkipListDataCB> toCall; std::vector<OnSkipListDataCB> toCall;
std::swap(toCall, dataReadyCallbacks_); std::swap(toCall, dataReadyCallbacks_);
auto const good = !failed_; auto const good = !hasFailed();
sl.unlock(); sl.unlock();
for (auto& cb : toCall) for (auto& cb : toCall)

View File

@@ -157,8 +157,6 @@ private:
std::unique_ptr<PeerSet> peerSet_; std::unique_ptr<PeerSet> peerSet_;
std::vector<OnSkipListDataCB> dataReadyCallbacks_; std::vector<OnSkipListDataCB> dataReadyCallbacks_;
std::shared_ptr<SkipListData const> data_; std::shared_ptr<SkipListData const> data_;
std::uint32_t noFeaturePeerCount_ = 0;
bool fallBack_ = false;
friend class test::LedgerReplayClient; friend class test::LedgerReplayClient;
}; };

View File

@@ -31,16 +31,14 @@ TimeoutCounter::TimeoutCounter(
uint256 const& hash, uint256 const& hash,
std::chrono::milliseconds interval, std::chrono::milliseconds interval,
QueueJobParameter&& jobParameter, QueueJobParameter&& jobParameter,
std::uint8_t userdata,
beast::Journal journal) beast::Journal journal)
: app_(app) : app_(app)
, journal_(journal) , journal_(journal)
, hash_(hash)
, timeouts_(0)
, complete_(false)
, failed_(false)
, progress_(false)
, timerInterval_(interval) , timerInterval_(interval)
, queueJobParameter_(std::move(jobParameter)) , queueJobParameter_(std::move(jobParameter))
, hash_(hash)
, userdata_(userdata)
, timer_(app_.getIOService()) , timer_(app_.getIOService())
{ {
assert((timerInterval_ > 10ms) && (timerInterval_ < 30s)); assert((timerInterval_ > 10ms) && (timerInterval_ < 30s));
@@ -70,6 +68,7 @@ TimeoutCounter::queueJob(ScopedLockType& sl)
{ {
if (isDone()) if (isDone())
return; return;
if (queueJobParameter_.jobLimit && if (queueJobParameter_.jobLimit &&
app_.getJobQueue().getJobCountTotal(queueJobParameter_.jobType) >= app_.getJobQueue().getJobCountTotal(queueJobParameter_.jobType) >=
queueJobParameter_.jobLimit) queueJobParameter_.jobLimit)
@@ -85,42 +84,41 @@ TimeoutCounter::queueJob(ScopedLockType& sl)
queueJobParameter_.jobName, queueJobParameter_.jobName,
[wptr = pmDowncast()]() { [wptr = pmDowncast()]() {
if (auto sptr = wptr.lock(); sptr) if (auto sptr = wptr.lock(); sptr)
sptr->invokeOnTimer(); {
ScopedLockType sl(sptr->mtx_);
if (sptr->isDone())
return;
auto const progress = [&sptr]() {
if (sptr->state_.fetch_and(~PROGRESSING) & PROGRESSING)
return true;
return false;
}();
if (!progress)
{
++sptr->timeouts_;
JLOG(sptr->journal_.debug())
<< "Acquiring " << sptr->hash_ << ": timeout ("
<< sptr->timeouts_ << ")";
}
sptr->onTimer(progress, sl);
if (!sptr->isDone())
sptr->setTimer(sl);
}
}); });
} }
void
TimeoutCounter::invokeOnTimer()
{
ScopedLockType sl(mtx_);
if (isDone())
return;
if (!progress_)
{
++timeouts_;
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
<< " acquiring " << hash_;
onTimer(false, sl);
}
else
{
progress_ = false;
onTimer(true, sl);
}
if (!isDone())
setTimer(sl);
}
void void
TimeoutCounter::cancel() TimeoutCounter::cancel()
{ {
ScopedLockType sl(mtx_); ScopedLockType sl(mtx_);
if (!isDone()) if (!isDone())
{ {
failed_ = true; markFailed();
JLOG(journal_.info()) << "Cancel " << hash_; JLOG(journal_.info()) << "Cancel " << hash_;
} }
} }

View File

@@ -25,6 +25,7 @@
#include <ripple/beast/utility/Journal.h> #include <ripple/beast/utility/Journal.h>
#include <ripple/core/Job.h> #include <ripple/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp> #include <boost/asio/basic_waitable_timer.hpp>
#include <atomic>
#include <mutex> #include <mutex>
namespace ripple { namespace ripple {
@@ -40,11 +41,11 @@ namespace ripple {
1. The entry point is `setTimer`. 1. The entry point is `setTimer`.
2. After `mTimerInterval`, `queueJob` is called, which schedules a job to 2. After `mTimerInterval`, `queueJob` is called, which schedules a job to
call `invokeOnTimer` (or loops back to setTimer if there are too many do some housekeeping and invoke the timer callback (or loops back to
concurrent jobs). setTimer if there are too many concurrent jobs).
3. The job queue calls `invokeOnTimer` which either breaks the loop if 3. The job queue performs housekeeping and either breaks the loop if
`isDone` or calls `onTimer`. `isDone` or calls the virtual `onTimer`.
4. `onTimer` is the only "real" virtual method in this class. It is the 4. `onTimer` is the only "real" virtual method in this class. It is the
callback for when the timeout expires. Generally, its only responsibility callback for when the timeout expires. Generally, its only responsibility
@@ -53,7 +54,7 @@ namespace ripple {
timeouts. timeouts.
5. Once `onTimer` returns, if the object is still not `isDone`, then 5. Once `onTimer` returns, if the object is still not `isDone`, then
`invokeOnTimer` sets another timeout by looping back to setTimer. we set another timeout by looping back to `setTimer`.
This loop executes concurrently with another asynchronous sequence, This loop executes concurrently with another asynchronous sequence,
implemented by the subtype, that is trying to make progress and eventually implemented by the subtype, that is trying to make progress and eventually
@@ -77,14 +78,32 @@ public:
virtual void virtual void
cancel(); cancel();
bool
hasFailed() const
{
return (state_ & FAILED) == FAILED;
}
/** True if the object has completed its work. */
bool
hasCompleted() const
{
return (state_ & COMPLETED) == COMPLETED;
}
protected: protected:
using ScopedLockType = std::unique_lock<std::recursive_mutex>; using ScopedLockType = std::unique_lock<std::recursive_mutex>;
struct QueueJobParameter struct QueueJobParameter
{ {
JobType jobType; JobType const jobType;
std::string jobName;
std::optional<std::uint32_t> jobLimit; // The maximum number of jobs of this type already queued for us to
// queue a job.
std::uint32_t const jobLimit;
// The description of the job
std::string const jobName;
}; };
TimeoutCounter( TimeoutCounter(
@@ -92,6 +111,7 @@ protected:
uint256 const& targetHash, uint256 const& targetHash,
std::chrono::milliseconds timeoutInterval, std::chrono::milliseconds timeoutInterval,
QueueJobParameter&& jobParameter, QueueJobParameter&& jobParameter,
std::uint8_t userdata,
beast::Journal journal); beast::Journal journal);
virtual ~TimeoutCounter() = default; virtual ~TimeoutCounter() = default;
@@ -100,11 +120,11 @@ protected:
void void
setTimer(ScopedLockType&); setTimer(ScopedLockType&);
/** Queue a job to call invokeOnTimer(). */ /** Queue a job to call the timer callback. */
void void
queueJob(ScopedLockType&); queueJob(ScopedLockType&);
/** Hook called from invokeOnTimer(). */ /** Hook called from when the timer. */
virtual void virtual void
onTimer(bool progress, ScopedLockType&) = 0; onTimer(bool progress, ScopedLockType&) = 0;
@@ -115,34 +135,68 @@ protected:
bool bool
isDone() const isDone() const
{ {
return complete_ || failed_; return (state_ & (COMPLETED | FAILED)) != 0;
}
/** True if the object has made progress since the last time we checked. */
bool
hasProgressed() const
{
return PROGRESSING == (state_ & PROGRESSING);
}
/** Indicate that this object has completed its work. */
void
markComplete()
{
state_ = (state_ & ~FAILED) | COMPLETED;
}
void
markFailed()
{
state_ = (state_ & ~COMPLETED) | FAILED;
}
/** Indicate that this object made progress. */
void
makeProgress()
{
state_ |= PROGRESSING;
} }
// Used in this class for access to boost::asio::io_service and // Used in this class for access to boost::asio::io_service and
// ripple::Overlay. Used in subtypes for the kitchen sink. // ripple::Overlay. Used in subtypes for the kitchen sink.
Application& app_; Application& app_;
beast::Journal journal_; beast::Journal journal_;
mutable std::recursive_mutex mtx_;
/** The hash of the object (in practice, always a ledger) we are trying to
* fetch. */
uint256 const hash_;
int timeouts_;
bool complete_;
bool failed_;
/** Whether forward progress has been made. */
bool progress_;
/** The minimum time to wait between calls to execute(). */ /** The minimum time to wait between calls to execute(). */
std::chrono::milliseconds timerInterval_; std::chrono::milliseconds timerInterval_;
QueueJobParameter queueJobParameter_; QueueJobParameter queueJobParameter_;
/** The hash of the object (typically a ledger) we are trying to fetch. */
uint256 const hash_;
mutable std::recursive_mutex mtx_;
std::uint16_t timeouts_ = 0;
/** A small amount of data for derived classes to use as needed. */
std::atomic<std::uint8_t> userdata_ = 0;
private: private:
/** Calls onTimer() if in the right state. /** Used to track the current state of the object. */
* Only called by queueJob(). std::atomic<std::uint8_t> state_ = 0;
*/
void /** If set, the acquisition has been completed. */
invokeOnTimer(); static constexpr std::uint8_t const COMPLETED = 0x01;
/** If set, the acquisition has failed. */
static constexpr std::uint8_t const FAILED = 0x02;
/** If set, the acquisition has made some progress. */
static constexpr std::uint8_t const PROGRESSING = 0x04;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_; boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
}; };

View File

@@ -35,10 +35,11 @@ using namespace std::chrono_literals;
// Timeout interval in milliseconds // Timeout interval in milliseconds
auto constexpr TX_ACQUIRE_TIMEOUT = 250ms; auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
enum { // If set, we have acquired the root of the TX SHAMap
NORM_TIMEOUTS = 4, static constexpr std::uint8_t const TX_HAVE_ROOT = 0x01;
MAX_TIMEOUTS = 20,
}; static constexpr std::uint16_t const TX_NORM_TIMEOUTS = 4;
static constexpr std::uint16_t const TX_MAX_TIMEOUTS = 20;
TransactionAcquire::TransactionAcquire( TransactionAcquire::TransactionAcquire(
Application& app, Application& app,
@@ -48,9 +49,9 @@ TransactionAcquire::TransactionAcquire(
app, app,
hash, hash,
TX_ACQUIRE_TIMEOUT, TX_ACQUIRE_TIMEOUT,
{jtTXN_DATA, "TransactionAcquire", {}}, {jtTXN_DATA, 0, "TransactionAcquire"},
0,
app.journal("TransactionAcquire")) app.journal("TransactionAcquire"))
, mHaveRoot(false)
, mPeerSet(std::move(peerSet)) , mPeerSet(std::move(peerSet))
{ {
mMap = std::make_shared<SHAMap>( mMap = std::make_shared<SHAMap>(
@@ -61,43 +62,36 @@ TransactionAcquire::TransactionAcquire(
void void
TransactionAcquire::done() TransactionAcquire::done()
{ {
// We hold a PeerSet lock and so cannot do real work here if (hasFailed())
if (failed_)
{ {
JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_; JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
return;
} }
else
{
JLOG(journal_.debug()) << "Acquired TX set " << hash_;
mMap->setImmutable();
uint256 const& hash(hash_); JLOG(journal_.debug()) << "Acquired TX set " << hash_;
std::shared_ptr<SHAMap> const& map(mMap); mMap->setImmutable();
auto const pap = &app_;
// Note that, when we're in the process of shutting down, addJob() // If we are in the process of shutting down, the job queue may refuse
// may reject the request. If that happens then giveSet() will // to queue the job; that's fine.
// not be called. That's fine. According to David the giveSet() call app_.getJobQueue().addJob(
// just updates the consensus and related structures when we acquire jtTXN_DATA,
// a transaction set. No need to update them if we're shutting down. "completeAcquire",
app_.getJobQueue().addJob( [&app = app_, hash = hash_, map = mMap]() {
jtTXN_DATA, "completeAcquire", [pap, hash, map]() { app.getInboundTransactions().giveSet(hash, map, true);
pap->getInboundTransactions().giveSet(hash, map, true); });
});
}
} }
void void
TransactionAcquire::onTimer(bool progress, ScopedLockType& psl) TransactionAcquire::onTimer(bool progress, ScopedLockType& psl)
{ {
if (timeouts_ > MAX_TIMEOUTS) if (timeouts_ > TX_MAX_TIMEOUTS)
{ {
failed_ = true; markFailed();
done(); done();
return; return;
} }
if (timeouts_ >= NORM_TIMEOUTS) if (timeouts_ >= TX_NORM_TIMEOUTS)
trigger(nullptr); trigger(nullptr);
addPeers(1); addPeers(1);
@@ -112,18 +106,18 @@ TransactionAcquire::pmDowncast()
void void
TransactionAcquire::trigger(std::shared_ptr<Peer> const& peer) TransactionAcquire::trigger(std::shared_ptr<Peer> const& peer)
{ {
if (complete_) if (hasCompleted())
{ {
JLOG(journal_.info()) << "trigger after complete"; JLOG(journal_.info()) << "trigger after complete";
return; return;
} }
if (failed_) if (hasFailed())
{ {
JLOG(journal_.info()) << "trigger after fail"; JLOG(journal_.info()) << "trigger after fail";
return; return;
} }
if (!mHaveRoot) if (!(userdata_ & TX_HAVE_ROOT))
{ {
JLOG(journal_.trace()) << "TransactionAcquire::trigger " JLOG(journal_.trace()) << "TransactionAcquire::trigger "
<< (peer ? "havePeer" : "noPeer") << " no root"; << (peer ? "havePeer" : "noPeer") << " no root";
@@ -140,7 +134,7 @@ TransactionAcquire::trigger(std::shared_ptr<Peer> const& peer)
} }
else if (!mMap->isValid()) else if (!mMap->isValid())
{ {
failed_ = true; markFailed();
done(); done();
} }
else else
@@ -151,9 +145,9 @@ TransactionAcquire::trigger(std::shared_ptr<Peer> const& peer)
if (nodes.empty()) if (nodes.empty())
{ {
if (mMap->isValid()) if (mMap->isValid())
complete_ = true; markComplete();
else else
failed_ = true; markFailed();
done(); done();
return; return;
@@ -181,16 +175,16 @@ TransactionAcquire::takeNodes(
{ {
ScopedLockType sl(mtx_); ScopedLockType sl(mtx_);
if (complete_) if (hasCompleted())
{ {
JLOG(journal_.trace()) << "TX set complete"; JLOG(journal_.trace()) << "TX set complete";
return SHAMapAddNode(); return {};
} }
if (failed_) if (hasFailed())
{ {
JLOG(journal_.trace()) << "TX set failed"; JLOG(journal_.trace()) << "TX set failed";
return SHAMapAddNode(); return {};
} }
try try
@@ -204,7 +198,7 @@ TransactionAcquire::takeNodes(
{ {
if (d.first.isRoot()) if (d.first.isRoot())
{ {
if (mHaveRoot) if (userdata_ & TX_HAVE_ROOT)
JLOG(journal_.debug()) JLOG(journal_.debug())
<< "Got root TXS node, already have it"; << "Got root TXS node, already have it";
else if (!mMap->addRootNode( else if (!mMap->addRootNode(
@@ -214,7 +208,7 @@ TransactionAcquire::takeNodes(
JLOG(journal_.warn()) << "TX acquire got bad root node"; JLOG(journal_.warn()) << "TX acquire got bad root node";
} }
else else
mHaveRoot = true; userdata_ |= TX_HAVE_ROOT;
} }
else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood()) else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
{ {
@@ -224,7 +218,7 @@ TransactionAcquire::takeNodes(
} }
trigger(peer); trigger(peer);
progress_ = true; makeProgress();
return SHAMapAddNode::useful(); return SHAMapAddNode::useful();
} }
catch (std::exception const& ex) catch (std::exception const& ex)
@@ -260,8 +254,8 @@ TransactionAcquire::stillNeed()
{ {
ScopedLockType sl(mtx_); ScopedLockType sl(mtx_);
if (timeouts_ > NORM_TIMEOUTS) if (timeouts_ > TX_NORM_TIMEOUTS)
timeouts_ = NORM_TIMEOUTS; timeouts_ = TX_NORM_TIMEOUTS;
} }
} // namespace ripple } // namespace ripple

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_APP_LEDGER_TRANSACTIONACQUIRE_H_INCLUDED #ifndef RIPPLE_APP_LEDGER_TRANSACTIONACQUIRE_H_INCLUDED
#define RIPPLE_APP_LEDGER_TRANSACTIONACQUIRE_H_INCLUDED #define RIPPLE_APP_LEDGER_TRANSACTIONACQUIRE_H_INCLUDED
#include <ripple/app/ledger/impl/TimeoutCounter.h>
#include <ripple/app/main/Application.h> #include <ripple/app/main/Application.h>
#include <ripple/overlay/PeerSet.h> #include <ripple/overlay/PeerSet.h>
#include <ripple/shamap/SHAMap.h> #include <ripple/shamap/SHAMap.h>
@@ -55,12 +56,16 @@ public:
private: private:
std::shared_ptr<SHAMap> mMap; std::shared_ptr<SHAMap> mMap;
bool mHaveRoot;
std::unique_ptr<PeerSet> mPeerSet; std::unique_ptr<PeerSet> mPeerSet;
void void
onTimer(bool progress, ScopedLockType& peerSetLock) override; onTimer(bool progress, ScopedLockType& peerSetLock) override;
/** The have acquired the tx set we were looking to.
@note Note that this function is called with the PeerSet lock
held, so we cannot do real work in it.
*/
void void
done(); done();
@@ -69,6 +74,7 @@ private:
void void
trigger(std::shared_ptr<Peer> const&); trigger(std::shared_ptr<Peer> const&);
std::weak_ptr<TimeoutCounter> std::weak_ptr<TimeoutCounter>
pmDowncast() override; pmDowncast() override;
}; };

View File

@@ -395,7 +395,8 @@ public:
*this, *this,
m_collectorManager->collector(), m_collectorManager->collector(),
[this](std::shared_ptr<SHAMap> const& set, bool fromAcquire) { [this](std::shared_ptr<SHAMap> const& set, bool fromAcquire) {
gotTXSet(set, fromAcquire); if (set)
m_networkOPs->mapComplete(set, fromAcquire);
})) }))
, m_ledgerReplayer(std::make_unique<LedgerReplayer>( , m_ledgerReplayer(std::make_unique<LedgerReplayer>(
@@ -650,13 +651,6 @@ public:
return m_acceptedLedgerCache; return m_acceptedLedgerCache;
} }
void
gotTXSet(std::shared_ptr<SHAMap> const& set, bool fromAcquire)
{
if (set)
m_networkOPs->mapComplete(set, fromAcquire);
}
TransactionMaster& TransactionMaster&
getMasterTransaction() override getMasterTransaction() override
{ {

View File

@@ -133,6 +133,9 @@ public:
public: public:
Application(); Application();
Application(Application const&) = delete;
Application(Application&&) = delete;
virtual ~Application() = default; virtual ~Application() = default;
virtual bool virtual bool

View File

@@ -249,8 +249,8 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (detaching_) if (detaching_)
return; return;
auto validator = m->getValidatorKey(); if (auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator)) validator && !squelch_.expireSquelch(*validator))
return; return;
overlay_.reportTraffic( overlay_.reportTraffic(
@@ -267,30 +267,21 @@ PeerImp::send(std::shared_ptr<Message> const& m)
// a small senq periodically // a small senq periodically
large_sendq_ = 0; large_sendq_ = 0;
} }
else if (auto sink = journal_.debug();
sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
{
std::string const n = name();
sink << (n.empty() ? remote_address_.to_string() : n)
<< " sendq: " << sendq_size;
}
send_queue_.push(m); send_queue_.push(m);
if (sendq_size != 0) if (sendq_size == 0)
return; boost::asio::async_write(
stream_,
boost::asio::async_write( boost::asio::buffer(
stream_, send_queue_.front()->getBuffer(compressionEnabled_)),
boost::asio::buffer( bind_executor(
send_queue_.front()->getBuffer(compressionEnabled_)), strand_,
bind_executor( std::bind(
strand_, &PeerImp::onWriteMessage,
std::bind( shared_from_this(),
&PeerImp::onWriteMessage, std::placeholders::_1,
shared_from_this(), std::placeholders::_2)));
std::placeholders::_1,
std::placeholders::_2)));
} }
void void
@@ -359,10 +350,9 @@ PeerImp::charge(Resource::Charge const& fee)
bool bool
PeerImp::crawl() const PeerImp::crawl() const
{ {
auto const iter = headers_.find("Crawl"); if (auto const iter = headers_.find("Crawl"); iter != headers_.end())
if (iter == headers_.end()) return boost::iequals(iter->value(), "public");
return false; return false;
return boost::iequals(iter->value(), "public");
} }
bool bool
@@ -1985,14 +1975,38 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
// Otherwise check if received data for a candidate transaction set // Otherwise check if received data for a candidate transaction set
if (m->type() == protocol::liTS_CANDIDATE) if (m->type() == protocol::liTS_CANDIDATE)
{ {
std::weak_ptr<PeerImp> weak{shared_from_this()}; std::vector<std::pair<SHAMapNodeID, Slice>> data;
data.reserve(m->nodes().size());
for (auto const& node : m->nodes())
{
if (!node.has_nodeid() || !node.has_nodedata())
{
charge(Resource::feeInvalidRequest);
return;
}
auto const id = deserializeSHAMapNodeID(node.nodeid());
if (!id)
{
charge(Resource::feeBadData);
return;
}
data.emplace_back(*id, makeSlice(node.nodedata()));
}
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() { jtTXN_DATA,
if (auto peer = weak.lock()) "recvPeerData",
{ [w = weak_from_this(), ledgerHash, d = std::move(data), m]() {
peer->app_.getInboundTransactions().gotData( // We capture `m` to keep its data alive, because we're
ledgerHash, peer, m); // implicitly referencing it from `d` (it holds slices!)
} (void)m;
if (auto p = w.lock())
p->app_.getInboundTransactions().gotData(ledgerHash, p, d);
}); });
return; return;
} }

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_SHAMAP_SHAMAPADDNODE_H_INCLUDED #ifndef RIPPLE_SHAMAP_SHAMAPADDNODE_H_INCLUDED
#define RIPPLE_SHAMAP_SHAMAPADDNODE_H_INCLUDED #define RIPPLE_SHAMAP_SHAMAPADDNODE_H_INCLUDED
#include <cstdint>
#include <string> #include <string>
namespace ripple { namespace ripple {
@@ -28,102 +29,97 @@ namespace ripple {
class SHAMapAddNode class SHAMapAddNode
{ {
private: private:
int mGood; std::uint32_t good_;
int mBad; std::uint32_t bad_;
int mDuplicate; std::uint32_t duplicate_;
constexpr SHAMapAddNode(int good, int bad, int duplicate)
: good_(good), bad_(bad), duplicate_(duplicate)
{
}
public: public:
SHAMapAddNode(); constexpr SHAMapAddNode() : SHAMapAddNode(0, 0, 0)
{
}
void void
incInvalid(); incInvalid();
void void
incUseful(); incUseful();
void void
incDuplicate(); incDuplicate();
void
reset(); [[nodiscard]] std::uint32_t
int
getGood() const; getGood() const;
bool
[[nodiscard]] bool
isGood() const; isGood() const;
bool
[[nodiscard]] bool
isInvalid() const; isInvalid() const;
bool
[[nodiscard]] bool
isUseful() const; isUseful() const;
std::string
[[nodiscard]] std::string
get() const; get() const;
SHAMapAddNode& SHAMapAddNode&
operator+=(SHAMapAddNode const& n); operator+=(SHAMapAddNode const& n);
static SHAMapAddNode [[nodiscard]] static SHAMapAddNode
duplicate(); duplicate();
static SHAMapAddNode
[[nodiscard]] static SHAMapAddNode
useful(); useful();
static SHAMapAddNode
[[nodiscard]] static SHAMapAddNode
invalid(); invalid();
private:
SHAMapAddNode(int good, int bad, int duplicate);
}; };
inline SHAMapAddNode::SHAMapAddNode() : mGood(0), mBad(0), mDuplicate(0)
{
}
inline SHAMapAddNode::SHAMapAddNode(int good, int bad, int duplicate)
: mGood(good), mBad(bad), mDuplicate(duplicate)
{
}
inline void inline void
SHAMapAddNode::incInvalid() SHAMapAddNode::incInvalid()
{ {
++mBad; ++bad_;
} }
inline void inline void
SHAMapAddNode::incUseful() SHAMapAddNode::incUseful()
{ {
++mGood; ++good_;
} }
inline void inline void
SHAMapAddNode::incDuplicate() SHAMapAddNode::incDuplicate()
{ {
++mDuplicate; ++duplicate_;
} }
inline void inline std::uint32_t
SHAMapAddNode::reset()
{
mGood = mBad = mDuplicate = 0;
}
inline int
SHAMapAddNode::getGood() const SHAMapAddNode::getGood() const
{ {
return mGood; return good_;
} }
inline bool inline bool
SHAMapAddNode::isInvalid() const SHAMapAddNode::isInvalid() const
{ {
return mBad > 0; return bad_ != 0;
} }
inline bool inline bool
SHAMapAddNode::isUseful() const SHAMapAddNode::isUseful() const
{ {
return mGood > 0; return good_ != 0;
} }
inline SHAMapAddNode& inline SHAMapAddNode&
SHAMapAddNode::operator+=(SHAMapAddNode const& n) SHAMapAddNode::operator+=(SHAMapAddNode const& n)
{ {
mGood += n.mGood; good_ += n.good_;
mBad += n.mBad; bad_ += n.bad_;
mDuplicate += n.mDuplicate; duplicate_ += n.duplicate_;
return *this; return *this;
} }
@@ -131,53 +127,33 @@ SHAMapAddNode::operator+=(SHAMapAddNode const& n)
inline bool inline bool
SHAMapAddNode::isGood() const SHAMapAddNode::isGood() const
{ {
return (mGood + mDuplicate) > mBad; return (good_ + duplicate_) > bad_;
} }
inline SHAMapAddNode inline SHAMapAddNode
SHAMapAddNode::duplicate() SHAMapAddNode::duplicate()
{ {
return SHAMapAddNode(0, 0, 1); return {0, 0, 1};
} }
inline SHAMapAddNode inline SHAMapAddNode
SHAMapAddNode::useful() SHAMapAddNode::useful()
{ {
return SHAMapAddNode(1, 0, 0); return {1, 0, 0};
} }
inline SHAMapAddNode inline SHAMapAddNode
SHAMapAddNode::invalid() SHAMapAddNode::invalid()
{ {
return SHAMapAddNode(0, 1, 0); return {0, 1, 0};
} }
inline std::string inline std::string
SHAMapAddNode::get() const SHAMapAddNode::get() const
{ {
std::string ret; return "{ good: " + std::to_string(good_) +
if (mGood > 0) ", bad: " + std::to_string(bad_) +
{ ", dup: " + std::to_string(duplicate_) + " }";
ret.append("good:");
ret.append(std::to_string(mGood));
}
if (mBad > 0)
{
if (!ret.empty())
ret.append(" ");
ret.append("bad:");
ret.append(std::to_string(mBad));
}
if (mDuplicate > 0)
{
if (!ret.empty())
ret.append(" ");
ret.append("dupe:");
ret.append(std::to_string(mDuplicate));
}
if (ret.empty())
ret = "no nodes processed";
return ret;
} }
} // namespace ripple } // namespace ripple

View File

@@ -722,9 +722,9 @@ public:
TaskStatus TaskStatus
taskStatus(std::shared_ptr<T> const& t) taskStatus(std::shared_ptr<T> const& t)
{ {
if (t->failed_) if (t->hasFailed())
return TaskStatus::Failed; return TaskStatus::Failed;
if (t->complete_) if (t->hasCompleted())
return TaskStatus::Completed; return TaskStatus::Completed;
return TaskStatus::NotDone; return TaskStatus::NotDone;
} }