rippled
Loading...
Searching...
No Matches
LedgerReplayTask.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2020 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/ledger/InboundLedgers.h>
21#include <xrpld/app/ledger/LedgerReplayTask.h>
22#include <xrpld/app/ledger/LedgerReplayer.h>
23#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
24#include <xrpld/app/ledger/detail/SkipListAcquire.h>
25
26namespace ripple {
27
30 uint256 const& finishLedgerHash,
31 std::uint32_t totalNumLedgers)
32 : reason_(r), finishHash_(finishLedgerHash), totalLedgers_(totalNumLedgers)
33{
34 XRPL_ASSERT(
35 finishLedgerHash.isNonZero() && totalNumLedgers > 0,
36 "ripple::LedgerReplayTask::TaskParameter::TaskParameter : valid "
37 "inputs");
38}
39
40bool
42 uint256 const& hash,
43 std::uint32_t seq,
44 std::vector<uint256> const& sList)
45{
46 if (finishHash_ != hash || sList.size() + 1 < totalLedgers_ || full_)
47 return false;
48
49 finishSeq_ = seq;
50 skipList_ = sList;
51 skipList_.emplace_back(finishHash_);
52 startHash_ = skipList_[skipList_.size() - totalLedgers_];
53 XRPL_ASSERT(
54 startHash_.isNonZero(),
55 "ripple::LedgerReplayTask::TaskParameter::update : nonzero start hash");
56 startSeq_ = finishSeq_ - totalLedgers_ + 1;
57 full_ = true;
58 return true;
59}
60
61bool
63 TaskParameter const& existingTask) const
64{
65 if (reason_ == existingTask.reason_)
66 {
67 if (finishHash_ == existingTask.finishHash_ &&
68 totalLedgers_ <= existingTask.totalLedgers_)
69 {
70 return true;
71 }
72
73 if (existingTask.full_)
74 {
75 auto const& exList = existingTask.skipList_;
76 if (auto i = std::find(exList.begin(), exList.end(), finishHash_);
77 i != exList.end())
78 {
79 return existingTask.totalLedgers_ >=
80 totalLedgers_ + (exList.end() - i) - 1;
81 }
82 }
83 }
84
85 return false;
86}
87
89 Application& app,
90 InboundLedgers& inboundLedgers,
91 LedgerReplayer& replayer,
92 std::shared_ptr<SkipListAcquire>& skipListAcquirer,
93 TaskParameter&& parameter)
95 app,
96 parameter.finishHash_,
97 LedgerReplayParameters::TASK_TIMEOUT,
99 "LedgerReplayTask",
101 app.journal("LedgerReplayTask"))
102 , inboundLedgers_(inboundLedgers)
103 , replayer_(replayer)
104 , parameter_(parameter)
105 , maxTimeouts_(std::max(
107 parameter.totalLedgers_ *
109 , skipListAcquirer_(skipListAcquirer)
110{
111 JLOG(journal_.trace()) << "Create " << hash_;
112}
113
115{
116 JLOG(journal_.trace()) << "Destroy " << hash_;
117}
118
119void
121{
122 JLOG(journal_.debug()) << "Task start " << hash_;
123
125 skipListAcquirer_->addDataCallback([wptr](bool good, uint256 const& hash) {
126 if (auto sptr = wptr.lock(); sptr)
127 {
128 if (!good)
129 {
130 sptr->cancel();
131 }
132 else
133 {
134 auto const skipListData = sptr->skipListAcquirer_->getData();
135 sptr->updateSkipList(
136 hash, skipListData->ledgerSeq, skipListData->skipList);
137 }
138 }
139 });
140
142 if (!isDone())
143 {
144 trigger(sl);
145 setTimer(sl);
146 }
147}
148
149void
151{
152 JLOG(journal_.trace()) << "trigger " << hash_;
153 if (!parameter_.full_)
154 return;
155
156 if (!parent_)
157 {
159 if (!parent_)
160 {
165 }
166 if (parent_)
167 {
168 JLOG(journal_.trace())
169 << "Got start ledger " << parameter_.startHash_ << " for task "
170 << hash_;
171 }
172 }
173
174 tryAdvance(sl);
175}
176
177void
179{
180 JLOG(journal_.trace()) << "Delta " << deltaHash << " ready for task "
181 << hash_;
183 if (!isDone())
184 tryAdvance(sl);
185}
186
187void
189{
190 JLOG(journal_.trace()) << "tryAdvance task " << hash_
191 << (parameter_.full_ ? ", full parameter"
192 : ", waiting to fill parameter")
193 << ", deltaIndex=" << deltaToBuild_
194 << ", totalDeltas=" << deltas_.size() << ", parent "
195 << (parent_ ? parent_->info().hash : uint256());
196
197 bool shouldTry = parent_ && parameter_.full_ &&
198 parameter_.totalLedgers_ - 1 == deltas_.size();
199 if (!shouldTry)
200 return;
201
202 try
203 {
204 for (; deltaToBuild_ < deltas_.size(); ++deltaToBuild_)
205 {
206 auto& delta = deltas_[deltaToBuild_];
207 XRPL_ASSERT(
208 parent_->seq() + 1 == delta->ledgerSeq_,
209 "ripple::LedgerReplayTask::tryAdvance : consecutive sequence");
210 if (auto l = delta->tryBuild(parent_); l)
211 {
212 JLOG(journal_.debug())
213 << "Task " << hash_ << " got ledger " << l->info().hash
214 << " deltaIndex=" << deltaToBuild_
215 << " totalDeltas=" << deltas_.size();
216 parent_ = l;
217 }
218 else
219 return;
220 }
221
222 complete_ = true;
223 JLOG(journal_.info()) << "Completed " << hash_;
224 }
225 catch (std::runtime_error const&)
226 {
227 failed_ = true;
228 }
229}
230
231void
233 uint256 const& hash,
234 std::uint32_t seq,
235 std::vector<uint256> const& sList)
236{
237 {
239 if (isDone())
240 return;
241 if (!parameter_.update(hash, seq, sList))
242 {
243 JLOG(journal_.error()) << "Parameter update failed " << hash_;
244 failed_ = true;
245 return;
246 }
247 }
248
251 if (!isDone())
252 trigger(sl);
253}
254
255void
257{
258 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
260 {
261 failed_ = true;
262 JLOG(journal_.debug())
263 << "LedgerReplayTask Failed, too many timeouts " << hash_;
264 }
265 else
266 {
267 trigger(sl);
268 }
269}
270
276
277void
279{
281 delta->addDataCallback(
282 parameter_.reason_, [wptr](bool good, uint256 const& hash) {
283 if (auto sptr = wptr.lock(); sptr)
284 {
285 if (!good)
286 sptr->cancel();
287 else
288 sptr->deltaReady(hash);
289 }
290 });
291
292 ScopedLockType sl(mtx_);
293 if (!isDone())
294 {
295 JLOG(journal_.trace())
296 << "addDelta task " << hash_ << " deltaIndex=" << deltaToBuild_
297 << " totalDeltas=" << deltas_.size();
298 XRPL_ASSERT(
299 deltas_.empty() ||
300 deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_,
301 "ripple::LedgerReplayTask::addDelta : no deltas or consecutive "
302 "sequence", );
303 deltas_.push_back(delta);
304 }
305}
306
307bool
308LedgerReplayTask::finished() const
309{
310 ScopedLockType sl(mtx_);
311 return isDone();
312}
313
314} // namespace ripple
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
virtual LedgerMaster & getLedgerMaster()=0
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
TaskParameter(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
constructor
bool update(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
fill all the fields that was not filled during construction
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
void trigger(ScopedLockType &sl)
Trigger another round.
std::vector< std::shared_ptr< LedgerDeltaAcquire > > deltas_
void addDelta(std::shared_ptr< LedgerDeltaAcquire > const &delta)
add a new LedgerDeltaAcquire subtask
void deltaReady(uint256 const &deltaHash)
Notify this task (by a LedgerDeltaAcquire subtask) that a delta is ready.
void init()
Start the task.
InboundLedgers & inboundLedgers_
std::shared_ptr< SkipListAcquire > skipListAcquirer_
void updateSkipList(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
Update this task (by a SkipListAcquire subtask) when skip list is ready.
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
LedgerReplayTask(Application &app, InboundLedgers &inboundLedgers, LedgerReplayer &replayer, std::shared_ptr< SkipListAcquire > &skipListAcquirer, TaskParameter &&parameter)
Constructor.
void tryAdvance(ScopedLockType &sl)
Try to build more ledgers.
std::shared_ptr< Ledger const > parent_
void onTimer(bool progress, ScopedLockType &sl) override
Hook called from invokeOnTimer().
Manages the lifetime of ledger replay tasks.
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
This class is an "active" object.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
static constexpr std::size_t size()
Definition base_uint.h:526
bool isNonZero() const
Definition base_uint.h:545
T emplace_back(T... args)
T find(T... args)
T lock(T... args)
T max(T... args)
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MINIMUM
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr TASK_MAX_TIMEOUTS_MULTIPLIER
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
base_uint< 256 > uint256
Definition base_uint.h:558
@ jtREPLAY_TASK
Definition Job.h:61
T size(T... args)