rippled
Loading...
Searching...
No Matches
TransactionAcquire.cpp
1#include <xrpld/app/ledger/ConsensusTransSetSF.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/detail/TransactionAcquire.h>
5#include <xrpld/app/main/Application.h>
6#include <xrpld/app/misc/NetworkOPs.h>
7
8#include <memory>
9
10namespace xrpl {
11
12using namespace std::chrono_literals;
13
14// Timeout interval in milliseconds
15auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
16
17enum {
20};
21
23 : TimeoutCounter(app, hash, TX_ACQUIRE_TIMEOUT, {jtTXN_DATA, "TxAcq", {}}, app.journal("TransactionAcquire"))
24 , mHaveRoot(false)
25 , mPeerSet(std::move(peerSet))
26{
27 mMap = std::make_shared<SHAMap>(SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
28 mMap->setUnbacked();
29}
30
31void
33{
34 // We hold a PeerSet lock and so cannot do real work here
35
36 if (failed_)
37 {
38 JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
39 }
40 else
41 {
42 JLOG(journal_.debug()) << "Acquired TX set " << hash_;
43 mMap->setImmutable();
44
45 uint256 const& hash(hash_);
46 std::shared_ptr<SHAMap> const& map(mMap);
47 auto const pap = &app_;
48 // Note that, when we're in the process of shutting down, addJob()
49 // may reject the request. If that happens then giveSet() will
50 // not be called. That's fine. According to David the giveSet() call
51 // just updates the consensus and related structures when we acquire
52 // a transaction set. No need to update them if we're shutting down.
54 jtTXN_DATA, "ComplAcquire", [pap, hash, map]() { pap->getInboundTransactions().giveSet(hash, map, true); });
55 }
56}
57
58void
60{
62 {
63 failed_ = true;
64 done();
65 return;
66 }
67
69 trigger(nullptr);
70
71 addPeers(1);
72}
73
79
80void
82{
83 if (complete_)
84 {
85 JLOG(journal_.info()) << "trigger after complete";
86 return;
87 }
88 if (failed_)
89 {
90 JLOG(journal_.info()) << "trigger after fail";
91 return;
92 }
93
94 if (!mHaveRoot)
95 {
96 JLOG(journal_.trace()) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer") << " no root";
97 protocol::TMGetLedger tmGL;
98 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
99 tmGL.set_itype(protocol::liTS_CANDIDATE);
100 tmGL.set_querydepth(3); // We probably need the whole thing
101
102 if (timeouts_ != 0)
103 tmGL.set_querytype(protocol::qtINDIRECT);
104
105 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
106 mPeerSet->sendRequest(tmGL, peer);
107 }
108 else if (!mMap->isValid())
109 {
110 failed_ = true;
111 done();
112 }
113 else
114 {
116 auto nodes = mMap->getMissingNodes(256, &sf);
117
118 if (nodes.empty())
119 {
120 if (mMap->isValid())
121 complete_ = true;
122 else
123 failed_ = true;
124
125 done();
126 return;
127 }
128
129 protocol::TMGetLedger tmGL;
130 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
131 tmGL.set_itype(protocol::liTS_CANDIDATE);
132
133 if (timeouts_ != 0)
134 tmGL.set_querytype(protocol::qtINDIRECT);
135
136 for (auto const& node : nodes)
137 {
138 *tmGL.add_nodeids() = node.first.getRawString();
139 }
140 mPeerSet->sendRequest(tmGL, peer);
141 }
142}
143
147 std::shared_ptr<Peer> const& peer)
148{
150
151 if (complete_)
152 {
153 JLOG(journal_.trace()) << "TX set complete";
154 return SHAMapAddNode();
155 }
156
157 if (failed_)
158 {
159 JLOG(journal_.trace()) << "TX set failed";
160 return SHAMapAddNode();
161 }
162
163 try
164 {
165 if (data.empty())
166 return SHAMapAddNode::invalid();
167
169
170 for (auto const& d : data)
171 {
172 if (d.first.isRoot())
173 {
174 if (mHaveRoot)
175 JLOG(journal_.debug()) << "Got root TXS node, already have it";
176 else if (!mMap->addRootNode(SHAMapHash{hash_}, d.second, nullptr).isGood())
177 {
178 JLOG(journal_.warn()) << "TX acquire got bad root node";
179 }
180 else
181 mHaveRoot = true;
182 }
183 else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
184 {
185 JLOG(journal_.warn()) << "TX acquire got bad non-root node";
186 return SHAMapAddNode::invalid();
187 }
188 }
189
190 trigger(peer);
191 progress_ = true;
192 return SHAMapAddNode::useful();
193 }
194 catch (std::exception const& ex)
195 {
196 JLOG(journal_.error()) << "Peer " << peer->id() << " sent us junky transaction node data: " << ex.what();
197 return SHAMapAddNode::invalid();
198 }
199}
200
201void
203{
204 mPeerSet->addPeers(
205 limit, [this](auto peer) { return peer->hasTxSet(hash_); }, [this](auto peer) { trigger(peer); });
206}
207
208void
210{
212
213 addPeers(numPeers);
214
215 setTimer(sl);
216}
217
218void
227
228} // namespace xrpl
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual JobQueue & getJobQueue()=0
virtual NodeCache & getTempNodeCache()=0
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:146
static SHAMapAddNode useful()
static SHAMapAddNode invalid()
Identifies a node inside a SHAMap.
std::string getRawString() const
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
bool progress_
Whether forward progress has been made.
beast::Journal journal_
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
std::shared_ptr< SHAMap > mMap
void addPeers(std::size_t limit)
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::unique_ptr< PeerSet > mPeerSet
iterator begin()
Definition base_uint.h:113
static constexpr std::size_t size()
Definition base_uint.h:495
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
@ jtTXN_DATA
Definition Job.h:49
auto constexpr TX_ACQUIRE_TIMEOUT
T what(T... args)