rippled
Loading...
Searching...
No Matches
LedgerReplayer.cpp
1#include <xrpld/app/ledger/LedgerReplayer.h>
2#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
4
5namespace xrpl {
6
8 Application& app,
9 InboundLedgers& inboundLedgers,
11 : app_(app)
12 , inboundLedgers_(inboundLedgers)
13 , peerSetBuilder_(std::move(peerSetBuilder))
14 , j_(app.journal("LedgerReplayer"))
15{
16}
17
23
24void
25LedgerReplayer::replay(InboundLedger::Reason r, uint256 const& finishLedgerHash, std::uint32_t totalNumLedgers)
26{
27 XRPL_ASSERT(
28 finishLedgerHash.isNonZero() && totalNumLedgers > 0 && totalNumLedgers <= LedgerReplayParameters::MAX_TASK_SIZE,
29 "xrpl::LedgerReplayer::replay : valid inputs");
30
31 LedgerReplayTask::TaskParameter parameter(r, finishLedgerHash, totalNumLedgers);
32
35 bool newSkipList = false;
36 {
38 if (app_.isStopping())
39 return;
41 {
42 JLOG(j_.info()) << "Too many replay tasks, dropping new task " << parameter.finishHash_;
43 return;
44 }
45
46 for (auto const& t : tasks_)
47 {
48 if (parameter.canMergeInto(t->getTaskParameter()))
49 {
50 JLOG(j_.info()) << "Task " << parameter.finishHash_ << " with " << totalNumLedgers
51 << " ledgers merged into an existing task.";
52 return;
53 }
54 }
55 JLOG(j_.info()) << "Replay " << totalNumLedgers << " ledgers. Finish ledger hash " << parameter.finishHash_;
56
57 auto i = skipLists_.find(parameter.finishHash_);
58 if (i != skipLists_.end())
59 skipList = i->second.lock();
60
61 if (!skipList) // cannot find, or found but cannot lock
62 {
64 app_, inboundLedgers_, parameter.finishHash_, peerSetBuilder_->build());
65 skipLists_[parameter.finishHash_] = skipList;
66 newSkipList = true;
67 }
68
69 task = std::make_shared<LedgerReplayTask>(app_, inboundLedgers_, *this, skipList, std::move(parameter));
70 tasks_.push_back(task);
71 }
72
73 if (newSkipList)
74 skipList->init(1);
75 // task init after skipList init, could save a timeout
76 task->init();
77}
78
79void
81{
82 {
83 // TODO for use cases like Consensus (i.e. totalLedgers = 1 or small):
84 // check if the last closed or validated ledger l the local node has
85 // is in the skip list and is an ancestor of parameter.startLedger
86 // that has to be downloaded, if so expand the task to start with l.
87 }
88
89 auto const& parameter = task->getTaskParameter();
90 JLOG(j_.trace()) << "Creating " << parameter.totalLedgers_ - 1 << " deltas";
91 if (parameter.totalLedgers_ > 1)
92 {
93 auto skipListItem = std::find(parameter.skipList_.begin(), parameter.skipList_.end(), parameter.startHash_);
94 if (skipListItem == parameter.skipList_.end() || ++skipListItem == parameter.skipList_.end())
95 {
96 JLOG(j_.error()) << "Task parameter error when creating deltas " << parameter.finishHash_;
97 return;
98 }
99
100 for (std::uint32_t seq = parameter.startSeq_ + 1;
101 seq <= parameter.finishSeq_ && skipListItem != parameter.skipList_.end();
102 ++seq, ++skipListItem)
103 {
105 bool newDelta = false;
106 {
108 if (app_.isStopping())
109 return;
110 auto i = deltas_.find(*skipListItem);
111 if (i != deltas_.end())
112 delta = i->second.lock();
113
114 if (!delta) // cannot find, or found but cannot lock
115 {
117 app_, inboundLedgers_, *skipListItem, seq, peerSetBuilder_->build());
118 deltas_[*skipListItem] = delta;
119 newDelta = true;
120 }
121 }
122
123 task->addDelta(delta);
124 if (newDelta)
125 delta->init(1);
126 }
127 }
128}
129
130void
131LedgerReplayer::gotSkipList(LedgerHeader const& info, boost::intrusive_ptr<SHAMapItem const> const& item)
132{
134 {
136 auto i = skipLists_.find(info.hash);
137 if (i == skipLists_.end())
138 return;
139 skipList = i->second.lock();
140 if (!skipList)
141 {
142 skipLists_.erase(i);
143 return;
144 }
145 }
146
147 if (skipList)
148 skipList->processData(info.seq, item);
149}
150
151void
153{
155 {
157 auto i = deltas_.find(info.hash);
158 if (i == deltas_.end())
159 return;
160 delta = i->second.lock();
161 if (!delta)
162 {
163 deltas_.erase(i);
164 return;
165 }
166 }
167
168 if (delta)
169 delta->processData(info, std::move(txns));
170}
171
172void
174{
175 auto const start = std::chrono::steady_clock::now();
176 {
178 JLOG(j_.debug()) << "Sweeping, LedgerReplayer has " << tasks_.size() << " tasks, " << skipLists_.size()
179 << " skipLists, and " << deltas_.size() << " deltas.";
180
181 tasks_.erase(
183 tasks_.begin(),
184 tasks_.end(),
185 [this](auto const& t) -> bool {
186 if (t->finished())
187 {
188 JLOG(j_.debug()) << "Sweep task " << t->getTaskParameter().finishHash_;
189 return true;
190 }
191 return false;
192 }),
193 tasks_.end());
194
195 auto removeCannotLocked = [](auto& subTasks) {
196 for (auto it = subTasks.begin(); it != subTasks.end();)
197 {
198 if (auto item = it->second.lock(); !item)
199 {
200 it = subTasks.erase(it);
201 }
202 else
203 ++it;
204 }
205 };
206 removeCannotLocked(skipLists_);
207 removeCannotLocked(deltas_);
208 }
209 JLOG(j_.debug())
210 << " LedgerReplayer sweep lock duration "
211 << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count()
212 << "ms";
213}
214
215void
216LedgerReplayer::stop()
217{
218 JLOG(j_.info()) << "Stopping...";
219 {
221 std::for_each(tasks_.begin(), tasks_.end(), [](auto& i) { i->cancel(); });
222 tasks_.clear();
223 auto lockAndCancel = [](auto& i) {
224 if (auto sptr = i.second.lock(); sptr)
225 {
226 sptr->cancel();
227 }
228 };
229 std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
230 skipLists_.clear();
231 std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
232 deltas_.clear();
233 }
234
235 JLOG(j_.info()) << "Stopped";
236}
237
238} // namespace xrpl
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
virtual bool isStopping() const =0
Manages the lifetime of inbound ledgers.
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
InboundLedgers & inboundLedgers_
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
void gotReplayDelta(LedgerHeader const &info, std::map< std::uint32_t, std::shared_ptr< STTx const > > &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
void gotSkipList(LedgerHeader const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
void sweep()
Remove completed tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
bool isNonZero() const
Definition base_uint.h:514
T find(T... args)
T for_each(T... args)
T is_same_v
STL namespace.
std::uint32_t constexpr MAX_TASKS
std::uint32_t constexpr MAX_TASK_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T remove_if(T... args)
Information about the notional ledger backing the view.