1#include <xrpld/app/ledger/InboundLedgers.h>
2#include <xrpld/app/ledger/LedgerReplayTask.h>
3#include <xrpld/app/ledger/LedgerReplayer.h>
4#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
5#include <xrpld/app/ledger/detail/SkipListAcquire.h>
11 uint256 const& finishLedgerHash,
13 : reason_(r), finishHash_(finishLedgerHash), totalLedgers_(totalNumLedgers)
16 finishLedgerHash.
isNonZero() && totalNumLedgers > 0,
17 "ripple::LedgerReplayTask::TaskParameter::TaskParameter : valid "
27 if (finishHash_ != hash || sList.
size() + 1 < totalLedgers_ || full_)
33 startHash_ = skipList_[skipList_.size() - totalLedgers_];
35 startHash_.isNonZero(),
36 "ripple::LedgerReplayTask::TaskParameter::update : nonzero start hash");
37 startSeq_ = finishSeq_ - totalLedgers_ + 1;
46 if (reason_ == existingTask.
reason_)
54 if (existingTask.
full_)
56 auto const& exList = existingTask.
skipList_;
57 if (
auto i =
std::find(exList.begin(), exList.end(), finishHash_);
61 totalLedgers_ + (exList.end() - i) - 1;
77 parameter.finishHash_,
78 LedgerReplayParameters::TASK_TIMEOUT,
82 app.journal(
"LedgerReplayTask"))
83 , inboundLedgers_(inboundLedgers)
85 , parameter_(parameter)
88 parameter.totalLedgers_ *
90 , skipListAcquirer_(skipListAcquirer)
92 JLOG(journal_.trace()) <<
"Create " << hash_;
107 if (
auto sptr = wptr.
lock(); sptr)
115 auto const skipListData = sptr->skipListAcquirer_->getData();
116 sptr->updateSkipList(
117 hash, skipListData->ledgerSeq, skipListData->skipList);
161 JLOG(
journal_.
trace()) <<
"Delta " << deltaHash <<
" ready for task "
173 :
", waiting to fill parameter")
175 <<
", totalDeltas=" <<
deltas_.
size() <<
", parent "
189 parent_->seq() + 1 == delta->ledgerSeq_,
190 "ripple::LedgerReplayTask::tryAdvance : consecutive sequence");
191 if (
auto l = delta->tryBuild(
parent_); l)
194 <<
"Task " <<
hash_ <<
" got ledger " << l->info().hash
244 <<
"LedgerReplayTask Failed, too many timeouts " <<
hash_;
262 delta->addDataCallback(
264 if (auto sptr = wptr.lock(); sptr)
269 sptr->deltaReady(hash);
273 ScopedLockType sl(mtx_);
276 JLOG(journal_.trace())
277 <<
"addDelta task " << hash_ <<
" deltaIndex=" << deltaToBuild_
278 <<
" totalDeltas=" << deltas_.size();
281 deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_,
282 "ripple::LedgerReplayTask::addDelta : no deltas or consecutive "
284 deltas_.push_back(delta);
289LedgerReplayTask::finished()
const
Stream trace() const
Severity stream access functions.
virtual LedgerMaster & getLedgerMaster()=0
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
TaskParameter(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
constructor
bool update(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
fill all the fields that was not filled during construction
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
std::vector< uint256 > skipList_
InboundLedger::Reason reason_
std::uint32_t totalLedgers_
void trigger(ScopedLockType &sl)
Trigger another round.
std::vector< std::shared_ptr< LedgerDeltaAcquire > > deltas_
void addDelta(std::shared_ptr< LedgerDeltaAcquire > const &delta)
add a new LedgerDeltaAcquire subtask
void deltaReady(uint256 const &deltaHash)
Notify this task (by a LedgerDeltaAcquire subtask) that a delta is ready.
LedgerReplayer & replayer_
void init()
Start the task.
InboundLedgers & inboundLedgers_
std::shared_ptr< SkipListAcquire > skipListAcquirer_
void updateSkipList(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
Update this task (by a SkipListAcquire subtask) when skip list is ready.
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
LedgerReplayTask(Application &app, InboundLedgers &inboundLedgers, LedgerReplayer &replayer, std::shared_ptr< SkipListAcquire > &skipListAcquirer, TaskParameter &¶meter)
Constructor.
void tryAdvance(ScopedLockType &sl)
Try to build more ledgers.
std::shared_ptr< Ledger const > parent_
void onTimer(bool progress, ScopedLockType &sl) override
Hook called from invokeOnTimer().
Manages the lifetime of ledger replay tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
This class is an "active" object.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
static constexpr std::size_t size()
T emplace_back(T... args)
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MINIMUM
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MULTIPLIER
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
T shared_from_this(T... args)