1#include <xrpld/app/ledger/LedgerReplayer.h>
2#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
12 , inboundLedgers_(inboundLedgers)
13 , peerSetBuilder_(
std::move(peerSetBuilder))
14 , j_(app.journal(
"LedgerReplayer"))
29 "xrpl::LedgerReplayer::replay : valid inputs");
35 bool newSkipList =
false;
42 JLOG(
j_.
info()) <<
"Too many replay tasks, dropping new task " << parameter.
finishHash_;
46 for (
auto const& t :
tasks_)
50 JLOG(
j_.
info()) <<
"Task " << parameter.
finishHash_ <<
" with " << totalNumLedgers
51 <<
" ledgers merged into an existing task.";
55 JLOG(
j_.
info()) <<
"Replay " << totalNumLedgers <<
" ledgers. Finish ledger hash " << parameter.
finishHash_;
59 skipList = i->second.lock();
89 auto const& parameter = task->getTaskParameter();
90 JLOG(
j_.
trace()) <<
"Creating " << parameter.totalLedgers_ - 1 <<
" deltas";
91 if (parameter.totalLedgers_ > 1)
93 auto skipListItem =
std::find(parameter.skipList_.begin(), parameter.skipList_.end(), parameter.startHash_);
94 if (skipListItem == parameter.skipList_.end() || ++skipListItem == parameter.skipList_.end())
96 JLOG(
j_.
error()) <<
"Task parameter error when creating deltas " << parameter.finishHash_;
101 seq <= parameter.finishSeq_ && skipListItem != parameter.skipList_.end();
102 ++seq, ++skipListItem)
105 bool newDelta =
false;
110 auto i =
deltas_.find(*skipListItem);
112 delta = i->second.lock();
118 deltas_[*skipListItem] = delta;
123 task->addDelta(delta);
139 skipList = i->second.lock();
148 skipList->processData(info.
seq, item);
160 delta = i->second.lock();
169 delta->processData(info, std::move(txns));
179 <<
" skipLists, and " <<
deltas_.size() <<
" deltas.";
185 [
this](
auto const& t) ->
bool {
188 JLOG(j_.debug()) <<
"Sweep task " << t->getTaskParameter().finishHash_;
195 auto removeCannotLocked = [](
auto& subTasks) {
196 for (
auto it = subTasks.begin(); it != subTasks.end();)
198 if (
auto item = it->second.lock(); !item)
200 it = subTasks.erase(it);
210 <<
" LedgerReplayer sweep lock duration "
216LedgerReplayer::stop()
218 JLOG(j_.info()) <<
"Stopping...";
221 std::for_each(tasks_.begin(), tasks_.end(), [](
auto& i) { i->cancel(); });
223 auto lockAndCancel = [](
auto& i) {
224 if (
auto sptr = i.second.lock(); sptr)
229 std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
231 std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
235 JLOG(j_.info()) <<
"Stopped";
Stream trace() const
Severity stream access functions.
virtual bool isStopping() const =0
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
InboundLedgers & inboundLedgers_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
void gotReplayDelta(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
void gotSkipList(LedgerHeader const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
void sweep()
Remove completed tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
std::uint32_t constexpr MAX_TASKS
std::uint32_t constexpr MAX_TASK_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.