rippled
Loading...
Searching...
No Matches
InboundLedger.cpp
1#include <xrpld/app/ledger/AccountStateSF.h>
2#include <xrpld/app/ledger/InboundLedger.h>
3#include <xrpld/app/ledger/InboundLedgers.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionStateSF.h>
6#include <xrpld/app/main/Application.h>
7#include <xrpld/core/JobQueue.h>
8#include <xrpld/overlay/Overlay.h>
9
10#include <xrpl/basics/Log.h>
11#include <xrpl/protocol/HashPrefix.h>
12#include <xrpl/protocol/jss.h>
13#include <xrpl/resource/Fees.h>
14#include <xrpl/shamap/SHAMapNodeID.h>
15
16#include <boost/iterator/function_output_iterator.hpp>
17
18#include <algorithm>
19#include <random>
20
21namespace ripple {
22
23using namespace std::chrono_literals;
24
25enum {
26 // Number of peers to start with
28
29 // Number of peers to add on a timeout
30 ,
31 peerCountAdd = 3
32
33 // how many timeouts before we give up
34 ,
36
37 // how many timeouts before we get aggressive
38 ,
40
41 // Number of nodes to find initially
42 ,
44
45 // Number of nodes to request for a reply
46 ,
47 reqNodesReply = 128
48
49 // Number of nodes to request blindly
50 ,
51 reqNodes = 12
52};
53
54// millisecond for each ledger timeout
55auto constexpr ledgerAcquireTimeout = 3000ms;
56
58 Application& app,
59 uint256 const& hash,
60 std::uint32_t seq,
61 Reason reason,
62 clock_type& clock,
65 app,
66 hash,
68 {jtLEDGER_DATA, "InboundLedger", 5},
69 app.journal("InboundLedger"))
70 , m_clock(clock)
71 , mHaveHeader(false)
72 , mHaveState(false)
73 , mHaveTransactions(false)
74 , mSignaled(false)
75 , mByHash(true)
76 , mSeq(seq)
77 , mReason(reason)
78 , mReceiveDispatched(false)
79 , mPeerSet(std::move(peerSet))
80{
81 JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
82 touch();
83}
84
85void
87{
89 collectionLock.unlock();
90
92 if (failed_)
93 return;
94
95 if (!complete_)
96 {
97 addPeers();
98 queueJob(sl);
99 return;
100 }
101
102 JLOG(journal_.debug()) << "Acquiring ledger we already have in "
103 << " local store. " << hash_;
104 XRPL_ASSERT(
105 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
106 mLedger->read(keylet::fees()),
107 "ripple::InboundLedger::init : valid ledger fees");
108 mLedger->setImmutable();
109
111 return;
112
114
115 // Check if this could be a newer fully-validated ledger
118}
119
122{
123 auto const& peerIds = mPeerSet->getPeerIds();
124 return std::count_if(peerIds.begin(), peerIds.end(), [this](auto id) {
125 return (app_.overlay().findPeerByShortID(id) != nullptr);
126 });
127}
128
129void
131{
133
134 // If we didn't know the sequence number, but now do, save it
135 if ((seq != 0) && (mSeq == 0))
136 mSeq = seq;
137
138 // Prevent this from being swept
139 touch();
140}
141
142bool
144{
146 if (!isDone())
147 {
148 if (mLedger)
149 tryDB(mLedger->stateMap().family().db());
150 else
152 if (failed_ || complete_)
153 {
154 done();
155 return true;
156 }
157 }
158 return false;
159}
160
162{
163 // Save any received AS data not processed. It could be useful
164 // for populating a different ledger
165 for (auto& entry : mReceivedData)
166 {
167 if (entry.second->type() == protocol::liAS_NODE)
168 app_.getInboundLedgers().gotStaleData(entry.second);
169 }
170 if (!isDone())
171 {
172 JLOG(journal_.debug())
173 << "Acquire " << hash_ << " abort "
174 << ((timeouts_ == 0) ? std::string()
175 : (std::string("timeouts:") +
177 << mStats.get();
178 }
179}
180
183 uint256 const& root,
184 SHAMap& map,
185 int max,
186 SHAMapSyncFilter* filter)
187{
189
190 if (!root.isZero())
191 {
192 if (map.getHash().isZero())
193 ret.push_back(root);
194 else
195 {
196 auto mn = map.getMissingNodes(max, filter);
197 ret.reserve(mn.size());
198 for (auto const& n : mn)
199 ret.push_back(n.second);
200 }
201 }
202
203 return ret;
204}
205
208{
209 return neededHashes(mLedger->info().txHash, mLedger->txMap(), max, filter);
210}
211
214{
215 return neededHashes(
216 mLedger->info().accountHash, mLedger->stateMap(), max, filter);
217}
218
219// See how much of the ledger data is stored locally
220// Data found in a fetch pack will be stored
221void
223{
224 if (!mHaveHeader)
225 {
226 auto makeLedger = [&, this](Blob const& data) {
227 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
230 app_.config(),
232 if (mLedger->info().hash != hash_ ||
233 (mSeq != 0 && mSeq != mLedger->info().seq))
234 {
235 // We know for a fact the ledger can never be acquired
236 JLOG(journal_.warn())
237 << "hash " << hash_ << " seq " << std::to_string(mSeq)
238 << " cannot be a ledger";
239 mLedger.reset();
240 failed_ = true;
241 }
242 };
243
244 // Try to fetch the ledger header from the DB
245 if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
246 {
247 JLOG(journal_.trace()) << "Ledger header found in local store";
248
249 makeLedger(nodeObject->getData());
250 if (failed_)
251 return;
252
253 // Store the ledger header if the source and destination differ
254 auto& dstDB{mLedger->stateMap().family().db()};
255 if (std::addressof(dstDB) != std::addressof(srcDB))
256 {
257 Blob blob{nodeObject->getData()};
258 dstDB.store(
259 hotLEDGER, std::move(blob), hash_, mLedger->info().seq);
260 }
261 }
262 else
263 {
264 // Try to fetch the ledger header from a fetch pack
265 auto data = app_.getLedgerMaster().getFetchPack(hash_);
266 if (!data)
267 return;
268
269 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
270
271 makeLedger(*data);
272 if (failed_)
273 return;
274
275 // Store the ledger header in the ledger's database
276 mLedger->stateMap().family().db().store(
277 hotLEDGER, std::move(*data), hash_, mLedger->info().seq);
278 }
279
280 if (mSeq == 0)
281 mSeq = mLedger->info().seq;
282 mLedger->stateMap().setLedgerSeq(mSeq);
283 mLedger->txMap().setLedgerSeq(mSeq);
284 mHaveHeader = true;
285 }
286
288 {
289 if (mLedger->info().txHash.isZero())
290 {
291 JLOG(journal_.trace()) << "No TXNs to fetch";
292 mHaveTransactions = true;
293 }
294 else
295 {
296 TransactionStateSF filter(
297 mLedger->txMap().family().db(), app_.getLedgerMaster());
298 if (mLedger->txMap().fetchRoot(
299 SHAMapHash{mLedger->info().txHash}, &filter))
300 {
301 if (neededTxHashes(1, &filter).empty())
302 {
303 JLOG(journal_.trace()) << "Had full txn map locally";
304 mHaveTransactions = true;
305 }
306 }
307 }
308 }
309
310 if (!mHaveState)
311 {
312 if (mLedger->info().accountHash.isZero())
313 {
314 JLOG(journal_.fatal())
315 << "We are acquiring a ledger with a zero account hash";
316 failed_ = true;
317 return;
318 }
319 AccountStateSF filter(
320 mLedger->stateMap().family().db(), app_.getLedgerMaster());
321 if (mLedger->stateMap().fetchRoot(
322 SHAMapHash{mLedger->info().accountHash}, &filter))
323 {
324 if (neededStateHashes(1, &filter).empty())
325 {
326 JLOG(journal_.trace()) << "Had full AS map locally";
327 mHaveState = true;
328 }
329 }
330 }
331
333 {
334 JLOG(journal_.debug()) << "Had everything locally";
335 complete_ = true;
336 XRPL_ASSERT(
337 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
338 mLedger->read(keylet::fees()),
339 "ripple::InboundLedger::tryDB : valid ledger fees");
340 mLedger->setImmutable();
341 }
342}
343
346void
348{
349 mRecentNodes.clear();
350
351 if (isDone())
352 {
353 JLOG(journal_.info()) << "Already done " << hash_;
354 return;
355 }
356
358 {
359 if (mSeq != 0)
360 {
361 JLOG(journal_.warn())
362 << timeouts_ << " timeouts for ledger " << mSeq;
363 }
364 else
365 {
366 JLOG(journal_.warn())
367 << timeouts_ << " timeouts for ledger " << hash_;
368 }
369 failed_ = true;
370 done();
371 return;
372 }
373
374 if (!wasProgress)
375 {
376 checkLocal();
377
378 mByHash = true;
379
381 JLOG(journal_.debug())
382 << "No progress(" << pc << ") for ledger " << hash_;
383
384 // addPeers triggers if the reason is not HISTORY
385 // So if the reason IS HISTORY, need to trigger after we add
386 // otherwise, we need to trigger before we add
387 // so each peer gets triggered once
390 addPeers();
393 }
394}
395
397void
399{
400 mPeerSet->addPeers(
402 [this](auto peer) { return peer->hasLedger(hash_, mSeq); },
403 [this](auto peer) {
404 // For historical nodes, do not trigger too soon
405 // since a fetch pack is probably coming
408 });
409}
410
416
417void
419{
420 if (mSignaled)
421 return;
422
423 mSignaled = true;
424 touch();
425
426 JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
427 << ((timeouts_ == 0)
428 ? std::string()
429 : (std::string("timeouts:") +
431 << mStats.get();
432
433 XRPL_ASSERT(
435 "ripple::InboundLedger::done : complete or failed");
436
437 if (complete_ && !failed_ && mLedger)
438 {
439 XRPL_ASSERT(
440 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
441 mLedger->read(keylet::fees()),
442 "ripple::InboundLedger::done : valid ledger fees");
443 mLedger->setImmutable();
444 switch (mReason)
445 {
446 case Reason::HISTORY:
448 break;
449 default:
451 break;
452 }
453 }
454
455 // We hold the PeerSet lock, so must dispatch
457 jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
458 if (self->complete_ && !self->failed_)
459 {
460 self->app_.getLedgerMaster().checkAccept(self->getLedger());
461 self->app_.getLedgerMaster().tryAdvance();
462 }
463 else
464 self->app_.getInboundLedgers().logFailure(
465 self->hash_, self->mSeq);
466 });
467}
468
471void
473{
475
476 if (isDone())
477 {
478 JLOG(journal_.debug())
479 << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
480 << (failed_ ? " failed" : "");
481 return;
482 }
483
484 if (auto stream = journal_.debug())
485 {
487 ss << "Trigger acquiring ledger " << hash_;
488 if (peer)
489 ss << " from " << peer;
490
491 if (complete_ || failed_)
492 ss << " complete=" << complete_ << " failed=" << failed_;
493 else
494 ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
495 << " as=" << mHaveState;
496 stream << ss.str();
497 }
498
499 if (!mHaveHeader)
500 {
502 if (failed_)
503 {
504 JLOG(journal_.warn()) << " failed local for " << hash_;
505 return;
506 }
507 }
508
509 protocol::TMGetLedger tmGL;
510 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
511
512 if (timeouts_ != 0)
513 {
514 // Be more aggressive if we've timed out at least once
515 tmGL.set_querytype(protocol::qtINDIRECT);
516
517 if (!progress_ && !failed_ && mByHash &&
519 {
520 auto need = getNeededHashes();
521
522 if (!need.empty())
523 {
524 protocol::TMGetObjectByHash tmBH;
525 bool typeSet = false;
526 tmBH.set_query(true);
527 tmBH.set_ledgerhash(hash_.begin(), hash_.size());
528 for (auto const& p : need)
529 {
530 JLOG(journal_.debug()) << "Want: " << p.second;
531
532 if (!typeSet)
533 {
534 tmBH.set_type(p.first);
535 typeSet = true;
536 }
537
538 if (p.first == tmBH.type())
539 {
540 protocol::TMIndexedObject* io = tmBH.add_objects();
541 io->set_hash(p.second.begin(), p.second.size());
542 if (mSeq != 0)
543 io->set_ledgerseq(mSeq);
544 }
545 }
546
547 auto packet =
548 std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
549 auto const& peerIds = mPeerSet->getPeerIds();
551 peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
552 if (auto p = app_.overlay().findPeerByShortID(id))
553 {
554 mByHash = false;
555 p->send(packet);
556 }
557 });
558 }
559 else
560 {
561 JLOG(journal_.info())
562 << "getNeededHashes says acquire is complete";
563 mHaveHeader = true;
564 mHaveTransactions = true;
565 mHaveState = true;
566 complete_ = true;
567 }
568 }
569 }
570
571 // We can't do much without the header data because we don't know the
572 // state or transaction root hashes.
573 if (!mHaveHeader && !failed_)
574 {
575 tmGL.set_itype(protocol::liBASE);
576 if (mSeq != 0)
577 tmGL.set_ledgerseq(mSeq);
578 JLOG(journal_.trace()) << "Sending header request to "
579 << (peer ? "selected peer" : "all peers");
580 mPeerSet->sendRequest(tmGL, peer);
581 return;
582 }
583
584 if (mLedger)
585 tmGL.set_ledgerseq(mLedger->info().seq);
586
587 if (reason != TriggerReason::reply)
588 {
589 // If we're querying blind, don't query deep
590 tmGL.set_querydepth(0);
591 }
592 else if (peer && peer->isHighLatency())
593 {
594 // If the peer has high latency, query extra deep
595 tmGL.set_querydepth(2);
596 }
597 else
598 tmGL.set_querydepth(1);
599
600 // Get the state data first because it's the most likely to be useful
601 // if we wind up abandoning this fetch.
602 if (mHaveHeader && !mHaveState && !failed_)
603 {
604 XRPL_ASSERT(
605 mLedger,
606 "ripple::InboundLedger::trigger : non-null ledger to read state "
607 "from");
608
609 if (!mLedger->stateMap().isValid())
610 {
611 failed_ = true;
612 }
613 else if (mLedger->stateMap().getHash().isZero())
614 {
615 // we need the root node
616 tmGL.set_itype(protocol::liAS_NODE);
617 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
618 JLOG(journal_.trace()) << "Sending AS root request to "
619 << (peer ? "selected peer" : "all peers");
620 mPeerSet->sendRequest(tmGL, peer);
621 return;
622 }
623 else
624 {
625 AccountStateSF filter(
626 mLedger->stateMap().family().db(), app_.getLedgerMaster());
627
628 // Release the lock while we process the large state map
629 sl.unlock();
630 auto nodes =
631 mLedger->stateMap().getMissingNodes(missingNodesFind, &filter);
632 sl.lock();
633
634 // Make sure nothing happened while we released the lock
635 if (!failed_ && !complete_ && !mHaveState)
636 {
637 if (nodes.empty())
638 {
639 if (!mLedger->stateMap().isValid())
640 failed_ = true;
641 else
642 {
643 mHaveState = true;
644
645 if (mHaveTransactions)
646 complete_ = true;
647 }
648 }
649 else
650 {
651 filterNodes(nodes, reason);
652
653 if (!nodes.empty())
654 {
655 tmGL.set_itype(protocol::liAS_NODE);
656 for (auto const& id : nodes)
657 {
658 *(tmGL.add_nodeids()) = id.first.getRawString();
659 }
660
661 JLOG(journal_.trace())
662 << "Sending AS node request (" << nodes.size()
663 << ") to "
664 << (peer ? "selected peer" : "all peers");
665 mPeerSet->sendRequest(tmGL, peer);
666 return;
667 }
668 else
669 {
670 JLOG(journal_.trace()) << "All AS nodes filtered";
671 }
672 }
673 }
674 }
675 }
676
677 if (mHaveHeader && !mHaveTransactions && !failed_)
678 {
679 XRPL_ASSERT(
680 mLedger,
681 "ripple::InboundLedger::trigger : non-null ledger to read "
682 "transactions from");
683
684 if (!mLedger->txMap().isValid())
685 {
686 failed_ = true;
687 }
688 else if (mLedger->txMap().getHash().isZero())
689 {
690 // we need the root node
691 tmGL.set_itype(protocol::liTX_NODE);
692 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
693 JLOG(journal_.trace()) << "Sending TX root request to "
694 << (peer ? "selected peer" : "all peers");
695 mPeerSet->sendRequest(tmGL, peer);
696 return;
697 }
698 else
699 {
700 TransactionStateSF filter(
701 mLedger->txMap().family().db(), app_.getLedgerMaster());
702
703 auto nodes =
704 mLedger->txMap().getMissingNodes(missingNodesFind, &filter);
705
706 if (nodes.empty())
707 {
708 if (!mLedger->txMap().isValid())
709 failed_ = true;
710 else
711 {
712 mHaveTransactions = true;
713
714 if (mHaveState)
715 complete_ = true;
716 }
717 }
718 else
719 {
720 filterNodes(nodes, reason);
721
722 if (!nodes.empty())
723 {
724 tmGL.set_itype(protocol::liTX_NODE);
725 for (auto const& n : nodes)
726 {
727 *(tmGL.add_nodeids()) = n.first.getRawString();
728 }
729 JLOG(journal_.trace())
730 << "Sending TX node request (" << nodes.size()
731 << ") to " << (peer ? "selected peer" : "all peers");
732 mPeerSet->sendRequest(tmGL, peer);
733 return;
734 }
735 else
736 {
737 JLOG(journal_.trace()) << "All TX nodes filtered";
738 }
739 }
740 }
741 }
742
743 if (complete_ || failed_)
744 {
745 JLOG(journal_.debug())
746 << "Done:" << (complete_ ? " complete" : "")
747 << (failed_ ? " failed " : " ") << mLedger->info().seq;
748 sl.unlock();
749 done();
750 }
751}
752
753void
754InboundLedger::filterNodes(
756 TriggerReason reason)
757{
758 // Sort nodes so that the ones we haven't recently
759 // requested come before the ones we have.
760 auto dup = std::stable_partition(
761 nodes.begin(), nodes.end(), [this](auto const& item) {
762 return mRecentNodes.count(item.second) == 0;
763 });
764
765 // If everything is a duplicate we don't want to send
766 // any query at all except on a timeout where we need
767 // to query everyone:
768 if (dup == nodes.begin())
769 {
770 JLOG(journal_.trace()) << "filterNodes: all duplicates";
771
772 if (reason != TriggerReason::timeout)
773 {
774 nodes.clear();
775 return;
776 }
777 }
778 else
779 {
780 JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
781
782 nodes.erase(dup, nodes.end());
783 }
784
785 std::size_t const limit =
786 (reason == TriggerReason::reply) ? reqNodesReply : reqNodes;
787
788 if (nodes.size() > limit)
789 nodes.resize(limit);
790
791 for (auto const& n : nodes)
792 mRecentNodes.insert(n.second);
793}
794
798// data must not have hash prefix
799bool
800InboundLedger::takeHeader(std::string const& data)
801{
802 // Return value: true=normal, false=bad data
803 JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
804
805 if (complete_ || failed_ || mHaveHeader)
806 return true;
807
808 auto* f = &app_.getNodeFamily();
809 mLedger = std::make_shared<Ledger>(
810 deserializeHeader(makeSlice(data)), app_.config(), *f);
811 if (mLedger->info().hash != hash_ ||
812 (mSeq != 0 && mSeq != mLedger->info().seq))
813 {
814 JLOG(journal_.warn())
815 << "Acquire hash mismatch: " << mLedger->info().hash
816 << "!=" << hash_;
817 mLedger.reset();
818 return false;
819 }
820 if (mSeq == 0)
821 mSeq = mLedger->info().seq;
822 mLedger->stateMap().setLedgerSeq(mSeq);
823 mLedger->txMap().setLedgerSeq(mSeq);
824 mHaveHeader = true;
825
826 Serializer s(data.size() + 4);
827 s.add32(HashPrefix::ledgerMaster);
828 s.addRaw(data.data(), data.size());
829 f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
830
831 if (mLedger->info().txHash.isZero())
832 mHaveTransactions = true;
833
834 if (mLedger->info().accountHash.isZero())
835 mHaveState = true;
836
837 mLedger->txMap().setSynching();
838 mLedger->stateMap().setSynching();
839
840 return true;
841}
842
846void
847InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
848{
849 if (!mHaveHeader)
850 {
851 JLOG(journal_.warn()) << "Missing ledger header";
852 san.incInvalid();
853 return;
854 }
855 if (packet.type() == protocol::liTX_NODE)
856 {
857 if (mHaveTransactions || failed_)
858 {
859 san.incDuplicate();
860 return;
861 }
862 }
863 else if (mHaveState || failed_)
864 {
865 san.incDuplicate();
866 return;
867 }
868
869 auto [map, rootHash, filter] = [&]()
871 if (packet.type() == protocol::liTX_NODE)
872 return {
873 mLedger->txMap(),
874 SHAMapHash{mLedger->info().txHash},
876 mLedger->txMap().family().db(), app_.getLedgerMaster())};
877 return {
878 mLedger->stateMap(),
879 SHAMapHash{mLedger->info().accountHash},
881 mLedger->stateMap().family().db(), app_.getLedgerMaster())};
882 }();
883
884 try
885 {
886 auto const f = filter.get();
887
888 for (auto const& node : packet.nodes())
889 {
890 auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
891
892 if (!nodeID)
893 throw std::runtime_error("data does not properly deserialize");
894
895 if (nodeID->isRoot())
896 {
897 san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
898 }
899 else
900 {
901 san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
902 }
903
904 if (!san.isGood())
905 {
906 JLOG(journal_.warn()) << "Received bad node data";
907 return;
908 }
909 }
910 }
911 catch (std::exception const& e)
912 {
913 JLOG(journal_.error()) << "Received bad node data: " << e.what();
914 san.incInvalid();
915 return;
916 }
917
918 if (!map.isSynching())
919 {
920 if (packet.type() == protocol::liTX_NODE)
921 mHaveTransactions = true;
922 else
923 mHaveState = true;
924
925 if (mHaveTransactions && mHaveState)
926 {
927 complete_ = true;
928 done();
929 }
930 }
931}
932
936bool
937InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
938{
939 if (failed_ || mHaveState)
940 {
941 san.incDuplicate();
942 return true;
943 }
944
945 if (!mHaveHeader)
946 {
947 // LCOV_EXCL_START
948 UNREACHABLE("ripple::InboundLedger::takeAsRootNode : no ledger header");
949 return false;
950 // LCOV_EXCL_STOP
951 }
952
953 AccountStateSF filter(
954 mLedger->stateMap().family().db(), app_.getLedgerMaster());
955 san += mLedger->stateMap().addRootNode(
956 SHAMapHash{mLedger->info().accountHash}, data, &filter);
957 return san.isGood();
958}
959
963bool
964InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
965{
966 if (failed_ || mHaveTransactions)
967 {
968 san.incDuplicate();
969 return true;
970 }
971
972 if (!mHaveHeader)
973 {
974 // LCOV_EXCL_START
975 UNREACHABLE("ripple::InboundLedger::takeTxRootNode : no ledger header");
976 return false;
977 // LCOV_EXCL_STOP
978 }
979
980 TransactionStateSF filter(
981 mLedger->txMap().family().db(), app_.getLedgerMaster());
982 san += mLedger->txMap().addRootNode(
983 SHAMapHash{mLedger->info().txHash}, data, &filter);
984 return san.isGood();
985}
986
988InboundLedger::getNeededHashes()
989{
991
992 if (!mHaveHeader)
993 {
994 ret.push_back(
995 std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
996 return ret;
997 }
998
999 if (!mHaveState)
1000 {
1001 AccountStateSF filter(
1002 mLedger->stateMap().family().db(), app_.getLedgerMaster());
1003 for (auto const& h : neededStateHashes(4, &filter))
1004 {
1005 ret.push_back(
1006 std::make_pair(protocol::TMGetObjectByHash::otSTATE_NODE, h));
1007 }
1008 }
1009
1010 if (!mHaveTransactions)
1011 {
1012 TransactionStateSF filter(
1013 mLedger->txMap().family().db(), app_.getLedgerMaster());
1014 for (auto const& h : neededTxHashes(4, &filter))
1015 {
1017 protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
1018 }
1019 }
1020
1021 return ret;
1022}
1023
1027bool
1028InboundLedger::gotData(
1031{
1032 std::lock_guard sl(mReceivedDataLock);
1033
1034 if (isDone())
1035 return false;
1036
1037 mReceivedData.emplace_back(peer, data);
1038
1039 if (mReceiveDispatched)
1040 return false;
1041
1042 mReceiveDispatched = true;
1043 return true;
1044}
1045
1049// VFALCO NOTE, it is not necessary to pass the entire Peer,
1050// we can get away with just a Resource::Consumer endpoint.
1051//
1052// TODO Change peer to Consumer
1053//
1054int
1055InboundLedger::processData(
1057 protocol::TMLedgerData& packet)
1058{
1059 if (packet.type() == protocol::liBASE)
1060 {
1061 if (packet.nodes().empty())
1062 {
1063 JLOG(journal_.warn()) << peer->id() << ": empty header data";
1064 peer->charge(
1065 Resource::feeMalformedRequest, "ledger_data empty header");
1066 return -1;
1067 }
1068
1069 SHAMapAddNode san;
1070
1071 ScopedLockType sl(mtx_);
1072
1073 try
1074 {
1075 if (!mHaveHeader)
1076 {
1077 if (!takeHeader(packet.nodes(0).nodedata()))
1078 {
1079 JLOG(journal_.warn()) << "Got invalid header data";
1080 peer->charge(
1081 Resource::feeMalformedRequest,
1082 "ledger_data invalid header");
1083 return -1;
1084 }
1085
1086 san.incUseful();
1087 }
1088
1089 if (!mHaveState && (packet.nodes().size() > 1) &&
1090 !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
1091 {
1092 JLOG(journal_.warn()) << "Included AS root invalid";
1093 }
1094
1095 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1096 !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
1097 {
1098 JLOG(journal_.warn()) << "Included TX root invalid";
1099 }
1100 }
1101 catch (std::exception const& ex)
1102 {
1103 JLOG(journal_.warn())
1104 << "Included AS/TX root invalid: " << ex.what();
1105 using namespace std::string_literals;
1106 peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
1107 return -1;
1108 }
1109
1110 if (san.isUseful())
1111 progress_ = true;
1112
1113 mStats += san;
1114 return san.getGood();
1115 }
1116
1117 if ((packet.type() == protocol::liTX_NODE) ||
1118 (packet.type() == protocol::liAS_NODE))
1119 {
1120 std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: "
1121 : "liAS_NODE: ";
1122
1123 if (packet.nodes().empty())
1124 {
1125 JLOG(journal_.info()) << peer->id() << ": response with no nodes";
1126 peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
1127 return -1;
1128 }
1129
1130 ScopedLockType sl(mtx_);
1131
1132 // Verify node IDs and data are complete
1133 for (auto const& node : packet.nodes())
1134 {
1135 if (!node.has_nodeid() || !node.has_nodedata())
1136 {
1137 JLOG(journal_.warn()) << "Got bad node";
1138 peer->charge(
1139 Resource::feeMalformedRequest, "ledger_data bad node");
1140 return -1;
1141 }
1142 }
1143
1144 SHAMapAddNode san;
1145 receiveNode(packet, san);
1146
1147 JLOG(journal_.debug())
1148 << "Ledger "
1149 << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
1150 << " node stats: " << san.get();
1151
1152 if (san.isUseful())
1153 progress_ = true;
1154
1155 mStats += san;
1156 return san.getGood();
1157 }
1158
1159 return -1;
1160}
1161
1162namespace detail {
1163// Track the amount of useful data that each peer returns
1165{
1166 // Map from peer to amount of useful the peer returned
1168 // The largest amount of useful data that any peer returned
1169 int maxCount = 0;
1170
1171 // Update the data count for a peer
1172 void
1173 update(std::shared_ptr<Peer>&& peer, int dataCount)
1174 {
1175 if (dataCount <= 0)
1176 return;
1177 maxCount = std::max(maxCount, dataCount);
1178 auto i = counts.find(peer);
1179 if (i == counts.end())
1180 {
1181 counts.emplace(std::move(peer), dataCount);
1182 return;
1183 }
1184 i->second = std::max(i->second, dataCount);
1185 }
1186
1187 // Prune all the peers that didn't return enough data.
1188 void
1190 {
1191 // Remove all the peers that didn't return at least half as much data as
1192 // the best peer
1193 auto const thresh = maxCount / 2;
1194 auto i = counts.begin();
1195 while (i != counts.end())
1196 {
1197 if (i->second < thresh)
1198 i = counts.erase(i);
1199 else
1200 ++i;
1201 }
1202 }
1203
1204 // call F with the `peer` parameter with a random sample of at most n values
1205 // of the counts vector.
1206 template <class F>
1207 void
1209 {
1210 if (counts.empty())
1211 return;
1212
1213 auto outFunc = [&f](auto&& v) { f(v.first); };
1215#if _MSC_VER
1217 s.reserve(n);
1219 counts.begin(), counts.end(), std::back_inserter(s), n, rng);
1220 for (auto& v : s)
1221 {
1222 outFunc(v);
1223 }
1224#else
1226 counts.begin(),
1227 counts.end(),
1228 boost::make_function_output_iterator(outFunc),
1229 n,
1230 rng);
1231#endif
1232 }
1233};
1234} // namespace detail
1235
1239void
1240InboundLedger::runData()
1241{
1242 // Maximum number of peers to request data from
1243 constexpr std::size_t maxUsefulPeers = 6;
1244
1245 decltype(mReceivedData) data;
1246
1247 // Reserve some memory so the first couple iterations don't reallocate
1248 data.reserve(8);
1249
1250 detail::PeerDataCounts dataCounts;
1251
1252 for (;;)
1253 {
1254 data.clear();
1255
1256 {
1257 std::lock_guard sl(mReceivedDataLock);
1258
1259 if (mReceivedData.empty())
1260 {
1261 mReceiveDispatched = false;
1262 break;
1263 }
1264
1265 data.swap(mReceivedData);
1266 }
1267
1268 for (auto& entry : data)
1269 {
1270 if (auto peer = entry.first.lock())
1271 {
1272 int count = processData(peer, *(entry.second));
1273 dataCounts.update(std::move(peer), count);
1274 }
1275 }
1276 }
1277
1278 // Select a random sample of the peers that gives us the most nodes that are
1279 // useful
1280 dataCounts.prune();
1281 dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
1282 trigger(peer, TriggerReason::reply);
1283 });
1284}
1285
1287InboundLedger::getJson(int)
1288{
1290
1291 ScopedLockType sl(mtx_);
1292
1293 ret[jss::hash] = to_string(hash_);
1294
1295 if (complete_)
1296 ret[jss::complete] = true;
1297
1298 if (failed_)
1299 ret[jss::failed] = true;
1300
1301 if (!complete_ && !failed_)
1302 ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
1303
1304 ret[jss::have_header] = mHaveHeader;
1305
1306 if (mHaveHeader)
1307 {
1308 ret[jss::have_state] = mHaveState;
1309 ret[jss::have_transactions] = mHaveTransactions;
1310 }
1311
1312 ret[jss::timeouts] = timeouts_;
1313
1314 if (mHaveHeader && !mHaveState)
1315 {
1317 for (auto const& h : neededStateHashes(16, nullptr))
1318 {
1319 hv.append(to_string(h));
1320 }
1321 ret[jss::needed_state_hashes] = hv;
1322 }
1323
1324 if (mHaveHeader && !mHaveTransactions)
1325 {
1327 for (auto const& h : neededTxHashes(16, nullptr))
1328 {
1329 hv.append(to_string(h));
1330 }
1331 ret[jss::needed_transaction_hashes] = hv;
1332 }
1333
1334 return ret;
1335}
1336
1337} // namespace ripple
T addressof(T... args)
T back_inserter(T... args)
T begin(T... args)
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
ValueType type() const
Stream fatal() const
Definition Journal.h:333
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual Config & config()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual Family & getNodeFamily()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual NodeStore::Database & db()=0
std::size_t getPeerCount() const
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
void init(ScopedLockType &collectionLock)
std::set< uint256 > mRecentNodes
void addPeers()
Add more peers to the set, if possible.
std::shared_ptr< Ledger > mLedger
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
void tryDB(NodeStore::Database &srcDB)
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
std::vector< neededHash_t > getNeededHashes()
void update(std::uint32_t seq)
std::unique_ptr< PeerSet > mPeerSet
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the coresponding hash from peers.
bool storeLedger(std::shared_ptr< Ledger const > ledger)
Persistency layer for NodeObject.
Definition Database.h:32
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:221
std::string get() const
bool isZero() const
Definition SHAMapHash.h:35
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition SHAMap.h:78
SHAMapHash getHash() const
Definition SHAMap.cpp:871
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
int addRaw(Blob const &vector)
An immutable linear range of bytes.
Definition Slice.h:27
This class is an "active" object.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
iterator begin()
Definition base_uint.h:117
static constexpr std::size_t size()
Definition base_uint.h:507
T count_if(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T is_same_v
T make_pair(T... args)
T max(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Definition Indexes.cpp:203
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
auto constexpr ledgerAcquireTimeout
@ hotLEDGER
Definition NodeObject.h:15
@ ledgerBecomeAggressiveThreshold
@ ledgerTimeoutRetriesMax
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition Slice.h:225
Number root(Number f, unsigned d)
Definition Number.cpp:617
@ jtLEDGER_DATA
Definition Job.h:47
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T sample(T... args)
T stable_partition(T... args)
T str(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void sampleN(std::size_t n, F &&f)
void update(std::shared_ptr< Peer > &&peer, int dataCount)
T to_string(T... args)
T unlock(T... args)
T what(T... args)