1#include <xrpld/app/ledger/LedgerReplayer.h>
2#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
12 , inboundLedgers_(inboundLedgers)
13 , peerSetBuilder_(
std::move(peerSetBuilder))
14 , j_(app.journal(
"LedgerReplayer"))
27 uint256 const& finishLedgerHash,
31 finishLedgerHash.
isNonZero() && totalNumLedgers > 0 &&
33 "ripple::LedgerReplayer::replay : valid inputs");
36 r, finishLedgerHash, totalNumLedgers);
40 bool newSkipList =
false;
47 JLOG(
j_.
info()) <<
"Too many replay tasks, dropping new task "
52 for (
auto const& t :
tasks_)
58 <<
" ledgers merged into an existing task.";
62 JLOG(
j_.
info()) <<
"Replay " << totalNumLedgers
63 <<
" ledgers. Finish ledger hash "
68 skipList = i->second.lock();
102 auto const& parameter = task->getTaskParameter();
103 JLOG(
j_.
trace()) <<
"Creating " << parameter.totalLedgers_ - 1 <<
" deltas";
104 if (parameter.totalLedgers_ > 1)
107 parameter.skipList_.begin(),
108 parameter.skipList_.end(),
109 parameter.startHash_);
110 if (skipListItem == parameter.skipList_.end() ||
111 ++skipListItem == parameter.skipList_.end())
113 JLOG(
j_.
error()) <<
"Task parameter error when creating deltas "
114 << parameter.finishHash_;
119 seq <= parameter.finishSeq_ &&
120 skipListItem != parameter.skipList_.end();
121 ++seq, ++skipListItem)
124 bool newDelta =
false;
129 auto i =
deltas_.find(*skipListItem);
131 delta = i->second.lock();
141 deltas_[*skipListItem] = delta;
146 task->addDelta(delta);
156 boost::intrusive_ptr<SHAMapItem const>
const& item)
164 skipList = i->second.lock();
173 skipList->processData(info.
seq, item);
187 delta = i->second.lock();
196 delta->processData(info, std::move(txns));
205 JLOG(
j_.
debug()) <<
"Sweeping, LedgerReplayer has " <<
tasks_.size()
207 <<
" skipLists, and " <<
deltas_.size() <<
" deltas.";
213 [
this](
auto const& t) ->
bool {
216 JLOG(j_.debug()) <<
"Sweep task "
217 << t->getTaskParameter().finishHash_;
224 auto removeCannotLocked = [](
auto& subTasks) {
225 for (
auto it = subTasks.begin(); it != subTasks.end();)
227 if (
auto item = it->second.lock(); !item)
229 it = subTasks.erase(it);
238 JLOG(j_.
debug()) <<
" LedgerReplayer sweep lock duration "
239 << std::chrono::duration_cast<std::chrono::milliseconds>(
246LedgerReplayer::stop()
248 JLOG(j_.
info()) <<
"Stopping...";
252 tasks_.begin(), tasks_.end(), [](
auto& i) { i->cancel(); });
254 auto lockAndCancel = [](
auto& i) {
255 if (
auto sptr = i.second.lock(); sptr)
260 std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
262 std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
266 JLOG(j_.
info()) <<
"Stopped";
Stream trace() const
Severity stream access functions.
virtual bool isStopping() const =0
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
void gotReplayDelta(LedgerInfo const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
void sweep()
Remove completed tasks.
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
void gotSkipList(LedgerInfo const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
InboundLedgers & inboundLedgers_
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
std::uint32_t constexpr MAX_TASK_SIZE
std::uint32_t constexpr MAX_TASKS
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.