rippled
Loading...
Searching...
No Matches
InboundLedger.cpp
1#include <xrpld/app/ledger/AccountStateSF.h>
2#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/InboundLedgers.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionStateSF.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/core/JobQueue.h>
8#include <xrpld/overlay/Overlay.h>
9
10#include <xrpl/basics/Log.h>
11#include <xrpl/protocol/HashPrefix.h>
12#include <xrpl/protocol/jss.h>
13#include <xrpl/resource/Fees.h>
14#include <xrpl/shamap/SHAMapNodeID.h>
15
16#include <boost/iterator/function_output_iterator.hpp>
17
18#include <algorithm>
19#include <random>
20
21namespace ripple {
22
23using namespace std::chrono_literals;
24
25enum {
26 // Number of peers to start with
28
29 // Number of peers to add on a timeout
30 ,
31 peerCountAdd = 3
32
33 // how many timeouts before we give up
34 ,
36
37 // how many timeouts before we get aggressive
38 ,
40
41 // Number of nodes to find initially
42 ,
44
45 // Number of nodes to request for a reply
46 ,
47 reqNodesReply = 128
48
49 // Number of nodes to request blindly
50 ,
51 reqNodes = 12
52};
53
54// millisecond for each ledger timeout
55auto constexpr ledgerAcquireTimeout = 3000ms;
56
58 Application& app,
59 uint256 const& hash,
60 std::uint32_t seq,
61 Reason reason,
62 clock_type& clock,
65 app,
66 hash,
68 {jtLEDGER_DATA, "InboundLedger", 5},
69 app.journal("InboundLedger"))
70 , m_clock(clock)
71 , mHaveHeader(false)
72 , mHaveState(false)
73 , mHaveTransactions(false)
74 , mSignaled(false)
75 , mByHash(true)
76 , mSeq(seq)
77 , mReason(reason)
78 , mReceiveDispatched(false)
79 , mPeerSet(std::move(peerSet))
80{
81 JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
82 touch();
83}
84
85void
87{
89 collectionLock.unlock();
90
92 if (failed_)
93 return;
94
95 if (!complete_)
96 {
97 addPeers();
98 queueJob(sl);
99 return;
100 }
101
102 JLOG(journal_.debug()) << "Acquiring ledger we already have in "
103 << " local store. " << hash_;
104 XRPL_ASSERT(
105 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES ||
106 mLedger->read(keylet::fees()),
107 "ripple::InboundLedger::init : valid ledger fees");
108 mLedger->setImmutable();
109
111 return;
112
114
115 // Check if this could be a newer fully-validated ledger
118}
119
122{
123 auto const& peerIds = mPeerSet->getPeerIds();
124 return std::count_if(peerIds.begin(), peerIds.end(), [this](auto id) {
125 return (app_.overlay().findPeerByShortID(id) != nullptr);
126 });
127}
128
129void
131{
133
134 // If we didn't know the sequence number, but now do, save it
135 if ((seq != 0) && (mSeq == 0))
136 mSeq = seq;
137
138 // Prevent this from being swept
139 touch();
140}
141
142bool
144{
146 if (!isDone())
147 {
148 if (mLedger)
149 tryDB(mLedger->stateMap().family().db());
150 else
152 if (failed_ || complete_)
153 {
154 done();
155 return true;
156 }
157 }
158 return false;
159}
160
162{
163 // Save any received AS data not processed. It could be useful
164 // for populating a different ledger
165 for (auto& entry : mReceivedData)
166 {
167 if (entry.second->type() == protocol::liAS_NODE)
168 app_.getInboundLedgers().gotStaleData(entry.second);
169 }
170 if (!isDone())
171 {
172 JLOG(journal_.debug())
173 << "Acquire " << hash_ << " abort "
174 << ((timeouts_ == 0) ? std::string()
175 : (std::string("timeouts:") +
177 << mStats.get();
178 }
179}
180
183 uint256 const& root,
184 SHAMap& map,
185 int max,
186 SHAMapSyncFilter* filter)
187{
189
190 if (!root.isZero())
191 {
192 if (map.getHash().isZero())
193 ret.push_back(root);
194 else
195 {
196 auto mn = map.getMissingNodes(max, filter);
197 ret.reserve(mn.size());
198 for (auto const& n : mn)
199 ret.push_back(n.second);
200 }
201 }
202
203 return ret;
204}
205
208{
209 return neededHashes(
210 mLedger->header().txHash, mLedger->txMap(), max, filter);
211}
212
215{
216 return neededHashes(
217 mLedger->header().accountHash, mLedger->stateMap(), max, filter);
218}
219
220// See how much of the ledger data is stored locally
221// Data found in a fetch pack will be stored
222void
224{
225 if (!mHaveHeader)
226 {
227 auto makeLedger = [&, this](Blob const& data) {
228 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
231 app_.config(),
233 if (mLedger->header().hash != hash_ ||
234 (mSeq != 0 && mSeq != mLedger->header().seq))
235 {
236 // We know for a fact the ledger can never be acquired
237 JLOG(journal_.warn())
238 << "hash " << hash_ << " seq " << std::to_string(mSeq)
239 << " cannot be a ledger";
240 mLedger.reset();
241 failed_ = true;
242 }
243 };
244
245 // Try to fetch the ledger header from the DB
246 if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
247 {
248 JLOG(journal_.trace()) << "Ledger header found in local store";
249
250 makeLedger(nodeObject->getData());
251 if (failed_)
252 return;
253
254 // Store the ledger header if the source and destination differ
255 auto& dstDB{mLedger->stateMap().family().db()};
256 if (std::addressof(dstDB) != std::addressof(srcDB))
257 {
258 Blob blob{nodeObject->getData()};
259 dstDB.store(
260 hotLEDGER, std::move(blob), hash_, mLedger->header().seq);
261 }
262 }
263 else
264 {
265 // Try to fetch the ledger header from a fetch pack
266 auto data = app_.getLedgerMaster().getFetchPack(hash_);
267 if (!data)
268 return;
269
270 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
271
272 makeLedger(*data);
273 if (failed_)
274 return;
275
276 // Store the ledger header in the ledger's database
277 mLedger->stateMap().family().db().store(
278 hotLEDGER, std::move(*data), hash_, mLedger->header().seq);
279 }
280
281 if (mSeq == 0)
282 mSeq = mLedger->header().seq;
283 mLedger->stateMap().setLedgerSeq(mSeq);
284 mLedger->txMap().setLedgerSeq(mSeq);
285 mHaveHeader = true;
286 }
287
289 {
290 if (mLedger->header().txHash.isZero())
291 {
292 JLOG(journal_.trace()) << "No TXNs to fetch";
293 mHaveTransactions = true;
294 }
295 else
296 {
297 TransactionStateSF filter(
298 mLedger->txMap().family().db(), app_.getLedgerMaster());
299 if (mLedger->txMap().fetchRoot(
300 SHAMapHash{mLedger->header().txHash}, &filter))
301 {
302 if (neededTxHashes(1, &filter).empty())
303 {
304 JLOG(journal_.trace()) << "Had full txn map locally";
305 mHaveTransactions = true;
306 }
307 }
308 }
309 }
310
311 if (!mHaveState)
312 {
313 if (mLedger->header().accountHash.isZero())
314 {
315 JLOG(journal_.fatal())
316 << "We are acquiring a ledger with a zero account hash";
317 failed_ = true;
318 return;
319 }
320 AccountStateSF filter(
321 mLedger->stateMap().family().db(), app_.getLedgerMaster());
322 if (mLedger->stateMap().fetchRoot(
323 SHAMapHash{mLedger->header().accountHash}, &filter))
324 {
325 if (neededStateHashes(1, &filter).empty())
326 {
327 JLOG(journal_.trace()) << "Had full AS map locally";
328 mHaveState = true;
329 }
330 }
331 }
332
334 {
335 JLOG(journal_.debug()) << "Had everything locally";
336 complete_ = true;
337 XRPL_ASSERT(
338 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES ||
339 mLedger->read(keylet::fees()),
340 "ripple::InboundLedger::tryDB : valid ledger fees");
341 mLedger->setImmutable();
342 }
343}
344
347void
349{
350 mRecentNodes.clear();
351
352 if (isDone())
353 {
354 JLOG(journal_.info()) << "Already done " << hash_;
355 return;
356 }
357
359 {
360 if (mSeq != 0)
361 {
362 JLOG(journal_.warn())
363 << timeouts_ << " timeouts for ledger " << mSeq;
364 }
365 else
366 {
367 JLOG(journal_.warn())
368 << timeouts_ << " timeouts for ledger " << hash_;
369 }
370 failed_ = true;
371 done();
372 return;
373 }
374
375 if (!wasProgress)
376 {
377 checkLocal();
378
379 mByHash = true;
380
382 JLOG(journal_.debug())
383 << "No progress(" << pc << ") for ledger " << hash_;
384
385 // addPeers triggers if the reason is not HISTORY
386 // So if the reason IS HISTORY, need to trigger after we add
387 // otherwise, we need to trigger before we add
388 // so each peer gets triggered once
391 addPeers();
394 }
395}
396
398void
400{
401 mPeerSet->addPeers(
403 [this](auto peer) { return peer->hasLedger(hash_, mSeq); },
404 [this](auto peer) {
405 // For historical nodes, do not trigger too soon
406 // since a fetch pack is probably coming
409 });
410}
411
417
418void
420{
421 if (mSignaled)
422 return;
423
424 mSignaled = true;
425 touch();
426
427 JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
428 << ((timeouts_ == 0)
429 ? std::string()
430 : (std::string("timeouts:") +
432 << mStats.get();
433
434 XRPL_ASSERT(
436 "ripple::InboundLedger::done : complete or failed");
437
438 if (complete_ && !failed_ && mLedger)
439 {
440 XRPL_ASSERT(
441 mLedger->header().seq < XRP_LEDGER_EARLIEST_FEES ||
442 mLedger->read(keylet::fees()),
443 "ripple::InboundLedger::done : valid ledger fees");
444 mLedger->setImmutable();
445 switch (mReason)
446 {
447 case Reason::HISTORY:
449 break;
450 default:
452 break;
453 }
454 }
455
456 // We hold the PeerSet lock, so must dispatch
458 jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
459 if (self->complete_ && !self->failed_)
460 {
461 self->app_.getLedgerMaster().checkAccept(self->getLedger());
462 self->app_.getLedgerMaster().tryAdvance();
463 }
464 else
465 self->app_.getInboundLedgers().logFailure(
466 self->hash_, self->mSeq);
467 });
468}
469
472void
474{
476
477 if (isDone())
478 {
479 JLOG(journal_.debug())
480 << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
481 << (failed_ ? " failed" : "");
482 return;
483 }
484
485 if (auto stream = journal_.debug())
486 {
488 ss << "Trigger acquiring ledger " << hash_;
489 if (peer)
490 ss << " from " << peer;
491
492 if (complete_ || failed_)
493 ss << " complete=" << complete_ << " failed=" << failed_;
494 else
495 ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
496 << " as=" << mHaveState;
497 stream << ss.str();
498 }
499
500 if (!mHaveHeader)
501 {
503 if (failed_)
504 {
505 JLOG(journal_.warn()) << " failed local for " << hash_;
506 return;
507 }
508 }
509
510 protocol::TMGetLedger tmGL;
511 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
512
513 if (timeouts_ != 0)
514 {
515 // Be more aggressive if we've timed out at least once
516 tmGL.set_querytype(protocol::qtINDIRECT);
517
518 if (!progress_ && !failed_ && mByHash &&
520 {
521 auto need = getNeededHashes();
522
523 if (!need.empty())
524 {
525 protocol::TMGetObjectByHash tmBH;
526 bool typeSet = false;
527 tmBH.set_query(true);
528 tmBH.set_ledgerhash(hash_.begin(), hash_.size());
529 for (auto const& p : need)
530 {
531 JLOG(journal_.debug()) << "Want: " << p.second;
532
533 if (!typeSet)
534 {
535 tmBH.set_type(p.first);
536 typeSet = true;
537 }
538
539 if (p.first == tmBH.type())
540 {
541 protocol::TMIndexedObject* io = tmBH.add_objects();
542 io->set_hash(p.second.begin(), p.second.size());
543 if (mSeq != 0)
544 io->set_ledgerseq(mSeq);
545 }
546 }
547
548 auto packet =
549 std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
550 auto const& peerIds = mPeerSet->getPeerIds();
552 peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
553 if (auto p = app_.overlay().findPeerByShortID(id))
554 {
555 mByHash = false;
556 p->send(packet);
557 }
558 });
559 }
560 else
561 {
562 JLOG(journal_.info())
563 << "getNeededHashes says acquire is complete";
564 mHaveHeader = true;
565 mHaveTransactions = true;
566 mHaveState = true;
567 complete_ = true;
568 }
569 }
570 }
571
572 // We can't do much without the header data because we don't know the
573 // state or transaction root hashes.
574 if (!mHaveHeader && !failed_)
575 {
576 tmGL.set_itype(protocol::liBASE);
577 if (mSeq != 0)
578 tmGL.set_ledgerseq(mSeq);
579 JLOG(journal_.trace()) << "Sending header request to "
580 << (peer ? "selected peer" : "all peers");
581 mPeerSet->sendRequest(tmGL, peer);
582 return;
583 }
584
585 if (mLedger)
586 tmGL.set_ledgerseq(mLedger->header().seq);
587
588 if (reason != TriggerReason::reply)
589 {
590 // If we're querying blind, don't query deep
591 tmGL.set_querydepth(0);
592 }
593 else if (peer && peer->isHighLatency())
594 {
595 // If the peer has high latency, query extra deep
596 tmGL.set_querydepth(2);
597 }
598 else
599 tmGL.set_querydepth(1);
600
601 // Get the state data first because it's the most likely to be useful
602 // if we wind up abandoning this fetch.
603 if (mHaveHeader && !mHaveState && !failed_)
604 {
605 XRPL_ASSERT(
606 mLedger,
607 "ripple::InboundLedger::trigger : non-null ledger to read state "
608 "from");
609
610 if (!mLedger->stateMap().isValid())
611 {
612 failed_ = true;
613 }
614 else if (mLedger->stateMap().getHash().isZero())
615 {
616 // we need the root node
617 tmGL.set_itype(protocol::liAS_NODE);
618 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
619 JLOG(journal_.trace()) << "Sending AS root request to "
620 << (peer ? "selected peer" : "all peers");
621 mPeerSet->sendRequest(tmGL, peer);
622 return;
623 }
624 else
625 {
626 AccountStateSF filter(
627 mLedger->stateMap().family().db(), app_.getLedgerMaster());
628
629 // Release the lock while we process the large state map
630 sl.unlock();
631 auto nodes =
632 mLedger->stateMap().getMissingNodes(missingNodesFind, &filter);
633 sl.lock();
634
635 // Make sure nothing happened while we released the lock
636 if (!failed_ && !complete_ && !mHaveState)
637 {
638 if (nodes.empty())
639 {
640 if (!mLedger->stateMap().isValid())
641 failed_ = true;
642 else
643 {
644 mHaveState = true;
645
646 if (mHaveTransactions)
647 complete_ = true;
648 }
649 }
650 else
651 {
652 filterNodes(nodes, reason);
653
654 if (!nodes.empty())
655 {
656 tmGL.set_itype(protocol::liAS_NODE);
657 for (auto const& id : nodes)
658 {
659 *(tmGL.add_nodeids()) = id.first.getRawString();
660 }
661
662 JLOG(journal_.trace())
663 << "Sending AS node request (" << nodes.size()
664 << ") to "
665 << (peer ? "selected peer" : "all peers");
666 mPeerSet->sendRequest(tmGL, peer);
667 return;
668 }
669 else
670 {
671 JLOG(journal_.trace()) << "All AS nodes filtered";
672 }
673 }
674 }
675 }
676 }
677
678 if (mHaveHeader && !mHaveTransactions && !failed_)
679 {
680 XRPL_ASSERT(
681 mLedger,
682 "ripple::InboundLedger::trigger : non-null ledger to read "
683 "transactions from");
684
685 if (!mLedger->txMap().isValid())
686 {
687 failed_ = true;
688 }
689 else if (mLedger->txMap().getHash().isZero())
690 {
691 // we need the root node
692 tmGL.set_itype(protocol::liTX_NODE);
693 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
694 JLOG(journal_.trace()) << "Sending TX root request to "
695 << (peer ? "selected peer" : "all peers");
696 mPeerSet->sendRequest(tmGL, peer);
697 return;
698 }
699 else
700 {
701 TransactionStateSF filter(
702 mLedger->txMap().family().db(), app_.getLedgerMaster());
703
704 auto nodes =
705 mLedger->txMap().getMissingNodes(missingNodesFind, &filter);
706
707 if (nodes.empty())
708 {
709 if (!mLedger->txMap().isValid())
710 failed_ = true;
711 else
712 {
713 mHaveTransactions = true;
714
715 if (mHaveState)
716 complete_ = true;
717 }
718 }
719 else
720 {
721 filterNodes(nodes, reason);
722
723 if (!nodes.empty())
724 {
725 tmGL.set_itype(protocol::liTX_NODE);
726 for (auto const& n : nodes)
727 {
728 *(tmGL.add_nodeids()) = n.first.getRawString();
729 }
730 JLOG(journal_.trace())
731 << "Sending TX node request (" << nodes.size()
732 << ") to " << (peer ? "selected peer" : "all peers");
733 mPeerSet->sendRequest(tmGL, peer);
734 return;
735 }
736 else
737 {
738 JLOG(journal_.trace()) << "All TX nodes filtered";
739 }
740 }
741 }
742 }
743
744 if (complete_ || failed_)
745 {
746 JLOG(journal_.debug())
747 << "Done:" << (complete_ ? " complete" : "")
748 << (failed_ ? " failed " : " ") << mLedger->header().seq;
749 sl.unlock();
750 done();
751 }
752}
753
754void
755InboundLedger::filterNodes(
757 TriggerReason reason)
758{
759 // Sort nodes so that the ones we haven't recently
760 // requested come before the ones we have.
761 auto dup = std::stable_partition(
762 nodes.begin(), nodes.end(), [this](auto const& item) {
763 return mRecentNodes.count(item.second) == 0;
764 });
765
766 // If everything is a duplicate we don't want to send
767 // any query at all except on a timeout where we need
768 // to query everyone:
769 if (dup == nodes.begin())
770 {
771 JLOG(journal_.trace()) << "filterNodes: all duplicates";
772
773 if (reason != TriggerReason::timeout)
774 {
775 nodes.clear();
776 return;
777 }
778 }
779 else
780 {
781 JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
782
783 nodes.erase(dup, nodes.end());
784 }
785
786 std::size_t const limit =
787 (reason == TriggerReason::reply) ? reqNodesReply : reqNodes;
788
789 if (nodes.size() > limit)
790 nodes.resize(limit);
791
792 for (auto const& n : nodes)
793 mRecentNodes.insert(n.second);
794}
795
799// data must not have hash prefix
800bool
801InboundLedger::takeHeader(std::string const& data)
802{
803 // Return value: true=normal, false=bad data
804 JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
805
806 if (complete_ || failed_ || mHaveHeader)
807 return true;
808
809 auto* f = &app_.getNodeFamily();
810 mLedger = std::make_shared<Ledger>(
811 deserializeHeader(makeSlice(data)), app_.config(), *f);
812 if (mLedger->header().hash != hash_ ||
813 (mSeq != 0 && mSeq != mLedger->header().seq))
814 {
815 JLOG(journal_.warn())
816 << "Acquire hash mismatch: " << mLedger->header().hash
817 << "!=" << hash_;
818 mLedger.reset();
819 return false;
820 }
821 if (mSeq == 0)
822 mSeq = mLedger->header().seq;
823 mLedger->stateMap().setLedgerSeq(mSeq);
824 mLedger->txMap().setLedgerSeq(mSeq);
825 mHaveHeader = true;
826
827 Serializer s(data.size() + 4);
828 s.add32(HashPrefix::ledgerMaster);
829 s.addRaw(data.data(), data.size());
830 f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
831
832 if (mLedger->header().txHash.isZero())
833 mHaveTransactions = true;
834
835 if (mLedger->header().accountHash.isZero())
836 mHaveState = true;
837
838 mLedger->txMap().setSynching();
839 mLedger->stateMap().setSynching();
840
841 return true;
842}
843
847void
848InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
849{
850 if (!mHaveHeader)
851 {
852 JLOG(journal_.warn()) << "Missing ledger header";
853 san.incInvalid();
854 return;
855 }
856 if (packet.type() == protocol::liTX_NODE)
857 {
858 if (mHaveTransactions || failed_)
859 {
860 san.incDuplicate();
861 return;
862 }
863 }
864 else if (mHaveState || failed_)
865 {
866 san.incDuplicate();
867 return;
868 }
869
870 auto [map, rootHash, filter] = [&]()
872 if (packet.type() == protocol::liTX_NODE)
873 return {
874 mLedger->txMap(),
875 SHAMapHash{mLedger->header().txHash},
877 mLedger->txMap().family().db(), app_.getLedgerMaster())};
878 return {
879 mLedger->stateMap(),
880 SHAMapHash{mLedger->header().accountHash},
882 mLedger->stateMap().family().db(), app_.getLedgerMaster())};
883 }();
884
885 try
886 {
887 auto const f = filter.get();
888
889 for (auto const& node : packet.nodes())
890 {
891 auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
892
893 if (!nodeID)
894 throw std::runtime_error("data does not properly deserialize");
895
896 if (nodeID->isRoot())
897 {
898 san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
899 }
900 else
901 {
902 san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
903 }
904
905 if (!san.isGood())
906 {
907 JLOG(journal_.warn()) << "Received bad node data";
908 return;
909 }
910 }
911 }
912 catch (std::exception const& e)
913 {
914 JLOG(journal_.error()) << "Received bad node data: " << e.what();
915 san.incInvalid();
916 return;
917 }
918
919 if (!map.isSynching())
920 {
921 if (packet.type() == protocol::liTX_NODE)
922 mHaveTransactions = true;
923 else
924 mHaveState = true;
925
926 if (mHaveTransactions && mHaveState)
927 {
928 complete_ = true;
929 done();
930 }
931 }
932}
933
937bool
938InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
939{
940 if (failed_ || mHaveState)
941 {
942 san.incDuplicate();
943 return true;
944 }
945
946 if (!mHaveHeader)
947 {
948 // LCOV_EXCL_START
949 UNREACHABLE("ripple::InboundLedger::takeAsRootNode : no ledger header");
950 return false;
951 // LCOV_EXCL_STOP
952 }
953
954 AccountStateSF filter(
955 mLedger->stateMap().family().db(), app_.getLedgerMaster());
956 san += mLedger->stateMap().addRootNode(
957 SHAMapHash{mLedger->header().accountHash}, data, &filter);
958 return san.isGood();
959}
960
964bool
965InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
966{
967 if (failed_ || mHaveTransactions)
968 {
969 san.incDuplicate();
970 return true;
971 }
972
973 if (!mHaveHeader)
974 {
975 // LCOV_EXCL_START
976 UNREACHABLE("ripple::InboundLedger::takeTxRootNode : no ledger header");
977 return false;
978 // LCOV_EXCL_STOP
979 }
980
981 TransactionStateSF filter(
982 mLedger->txMap().family().db(), app_.getLedgerMaster());
983 san += mLedger->txMap().addRootNode(
984 SHAMapHash{mLedger->header().txHash}, data, &filter);
985 return san.isGood();
986}
987
989InboundLedger::getNeededHashes()
990{
992
993 if (!mHaveHeader)
994 {
995 ret.push_back(
996 std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
997 return ret;
998 }
999
1000 if (!mHaveState)
1001 {
1002 AccountStateSF filter(
1003 mLedger->stateMap().family().db(), app_.getLedgerMaster());
1004 for (auto const& h : neededStateHashes(4, &filter))
1005 {
1006 ret.push_back(
1007 std::make_pair(protocol::TMGetObjectByHash::otSTATE_NODE, h));
1008 }
1009 }
1010
1011 if (!mHaveTransactions)
1012 {
1013 TransactionStateSF filter(
1014 mLedger->txMap().family().db(), app_.getLedgerMaster());
1015 for (auto const& h : neededTxHashes(4, &filter))
1016 {
1018 protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
1019 }
1020 }
1021
1022 return ret;
1023}
1024
1028bool
1029InboundLedger::gotData(
1032{
1033 std::lock_guard sl(mReceivedDataLock);
1034
1035 if (isDone())
1036 return false;
1037
1038 mReceivedData.emplace_back(peer, data);
1039
1040 if (mReceiveDispatched)
1041 return false;
1042
1043 mReceiveDispatched = true;
1044 return true;
1045}
1046
1050// VFALCO NOTE, it is not necessary to pass the entire Peer,
1051// we can get away with just a Resource::Consumer endpoint.
1052//
1053// TODO Change peer to Consumer
1054//
1055int
1056InboundLedger::processData(
1058 protocol::TMLedgerData& packet)
1059{
1060 if (packet.type() == protocol::liBASE)
1061 {
1062 if (packet.nodes().empty())
1063 {
1064 JLOG(journal_.warn()) << peer->id() << ": empty header data";
1065 peer->charge(
1066 Resource::feeMalformedRequest, "ledger_data empty header");
1067 return -1;
1068 }
1069
1070 SHAMapAddNode san;
1071
1072 ScopedLockType sl(mtx_);
1073
1074 try
1075 {
1076 if (!mHaveHeader)
1077 {
1078 if (!takeHeader(packet.nodes(0).nodedata()))
1079 {
1080 JLOG(journal_.warn()) << "Got invalid header data";
1081 peer->charge(
1082 Resource::feeMalformedRequest,
1083 "ledger_data invalid header");
1084 return -1;
1085 }
1086
1087 san.incUseful();
1088 }
1089
1090 if (!mHaveState && (packet.nodes().size() > 1) &&
1091 !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
1092 {
1093 JLOG(journal_.warn()) << "Included AS root invalid";
1094 }
1095
1096 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1097 !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
1098 {
1099 JLOG(journal_.warn()) << "Included TX root invalid";
1100 }
1101 }
1102 catch (std::exception const& ex)
1103 {
1104 JLOG(journal_.warn())
1105 << "Included AS/TX root invalid: " << ex.what();
1106 using namespace std::string_literals;
1107 peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
1108 return -1;
1109 }
1110
1111 if (san.isUseful())
1112 progress_ = true;
1113
1114 mStats += san;
1115 return san.getGood();
1116 }
1117
1118 if ((packet.type() == protocol::liTX_NODE) ||
1119 (packet.type() == protocol::liAS_NODE))
1120 {
1121 std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: "
1122 : "liAS_NODE: ";
1123
1124 if (packet.nodes().empty())
1125 {
1126 JLOG(journal_.info()) << peer->id() << ": response with no nodes";
1127 peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
1128 return -1;
1129 }
1130
1131 ScopedLockType sl(mtx_);
1132
1133 // Verify node IDs and data are complete
1134 for (auto const& node : packet.nodes())
1135 {
1136 if (!node.has_nodeid() || !node.has_nodedata())
1137 {
1138 JLOG(journal_.warn()) << "Got bad node";
1139 peer->charge(
1140 Resource::feeMalformedRequest, "ledger_data bad node");
1141 return -1;
1142 }
1143 }
1144
1145 SHAMapAddNode san;
1146 receiveNode(packet, san);
1147
1148 JLOG(journal_.debug())
1149 << "Ledger "
1150 << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
1151 << " node stats: " << san.get();
1152
1153 if (san.isUseful())
1154 progress_ = true;
1155
1156 mStats += san;
1157 return san.getGood();
1158 }
1159
1160 return -1;
1161}
1162
1163namespace detail {
1164// Track the amount of useful data that each peer returns
1166{
1167 // Map from peer to amount of useful the peer returned
1169 // The largest amount of useful data that any peer returned
1170 int maxCount = 0;
1171
1172 // Update the data count for a peer
1173 void
1174 update(std::shared_ptr<Peer>&& peer, int dataCount)
1175 {
1176 if (dataCount <= 0)
1177 return;
1178 maxCount = std::max(maxCount, dataCount);
1179 auto i = counts.find(peer);
1180 if (i == counts.end())
1181 {
1182 counts.emplace(std::move(peer), dataCount);
1183 return;
1184 }
1185 i->second = std::max(i->second, dataCount);
1186 }
1187
1188 // Prune all the peers that didn't return enough data.
1189 void
1191 {
1192 // Remove all the peers that didn't return at least half as much data as
1193 // the best peer
1194 auto const thresh = maxCount / 2;
1195 auto i = counts.begin();
1196 while (i != counts.end())
1197 {
1198 if (i->second < thresh)
1199 i = counts.erase(i);
1200 else
1201 ++i;
1202 }
1203 }
1204
1205 // call F with the `peer` parameter with a random sample of at most n values
1206 // of the counts vector.
1207 template <class F>
1208 void
1210 {
1211 if (counts.empty())
1212 return;
1213
1214 auto outFunc = [&f](auto&& v) { f(v.first); };
1216#if _MSC_VER
1218 s.reserve(n);
1220 counts.begin(), counts.end(), std::back_inserter(s), n, rng);
1221 for (auto& v : s)
1222 {
1223 outFunc(v);
1224 }
1225#else
1227 counts.begin(),
1228 counts.end(),
1229 boost::make_function_output_iterator(outFunc),
1230 n,
1231 rng);
1232#endif
1233 }
1234};
1235} // namespace detail
1236
1240void
1241InboundLedger::runData()
1242{
1243 // Maximum number of peers to request data from
1244 constexpr std::size_t maxUsefulPeers = 6;
1245
1246 decltype(mReceivedData) data;
1247
1248 // Reserve some memory so the first couple iterations don't reallocate
1249 data.reserve(8);
1250
1251 detail::PeerDataCounts dataCounts;
1252
1253 for (;;)
1254 {
1255 data.clear();
1256
1257 {
1258 std::lock_guard sl(mReceivedDataLock);
1259
1260 if (mReceivedData.empty())
1261 {
1262 mReceiveDispatched = false;
1263 break;
1264 }
1265
1266 data.swap(mReceivedData);
1267 }
1268
1269 for (auto& entry : data)
1270 {
1271 if (auto peer = entry.first.lock())
1272 {
1273 int count = processData(peer, *(entry.second));
1274 dataCounts.update(std::move(peer), count);
1275 }
1276 }
1277 }
1278
1279 // Select a random sample of the peers that gives us the most nodes that are
1280 // useful
1281 dataCounts.prune();
1282 dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
1283 trigger(peer, TriggerReason::reply);
1284 });
1285}
1286
1288InboundLedger::getJson(int)
1289{
1291
1292 ScopedLockType sl(mtx_);
1293
1294 ret[jss::hash] = to_string(hash_);
1295
1296 if (complete_)
1297 ret[jss::complete] = true;
1298
1299 if (failed_)
1300 ret[jss::failed] = true;
1301
1302 if (!complete_ && !failed_)
1303 ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
1304
1305 ret[jss::have_header] = mHaveHeader;
1306
1307 if (mHaveHeader)
1308 {
1309 ret[jss::have_state] = mHaveState;
1310 ret[jss::have_transactions] = mHaveTransactions;
1311 }
1312
1313 ret[jss::timeouts] = timeouts_;
1314
1315 if (mHaveHeader && !mHaveState)
1316 {
1318 for (auto const& h : neededStateHashes(16, nullptr))
1319 {
1320 hv.append(to_string(h));
1321 }
1322 ret[jss::needed_state_hashes] = hv;
1323 }
1324
1325 if (mHaveHeader && !mHaveTransactions)
1326 {
1328 for (auto const& h : neededTxHashes(16, nullptr))
1329 {
1330 hv.append(to_string(h));
1331 }
1332 ret[jss::needed_transaction_hashes] = hv;
1333 }
1334
1335 return ret;
1336}
1337
1338} // namespace ripple
T addressof(T... args)
T back_inserter(T... args)
T begin(T... args)
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
ValueType type() const
Stream fatal() const
Definition Journal.h:333
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual Config & config()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual Family & getNodeFamily()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual NodeStore::Database & db()=0
std::size_t getPeerCount() const
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
void init(ScopedLockType &collectionLock)
std::set< uint256 > mRecentNodes
void addPeers()
Add more peers to the set, if possible.
std::shared_ptr< Ledger > mLedger
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
void tryDB(NodeStore::Database &srcDB)
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
std::vector< neededHash_t > getNeededHashes()
void update(std::uint32_t seq)
std::unique_ptr< PeerSet > mPeerSet
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the coresponding hash from peers.
bool storeLedger(std::shared_ptr< Ledger const > ledger)
Persistency layer for NodeObject.
Definition Database.h:32
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:221
std::string get() const
bool isZero() const
Definition SHAMapHash.h:35
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:78
SHAMapHash getHash() const
Definition SHAMap.cpp:871
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
int addRaw(Blob const &vector)
An immutable linear range of bytes.
Definition Slice.h:27
This class is an "active" object.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
iterator begin()
Definition base_uint.h:117
static constexpr std::size_t size()
Definition base_uint.h:507
T count_if(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T is_same_v
T make_pair(T... args)
T max(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Definition Indexes.cpp:205
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
auto constexpr ledgerAcquireTimeout
@ hotLEDGER
Definition NodeObject.h:15
@ ledgerBecomeAggressiveThreshold
@ ledgerTimeoutRetriesMax
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:225
Number root(Number f, unsigned d)
Definition Number.cpp:645
@ jtLEDGER_DATA
Definition Job.h:47
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T sample(T... args)
T stable_partition(T... args)
T str(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void sampleN(std::size_t n, F &&f)
void update(std::shared_ptr< Peer > &&peer, int dataCount)
T to_string(T... args)
T unlock(T... args)
T what(T... args)