rippled
Loading...
Searching...
No Matches
TransactionAcquire.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/ledger/ConsensusTransSetSF.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/detail/TransactionAcquire.h>
24#include <xrpld/app/main/Application.h>
25#include <xrpld/app/misc/NetworkOPs.h>
26#include <xrpld/overlay/Overlay.h>
27#include <xrpld/overlay/detail/ProtocolMessage.h>
28
29#include <memory>
30
31namespace ripple {
32
33using namespace std::chrono_literals;
34
35// Timeout interval in milliseconds
36auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
37
38enum {
41};
42
44 Application& app,
45 uint256 const& hash,
48 app,
49 hash,
51 {jtTXN_DATA, "TransactionAcquire", {}},
52 app.journal("TransactionAcquire"))
53 , mHaveRoot(false)
54 , mPeerSet(std::move(peerSet))
55{
56 mMap = std::make_shared<SHAMap>(
57 SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
58 mMap->setUnbacked();
59}
60
61void
63{
64 // We hold a PeerSet lock and so cannot do real work here
65
66 if (failed_)
67 {
68 JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
69 }
70 else
71 {
72 JLOG(journal_.debug()) << "Acquired TX set " << hash_;
73 mMap->setImmutable();
74
75 uint256 const& hash(hash_);
76 std::shared_ptr<SHAMap> const& map(mMap);
77 auto const pap = &app_;
78 // Note that, when we're in the process of shutting down, addJob()
79 // may reject the request. If that happens then giveSet() will
80 // not be called. That's fine. According to David the giveSet() call
81 // just updates the consensus and related structures when we acquire
82 // a transaction set. No need to update them if we're shutting down.
84 jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
85 pap->getInboundTransactions().giveSet(hash, map, true);
86 });
87 }
88}
89
90void
92{
94 {
95 failed_ = true;
96 done();
97 return;
98 }
99
101 trigger(nullptr);
102
103 addPeers(1);
104}
105
108{
109 return shared_from_this();
110}
111
112void
114{
115 if (complete_)
116 {
117 JLOG(journal_.info()) << "trigger after complete";
118 return;
119 }
120 if (failed_)
121 {
122 JLOG(journal_.info()) << "trigger after fail";
123 return;
124 }
125
126 if (!mHaveRoot)
127 {
128 JLOG(journal_.trace()) << "TransactionAcquire::trigger "
129 << (peer ? "havePeer" : "noPeer") << " no root";
130 protocol::TMGetLedger tmGL;
131 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
132 tmGL.set_itype(protocol::liTS_CANDIDATE);
133 tmGL.set_querydepth(3); // We probably need the whole thing
134
135 if (timeouts_ != 0)
136 tmGL.set_querytype(protocol::qtINDIRECT);
137
138 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
139 mPeerSet->sendRequest(tmGL, peer);
140 }
141 else if (!mMap->isValid())
142 {
143 failed_ = true;
144 done();
145 }
146 else
147 {
149 auto nodes = mMap->getMissingNodes(256, &sf);
150
151 if (nodes.empty())
152 {
153 if (mMap->isValid())
154 complete_ = true;
155 else
156 failed_ = true;
157
158 done();
159 return;
160 }
161
162 protocol::TMGetLedger tmGL;
163 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
164 tmGL.set_itype(protocol::liTS_CANDIDATE);
165
166 if (timeouts_ != 0)
167 tmGL.set_querytype(protocol::qtINDIRECT);
168
169 for (auto const& node : nodes)
170 {
171 *tmGL.add_nodeids() = node.first.getRawString();
172 }
173 mPeerSet->sendRequest(tmGL, peer);
174 }
175}
176
180 std::shared_ptr<Peer> const& peer)
181{
183
184 if (complete_)
185 {
186 JLOG(journal_.trace()) << "TX set complete";
187 return SHAMapAddNode();
188 }
189
190 if (failed_)
191 {
192 JLOG(journal_.trace()) << "TX set failed";
193 return SHAMapAddNode();
194 }
195
196 try
197 {
198 if (data.empty())
199 return SHAMapAddNode::invalid();
200
202
203 for (auto const& d : data)
204 {
205 if (d.first.isRoot())
206 {
207 if (mHaveRoot)
208 JLOG(journal_.debug())
209 << "Got root TXS node, already have it";
210 else if (!mMap->addRootNode(
211 SHAMapHash{hash_}, d.second, nullptr)
212 .isGood())
213 {
214 JLOG(journal_.warn()) << "TX acquire got bad root node";
215 }
216 else
217 mHaveRoot = true;
218 }
219 else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
220 {
221 JLOG(journal_.warn()) << "TX acquire got bad non-root node";
222 return SHAMapAddNode::invalid();
223 }
224 }
225
226 trigger(peer);
227 progress_ = true;
228 return SHAMapAddNode::useful();
229 }
230 catch (std::exception const& ex)
231 {
232 JLOG(journal_.error())
233 << "Peer " << peer->id()
234 << " sent us junky transaction node data: " << ex.what();
235 return SHAMapAddNode::invalid();
236 }
237}
238
239void
241{
242 mPeerSet->addPeers(
243 limit,
244 [this](auto peer) { return peer->hasTxSet(hash_); },
245 [this](auto peer) { trigger(peer); });
246}
247
248void
250{
252
253 addPeers(numPeers);
254
255 setTimer(sl);
256}
257
258void
260{
262
265 failed_ = false;
266}
267
268} // namespace ripple
Stream error() const
Definition: Journal.h:335
Stream debug() const
Definition: Journal.h:317
Stream info() const
Definition: Journal.h:323
Stream trace() const
Severity stream access functions.
Definition: Journal.h:311
Stream warn() const
Definition: Journal.h:329
virtual NodeCache & getTempNodeCache()=0
virtual JobQueue & getJobQueue()=0
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
static SHAMapAddNode invalid()
static SHAMapAddNode useful()
Identifies a node inside a SHAMap.
Definition: SHAMapNodeID.h:34
std::string getRawString() const
This class is an "active" object.
bool progress_
Whether forward progress has been made.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
beast::Journal journal_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
void trigger(std::shared_ptr< Peer > const &)
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
std::shared_ptr< SHAMap > mMap
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void addPeers(std::size_t limit)
std::unique_ptr< PeerSet > mPeerSet
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice > > const &data, std::shared_ptr< Peer > const &)
iterator begin()
Definition: base_uint.h:135
static constexpr std::size_t size()
Definition: base_uint.h:525
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
auto constexpr TX_ACQUIRE_TIMEOUT
@ jtTXN_DATA
Definition: Job.h:69
T what(T... args)