rippled
Loading...
Searching...
No Matches
InboundLedger.cpp
1#include <xrpld/app/ledger/AccountStateSF.h>
2#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/InboundLedgers.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionStateSF.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/overlay/Overlay.h>
8
9#include <xrpl/basics/Log.h>
10#include <xrpl/core/JobQueue.h>
11#include <xrpl/protocol/HashPrefix.h>
12#include <xrpl/protocol/jss.h>
13#include <xrpl/resource/Fees.h>
14#include <xrpl/shamap/SHAMapNodeID.h>
15
16#include <boost/iterator/function_output_iterator.hpp>
17
18#include <algorithm>
19#include <random>
20
21namespace xrpl {
22
23using namespace std::chrono_literals;
24
25enum {
26 // Number of peers to start with
28
29 // Number of peers to add on a timeout
30 ,
31 peerCountAdd = 3
32
33 // how many timeouts before we give up
34 ,
36
37 // how many timeouts before we get aggressive
38 ,
40
41 // Number of nodes to find initially
42 ,
44
45 // Number of nodes to request for a reply
46 ,
47 reqNodesReply = 128
48
49 // Number of nodes to request blindly
50 ,
51 reqNodes = 12
52};
53
54// millisecond for each ledger timeout
55auto constexpr ledgerAcquireTimeout = 3000ms;
56
58 Application& app,
59 uint256 const& hash,
60 std::uint32_t seq,
61 Reason reason,
62 clock_type& clock,
64 : TimeoutCounter(app, hash, ledgerAcquireTimeout, {jtLEDGER_DATA, "InboundLedger", 5}, app.journal("InboundLedger"))
65 , m_clock(clock)
66 , mHaveHeader(false)
67 , mHaveState(false)
68 , mHaveTransactions(false)
69 , mSignaled(false)
70 , mByHash(true)
71 , mSeq(seq)
72 , mReason(reason)
73 , mReceiveDispatched(false)
74 , mPeerSet(std::move(peerSet))
75{
76 JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
77 touch();
78}
79
80void
82{
84 collectionLock.unlock();
85
87 if (failed_)
88 return;
89
90 if (!complete_)
91 {
92 addPeers();
93 queueJob(sl);
94 return;
95 }
96
97 JLOG(journal_.debug()) << "Acquiring ledger we already have in "
98 << " local store. " << hash_;
99 XRPL_ASSERT(
100 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES || mLedger->read(keylet::fees()),
101 "xrpl::InboundLedger::init : valid ledger fees");
102 mLedger->setImmutable();
103
105 return;
106
108
109 // Check if this could be a newer fully-validated ledger
112}
113
116{
117 auto const& peerIds = mPeerSet->getPeerIds();
118 return std::count_if(
119 peerIds.begin(), peerIds.end(), [this](auto id) { return (app_.overlay().findPeerByShortID(id) != nullptr); });
120}
121
122void
124{
126
127 // If we didn't know the sequence number, but now do, save it
128 if ((seq != 0) && (mSeq == 0))
129 mSeq = seq;
130
131 // Prevent this from being swept
132 touch();
133}
134
135bool
137{
139 if (!isDone())
140 {
141 if (mLedger)
142 tryDB(mLedger->stateMap().family().db());
143 else
145 if (failed_ || complete_)
146 {
147 done();
148 return true;
149 }
150 }
151 return false;
152}
153
155{
156 // Save any received AS data not processed. It could be useful
157 // for populating a different ledger
158 for (auto& entry : mReceivedData)
159 {
160 if (entry.second->type() == protocol::liAS_NODE)
161 app_.getInboundLedgers().gotStaleData(entry.second);
162 }
163 if (!isDone())
164 {
165 JLOG(journal_.debug()) << "Acquire " << hash_ << " abort "
166 << ((timeouts_ == 0) ? std::string()
167 : (std::string("timeouts:") + std::to_string(timeouts_) + " "))
168 << mStats.get();
169 }
170}
171
173neededHashes(uint256 const& root, SHAMap& map, int max, SHAMapSyncFilter* filter)
174{
176
177 if (!root.isZero())
178 {
179 if (map.getHash().isZero())
180 ret.push_back(root);
181 else
182 {
183 auto mn = map.getMissingNodes(max, filter);
184 ret.reserve(mn.size());
185 for (auto const& n : mn)
186 ret.push_back(n.second);
187 }
188 }
189
190 return ret;
191}
192
195{
196 return neededHashes(mLedger->header().txHash, mLedger->txMap(), max, filter);
197}
198
201{
202 return neededHashes(mLedger->header().accountHash, mLedger->stateMap(), max, filter);
203}
204
205// See how much of the ledger data is stored locally
206// Data found in a fetch pack will be stored
207void
209{
210 if (!mHaveHeader)
211 {
212 auto makeLedger = [&, this](Blob const& data) {
213 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
216 if (mLedger->header().hash != hash_ || (mSeq != 0 && mSeq != mLedger->header().seq))
217 {
218 // We know for a fact the ledger can never be acquired
219 JLOG(journal_.warn()) << "hash " << hash_ << " seq " << std::to_string(mSeq) << " cannot be a ledger";
220 mLedger.reset();
221 failed_ = true;
222 }
223 };
224
225 // Try to fetch the ledger header from the DB
226 if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
227 {
228 JLOG(journal_.trace()) << "Ledger header found in local store";
229
230 makeLedger(nodeObject->getData());
231 if (failed_)
232 return;
233
234 // Store the ledger header if the source and destination differ
235 auto& dstDB{mLedger->stateMap().family().db()};
236 if (std::addressof(dstDB) != std::addressof(srcDB))
237 {
238 Blob blob{nodeObject->getData()};
239 dstDB.store(hotLEDGER, std::move(blob), hash_, mLedger->header().seq);
240 }
241 }
242 else
243 {
244 // Try to fetch the ledger header from a fetch pack
245 auto data = app_.getLedgerMaster().getFetchPack(hash_);
246 if (!data)
247 return;
248
249 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
250
251 makeLedger(*data);
252 if (failed_)
253 return;
254
255 // Store the ledger header in the ledger's database
256 mLedger->stateMap().family().db().store(hotLEDGER, std::move(*data), hash_, mLedger->header().seq);
257 }
258
259 if (mSeq == 0)
260 mSeq = mLedger->header().seq;
261 mLedger->stateMap().setLedgerSeq(mSeq);
262 mLedger->txMap().setLedgerSeq(mSeq);
263 mHaveHeader = true;
264 }
265
267 {
268 if (mLedger->header().txHash.isZero())
269 {
270 JLOG(journal_.trace()) << "No TXNs to fetch";
271 mHaveTransactions = true;
272 }
273 else
274 {
275 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
276 if (mLedger->txMap().fetchRoot(SHAMapHash{mLedger->header().txHash}, &filter))
277 {
278 if (neededTxHashes(1, &filter).empty())
279 {
280 JLOG(journal_.trace()) << "Had full txn map locally";
281 mHaveTransactions = true;
282 }
283 }
284 }
285 }
286
287 if (!mHaveState)
288 {
289 if (mLedger->header().accountHash.isZero())
290 {
291 JLOG(journal_.fatal()) << "We are acquiring a ledger with a zero account hash";
292 failed_ = true;
293 return;
294 }
295 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
296 if (mLedger->stateMap().fetchRoot(SHAMapHash{mLedger->header().accountHash}, &filter))
297 {
298 if (neededStateHashes(1, &filter).empty())
299 {
300 JLOG(journal_.trace()) << "Had full AS map locally";
301 mHaveState = true;
302 }
303 }
304 }
305
307 {
308 JLOG(journal_.debug()) << "Had everything locally";
309 complete_ = true;
310 XRPL_ASSERT(
311 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES || mLedger->read(keylet::fees()),
312 "xrpl::InboundLedger::tryDB : valid ledger fees");
313 mLedger->setImmutable();
314 }
315}
316
319void
321{
322 mRecentNodes.clear();
323
324 if (isDone())
325 {
326 JLOG(journal_.info()) << "Already done " << hash_;
327 return;
328 }
329
331 {
332 if (mSeq != 0)
333 {
334 JLOG(journal_.warn()) << timeouts_ << " timeouts for ledger " << mSeq;
335 }
336 else
337 {
338 JLOG(journal_.warn()) << timeouts_ << " timeouts for ledger " << hash_;
339 }
340 failed_ = true;
341 done();
342 return;
343 }
344
345 if (!wasProgress)
346 {
347 checkLocal();
348
349 mByHash = true;
350
352 JLOG(journal_.debug()) << "No progress(" << pc << ") for ledger " << hash_;
353
354 // addPeers triggers if the reason is not HISTORY
355 // So if the reason IS HISTORY, need to trigger after we add
356 // otherwise, we need to trigger before we add
357 // so each peer gets triggered once
360 addPeers();
363 }
364}
365
367void
369{
370 mPeerSet->addPeers(
372 [this](auto peer) { return peer->hasLedger(hash_, mSeq); },
373 [this](auto peer) {
374 // For historical nodes, do not trigger too soon
375 // since a fetch pack is probably coming
378 });
379}
380
386
387void
389{
390 if (mSignaled)
391 return;
392
393 mSignaled = true;
394 touch();
395
396 JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
397 << ((timeouts_ == 0) ? std::string()
398 : (std::string("timeouts:") + std::to_string(timeouts_) + " "))
399 << mStats.get();
400
401 XRPL_ASSERT(complete_ || failed_, "xrpl::InboundLedger::done : complete or failed");
402
403 if (complete_ && !failed_ && mLedger)
404 {
405 XRPL_ASSERT(
406 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES || mLedger->read(keylet::fees()),
407 "xrpl::InboundLedger::done : valid ledger fees");
408 mLedger->setImmutable();
409 switch (mReason)
410 {
411 case Reason::HISTORY:
413 break;
414 default:
416 break;
417 }
418 }
419
420 // We hold the PeerSet lock, so must dispatch
421 app_.getJobQueue().addJob(jtLEDGER_DATA, "AcqDone", [self = shared_from_this()]() {
422 if (self->complete_ && !self->failed_)
423 {
424 self->app_.getLedgerMaster().checkAccept(self->getLedger());
425 self->app_.getLedgerMaster().tryAdvance();
426 }
427 else
428 self->app_.getInboundLedgers().logFailure(self->hash_, self->mSeq);
429 });
430}
431
434void
436{
438
439 if (isDone())
440 {
441 JLOG(journal_.debug()) << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
442 << (failed_ ? " failed" : "");
443 return;
444 }
445
446 if (auto stream = journal_.debug())
447 {
449 ss << "Trigger acquiring ledger " << hash_;
450 if (peer)
451 ss << " from " << peer;
452
453 if (complete_ || failed_)
454 ss << " complete=" << complete_ << " failed=" << failed_;
455 else
456 ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions << " as=" << mHaveState;
457 stream << ss.str();
458 }
459
460 if (!mHaveHeader)
461 {
463 if (failed_)
464 {
465 JLOG(journal_.warn()) << " failed local for " << hash_;
466 return;
467 }
468 }
469
470 protocol::TMGetLedger tmGL;
471 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
472
473 if (timeouts_ != 0)
474 {
475 // Be more aggressive if we've timed out at least once
476 tmGL.set_querytype(protocol::qtINDIRECT);
477
479 {
480 auto need = getNeededHashes();
481
482 if (!need.empty())
483 {
484 protocol::TMGetObjectByHash tmBH;
485 bool typeSet = false;
486 tmBH.set_query(true);
487 tmBH.set_ledgerhash(hash_.begin(), hash_.size());
488 for (auto const& p : need)
489 {
490 JLOG(journal_.debug()) << "Want: " << p.second;
491
492 if (!typeSet)
493 {
494 tmBH.set_type(p.first);
495 typeSet = true;
496 }
497
498 if (p.first == tmBH.type())
499 {
500 protocol::TMIndexedObject* io = tmBH.add_objects();
501 io->set_hash(p.second.begin(), p.second.size());
502 if (mSeq != 0)
503 io->set_ledgerseq(mSeq);
504 }
505 }
506
507 auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
508 auto const& peerIds = mPeerSet->getPeerIds();
509 std::for_each(peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
510 if (auto p = app_.overlay().findPeerByShortID(id))
511 {
512 mByHash = false;
513 p->send(packet);
514 }
515 });
516 }
517 else
518 {
519 JLOG(journal_.info()) << "getNeededHashes says acquire is complete";
520 mHaveHeader = true;
521 mHaveTransactions = true;
522 mHaveState = true;
523 complete_ = true;
524 }
525 }
526 }
527
528 // We can't do much without the header data because we don't know the
529 // state or transaction root hashes.
530 if (!mHaveHeader && !failed_)
531 {
532 tmGL.set_itype(protocol::liBASE);
533 if (mSeq != 0)
534 tmGL.set_ledgerseq(mSeq);
535 JLOG(journal_.trace()) << "Sending header request to " << (peer ? "selected peer" : "all peers");
536 mPeerSet->sendRequest(tmGL, peer);
537 return;
538 }
539
540 if (mLedger)
541 tmGL.set_ledgerseq(mLedger->header().seq);
542
543 if (reason != TriggerReason::reply)
544 {
545 // If we're querying blind, don't query deep
546 tmGL.set_querydepth(0);
547 }
548 else if (peer && peer->isHighLatency())
549 {
550 // If the peer has high latency, query extra deep
551 tmGL.set_querydepth(2);
552 }
553 else
554 tmGL.set_querydepth(1);
555
556 // Get the state data first because it's the most likely to be useful
557 // if we wind up abandoning this fetch.
558 if (mHaveHeader && !mHaveState && !failed_)
559 {
560 XRPL_ASSERT(
561 mLedger,
562 "xrpl::InboundLedger::trigger : non-null ledger to read state "
563 "from");
564
565 if (!mLedger->stateMap().isValid())
566 {
567 failed_ = true;
568 }
569 else if (mLedger->stateMap().getHash().isZero())
570 {
571 // we need the root node
572 tmGL.set_itype(protocol::liAS_NODE);
573 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
574 JLOG(journal_.trace()) << "Sending AS root request to " << (peer ? "selected peer" : "all peers");
575 mPeerSet->sendRequest(tmGL, peer);
576 return;
577 }
578 else
579 {
580 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
581
582 // Release the lock while we process the large state map
583 sl.unlock();
584 auto nodes = mLedger->stateMap().getMissingNodes(missingNodesFind, &filter);
585 sl.lock();
586
587 // Make sure nothing happened while we released the lock
588 if (!failed_ && !complete_ && !mHaveState)
589 {
590 if (nodes.empty())
591 {
592 if (!mLedger->stateMap().isValid())
593 failed_ = true;
594 else
595 {
596 mHaveState = true;
597
598 if (mHaveTransactions)
599 complete_ = true;
600 }
601 }
602 else
603 {
604 filterNodes(nodes, reason);
605
606 if (!nodes.empty())
607 {
608 tmGL.set_itype(protocol::liAS_NODE);
609 for (auto const& id : nodes)
610 {
611 *(tmGL.add_nodeids()) = id.first.getRawString();
612 }
613
614 JLOG(journal_.trace()) << "Sending AS node request (" << nodes.size() << ") to "
615 << (peer ? "selected peer" : "all peers");
616 mPeerSet->sendRequest(tmGL, peer);
617 return;
618 }
619 else
620 {
621 JLOG(journal_.trace()) << "All AS nodes filtered";
622 }
623 }
624 }
625 }
626 }
627
628 if (mHaveHeader && !mHaveTransactions && !failed_)
629 {
630 XRPL_ASSERT(
631 mLedger,
632 "xrpl::InboundLedger::trigger : non-null ledger to read "
633 "transactions from");
634
635 if (!mLedger->txMap().isValid())
636 {
637 failed_ = true;
638 }
639 else if (mLedger->txMap().getHash().isZero())
640 {
641 // we need the root node
642 tmGL.set_itype(protocol::liTX_NODE);
643 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
644 JLOG(journal_.trace()) << "Sending TX root request to " << (peer ? "selected peer" : "all peers");
645 mPeerSet->sendRequest(tmGL, peer);
646 return;
647 }
648 else
649 {
650 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
651
652 auto nodes = mLedger->txMap().getMissingNodes(missingNodesFind, &filter);
653
654 if (nodes.empty())
655 {
656 if (!mLedger->txMap().isValid())
657 failed_ = true;
658 else
659 {
660 mHaveTransactions = true;
661
662 if (mHaveState)
663 complete_ = true;
664 }
665 }
666 else
667 {
668 filterNodes(nodes, reason);
669
670 if (!nodes.empty())
671 {
672 tmGL.set_itype(protocol::liTX_NODE);
673 for (auto const& n : nodes)
674 {
675 *(tmGL.add_nodeids()) = n.first.getRawString();
676 }
677 JLOG(journal_.trace()) << "Sending TX node request (" << nodes.size() << ") to "
678 << (peer ? "selected peer" : "all peers");
679 mPeerSet->sendRequest(tmGL, peer);
680 return;
681 }
682 else
683 {
684 JLOG(journal_.trace()) << "All TX nodes filtered";
685 }
686 }
687 }
688 }
689
690 if (complete_ || failed_)
691 {
692 JLOG(journal_.debug()) << "Done:" << (complete_ ? " complete" : "") << (failed_ ? " failed " : " ")
693 << mLedger->header().seq;
694 sl.unlock();
695 done();
696 }
697}
698
699void
700InboundLedger::filterNodes(std::vector<std::pair<SHAMapNodeID, uint256>>& nodes, TriggerReason reason)
701{
702 // Sort nodes so that the ones we haven't recently
703 // requested come before the ones we have.
704 auto dup = std::stable_partition(
705 nodes.begin(), nodes.end(), [this](auto const& item) { return mRecentNodes.count(item.second) == 0; });
706
707 // If everything is a duplicate we don't want to send
708 // any query at all except on a timeout where we need
709 // to query everyone:
710 if (dup == nodes.begin())
711 {
712 JLOG(journal_.trace()) << "filterNodes: all duplicates";
713
714 if (reason != TriggerReason::timeout)
715 {
716 nodes.clear();
717 return;
718 }
719 }
720 else
721 {
722 JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
723
724 nodes.erase(dup, nodes.end());
725 }
726
727 std::size_t const limit = (reason == TriggerReason::reply) ? reqNodesReply : reqNodes;
728
729 if (nodes.size() > limit)
730 nodes.resize(limit);
731
732 for (auto const& n : nodes)
733 mRecentNodes.insert(n.second);
734}
735
739// data must not have hash prefix
740bool
741InboundLedger::takeHeader(std::string const& data)
742{
743 // Return value: true=normal, false=bad data
744 JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
745
746 if (complete_ || failed_ || mHaveHeader)
747 return true;
748
749 auto* f = &app_.getNodeFamily();
750 mLedger = std::make_shared<Ledger>(deserializeHeader(makeSlice(data)), app_.config(), *f);
751 if (mLedger->header().hash != hash_ || (mSeq != 0 && mSeq != mLedger->header().seq))
752 {
753 JLOG(journal_.warn()) << "Acquire hash mismatch: " << mLedger->header().hash << "!=" << hash_;
754 mLedger.reset();
755 return false;
756 }
757 if (mSeq == 0)
758 mSeq = mLedger->header().seq;
759 mLedger->stateMap().setLedgerSeq(mSeq);
760 mLedger->txMap().setLedgerSeq(mSeq);
761 mHaveHeader = true;
762
763 Serializer s(data.size() + 4);
764 s.add32(HashPrefix::ledgerMaster);
765 s.addRaw(data.data(), data.size());
766 f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
767
768 if (mLedger->header().txHash.isZero())
769 mHaveTransactions = true;
770
771 if (mLedger->header().accountHash.isZero())
772 mHaveState = true;
773
774 mLedger->txMap().setSynching();
775 mLedger->stateMap().setSynching();
776
777 return true;
778}
779
783void
784InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
785{
786 if (!mHaveHeader)
787 {
788 JLOG(journal_.warn()) << "Missing ledger header";
789 san.incInvalid();
790 return;
791 }
792 if (packet.type() == protocol::liTX_NODE)
793 {
794 if (mHaveTransactions || failed_)
795 {
796 san.incDuplicate();
797 return;
798 }
799 }
800 else if (mHaveState || failed_)
801 {
802 san.incDuplicate();
803 return;
804 }
805
806 auto [map, rootHash, filter] = [&]() -> std::tuple<SHAMap&, SHAMapHash, std::unique_ptr<SHAMapSyncFilter>> {
807 if (packet.type() == protocol::liTX_NODE)
808 return {
809 mLedger->txMap(),
810 SHAMapHash{mLedger->header().txHash},
811 std::make_unique<TransactionStateSF>(mLedger->txMap().family().db(), app_.getLedgerMaster())};
812 return {
813 mLedger->stateMap(),
814 SHAMapHash{mLedger->header().accountHash},
815 std::make_unique<AccountStateSF>(mLedger->stateMap().family().db(), app_.getLedgerMaster())};
816 }();
817
818 try
819 {
820 auto const f = filter.get();
821
822 for (auto const& node : packet.nodes())
823 {
824 auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
825
826 if (!nodeID)
827 throw std::runtime_error("data does not properly deserialize");
828
829 if (nodeID->isRoot())
830 {
831 san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
832 }
833 else
834 {
835 san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
836 }
837
838 if (!san.isGood())
839 {
840 JLOG(journal_.warn()) << "Received bad node data";
841 return;
842 }
843 }
844 }
845 catch (std::exception const& e)
846 {
847 JLOG(journal_.error()) << "Received bad node data: " << e.what();
848 san.incInvalid();
849 return;
850 }
851
852 if (!map.isSynching())
853 {
854 if (packet.type() == protocol::liTX_NODE)
855 mHaveTransactions = true;
856 else
857 mHaveState = true;
858
859 if (mHaveTransactions && mHaveState)
860 {
861 complete_ = true;
862 done();
863 }
864 }
865}
866
870bool
871InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
872{
873 if (failed_ || mHaveState)
874 {
875 san.incDuplicate();
876 return true;
877 }
878
879 if (!mHaveHeader)
880 {
881 // LCOV_EXCL_START
882 UNREACHABLE("xrpl::InboundLedger::takeAsRootNode : no ledger header");
883 return false;
884 // LCOV_EXCL_STOP
885 }
886
887 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
888 san += mLedger->stateMap().addRootNode(SHAMapHash{mLedger->header().accountHash}, data, &filter);
889 return san.isGood();
890}
891
895bool
896InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
897{
898 if (failed_ || mHaveTransactions)
899 {
900 san.incDuplicate();
901 return true;
902 }
903
904 if (!mHaveHeader)
905 {
906 // LCOV_EXCL_START
907 UNREACHABLE("xrpl::InboundLedger::takeTxRootNode : no ledger header");
908 return false;
909 // LCOV_EXCL_STOP
910 }
911
912 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
913 san += mLedger->txMap().addRootNode(SHAMapHash{mLedger->header().txHash}, data, &filter);
914 return san.isGood();
915}
916
918InboundLedger::getNeededHashes()
919{
921
922 if (!mHaveHeader)
923 {
924 ret.push_back(std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
925 return ret;
926 }
927
928 if (!mHaveState)
929 {
930 AccountStateSF filter(mLedger->stateMap().family().db(), app_.getLedgerMaster());
931 for (auto const& h : neededStateHashes(4, &filter))
932 {
933 ret.push_back(std::make_pair(protocol::TMGetObjectByHash::otSTATE_NODE, h));
934 }
935 }
936
937 if (!mHaveTransactions)
938 {
939 TransactionStateSF filter(mLedger->txMap().family().db(), app_.getLedgerMaster());
940 for (auto const& h : neededTxHashes(4, &filter))
941 {
942 ret.push_back(std::make_pair(protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
943 }
944 }
945
946 return ret;
947}
948
952bool
953InboundLedger::gotData(std::weak_ptr<Peer> peer, std::shared_ptr<protocol::TMLedgerData> const& data)
954{
955 std::lock_guard sl(mReceivedDataLock);
956
957 if (isDone())
958 return false;
959
960 mReceivedData.emplace_back(peer, data);
961
962 if (mReceiveDispatched)
963 return false;
964
965 mReceiveDispatched = true;
966 return true;
967}
968
972// VFALCO NOTE, it is not necessary to pass the entire Peer,
973// we can get away with just a Resource::Consumer endpoint.
974//
975// TODO Change peer to Consumer
976//
977int
978InboundLedger::processData(std::shared_ptr<Peer> peer, protocol::TMLedgerData& packet)
979{
980 if (packet.type() == protocol::liBASE)
981 {
982 if (packet.nodes().empty())
983 {
984 JLOG(journal_.warn()) << peer->id() << ": empty header data";
985 peer->charge(Resource::feeMalformedRequest, "ledger_data empty header");
986 return -1;
987 }
988
989 SHAMapAddNode san;
990
991 ScopedLockType sl(mtx_);
992
993 try
994 {
995 if (!mHaveHeader)
996 {
997 if (!takeHeader(packet.nodes(0).nodedata()))
998 {
999 JLOG(journal_.warn()) << "Got invalid header data";
1000 peer->charge(Resource::feeMalformedRequest, "ledger_data invalid header");
1001 return -1;
1002 }
1003
1004 san.incUseful();
1005 }
1006
1007 if (!mHaveState && (packet.nodes().size() > 1) &&
1008 !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
1009 {
1010 JLOG(journal_.warn()) << "Included AS root invalid";
1011 }
1012
1013 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1014 !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
1015 {
1016 JLOG(journal_.warn()) << "Included TX root invalid";
1017 }
1018 }
1019 catch (std::exception const& ex)
1020 {
1021 JLOG(journal_.warn()) << "Included AS/TX root invalid: " << ex.what();
1022 using namespace std::string_literals;
1023 peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
1024 return -1;
1025 }
1026
1027 if (san.isUseful())
1028 progress_ = true;
1029
1030 mStats += san;
1031 return san.getGood();
1032 }
1033
1034 if ((packet.type() == protocol::liTX_NODE) || (packet.type() == protocol::liAS_NODE))
1035 {
1036 std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: " : "liAS_NODE: ";
1037
1038 if (packet.nodes().empty())
1039 {
1040 JLOG(journal_.info()) << peer->id() << ": response with no nodes";
1041 peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
1042 return -1;
1043 }
1044
1045 ScopedLockType sl(mtx_);
1046
1047 // Verify node IDs and data are complete
1048 for (auto const& node : packet.nodes())
1049 {
1050 if (!node.has_nodeid() || !node.has_nodedata())
1051 {
1052 JLOG(journal_.warn()) << "Got bad node";
1053 peer->charge(Resource::feeMalformedRequest, "ledger_data bad node");
1054 return -1;
1055 }
1056 }
1057
1058 SHAMapAddNode san;
1059 receiveNode(packet, san);
1060
1061 JLOG(journal_.debug()) << "Ledger " << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
1062 << " node stats: " << san.get();
1063
1064 if (san.isUseful())
1065 progress_ = true;
1066
1067 mStats += san;
1068 return san.getGood();
1069 }
1070
1071 return -1;
1072}
1073
1074namespace detail {
1075// Track the amount of useful data that each peer returns
1077{
1078 // Map from peer to amount of useful the peer returned
1080 // The largest amount of useful data that any peer returned
1081 int maxCount = 0;
1082
1083 // Update the data count for a peer
1084 void
1085 update(std::shared_ptr<Peer>&& peer, int dataCount)
1086 {
1087 if (dataCount <= 0)
1088 return;
1089 maxCount = std::max(maxCount, dataCount);
1090 auto i = counts.find(peer);
1091 if (i == counts.end())
1092 {
1093 counts.emplace(std::move(peer), dataCount);
1094 return;
1095 }
1096 i->second = std::max(i->second, dataCount);
1097 }
1098
1099 // Prune all the peers that didn't return enough data.
1100 void
1102 {
1103 // Remove all the peers that didn't return at least half as much data as
1104 // the best peer
1105 auto const thresh = maxCount / 2;
1106 auto i = counts.begin();
1107 while (i != counts.end())
1108 {
1109 if (i->second < thresh)
1110 i = counts.erase(i);
1111 else
1112 ++i;
1113 }
1114 }
1115
1116 // call F with the `peer` parameter with a random sample of at most n values
1117 // of the counts vector.
1118 template <class F>
1119 void
1121 {
1122 if (counts.empty())
1123 return;
1124
1125 auto outFunc = [&f](auto&& v) { f(v.first); };
1127#if _MSC_VER
1129 s.reserve(n);
1130 std::sample(counts.begin(), counts.end(), std::back_inserter(s), n, rng);
1131 for (auto& v : s)
1132 {
1133 outFunc(v);
1134 }
1135#else
1136 std::sample(counts.begin(), counts.end(), boost::make_function_output_iterator(outFunc), n, rng);
1137#endif
1138 }
1139};
1140} // namespace detail
1141
1145void
1146InboundLedger::runData()
1147{
1148 // Maximum number of peers to request data from
1149 constexpr std::size_t maxUsefulPeers = 6;
1150
1151 decltype(mReceivedData) data;
1152
1153 // Reserve some memory so the first couple iterations don't reallocate
1154 data.reserve(8);
1155
1156 detail::PeerDataCounts dataCounts;
1157
1158 for (;;)
1159 {
1160 data.clear();
1161
1162 {
1163 std::lock_guard sl(mReceivedDataLock);
1164
1165 if (mReceivedData.empty())
1166 {
1167 mReceiveDispatched = false;
1168 break;
1169 }
1170
1171 data.swap(mReceivedData);
1172 }
1173
1174 for (auto& entry : data)
1175 {
1176 if (auto peer = entry.first.lock())
1177 {
1178 int count = processData(peer, *(entry.second));
1179 dataCounts.update(std::move(peer), count);
1180 }
1181 }
1182 }
1183
1184 // Select a random sample of the peers that gives us the most nodes that are
1185 // useful
1186 dataCounts.prune();
1187 dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) { trigger(peer, TriggerReason::reply); });
1188}
1189
1191InboundLedger::getJson(int)
1192{
1194
1195 ScopedLockType sl(mtx_);
1196
1197 ret[jss::hash] = to_string(hash_);
1198
1199 if (complete_)
1200 ret[jss::complete] = true;
1201
1202 if (failed_)
1203 ret[jss::failed] = true;
1204
1205 if (!complete_ && !failed_)
1206 ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
1207
1208 ret[jss::have_header] = mHaveHeader;
1209
1210 if (mHaveHeader)
1211 {
1212 ret[jss::have_state] = mHaveState;
1213 ret[jss::have_transactions] = mHaveTransactions;
1214 }
1215
1216 ret[jss::timeouts] = timeouts_;
1217
1218 if (mHaveHeader && !mHaveState)
1219 {
1221 for (auto const& h : neededStateHashes(16, nullptr))
1222 {
1223 hv.append(to_string(h));
1224 }
1225 ret[jss::needed_state_hashes] = hv;
1226 }
1227
1228 if (mHaveHeader && !mHaveTransactions)
1229 {
1231 for (auto const& h : neededTxHashes(16, nullptr))
1232 {
1233 hv.append(to_string(h));
1234 }
1235 ret[jss::needed_transaction_hashes] = hv;
1236 }
1237
1238 return ret;
1239}
1240
1241} // namespace xrpl
T addressof(T... args)
T back_inserter(T... args)
T begin(T... args)
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
ValueType type() const
Stream fatal() const
Definition Journal.h:324
Stream debug() const
Definition Journal.h:300
Stream info() const
Definition Journal.h:306
Stream trace() const
Severity stream access functions.
Definition Journal.h:294
Stream warn() const
Definition Journal.h:312
virtual Config & config()=0
virtual NodeStore::Database & db()=0
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
void tryDB(NodeStore::Database &srcDB)
std::size_t getPeerCount() const
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
SHAMapAddNode mStats
void addPeers()
Add more peers to the set, if possible.
std::shared_ptr< Ledger > mLedger
std::set< uint256 > mRecentNodes
void init(ScopedLockType &collectionLock)
void update(std::uint32_t seq)
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
std::vector< neededHash_t > getNeededHashes()
std::unique_ptr< PeerSet > mPeerSet
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:145
bool storeLedger(std::shared_ptr< Ledger const > ledger)
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the corresponding hash from peers.
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
Persistency layer for NodeObject.
Definition Database.h:31
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:202
std::string get() const
bool isUseful() const
bool isZero() const
Definition SHAMapHash.h:34
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:77
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
SHAMapHash getHash() const
Definition SHAMap.cpp:781
int addRaw(Blob const &vector)
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual Family & getNodeFamily()=0
An immutable linear range of bytes.
Definition Slice.h:26
This class is an "active" object.
std::recursive_mutex mtx_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
beast::Journal journal_
iterator begin()
Definition base_uint.h:112
static constexpr std::size_t size()
Definition base_uint.h:494
T count_if(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T is_same_v
T make_pair(T... args)
T max(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Definition Indexes.cpp:194
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
auto constexpr ledgerAcquireTimeout
@ ledgerBecomeAggressiveThreshold
@ peerCountStart
@ ledgerTimeoutRetriesMax
@ reqNodesReply
@ missingNodesFind
Number root(Number f, unsigned d)
Definition Number.cpp:938
@ hotLEDGER
Definition NodeObject.h:14
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
@ jtLEDGER_DATA
Definition Job.h:45
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:213
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T sample(T... args)
T stable_partition(T... args)
T str(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void update(std::shared_ptr< Peer > &&peer, int dataCount)
void sampleN(std::size_t n, F &&f)
T to_string(T... args)
T unlock(T... args)
T what(T... args)