rippled
Loading...
Searching...
No Matches
InboundLedger.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/ledger/AccountStateSF.h>
21#include <xrpld/app/ledger/InboundLedger.h>
22#include <xrpld/app/ledger/InboundLedgers.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionStateSF.h>
25#include <xrpld/app/main/Application.h>
26#include <xrpld/app/misc/NetworkOPs.h>
27#include <xrpld/core/JobQueue.h>
28#include <xrpld/overlay/Overlay.h>
29#include <xrpld/shamap/SHAMapNodeID.h>
30#include <xrpl/basics/Log.h>
31#include <xrpl/protocol/HashPrefix.h>
32#include <xrpl/protocol/jss.h>
33#include <xrpl/resource/Fees.h>
34
35#include <boost/iterator/function_output_iterator.hpp>
36
37#include <algorithm>
38#include <random>
39
40namespace ripple {
41
42using namespace std::chrono_literals;
43
44enum {
45 // Number of peers to start with
47
48 // Number of peers to add on a timeout
49 ,
50 peerCountAdd = 3
51
52 // how many timeouts before we give up
53 ,
55
56 // how many timeouts before we get aggressive
57 ,
59
60 // Number of nodes to find initially
61 ,
63
64 // Number of nodes to request for a reply
65 ,
66 reqNodesReply = 128
67
68 // Number of nodes to request blindly
69 ,
70 reqNodes = 12
71};
72
73// millisecond for each ledger timeout
74auto constexpr ledgerAcquireTimeout = 3000ms;
75
77 Application& app,
78 uint256 const& hash,
79 std::uint32_t seq,
80 Reason reason,
81 clock_type& clock,
84 app,
85 hash,
87 {jtLEDGER_DATA, "InboundLedger", 5},
88 app.journal("InboundLedger"))
89 , m_clock(clock)
90 , mHaveHeader(false)
91 , mHaveState(false)
92 , mHaveTransactions(false)
93 , mSignaled(false)
94 , mByHash(true)
95 , mSeq(seq)
96 , mReason(reason)
97 , mReceiveDispatched(false)
98 , mPeerSet(std::move(peerSet))
99{
100 JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
101 touch();
102}
103
104void
106{
108 collectionLock.unlock();
109
111 if (failed_)
112 return;
113
114 if (!complete_)
115 {
116 addPeers();
117 queueJob(sl);
118 return;
119 }
120
121 JLOG(journal_.debug()) << "Acquiring ledger we already have in "
122 << " local store. " << hash_;
123 XRPL_ASSERT(
124 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
125 mLedger->read(keylet::fees()),
126 "ripple::InboundLedger::init : valid ledger fees");
127 mLedger->setImmutable();
128
130 return;
131
133
134 // Check if this could be a newer fully-validated ledger
137}
138
141{
142 auto const& peerIds = mPeerSet->getPeerIds();
143 return std::count_if(peerIds.begin(), peerIds.end(), [this](auto id) {
144 return (app_.overlay().findPeerByShortID(id) != nullptr);
145 });
146}
147
148void
150{
152
153 // If we didn't know the sequence number, but now do, save it
154 if ((seq != 0) && (mSeq == 0))
155 mSeq = seq;
156
157 // Prevent this from being swept
158 touch();
159}
160
161bool
163{
165 if (!isDone())
166 {
167 if (mLedger)
168 tryDB(mLedger->stateMap().family().db());
169 else
171 if (failed_ || complete_)
172 {
173 done();
174 return true;
175 }
176 }
177 return false;
178}
179
181{
182 // Save any received AS data not processed. It could be useful
183 // for populating a different ledger
184 for (auto& entry : mReceivedData)
185 {
186 if (entry.second->type() == protocol::liAS_NODE)
187 app_.getInboundLedgers().gotStaleData(entry.second);
188 }
189 if (!isDone())
190 {
191 JLOG(journal_.debug())
192 << "Acquire " << hash_ << " abort "
193 << ((timeouts_ == 0) ? std::string()
194 : (std::string("timeouts:") +
196 << mStats.get();
197 }
198}
199
202 uint256 const& root,
203 SHAMap& map,
204 int max,
205 SHAMapSyncFilter* filter)
206{
208
209 if (!root.isZero())
210 {
211 if (map.getHash().isZero())
212 ret.push_back(root);
213 else
214 {
215 auto mn = map.getMissingNodes(max, filter);
216 ret.reserve(mn.size());
217 for (auto const& n : mn)
218 ret.push_back(n.second);
219 }
220 }
221
222 return ret;
223}
224
227{
228 return neededHashes(mLedger->info().txHash, mLedger->txMap(), max, filter);
229}
230
233{
234 return neededHashes(
235 mLedger->info().accountHash, mLedger->stateMap(), max, filter);
236}
237
238// See how much of the ledger data is stored locally
239// Data found in a fetch pack will be stored
240void
242{
243 if (!mHaveHeader)
244 {
245 auto makeLedger = [&, this](Blob const& data) {
246 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
247 mLedger = std::make_shared<Ledger>(
249 app_.config(),
251 if (mLedger->info().hash != hash_ ||
252 (mSeq != 0 && mSeq != mLedger->info().seq))
253 {
254 // We know for a fact the ledger can never be acquired
255 JLOG(journal_.warn())
256 << "hash " << hash_ << " seq " << std::to_string(mSeq)
257 << " cannot be a ledger";
258 mLedger.reset();
259 failed_ = true;
260 }
261 };
262
263 // Try to fetch the ledger header from the DB
264 if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
265 {
266 JLOG(journal_.trace()) << "Ledger header found in local store";
267
268 makeLedger(nodeObject->getData());
269 if (failed_)
270 return;
271
272 // Store the ledger header if the source and destination differ
273 auto& dstDB{mLedger->stateMap().family().db()};
274 if (std::addressof(dstDB) != std::addressof(srcDB))
275 {
276 Blob blob{nodeObject->getData()};
277 dstDB.store(
278 hotLEDGER, std::move(blob), hash_, mLedger->info().seq);
279 }
280 }
281 else
282 {
283 // Try to fetch the ledger header from a fetch pack
284 auto data = app_.getLedgerMaster().getFetchPack(hash_);
285 if (!data)
286 return;
287
288 JLOG(journal_.trace()) << "Ledger header found in fetch pack";
289
290 makeLedger(*data);
291 if (failed_)
292 return;
293
294 // Store the ledger header in the ledger's database
295 mLedger->stateMap().family().db().store(
296 hotLEDGER, std::move(*data), hash_, mLedger->info().seq);
297 }
298
299 if (mSeq == 0)
300 mSeq = mLedger->info().seq;
301 mLedger->stateMap().setLedgerSeq(mSeq);
302 mLedger->txMap().setLedgerSeq(mSeq);
303 mHaveHeader = true;
304 }
305
307 {
308 if (mLedger->info().txHash.isZero())
309 {
310 JLOG(journal_.trace()) << "No TXNs to fetch";
311 mHaveTransactions = true;
312 }
313 else
314 {
315 TransactionStateSF filter(
316 mLedger->txMap().family().db(), app_.getLedgerMaster());
317 if (mLedger->txMap().fetchRoot(
318 SHAMapHash{mLedger->info().txHash}, &filter))
319 {
320 if (neededTxHashes(1, &filter).empty())
321 {
322 JLOG(journal_.trace()) << "Had full txn map locally";
323 mHaveTransactions = true;
324 }
325 }
326 }
327 }
328
329 if (!mHaveState)
330 {
331 if (mLedger->info().accountHash.isZero())
332 {
333 JLOG(journal_.fatal())
334 << "We are acquiring a ledger with a zero account hash";
335 failed_ = true;
336 return;
337 }
338 AccountStateSF filter(
339 mLedger->stateMap().family().db(), app_.getLedgerMaster());
340 if (mLedger->stateMap().fetchRoot(
341 SHAMapHash{mLedger->info().accountHash}, &filter))
342 {
343 if (neededStateHashes(1, &filter).empty())
344 {
345 JLOG(journal_.trace()) << "Had full AS map locally";
346 mHaveState = true;
347 }
348 }
349 }
350
352 {
353 JLOG(journal_.debug()) << "Had everything locally";
354 complete_ = true;
355 XRPL_ASSERT(
356 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
357 mLedger->read(keylet::fees()),
358 "ripple::InboundLedger::tryDB : valid ledger fees");
359 mLedger->setImmutable();
360 }
361}
362
365void
367{
368 mRecentNodes.clear();
369
370 if (isDone())
371 {
372 JLOG(journal_.info()) << "Already done " << hash_;
373 return;
374 }
375
377 {
378 if (mSeq != 0)
379 {
380 JLOG(journal_.warn())
381 << timeouts_ << " timeouts for ledger " << mSeq;
382 }
383 else
384 {
385 JLOG(journal_.warn())
386 << timeouts_ << " timeouts for ledger " << hash_;
387 }
388 failed_ = true;
389 done();
390 return;
391 }
392
393 if (!wasProgress)
394 {
395 if (checkLocal())
396 {
397 // Done. Something else (probably consensus) built the ledger
398 // locally while waiting for data (or possibly before requesting)
399 XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
400 JLOG(journal_.info()) << "Finished while waiting " << hash_;
401 return;
402 }
403
404 mByHash = true;
405
407 JLOG(journal_.debug())
408 << "No progress(" << pc << ") for ledger " << hash_;
409
410 // addPeers triggers if the reason is not HISTORY
411 // So if the reason IS HISTORY, need to trigger after we add
412 // otherwise, we need to trigger before we add
413 // so each peer gets triggered once
416 addPeers();
419 }
420}
421
423void
425{
426 mPeerSet->addPeers(
428 [this](auto peer) { return peer->hasLedger(hash_, mSeq); },
429 [this](auto peer) {
430 // For historical nodes, do not trigger too soon
431 // since a fetch pack is probably coming
434 });
435}
436
439{
440 return shared_from_this();
441}
442
443void
445{
446 if (mSignaled)
447 return;
448
449 mSignaled = true;
450 touch();
451
452 JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
453 << ((timeouts_ == 0)
454 ? std::string()
455 : (std::string("timeouts:") +
457 << mStats.get();
458
459 XRPL_ASSERT(
461 "ripple::InboundLedger::done : complete or failed");
462
463 if (complete_ && !failed_ && mLedger)
464 {
465 XRPL_ASSERT(
466 mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
467 mLedger->read(keylet::fees()),
468 "ripple::InboundLedger::done : valid ledger fees");
469 mLedger->setImmutable();
470 switch (mReason)
471 {
472 case Reason::HISTORY:
474 break;
475 default:
477 break;
478 }
479 }
480
481 // We hold the PeerSet lock, so must dispatch
483 jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
484 if (self->complete_ && !self->failed_)
485 {
486 self->app_.getLedgerMaster().checkAccept(self->getLedger());
487 self->app_.getLedgerMaster().tryAdvance();
488 }
489 else
490 self->app_.getInboundLedgers().logFailure(
491 self->hash_, self->mSeq);
492 });
493}
494
497void
499{
501
502 if (isDone())
503 {
504 JLOG(journal_.debug())
505 << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
506 << (failed_ ? " failed" : "");
507 return;
508 }
509
510 if (auto stream = journal_.debug())
511 {
513 ss << "Trigger acquiring ledger " << hash_;
514 if (peer)
515 ss << " from " << peer;
516
517 if (complete_ || failed_)
518 ss << " complete=" << complete_ << " failed=" << failed_;
519 else
520 ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
521 << " as=" << mHaveState;
522 stream << ss.str();
523 }
524
525 if (!mHaveHeader)
526 {
528 if (failed_)
529 {
530 JLOG(journal_.warn()) << " failed local for " << hash_;
531 return;
532 }
533 }
534
535 protocol::TMGetLedger tmGL;
536 tmGL.set_ledgerhash(hash_.begin(), hash_.size());
537
538 if (timeouts_ != 0)
539 {
540 // Be more aggressive if we've timed out at least once
541 tmGL.set_querytype(protocol::qtINDIRECT);
542
543 if (!progress_ && !failed_ && mByHash &&
545 {
546 auto need = getNeededHashes();
547
548 if (!need.empty())
549 {
550 protocol::TMGetObjectByHash tmBH;
551 bool typeSet = false;
552 tmBH.set_query(true);
553 tmBH.set_ledgerhash(hash_.begin(), hash_.size());
554 for (auto const& p : need)
555 {
556 JLOG(journal_.debug()) << "Want: " << p.second;
557
558 if (!typeSet)
559 {
560 tmBH.set_type(p.first);
561 typeSet = true;
562 }
563
564 if (p.first == tmBH.type())
565 {
566 protocol::TMIndexedObject* io = tmBH.add_objects();
567 io->set_hash(p.second.begin(), p.second.size());
568 if (mSeq != 0)
569 io->set_ledgerseq(mSeq);
570 }
571 }
572
573 auto packet =
574 std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
575 auto const& peerIds = mPeerSet->getPeerIds();
577 peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
578 if (auto p = app_.overlay().findPeerByShortID(id))
579 {
580 mByHash = false;
581 p->send(packet);
582 }
583 });
584 }
585 else
586 {
587 JLOG(journal_.info())
588 << "getNeededHashes says acquire is complete";
589 mHaveHeader = true;
590 mHaveTransactions = true;
591 mHaveState = true;
592 complete_ = true;
593 }
594 }
595 }
596
597 // We can't do much without the header data because we don't know the
598 // state or transaction root hashes.
599 if (!mHaveHeader && !failed_)
600 {
601 tmGL.set_itype(protocol::liBASE);
602 if (mSeq != 0)
603 tmGL.set_ledgerseq(mSeq);
604 JLOG(journal_.trace()) << "Sending header request to "
605 << (peer ? "selected peer" : "all peers");
606 mPeerSet->sendRequest(tmGL, peer);
607 return;
608 }
609
610 if (mLedger)
611 tmGL.set_ledgerseq(mLedger->info().seq);
612
613 if (reason != TriggerReason::reply)
614 {
615 // If we're querying blind, don't query deep
616 tmGL.set_querydepth(0);
617 }
618 else if (peer && peer->isHighLatency())
619 {
620 // If the peer has high latency, query extra deep
621 tmGL.set_querydepth(2);
622 }
623 else
624 tmGL.set_querydepth(1);
625
626 // Get the state data first because it's the most likely to be useful
627 // if we wind up abandoning this fetch.
628 if (mHaveHeader && !mHaveState && !failed_)
629 {
630 XRPL_ASSERT(
631 mLedger,
632 "ripple::InboundLedger::trigger : non-null ledger to read state "
633 "from");
634
635 if (!mLedger->stateMap().isValid())
636 {
637 failed_ = true;
638 }
639 else if (mLedger->stateMap().getHash().isZero())
640 {
641 // we need the root node
642 tmGL.set_itype(protocol::liAS_NODE);
643 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
644 JLOG(journal_.trace()) << "Sending AS root request to "
645 << (peer ? "selected peer" : "all peers");
646 mPeerSet->sendRequest(tmGL, peer);
647 return;
648 }
649 else
650 {
651 AccountStateSF filter(
652 mLedger->stateMap().family().db(), app_.getLedgerMaster());
653
654 // Release the lock while we process the large state map
655 sl.unlock();
656 auto nodes =
657 mLedger->stateMap().getMissingNodes(missingNodesFind, &filter);
658 sl.lock();
659
660 // Make sure nothing happened while we released the lock
661 if (!failed_ && !complete_ && !mHaveState)
662 {
663 if (nodes.empty())
664 {
665 if (!mLedger->stateMap().isValid())
666 failed_ = true;
667 else
668 {
669 mHaveState = true;
670
671 if (mHaveTransactions)
672 complete_ = true;
673 }
674 }
675 else
676 {
677 filterNodes(nodes, reason);
678
679 if (!nodes.empty())
680 {
681 tmGL.set_itype(protocol::liAS_NODE);
682 for (auto const& id : nodes)
683 {
684 *(tmGL.add_nodeids()) = id.first.getRawString();
685 }
686
687 JLOG(journal_.trace())
688 << "Sending AS node request (" << nodes.size()
689 << ") to "
690 << (peer ? "selected peer" : "all peers");
691 mPeerSet->sendRequest(tmGL, peer);
692 return;
693 }
694 else
695 {
696 JLOG(journal_.trace()) << "All AS nodes filtered";
697 }
698 }
699 }
700 }
701 }
702
703 if (mHaveHeader && !mHaveTransactions && !failed_)
704 {
705 XRPL_ASSERT(
706 mLedger,
707 "ripple::InboundLedger::trigger : non-null ledger to read "
708 "transactions from");
709
710 if (!mLedger->txMap().isValid())
711 {
712 failed_ = true;
713 }
714 else if (mLedger->txMap().getHash().isZero())
715 {
716 // we need the root node
717 tmGL.set_itype(protocol::liTX_NODE);
718 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
719 JLOG(journal_.trace()) << "Sending TX root request to "
720 << (peer ? "selected peer" : "all peers");
721 mPeerSet->sendRequest(tmGL, peer);
722 return;
723 }
724 else
725 {
726 TransactionStateSF filter(
727 mLedger->txMap().family().db(), app_.getLedgerMaster());
728
729 auto nodes =
730 mLedger->txMap().getMissingNodes(missingNodesFind, &filter);
731
732 if (nodes.empty())
733 {
734 if (!mLedger->txMap().isValid())
735 failed_ = true;
736 else
737 {
738 mHaveTransactions = true;
739
740 if (mHaveState)
741 complete_ = true;
742 }
743 }
744 else
745 {
746 filterNodes(nodes, reason);
747
748 if (!nodes.empty())
749 {
750 tmGL.set_itype(protocol::liTX_NODE);
751 for (auto const& n : nodes)
752 {
753 *(tmGL.add_nodeids()) = n.first.getRawString();
754 }
755 JLOG(journal_.trace())
756 << "Sending TX node request (" << nodes.size()
757 << ") to " << (peer ? "selected peer" : "all peers");
758 mPeerSet->sendRequest(tmGL, peer);
759 return;
760 }
761 else
762 {
763 JLOG(journal_.trace()) << "All TX nodes filtered";
764 }
765 }
766 }
767 }
768
769 if (complete_ || failed_)
770 {
771 JLOG(journal_.debug())
772 << "Done:" << (complete_ ? " complete" : "")
773 << (failed_ ? " failed " : " ") << mLedger->info().seq;
774 sl.unlock();
775 done();
776 }
777}
778
779void
780InboundLedger::filterNodes(
782 TriggerReason reason)
783{
784 // Sort nodes so that the ones we haven't recently
785 // requested come before the ones we have.
786 auto dup = std::stable_partition(
787 nodes.begin(), nodes.end(), [this](auto const& item) {
788 return mRecentNodes.count(item.second) == 0;
789 });
790
791 // If everything is a duplicate we don't want to send
792 // any query at all except on a timeout where we need
793 // to query everyone:
794 if (dup == nodes.begin())
795 {
796 JLOG(journal_.trace()) << "filterNodes: all duplicates";
797
798 if (reason != TriggerReason::timeout)
799 {
800 nodes.clear();
801 return;
802 }
803 }
804 else
805 {
806 JLOG(journal_.trace()) << "filterNodes: pruning duplicates";
807
808 nodes.erase(dup, nodes.end());
809 }
810
811 std::size_t const limit =
812 (reason == TriggerReason::reply) ? reqNodesReply : reqNodes;
813
814 if (nodes.size() > limit)
815 nodes.resize(limit);
816
817 for (auto const& n : nodes)
818 mRecentNodes.insert(n.second);
819}
820
824// data must not have hash prefix
825bool
826InboundLedger::takeHeader(std::string const& data)
827{
828 // Return value: true=normal, false=bad data
829 JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;
830
831 if (complete_ || failed_ || mHaveHeader)
832 return true;
833
834 auto* f = &app_.getNodeFamily();
835 mLedger = std::make_shared<Ledger>(
836 deserializeHeader(makeSlice(data)), app_.config(), *f);
837 if (mLedger->info().hash != hash_ ||
838 (mSeq != 0 && mSeq != mLedger->info().seq))
839 {
840 JLOG(journal_.warn())
841 << "Acquire hash mismatch: " << mLedger->info().hash
842 << "!=" << hash_;
843 mLedger.reset();
844 return false;
845 }
846 if (mSeq == 0)
847 mSeq = mLedger->info().seq;
848 mLedger->stateMap().setLedgerSeq(mSeq);
849 mLedger->txMap().setLedgerSeq(mSeq);
850 mHaveHeader = true;
851
852 Serializer s(data.size() + 4);
853 s.add32(HashPrefix::ledgerMaster);
854 s.addRaw(data.data(), data.size());
855 f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);
856
857 if (mLedger->info().txHash.isZero())
858 mHaveTransactions = true;
859
860 if (mLedger->info().accountHash.isZero())
861 mHaveState = true;
862
863 mLedger->txMap().setSynching();
864 mLedger->stateMap().setSynching();
865
866 return true;
867}
868
872void
873InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
874{
875 if (!mHaveHeader)
876 {
877 JLOG(journal_.warn()) << "Missing ledger header";
878 san.incInvalid();
879 return;
880 }
881 if (packet.type() == protocol::liTX_NODE)
882 {
883 if (mHaveTransactions || failed_)
884 {
885 san.incDuplicate();
886 return;
887 }
888 }
889 else if (mHaveState || failed_)
890 {
891 san.incDuplicate();
892 return;
893 }
894
895 auto [map, rootHash, filter] = [&]()
897 if (packet.type() == protocol::liTX_NODE)
898 return {
899 mLedger->txMap(),
900 SHAMapHash{mLedger->info().txHash},
901 std::make_unique<TransactionStateSF>(
902 mLedger->txMap().family().db(), app_.getLedgerMaster())};
903 return {
904 mLedger->stateMap(),
905 SHAMapHash{mLedger->info().accountHash},
906 std::make_unique<AccountStateSF>(
907 mLedger->stateMap().family().db(), app_.getLedgerMaster())};
908 }();
909
910 try
911 {
912 auto const f = filter.get();
913
914 for (auto const& node : packet.nodes())
915 {
916 auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
917
918 if (!nodeID)
919 throw std::runtime_error("data does not properly deserialize");
920
921 if (nodeID->isRoot())
922 {
923 san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
924 }
925 else
926 {
927 san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
928 }
929
930 if (!san.isGood())
931 {
932 JLOG(journal_.warn()) << "Received bad node data";
933 return;
934 }
935 }
936 }
937 catch (std::exception const& e)
938 {
939 JLOG(journal_.error()) << "Received bad node data: " << e.what();
940 san.incInvalid();
941 return;
942 }
943
944 if (!map.isSynching())
945 {
946 if (packet.type() == protocol::liTX_NODE)
947 mHaveTransactions = true;
948 else
949 mHaveState = true;
950
951 if (mHaveTransactions && mHaveState)
952 {
953 complete_ = true;
954 done();
955 }
956 }
957}
958
962bool
963InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
964{
965 if (failed_ || mHaveState)
966 {
967 san.incDuplicate();
968 return true;
969 }
970
971 if (!mHaveHeader)
972 {
973 UNREACHABLE("ripple::InboundLedger::takeAsRootNode : no ledger header");
974 return false;
975 }
976
977 AccountStateSF filter(
978 mLedger->stateMap().family().db(), app_.getLedgerMaster());
979 san += mLedger->stateMap().addRootNode(
980 SHAMapHash{mLedger->info().accountHash}, data, &filter);
981 return san.isGood();
982}
983
987bool
988InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
989{
990 if (failed_ || mHaveTransactions)
991 {
992 san.incDuplicate();
993 return true;
994 }
995
996 if (!mHaveHeader)
997 {
998 UNREACHABLE("ripple::InboundLedger::takeTxRootNode : no ledger header");
999 return false;
1000 }
1001
1002 TransactionStateSF filter(
1003 mLedger->txMap().family().db(), app_.getLedgerMaster());
1004 san += mLedger->txMap().addRootNode(
1005 SHAMapHash{mLedger->info().txHash}, data, &filter);
1006 return san.isGood();
1007}
1008
1010InboundLedger::getNeededHashes()
1011{
1013
1014 if (!mHaveHeader)
1015 {
1016 ret.push_back(
1017 std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
1018 return ret;
1019 }
1020
1021 if (!mHaveState)
1022 {
1023 AccountStateSF filter(
1024 mLedger->stateMap().family().db(), app_.getLedgerMaster());
1025 for (auto const& h : neededStateHashes(4, &filter))
1026 {
1027 ret.push_back(
1028 std::make_pair(protocol::TMGetObjectByHash::otSTATE_NODE, h));
1029 }
1030 }
1031
1032 if (!mHaveTransactions)
1033 {
1034 TransactionStateSF filter(
1035 mLedger->txMap().family().db(), app_.getLedgerMaster());
1036 for (auto const& h : neededTxHashes(4, &filter))
1037 {
1039 protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
1040 }
1041 }
1042
1043 return ret;
1044}
1045
1049bool
1050InboundLedger::gotData(
1053{
1054 std::lock_guard sl(mReceivedDataLock);
1055
1056 if (isDone())
1057 return false;
1058
1059 mReceivedData.emplace_back(peer, data);
1060
1061 if (mReceiveDispatched)
1062 return false;
1063
1064 mReceiveDispatched = true;
1065 return true;
1066}
1067
1071// VFALCO NOTE, it is not necessary to pass the entire Peer,
1072// we can get away with just a Resource::Consumer endpoint.
1073//
1074// TODO Change peer to Consumer
1075//
1076int
1077InboundLedger::processData(
1079 protocol::TMLedgerData& packet)
1080{
1081 if (packet.type() == protocol::liBASE)
1082 {
1083 if (packet.nodes().empty())
1084 {
1085 JLOG(journal_.warn()) << peer->id() << ": empty header data";
1086 peer->charge(
1087 Resource::feeMalformedRequest, "ledger_data empty header");
1088 return -1;
1089 }
1090
1091 SHAMapAddNode san;
1092
1093 ScopedLockType sl(mtx_);
1094
1095 try
1096 {
1097 if (!mHaveHeader)
1098 {
1099 if (!takeHeader(packet.nodes(0).nodedata()))
1100 {
1101 JLOG(journal_.warn()) << "Got invalid header data";
1102 peer->charge(
1103 Resource::feeMalformedRequest,
1104 "ledger_data invalid header");
1105 return -1;
1106 }
1107
1108 san.incUseful();
1109 }
1110
1111 if (!mHaveState && (packet.nodes().size() > 1) &&
1112 !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
1113 {
1114 JLOG(journal_.warn()) << "Included AS root invalid";
1115 }
1116
1117 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1118 !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
1119 {
1120 JLOG(journal_.warn()) << "Included TX root invalid";
1121 }
1122 }
1123 catch (std::exception const& ex)
1124 {
1125 JLOG(journal_.warn())
1126 << "Included AS/TX root invalid: " << ex.what();
1127 using namespace std::string_literals;
1128 peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
1129 return -1;
1130 }
1131
1132 if (san.isUseful())
1133 progress_ = true;
1134
1135 mStats += san;
1136 return san.getGood();
1137 }
1138
1139 if ((packet.type() == protocol::liTX_NODE) ||
1140 (packet.type() == protocol::liAS_NODE))
1141 {
1142 std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: "
1143 : "liAS_NODE: ";
1144
1145 if (packet.nodes().empty())
1146 {
1147 JLOG(journal_.info()) << peer->id() << ": response with no nodes";
1148 peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
1149 return -1;
1150 }
1151
1152 ScopedLockType sl(mtx_);
1153
1154 // Verify node IDs and data are complete
1155 for (auto const& node : packet.nodes())
1156 {
1157 if (!node.has_nodeid() || !node.has_nodedata())
1158 {
1159 JLOG(journal_.warn()) << "Got bad node";
1160 peer->charge(
1161 Resource::feeMalformedRequest, "ledger_data bad node");
1162 return -1;
1163 }
1164 }
1165
1166 SHAMapAddNode san;
1167 receiveNode(packet, san);
1168
1169 JLOG(journal_.debug())
1170 << "Ledger "
1171 << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
1172 << " node stats: " << san.get();
1173
1174 if (san.isUseful())
1175 progress_ = true;
1176
1177 mStats += san;
1178 return san.getGood();
1179 }
1180
1181 return -1;
1182}
1183
1184namespace detail {
1185// Track the amount of useful data that each peer returns
1187{
1188 // Map from peer to amount of useful the peer returned
1190 // The largest amount of useful data that any peer returned
1191 int maxCount = 0;
1192
1193 // Update the data count for a peer
1194 void
1195 update(std::shared_ptr<Peer>&& peer, int dataCount)
1196 {
1197 if (dataCount <= 0)
1198 return;
1199 maxCount = std::max(maxCount, dataCount);
1200 auto i = counts.find(peer);
1201 if (i == counts.end())
1202 {
1203 counts.emplace(std::move(peer), dataCount);
1204 return;
1205 }
1206 i->second = std::max(i->second, dataCount);
1207 }
1208
1209 // Prune all the peers that didn't return enough data.
1210 void
1212 {
1213 // Remove all the peers that didn't return at least half as much data as
1214 // the best peer
1215 auto const thresh = maxCount / 2;
1216 auto i = counts.begin();
1217 while (i != counts.end())
1218 {
1219 if (i->second < thresh)
1220 i = counts.erase(i);
1221 else
1222 ++i;
1223 }
1224 }
1225
1226 // call F with the `peer` parameter with a random sample of at most n values
1227 // of the counts vector.
1228 template <class F>
1229 void
1231 {
1232 if (counts.empty())
1233 return;
1234
1235 auto outFunc = [&f](auto&& v) { f(v.first); };
1237#if _MSC_VER
1239 s.reserve(n);
1241 counts.begin(), counts.end(), std::back_inserter(s), n, rng);
1242 for (auto& v : s)
1243 {
1244 outFunc(v);
1245 }
1246#else
1248 counts.begin(),
1249 counts.end(),
1250 boost::make_function_output_iterator(outFunc),
1251 n,
1252 rng);
1253#endif
1254 }
1255};
1256} // namespace detail
1257
1261void
1262InboundLedger::runData()
1263{
1264 // Maximum number of peers to request data from
1265 constexpr std::size_t maxUsefulPeers = 6;
1266
1267 decltype(mReceivedData) data;
1268
1269 // Reserve some memory so the first couple iterations don't reallocate
1270 data.reserve(8);
1271
1272 detail::PeerDataCounts dataCounts;
1273
1274 for (;;)
1275 {
1276 data.clear();
1277
1278 {
1279 std::lock_guard sl(mReceivedDataLock);
1280
1281 if (mReceivedData.empty())
1282 {
1283 mReceiveDispatched = false;
1284 break;
1285 }
1286
1287 data.swap(mReceivedData);
1288 }
1289
1290 for (auto& entry : data)
1291 {
1292 if (auto peer = entry.first.lock())
1293 {
1294 int count = processData(peer, *(entry.second));
1295 dataCounts.update(std::move(peer), count);
1296 }
1297 }
1298 }
1299
1300 // Select a random sample of the peers that gives us the most nodes that are
1301 // useful
1302 dataCounts.prune();
1303 dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
1304 trigger(peer, TriggerReason::reply);
1305 });
1306}
1307
1309InboundLedger::getJson(int)
1310{
1312
1313 ScopedLockType sl(mtx_);
1314
1315 ret[jss::hash] = to_string(hash_);
1316
1317 if (complete_)
1318 ret[jss::complete] = true;
1319
1320 if (failed_)
1321 ret[jss::failed] = true;
1322
1323 if (!complete_ && !failed_)
1324 ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());
1325
1326 ret[jss::have_header] = mHaveHeader;
1327
1328 if (mHaveHeader)
1329 {
1330 ret[jss::have_state] = mHaveState;
1331 ret[jss::have_transactions] = mHaveTransactions;
1332 }
1333
1334 ret[jss::timeouts] = timeouts_;
1335
1336 if (mHaveHeader && !mHaveState)
1337 {
1339 for (auto const& h : neededStateHashes(16, nullptr))
1340 {
1341 hv.append(to_string(h));
1342 }
1343 ret[jss::needed_state_hashes] = hv;
1344 }
1345
1346 if (mHaveHeader && !mHaveTransactions)
1347 {
1349 for (auto const& h : neededTxHashes(16, nullptr))
1350 {
1351 hv.append(to_string(h));
1352 }
1353 ret[jss::needed_transaction_hashes] = hv;
1354 }
1355
1356 return ret;
1357}
1358
1359} // namespace ripple
T addressof(T... args)
T back_inserter(T... args)
T begin(T... args)
Represents a JSON value.
Definition: json_value.h:147
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:891
Stream fatal() const
Definition: Journal.h:341
Stream debug() const
Definition: Journal.h:317
Stream info() const
Definition: Journal.h:323
Stream trace() const
Severity stream access functions.
Definition: Journal.h:311
Stream warn() const
Definition: Journal.h:329
virtual Config & config()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual Family & getNodeFamily()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual NodeStore::Database & db()=0
std::size_t getPeerCount() const
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
void init(ScopedLockType &collectionLock)
std::set< uint256 > mRecentNodes
void addPeers()
Add more peers to the set, if possible.
std::shared_ptr< Ledger > mLedger
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
SHAMapAddNode mStats
void tryDB(NodeStore::Database &srcDB)
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
std::vector< neededHash_t > getNeededHashes()
void update(std::uint32_t seq)
std::unique_ptr< PeerSet > mPeerSet
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the coresponding hash from peers.
bool storeLedger(std::shared_ptr< Ledger const > ledger)
Persistency layer for NodeObject.
Definition: Database.h:50
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:239
std::string get() const
bool isZero() const
Definition: SHAMapHash.h:54
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition: SHAMap.h:96
SHAMapHash getHash() const
Definition: SHAMap.cpp:888
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
Definition: SHAMapSync.cpp:315
int addRaw(Blob const &vector)
Definition: Serializer.cpp:75
An immutable linear range of bytes.
Definition: Slice.h:45
This class is an "active" object.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
beast::Journal journal_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
iterator begin()
Definition: base_uint.h:135
static constexpr std::size_t size()
Definition: base_uint.h:525
T count_if(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T make_pair(T... args)
T max(T... args)
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Definition: Indexes.cpp:198
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
@ peerCountStart
@ ledgerBecomeAggressiveThreshold
@ ledgerTimeoutRetriesMax
@ missingNodesFind
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
auto constexpr ledgerAcquireTimeout
@ hotLEDGER
Definition: NodeObject.h:34
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:243
Number root(Number f, unsigned d)
Definition: Number.cpp:630
@ jtLEDGER_DATA
Definition: Job.h:66
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T sample(T... args)
T stable_partition(T... args)
T str(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void sampleN(std::size_t n, F &&f)
void update(std::shared_ptr< Peer > &&peer, int dataCount)
T to_string(T... args)
T unlock(T... args)
T what(T... args)