20#include <xrpld/app/ledger/AccountStateSF.h>
21#include <xrpld/app/ledger/InboundLedger.h>
22#include <xrpld/app/ledger/InboundLedgers.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionStateSF.h>
25#include <xrpld/app/main/Application.h>
26#include <xrpld/core/JobQueue.h>
27#include <xrpld/overlay/Overlay.h>
28#include <xrpld/shamap/SHAMapNodeID.h>
29#include <xrpl/basics/Log.h>
30#include <xrpl/protocol/HashPrefix.h>
31#include <xrpl/protocol/jss.h>
32#include <xrpl/resource/Fees.h>
34#include <boost/iterator/function_output_iterator.hpp>
41using namespace std::chrono_literals;
87 app.journal(
"InboundLedger"))
91 , mHaveTransactions(
false)
96 , mReceiveDispatched(
false)
97 , mPeerSet(std::move(peerSet))
99 JLOG(journal_.trace()) <<
"Acquiring ledger " << hash_;
120 JLOG(
journal_.
debug()) <<
"Acquiring ledger we already have in "
121 <<
" local store. " <<
hash_;
125 "ripple::InboundLedger::init : valid ledger fees");
141 auto const& peerIds =
mPeerSet->getPeerIds();
142 return std::count_if(peerIds.begin(), peerIds.end(), [
this](
auto id) {
143 return (app_.overlay().findPeerByShortID(id) != nullptr);
153 if ((seq != 0) && (
mSeq == 0))
185 if (entry.second->type() == protocol::liAS_NODE)
191 <<
"Acquire " <<
hash_ <<
" abort "
216 for (
auto const& n : mn)
244 auto makeLedger = [&,
this](
Blob const& data) {
245 JLOG(
journal_.
trace()) <<
"Ledger header found in fetch pack";
246 mLedger = std::make_shared<Ledger>(
256 <<
" cannot be a ledger";
265 JLOG(
journal_.
trace()) <<
"Ledger header found in local store";
267 makeLedger(nodeObject->getData());
272 auto& dstDB{
mLedger->stateMap().family().db()};
275 Blob blob{nodeObject->getData()};
287 JLOG(
journal_.
trace()) <<
"Ledger header found in fetch pack";
294 mLedger->stateMap().family().db().store(
307 if (
mLedger->info().txHash.isZero())
316 if (
mLedger->txMap().fetchRoot(
330 if (
mLedger->info().accountHash.isZero())
333 <<
"We are acquiring a ledger with a zero account hash";
339 if (
mLedger->stateMap().fetchRoot(
340 SHAMapHash{mLedger->info().accountHash}, &filter))
357 "ripple::InboundLedger::tryDB : valid ledger fees");
400 <<
"No progress(" << pc <<
") for ledger " <<
hash_;
420 [
this](
auto peer) {
return peer->hasLedger(
hash_,
mSeq); },
453 "ripple::InboundLedger::done : complete or failed");
460 "ripple::InboundLedger::done : valid ledger fees");
476 if (self->complete_ && !self->failed_)
478 self->app_.getLedgerMaster().checkAccept(self->getLedger());
479 self->app_.getLedgerMaster().tryAdvance();
482 self->app_.getInboundLedgers().logFailure(
483 self->hash_, self->mSeq);
497 <<
"Trigger on ledger: " <<
hash_ << (
complete_ ?
" completed" :
"")
504 stream <<
"Trigger acquiring ledger " <<
hash_;
506 stream <<
" from " << peer;
525 protocol::TMGetLedger tmGL;
531 tmGL.set_querytype(protocol::qtINDIRECT);
540 protocol::TMGetObjectByHash tmBH;
541 bool typeSet =
false;
542 tmBH.set_query(
true);
544 for (
auto const& p : need)
550 tmBH.set_type(p.first);
554 if (p.first == tmBH.type())
556 protocol::TMIndexedObject* io = tmBH.add_objects();
557 io->set_hash(p.second.begin(), p.second.size());
559 io->set_ledgerseq(
mSeq);
564 std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
565 auto const& peerIds =
mPeerSet->getPeerIds();
567 peerIds.begin(), peerIds.end(), [
this, &packet](
auto id) {
568 if (auto p = app_.overlay().findPeerByShortID(id))
578 <<
"getNeededHashes says acquire is complete";
589 if (!mHaveHeader && !failed_)
591 tmGL.set_itype(protocol::liBASE);
593 tmGL.set_ledgerseq(mSeq);
594 JLOG(journal_.trace()) <<
"Sending header request to "
595 << (peer ?
"selected peer" :
"all peers");
596 mPeerSet->sendRequest(tmGL, peer);
601 tmGL.set_ledgerseq(mLedger->info().seq);
603 if (reason != TriggerReason::reply)
606 tmGL.set_querydepth(0);
608 else if (peer && peer->isHighLatency())
611 tmGL.set_querydepth(2);
614 tmGL.set_querydepth(1);
618 if (mHaveHeader && !mHaveState && !failed_)
622 "ripple::InboundLedger::trigger : non-null ledger to read state "
625 if (!mLedger->stateMap().isValid())
629 else if (mLedger->stateMap().getHash().isZero())
632 tmGL.set_itype(protocol::liAS_NODE);
633 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
634 JLOG(journal_.trace()) <<
"Sending AS root request to "
635 << (peer ?
"selected peer" :
"all peers");
636 mPeerSet->sendRequest(tmGL, peer);
641 AccountStateSF filter(
642 mLedger->stateMap().family().db(), app_.getLedgerMaster());
651 if (!failed_ && !complete_ && !mHaveState)
655 if (!mLedger->stateMap().isValid())
661 if (mHaveTransactions)
667 filterNodes(nodes, reason);
671 tmGL.set_itype(protocol::liAS_NODE);
672 for (
auto const&
id : nodes)
674 *(tmGL.add_nodeids()) =
id.first.getRawString();
677 JLOG(journal_.trace())
678 <<
"Sending AS node request (" << nodes.size()
680 << (peer ?
"selected peer" :
"all peers");
681 mPeerSet->sendRequest(tmGL, peer);
686 JLOG(journal_.trace()) <<
"All AS nodes filtered";
693 if (mHaveHeader && !mHaveTransactions && !failed_)
697 "ripple::InboundLedger::trigger : non-null ledger to read "
698 "transactions from");
700 if (!mLedger->txMap().isValid())
704 else if (mLedger->txMap().getHash().isZero())
707 tmGL.set_itype(protocol::liTX_NODE);
708 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
709 JLOG(journal_.trace()) <<
"Sending TX root request to "
710 << (peer ?
"selected peer" :
"all peers");
711 mPeerSet->sendRequest(tmGL, peer);
716 TransactionStateSF filter(
717 mLedger->txMap().family().db(), app_.getLedgerMaster());
724 if (!mLedger->txMap().isValid())
728 mHaveTransactions =
true;
736 filterNodes(nodes, reason);
740 tmGL.set_itype(protocol::liTX_NODE);
741 for (
auto const& n : nodes)
743 *(tmGL.add_nodeids()) = n.first.getRawString();
745 JLOG(journal_.trace())
746 <<
"Sending TX node request (" << nodes.size()
747 <<
") to " << (peer ?
"selected peer" :
"all peers");
748 mPeerSet->sendRequest(tmGL, peer);
753 JLOG(journal_.trace()) <<
"All TX nodes filtered";
759 if (complete_ || failed_)
761 JLOG(journal_.debug())
762 <<
"Done:" << (complete_ ?
" complete" :
"")
763 << (failed_ ?
" failed " :
" ") << mLedger->info().seq;
770InboundLedger::filterNodes(
777 nodes.begin(), nodes.end(), [
this](
auto const& item) {
778 return mRecentNodes.count(item.second) == 0;
784 if (dup == nodes.begin())
786 JLOG(journal_.trace()) <<
"filterNodes: all duplicates";
788 if (reason != TriggerReason::timeout)
796 JLOG(journal_.trace()) <<
"filterNodes: pruning duplicates";
798 nodes.erase(dup, nodes.end());
804 if (nodes.size() > limit)
807 for (
auto const& n : nodes)
808 mRecentNodes.insert(n.second);
819 JLOG(journal_.trace()) <<
"got header acquiring ledger " << hash_;
821 if (complete_ || failed_ || mHaveHeader)
824 auto* f = &app_.getNodeFamily();
825 mLedger = std::make_shared<Ledger>(
827 if (mLedger->info().hash != hash_ ||
828 (mSeq != 0 && mSeq != mLedger->info().seq))
830 JLOG(journal_.warn())
831 <<
"Acquire hash mismatch: " << mLedger->info().hash
837 mSeq = mLedger->info().seq;
838 mLedger->stateMap().setLedgerSeq(mSeq);
839 mLedger->txMap().setLedgerSeq(mSeq);
843 s.
add32(HashPrefix::ledgerMaster);
844 s.
addRaw(data.data(), data.size());
847 if (mLedger->info().txHash.isZero())
848 mHaveTransactions =
true;
850 if (mLedger->info().accountHash.isZero())
853 mLedger->txMap().setSynching();
854 mLedger->stateMap().setSynching();
863InboundLedger::receiveNode(protocol::TMLedgerData& packet,
SHAMapAddNode& san)
867 JLOG(journal_.warn()) <<
"Missing ledger header";
871 if (packet.type() == protocol::liTX_NODE)
873 if (mHaveTransactions || failed_)
879 else if (mHaveState || failed_)
885 auto [map, rootHash, filter] = [&]()
887 if (packet.type() == protocol::liTX_NODE)
891 std::make_unique<TransactionStateSF>(
892 mLedger->txMap().family().db(), app_.getLedgerMaster())};
896 std::make_unique<AccountStateSF>(
897 mLedger->stateMap().family().db(), app_.getLedgerMaster())};
902 auto const f = filter.get();
904 for (
auto const& node : packet.nodes())
911 if (nodeID->isRoot())
913 san += map.addRootNode(rootHash,
makeSlice(node.nodedata()), f);
917 san += map.addKnownNode(*nodeID,
makeSlice(node.nodedata()), f);
922 JLOG(journal_.warn()) <<
"Received bad node data";
929 JLOG(journal_.error()) <<
"Received bad node data: " << e.
what();
934 if (!map.isSynching())
936 if (packet.type() == protocol::liTX_NODE)
937 mHaveTransactions =
true;
941 if (mHaveTransactions && mHaveState)
955 if (failed_ || mHaveState)
963 UNREACHABLE(
"ripple::InboundLedger::takeAsRootNode : no ledger header");
968 mLedger->stateMap().family().db(), app_.getLedgerMaster());
969 san += mLedger->stateMap().addRootNode(
970 SHAMapHash{mLedger->info().accountHash}, data, &filter);
980 if (failed_ || mHaveTransactions)
988 UNREACHABLE(
"ripple::InboundLedger::takeTxRootNode : no ledger header");
993 mLedger->txMap().family().db(), app_.getLedgerMaster());
994 san += mLedger->txMap().addRootNode(
995 SHAMapHash{mLedger->info().txHash}, data, &filter);
1000InboundLedger::getNeededHashes()
1014 mLedger->stateMap().family().db(), app_.getLedgerMaster());
1015 for (
auto const& h : neededStateHashes(4, &filter))
1022 if (!mHaveTransactions)
1025 mLedger->txMap().family().db(), app_.getLedgerMaster());
1026 for (
auto const& h : neededTxHashes(4, &filter))
1029 protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
1040InboundLedger::gotData(
1049 mReceivedData.emplace_back(peer, data);
1051 if (mReceiveDispatched)
1054 mReceiveDispatched =
true;
1067InboundLedger::processData(
1069 protocol::TMLedgerData& packet)
1071 if (packet.type() == protocol::liBASE)
1073 if (packet.nodes().empty())
1075 JLOG(journal_.warn()) << peer->id() <<
": empty header data";
1077 Resource::feeMalformedRequest,
"ledger_data empty header");
1089 if (!takeHeader(packet.nodes(0).nodedata()))
1091 JLOG(journal_.warn()) <<
"Got invalid header data";
1093 Resource::feeMalformedRequest,
1094 "ledger_data invalid header");
1101 if (!mHaveState && (packet.nodes().size() > 1) &&
1102 !takeAsRootNode(
makeSlice(packet.nodes(1).nodedata()), san))
1104 JLOG(journal_.warn()) <<
"Included AS root invalid";
1107 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1108 !takeTxRootNode(
makeSlice(packet.nodes(2).nodedata()), san))
1110 JLOG(journal_.warn()) <<
"Included TX root invalid";
1115 JLOG(journal_.warn())
1116 <<
"Included AS/TX root invalid: " << ex.
what();
1117 using namespace std::string_literals;
1118 peer->charge(Resource::feeInvalidData,
"ledger_data "s + ex.
what());
1129 if ((packet.type() == protocol::liTX_NODE) ||
1130 (packet.type() == protocol::liAS_NODE))
1132 std::string type = packet.type() == protocol::liTX_NODE ?
"liTX_NODE: "
1135 if (packet.nodes().empty())
1137 JLOG(journal_.info()) << peer->id() <<
": response with no nodes";
1138 peer->charge(Resource::feeMalformedRequest,
"ledger_data no nodes");
1145 for (
auto const& node : packet.nodes())
1147 if (!node.has_nodeid() || !node.has_nodedata())
1149 JLOG(journal_.warn()) <<
"Got bad node";
1151 Resource::feeMalformedRequest,
"ledger_data bad node");
1157 receiveNode(packet, san);
1159 JLOG(journal_.debug())
1161 << ((packet.type() == protocol::liTX_NODE) ?
"TX" :
"AS")
1162 <<
" node stats: " << san.
get();
1189 maxCount =
std::max(maxCount, dataCount);
1190 auto i = counts.
find(peer);
1191 if (i == counts.
end())
1193 counts.
emplace(std::move(peer), dataCount);
1196 i->second =
std::max(i->second, dataCount);
1205 auto const thresh = maxCount / 2;
1206 auto i = counts.
begin();
1207 while (i != counts.
end())
1209 if (i->second < thresh)
1210 i = counts.
erase(i);
1225 auto outFunc = [&f](
auto&& v) { f(v.first); };
1240 boost::make_function_output_iterator(outFunc),
1252InboundLedger::runData()
1257 decltype(mReceivedData) data;
1271 if (mReceivedData.empty())
1273 mReceiveDispatched =
false;
1277 data.swap(mReceivedData);
1280 for (
auto& entry : data)
1282 if (
auto peer = entry.first.lock())
1284 int count = processData(peer, *(entry.second));
1285 dataCounts.
update(std::move(peer), count);
1294 trigger(peer, TriggerReason::reply);
1299InboundLedger::getJson(
int)
1305 ret[jss::hash] = to_string(hash_);
1308 ret[jss::complete] =
true;
1311 ret[jss::failed] =
true;
1313 if (!complete_ && !failed_)
1314 ret[jss::peers] =
static_cast<int>(mPeerSet->getPeerIds().size());
1316 ret[jss::have_header] = mHaveHeader;
1320 ret[jss::have_state] = mHaveState;
1321 ret[jss::have_transactions] = mHaveTransactions;
1324 ret[jss::timeouts] = timeouts_;
1326 if (mHaveHeader && !mHaveState)
1329 for (
auto const& h : neededStateHashes(16,
nullptr))
1333 ret[jss::needed_state_hashes] = hv;
1336 if (mHaveHeader && !mHaveTransactions)
1339 for (
auto const& h : neededTxHashes(16,
nullptr))
1343 ret[jss::needed_transaction_hashes] = hv;
T back_inserter(T... args)
Value & append(const Value &value)
Append value to array at the end.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual Family & getNodeFamily()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual NodeStore::Database & db()=0
std::size_t getPeerCount() const
void trigger(std::shared_ptr< Peer > const &, TriggerReason)
Request more nodes, perhaps from a specific peer.
void init(ScopedLockType &collectionLock)
std::set< uint256 > mRecentNodes
void addPeers()
Add more peers to the set, if possible.
std::shared_ptr< Ledger > mLedger
std::vector< uint256 > neededTxHashes(int max, SHAMapSyncFilter *filter) const
InboundLedger(Application &app, uint256 const &hash, std::uint32_t seq, Reason reason, clock_type &, std::unique_ptr< PeerSet > peerSet)
void tryDB(NodeStore::Database &srcDB)
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Called with a lock by the PeerSet when the timer expires.
std::vector< uint256 > neededStateHashes(int max, SHAMapSyncFilter *filter) const
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
std::vector< std::pair< std::weak_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData > > > mReceivedData
std::vector< neededHash_t > getNeededHashes()
void update(std::uint32_t seq)
std::unique_ptr< PeerSet > mPeerSet
virtual void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet)=0
virtual void onLedgerFetched()=0
Called when a complete ledger is obtained.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
void checkAccept(std::shared_ptr< Ledger const > const &ledger)
std::optional< Blob > getFetchPack(uint256 const &hash) override
Retrieves partial ledger data of the coresponding hash from peers.
bool storeLedger(std::shared_ptr< Ledger const > ledger)
Persistency layer for NodeObject.
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
SHAMapHash getHash() const
std::vector< std::pair< SHAMapNodeID, uint256 > > getMissingNodes(int maxNodes, SHAMapSyncFilter *filter)
Check for nodes in the SHAMap not available.
int addRaw(Blob const &vector)
An immutable linear range of bytes.
This class is an "active" object.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
std::recursive_mutex mtx_
static constexpr std::size_t size()
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
LedgerHeader deserializePrefixedHeader(Slice data, bool hasHash=false)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
@ ledgerBecomeAggressiveThreshold
@ ledgerTimeoutRetriesMax
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
auto constexpr ledgerAcquireTimeout
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Number root(Number f, unsigned d)
LedgerHeader deserializeHeader(Slice data, bool hasHash=false)
Deserialize a ledger header from a byte array.
static std::vector< uint256 > neededHashes(uint256 const &root, SHAMap &map, int max, SHAMapSyncFilter *filter)
T shared_from_this(T... args)
T stable_partition(T... args)
std::unordered_map< std::shared_ptr< Peer >, int > counts
void sampleN(std::size_t n, F &&f)
void update(std::shared_ptr< Peer > &&peer, int dataCount)