rippled
Loading...
Searching...
No Matches
SkipListAcquire.cpp
1#include <xrpld/app/ledger/InboundLedger.h>
2#include <xrpld/app/ledger/LedgerReplayer.h>
3#include <xrpld/app/ledger/detail/SkipListAcquire.h>
4#include <xrpld/app/main/Application.h>
5#include <xrpld/overlay/PeerSet.h>
6
7namespace xrpl {
8
10 Application& app,
11 InboundLedgers& inboundLedgers,
12 uint256 const& ledgerHash,
15 app,
16 ledgerHash,
17 LedgerReplayParameters::SUB_TASK_TIMEOUT,
19 app.journal("LedgerReplaySkipList"))
20 , inboundLedgers_(inboundLedgers)
21 , peerSet_(std::move(peerSet))
22{
23 JLOG(journal_.trace()) << "Create " << hash_;
24}
25
27{
28 JLOG(journal_.trace()) << "Destroy " << hash_;
29}
30
31void
33{
35 if (!isDone())
36 {
37 trigger(numPeers, sl);
38 setTimer(sl);
39 }
40}
41
42void
44{
45 if (auto const l = app_.getLedgerMaster().getLedgerByHash(hash_); l)
46 {
47 JLOG(journal_.trace()) << "existing ledger " << hash_;
48 retrieveSkipList(l, sl);
49 return;
50 }
51
52 if (!fallBack_)
53 {
54 peerSet_->addPeers(
55 limit,
56 [this](auto peer) {
57 return peer->supportsFeature(ProtocolFeature::LedgerReplay) && peer->hasLedger(hash_, 0);
58 },
59 [this](auto peer) {
60 if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
61 {
62 JLOG(journal_.trace()) << "Add a peer " << peer->id() << " for " << hash_;
63 protocol::TMProofPathRequest request;
64 request.set_ledgerhash(hash_.data(), hash_.size());
65 request.set_key(keylet::skip().key.data(), keylet::skip().key.size());
66 request.set_type(protocol::TMLedgerMapType::lmACCOUNT_STATE);
67 peerSet_->sendRequest(request, peer);
68 }
69 else
70 {
71 JLOG(journal_.trace()) << "Add a no feature peer " << peer->id() << " for " << hash_;
73 {
74 JLOG(journal_.debug()) << "Fall back for " << hash_;
76 fallBack_ = true;
77 }
78 }
79 });
80 }
81
82 if (fallBack_)
84}
85
86void
88{
89 JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
91 {
92 failed_ = true;
93 JLOG(journal_.debug()) << "too many timeouts " << hash_;
94 notify(sl);
95 }
96 else
97 {
98 trigger(1, sl);
99 }
100}
101
107
108void
109SkipListAcquire::processData(std::uint32_t ledgerSeq, boost::intrusive_ptr<SHAMapItem const> const& item)
110{
111 XRPL_ASSERT(ledgerSeq != 0 && item, "xrpl::SkipListAcquire::processData : valid inputs");
113 if (isDone())
114 return;
115
116 JLOG(journal_.trace()) << "got data for " << hash_;
117 try
118 {
119 if (auto sle = std::make_shared<SLE>(SerialIter{item->slice()}, item->key()); sle)
120 {
121 if (auto const& skipList = sle->getFieldV256(sfHashes).value(); !skipList.empty())
122 onSkipListAcquired(skipList, ledgerSeq, sl);
123 return;
124 }
125 }
126 catch (...)
127 {
128 }
129
130 failed_ = true;
131 JLOG(journal_.error()) << "failed to retrieve Skip list from verified data " << hash_;
132 notify(sl);
133}
134
135void
137{
139 dataReadyCallbacks_.emplace_back(std::move(cb));
140 if (isDone())
141 {
142 JLOG(journal_.debug()) << "task added to a finished SkipListAcquire " << hash_;
143 notify(sl);
144 }
145}
146
149{
151 return data_;
152}
153
154void
156{
157 if (auto const hashIndex = ledger->read(keylet::skip()); hashIndex && hashIndex->isFieldPresent(sfHashes))
158 {
159 auto const& slist = hashIndex->getFieldV256(sfHashes).value();
160 if (!slist.empty())
161 {
162 onSkipListAcquired(slist, ledger->seq(), sl);
163 return;
164 }
165 }
166
167 failed_ = true;
168 JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger " << hash_;
169 notify(sl);
170}
171
172void
174{
175 complete_ = true;
176 data_ = std::make_shared<SkipListData>(ledgerSeq, skipList);
177 JLOG(journal_.debug()) << "Skip list acquired " << hash_;
178 notify(sl);
179}
180
181void
183{
184 XRPL_ASSERT(isDone(), "xrpl::SkipListAcquire::notify : is done");
187 auto const good = !failed_;
188 sl.unlock();
189
190 for (auto& cb : toCall)
191 {
192 cb(good, hash_);
193 }
194
195 sl.lock();
196}
197
198} // namespace xrpl
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
virtual LedgerMaster & getLedgerMaster()=0
Manages the lifetime of inbound ledgers.
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
void addDataCallback(OnSkipListDataCB &&cb)
Add a callback that will be called when the skipList is ready or failed.
std::uint32_t noFeaturePeerCount_
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > peerSet_
std::shared_ptr< SkipListData const > data_
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
void notify(ScopedLockType &sl)
Call the OnSkipListDataCB callbacks.
std::vector< OnSkipListDataCB > dataReadyCallbacks_
std::shared_ptr< SkipListData const > getData() const
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
InboundLedgers & inboundLedgers_
void onSkipListAcquired(std::vector< uint256 > const &skipList, std::uint32_t ledgerSeq, ScopedLockType &sl)
Process the skip list.
void retrieveSkipList(std::shared_ptr< Ledger const > const &ledger, ScopedLockType &sl)
Retrieve the skip list from the ledger.
void processData(std::uint32_t ledgerSeq, boost::intrusive_ptr< SHAMapItem const > const &item)
Process the data extracted from a peer's reply.
SkipListAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::unique_ptr< PeerSet > peerSet)
Constructor.
void init(int numPeers)
Start the SkipListAcquire task.
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
pointer data()
Definition base_uint.h:102
static constexpr std::size_t size()
Definition base_uint.h:495
T is_same_v
T lock(T... args)
auto constexpr SUB_TASK_FALLBACK_TIMEOUT
auto constexpr MAX_NO_FEATURE_PEER_COUNT
std::uint32_t constexpr MAX_QUEUED_TASKS
std::uint32_t constexpr SUB_TASK_MAX_TIMEOUTS
Keylet const & skip() noexcept
The index of the "short" skip list.
Definition Indexes.cpp:172
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
@ jtREPLAY_TASK
Definition Job.h:41
uint256 key
Definition Keylet.h:21
T swap(T... args)
T unlock(T... args)