rippled
Loading...
Searching...
No Matches
TransactionAcquire.cpp
1#include <xrpld/app/ledger/ConsensusTransSetSF.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/detail/TransactionAcquire.h>
5#include <xrpld/app/main/Application.h>
6#include <xrpld/app/misc/NetworkOPs.h>
7
8#include <memory>
9
10namespace ripple {
11
12using namespace std::chrono_literals;
13
14// Timeout interval in milliseconds
15auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
16
17enum {
20};
21
23 Application& app,
24 uint256 const& hash,
27 app,
28 hash,
30 {jtTXN_DATA, "TransactionAcquire", {}},
31 app.journal("TransactionAcquire"))
32 , mHaveRoot(false)
33 , mPeerSet(std::move(peerSet))
34{
36 SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
37 mMap->setUnbacked();
38}
39
40void
42{
43 // We hold a PeerSet lock and so cannot do real work here
44
45 if (failed_)
46 {
47 JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
48 }
49 else
50 {
51 JLOG(journal_.debug()) << "Acquired TX set " << hash_;
52 mMap->setImmutable();
53
54 uint256 const& hash(hash_);
55 std::shared_ptr<SHAMap> const& map(mMap);
56 auto const pap = &app_;
57 // Note that, when we're in the process of shutting down, addJob()
58 // may reject the request. If that happens then giveSet() will
59 // not be called. That's fine. According to David the giveSet() call
60 // just updates the consensus and related structures when we acquire
61 // a transaction set. No need to update them if we're shutting down.
63 jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
64 pap->getInboundTransactions().giveSet(hash, map, true);
65 });
66 }
67}
68
69void
71{
73 {
74 failed_ = true;
75 done();
76 return;
77 }
78
80 trigger(nullptr);
81
82 addPeers(1);
83}
84
90
91void
93{
94 if (complete_)
95 {
96 JLOG(journal_.info()) << "trigger after complete";
97 return;
98 }
99 if (failed_)
100 {
101 JLOG(journal_.info()) << "trigger after fail";
102 return;
103 }
104
105 if (!mHaveRoot)
106 {
107 JLOG(journal_.trace()) << "TransactionAcquire::trigger "
108 << (peer ? "havePeer" : "noPeer") << " no root";
109 protocol::TMGetLedger tmGL;
110 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
111 tmGL.set_itype(protocol::liTS_CANDIDATE);
112 tmGL.set_querydepth(3); // We probably need the whole thing
113
114 if (timeouts_ != 0)
115 tmGL.set_querytype(protocol::qtINDIRECT);
116
117 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
118 mPeerSet->sendRequest(tmGL, peer);
119 }
120 else if (!mMap->isValid())
121 {
122 failed_ = true;
123 done();
124 }
125 else
126 {
128 auto nodes = mMap->getMissingNodes(256, &sf);
129
130 if (nodes.empty())
131 {
132 if (mMap->isValid())
133 complete_ = true;
134 else
135 failed_ = true;
136
137 done();
138 return;
139 }
140
141 protocol::TMGetLedger tmGL;
142 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
143 tmGL.set_itype(protocol::liTS_CANDIDATE);
144
145 if (timeouts_ != 0)
146 tmGL.set_querytype(protocol::qtINDIRECT);
147
148 for (auto const& node : nodes)
149 {
150 *tmGL.add_nodeids() = node.first.getRawString();
151 }
152 mPeerSet->sendRequest(tmGL, peer);
153 }
154}
155
159 std::shared_ptr<Peer> const& peer)
160{
162
163 if (complete_)
164 {
165 JLOG(journal_.trace()) << "TX set complete";
166 return SHAMapAddNode();
167 }
168
169 if (failed_)
170 {
171 JLOG(journal_.trace()) << "TX set failed";
172 return SHAMapAddNode();
173 }
174
175 try
176 {
177 if (data.empty())
178 return SHAMapAddNode::invalid();
179
181
182 for (auto const& d : data)
183 {
184 if (d.first.isRoot())
185 {
186 if (mHaveRoot)
187 JLOG(journal_.debug())
188 << "Got root TXS node, already have it";
189 else if (!mMap->addRootNode(
190 SHAMapHash{hash_}, d.second, nullptr)
191 .isGood())
192 {
193 JLOG(journal_.warn()) << "TX acquire got bad root node";
194 }
195 else
196 mHaveRoot = true;
197 }
198 else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
199 {
200 JLOG(journal_.warn()) << "TX acquire got bad non-root node";
201 return SHAMapAddNode::invalid();
202 }
203 }
204
205 trigger(peer);
206 progress_ = true;
207 return SHAMapAddNode::useful();
208 }
209 catch (std::exception const& ex)
210 {
211 JLOG(journal_.error())
212 << "Peer " << peer->id()
213 << " sent us junky transaction node data: " << ex.what();
214 return SHAMapAddNode::invalid();
215 }
216}
217
218void
220{
221 mPeerSet->addPeers(
222 limit,
223 [this](auto peer) { return peer->hasTxSet(hash_); },
224 [this](auto peer) { trigger(peer); });
225}
226
227void
229{
231
232 addPeers(numPeers);
233
234 setTimer(sl);
235}
236
237void
246
247} // namespace ripple
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual NodeCache & getTempNodeCache()=0
virtual JobQueue & getJobQueue()=0
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
static SHAMapAddNode invalid()
static SHAMapAddNode useful()
Identifies a node inside a SHAMap.
std::string getRawString() const
This class is an "active" object.
bool progress_
Whether forward progress has been made.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
std::shared_ptr< SHAMap > mMap
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void addPeers(std::size_t limit)
std::unique_ptr< PeerSet > mPeerSet
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
iterator begin()
Definition base_uint.h:117
static constexpr std::size_t size()
Definition base_uint.h:507
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
auto constexpr TX_ACQUIRE_TIMEOUT
@ jtTXN_DATA
Definition Job.h:50
T what(T... args)