1#include <xrpld/app/consensus/RCLValidations.h>
2#include <xrpld/app/ledger/InboundLedgers.h>
3#include <xrpld/app/ledger/InboundTransactions.h>
4#include <xrpld/app/ledger/LedgerMaster.h>
5#include <xrpld/app/ledger/TransactionMaster.h>
6#include <xrpld/app/misc/HashRouter.h>
7#include <xrpld/app/misc/LoadFeeTrack.h>
8#include <xrpld/app/misc/NetworkOPs.h>
9#include <xrpld/app/misc/Transaction.h>
10#include <xrpld/app/misc/ValidatorList.h>
11#include <xrpld/app/tx/apply.h>
12#include <xrpld/overlay/Cluster.h>
13#include <xrpld/overlay/detail/PeerImp.h>
14#include <xrpld/overlay/detail/Tuning.h>
16#include <xrpl/basics/UptimeClock.h>
17#include <xrpl/basics/base64.h>
18#include <xrpl/basics/random.h>
19#include <xrpl/basics/safe_cast.h>
20#include <xrpl/core/PerfLog.h>
21#include <xrpl/protocol/TxFlags.h>
22#include <xrpl/protocol/digest.h>
24#include <boost/algorithm/string/predicate.hpp>
25#include <boost/beast/core/ostream.hpp>
34using namespace std::chrono_literals;
67 , prefix_(makePrefix(fingerprint_))
68 , sink_(app_.journal(
"Peer"), prefix_)
69 , p_sink_(app_.journal(
"Protocol"), prefix_)
72 , stream_ptr_(
std::move(stream_ptr))
73 , socket_(stream_ptr_->next_layer().socket())
74 , stream_(*stream_ptr_)
75 , strand_(
boost::asio::make_strand(socket_.get_executor()))
77 , remote_address_(slot->remote_endpoint())
83 , publicKey_(publicKey)
86 , squelch_(app_.journal(
"Squelch"))
88 , fee_{Resource::feeTrivialPeer,
""}
90 , request_(
std::move(request))
92 , compressionEnabled_(
97 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
100 <<
" vp reduce-relay base squelch enabled "
108 bool const inCluster{
cluster()};
131 if (!
strand_.running_in_this_thread())
147 if (
auto const iter =
headers_.find(
"Closed-Ledger"); iter !=
headers_.end())
149 closed = parseLedgerHash(iter->value());
152 fail(
"Malformed handshake data (1)");
155 if (
auto const iter =
headers_.find(
"Previous-Ledger"); iter !=
headers_.end())
157 previous = parseLedgerHash(iter->value());
160 fail(
"Malformed handshake data (2)");
163 if (previous && !closed)
164 fail(
"Malformed handshake data (3)");
186 if (!
strand_.running_in_this_thread())
206 if (!
strand_.running_in_this_thread())
216 auto validator = m->getValidatorKey();
217 if (validator && !
squelch_.expireSquelch(*validator))
226 safe_cast<TrafficCount::category>(m->getCategory()),
245 sink << n <<
" sendq: " << sendq_size;
254 boost::asio::async_write(
265 if (!
strand_.running_in_this_thread())
270 protocol::TMHaveTransactions ht;
272 txQueue_.begin(),
txQueue_.end(), [&](
auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
282 if (!
strand_.running_in_this_thread())
298 if (!
strand_.running_in_this_thread())
301 auto removed =
txQueue_.erase(hash);
309 strand_.running_in_this_thread())
313 fail(
"charge: Resources");
322 auto const iter =
headers_.find(
"Crawl");
325 return boost::iequals(iter->value(),
"public");
351 ret[jss::inbound] =
true;
355 ret[jss::cluster] =
true;
365 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
381 ret[jss::uptime] =
static_cast<Json::UInt>(std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
386 if ((minSeq != 0) || (maxSeq != 0))
392 ret[jss::track] =
"diverged";
396 ret[jss::track] =
"unknown";
405 protocol::TMStatusChange last_status;
412 if (closedLedgerHash != beast::zero)
413 ret[jss::ledger] =
to_string(closedLedgerHash);
415 if (last_status.has_newstatus())
417 switch (last_status.newstatus())
419 case protocol::nsCONNECTING:
420 ret[jss::status] =
"connecting";
423 case protocol::nsCONNECTED:
424 ret[jss::status] =
"connected";
427 case protocol::nsMONITORING:
428 ret[jss::status] =
"monitoring";
431 case protocol::nsVALIDATING:
432 ret[jss::status] =
"validating";
435 case protocol::nsSHUTTING:
436 ret[jss::status] =
"shutting";
440 JLOG(
p_journal_.
warn()) <<
"Unknown status: " << last_status.newstatus();
521 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::fail : strand in this thread");
534 if (!
strand_.running_in_this_thread())
554 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::tryAsyncShutdown : strand in this thread");
574 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::shutdown: strand in this thread");
581 boost::beast::get_lowest_layer(
stream_).cancel();
598 (ec != boost::asio::error::eof && ec != boost::asio::error::operation_aborted &&
599 ec.message().find(
"application data after close notify") == std::string::npos);
613 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::close : strand in this thread");
639 timer_.expires_after(interval);
663 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::onTimer : strand in this thread");
671 if (ec == boost::asio::error::operation_aborted)
688 return fail(
"Large send queue");
692 clock_type::duration duration;
703 return fail(
"Not useful");
709 return fail(
"Ping Timeout");
714 protocol::TMPing message;
715 message.set_type(protocol::TMPing::ptPING);
740 XRPL_ASSERT(
read_buffer_.size() == 0,
"xrpl::PeerImp::doAccept : empty read buffer");
753 return fail(
"makeSharedValue: Unexpected failure");
785 boost::asio::async_write(
787 write_buffer->data(),
788 boost::asio::transfer_all(),
793 if (ec == boost::asio::error::operation_aborted)
796 return fail(
"onWriteResponse", ec);
797 if (write_buffer->size() == bytes_transferred)
799 return fail(
"Failed to write header");
856 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::onReadMessage : strand in this thread");
865 if (ec == boost::asio::error::eof)
871 if (ec == boost::asio::error::operation_aborted)
874 return fail(
"onReadMessage", ec);
882 stream <<
"onReadMessage: " << (bytes_transferred > 0 ?
to_string(bytes_transferred) +
" bytes" :
"");
885 metrics_.recv.add_message(bytes_transferred);
895 using namespace std::chrono_literals;
898 "invokeProtocolMessage",
908 return fail(
"onReadMessage", ec);
910 if (bytes_consumed == 0)
922 XRPL_ASSERT(!
shutdownStarted_,
"xrpl::PeerImp::onReadMessage : shutdown started");
935 XRPL_ASSERT(
strand_.running_in_this_thread(),
"xrpl::PeerImp::onWriteMessage : strand in this thread");
944 if (ec == boost::asio::error::operation_aborted)
947 return fail(
"onWriteMessage", ec);
952 stream <<
"onWriteMessage: " << (bytes_transferred > 0 ?
to_string(bytes_transferred) +
" bytes" :
"");
955 metrics_.sent.add_message(bytes_transferred);
957 XRPL_ASSERT(!
send_queue_.empty(),
"xrpl::PeerImp::onWriteMessage : non-empty send buffer");
966 XRPL_ASSERT(!
shutdownStarted_,
"xrpl::PeerImp::onWriteMessage : shutdown started");
969 return boost::asio::async_write(
1011 if ((type == MessageType::mtTRANSACTION || type == MessageType::mtHAVE_TRANSACTIONS ||
1012 type == MessageType::mtTRANSACTIONS ||
1023 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" " << uncompressed_size <<
" "
1037 auto const s = m->list_size();
1055 if (m->type() == protocol::TMPing::ptPING)
1059 m->set_type(protocol::TMPing::ptPONG);
1064 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1098 for (
int i = 0; i < m->clusternodes().size(); ++i)
1100 protocol::TMClusterNode
const& node = m->clusternodes(i);
1103 if (node.has_nodename())
1104 name = node.nodename();
1118 int loadSources = m->loadsources().size();
1119 if (loadSources != 0)
1122 gossip.
items.reserve(loadSources);
1123 for (
int i = 0; i < m->loadsources().size(); ++i)
1125 protocol::TMLoadSource
const& node = m->loadsources(i);
1130 gossip.
items.push_back(item);
1143 if (status.getReportTime() >= thresh)
1144 fees.push_back(status.getLoadFee());
1149 auto const index = fees.size() / 2;
1151 clusterFee = fees[index];
1167 if (m->endpoints_v2().size() >= 1024)
1174 endpoints.
reserve(m->endpoints_v2().size());
1177 for (
auto const& tm : m->endpoints_v2())
1183 JLOG(
p_journal_.
error()) <<
"failed to parse incoming endpoint: {" << tm.endpoint() <<
"}";
1207 if (!endpoints.
empty())
1220 XRPL_ASSERT(eraseTxQueue !=
batch, (
"xrpl::PeerImp::handleTransaction : valid inputs"));
1228 JLOG(
p_journal_.
debug()) <<
"Ignoring incoming transaction: Need network ledger";
1237 uint256 txID = stx->getTransactionID();
1259 JLOG(
p_journal_.
warn()) <<
"Ignoring Network relayed Tx containing "
1260 "tfInnerBatchTxn (handleTransaction).";
1290 bool checkSignature =
true;
1293 if (!m->has_deferred() || !m->deferred())
1306 checkSignature =
false;
1325 if (
auto peer = weak.lock())
1326 peer->checkTransaction(flags, checkSignature, stx,
batch);
1333 <<
". Exception: " << ex.
what();
1344 auto const itype{m->itype()};
1347 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1348 return badData(
"Invalid ledger info type");
1356 if (itype == protocol::liTS_CANDIDATE)
1358 if (!m->has_ledgerhash())
1359 return badData(
"Invalid TX candidate set, missing TX set hash");
1361 else if (!m->has_ledgerhash() && !m->has_ledgerseq() && !(ltype && *ltype == protocol::ltCLOSED))
1363 return badData(
"Invalid request");
1367 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1368 return badData(
"Invalid ledger type");
1372 return badData(
"Invalid ledger hash");
1375 if (m->has_ledgerseq())
1377 auto const ledgerSeq{m->ledgerseq()};
1380 using namespace std::chrono_literals;
1384 return badData(
"Invalid ledger sequence " +
std::to_string(ledgerSeq));
1389 if (itype != protocol::liBASE)
1391 if (m->nodeids_size() <= 0)
1392 return badData(
"Invalid ledger node IDs");
1394 for (
auto const& nodeId : m->nodeids())
1397 return badData(
"Invalid SHAMap node ID");
1402 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1403 return badData(
"Invalid query type");
1406 if (m->has_querydepth())
1410 return badData(
"Invalid query depth");
1417 if (
auto peer = weak.
lock())
1418 peer->processLedgerRequest(m);
1435 if (
auto peer = weak.
lock())
1437 auto reply = peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1438 if (reply.has_error())
1440 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1441 peer->charge(Resource::feeMalformedRequest,
"proof_path_request");
1443 peer->charge(Resource::feeRequestNoReply,
"proof_path_request");
1447 peer->send(std::make_shared<Message>(reply, protocol::mtPROOF_PATH_RESPONSE));
1456 if (!ledgerReplayEnabled_)
1458 fee_.update(Resource::feeMalformedRequest,
"proof_path_response disabled");
1462 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1464 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1471 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1472 if (!ledgerReplayEnabled_)
1474 fee_.update(Resource::feeMalformedRequest,
"replay_delta_request disabled");
1478 fee_.fee = Resource::feeModerateBurdenPeer;
1480 app_.getJobQueue().addJob(
jtREPLAY_REQ,
"RcvReplDReq", [weak, m]() {
1481 if (
auto peer = weak.
lock())
1483 auto reply = peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1484 if (reply.has_error())
1486 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1487 peer->charge(Resource::feeMalformedRequest,
"replay_delta_request");
1489 peer->charge(Resource::feeRequestNoReply,
"replay_delta_request");
1493 peer->send(std::make_shared<Message>(reply, protocol::mtREPLAY_DELTA_RESPONSE));
1502 if (!ledgerReplayEnabled_)
1504 fee_.update(Resource::feeMalformedRequest,
"replay_delta_response disabled");
1508 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1510 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1518 fee_.update(Resource::feeInvalidData, msg);
1519 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1524 return badData(
"Invalid ledger hash");
1528 auto const ledgerSeq{m->ledgerseq()};
1529 if (m->type() == protocol::liTS_CANDIDATE)
1533 return badData(
"Invalid ledger sequence " +
std::to_string(ledgerSeq));
1539 using namespace std::chrono_literals;
1540 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1541 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1543 return badData(
"Invalid ledger sequence " +
std::to_string(ledgerSeq));
1549 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1550 return badData(
"Invalid ledger info type");
1553 if (m->has_error() && (m->error() < protocol::reNO_LEDGER || m->error() > protocol::reBAD_REQUEST))
1555 return badData(
"Invalid reply error");
1559 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1561 return badData(
"Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1565 if (m->has_requestcookie())
1567 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1569 m->clear_requestcookie();
1574 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1579 uint256 const ledgerHash{m->ledgerhash()};
1582 if (m->type() == protocol::liTS_CANDIDATE)
1585 app_.getJobQueue().addJob(
jtTXN_DATA,
"RcvPeerData", [weak, ledgerHash, m]() {
1586 if (
auto peer = weak.lock())
1588 peer->app_.getInboundTransactions().gotData(ledgerHash, peer, m);
1595 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1601 protocol::TMProposeSet&
set = *m;
1610 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1611 fee_.update(Resource::feeInvalidSignature,
" signature can't be longer than 72 bytes");
1617 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1618 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1626 auto const isTrusted = app_.validators().trusted(publicKey);
1634 overlay_.reportInboundTraffic(TrafficCount::category::proposal_untrusted, Message::messageSize(*m));
1636 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1640 uint256 const proposeHash{
set.currenttxhash()};
1641 uint256 const prevLedger{
set.previousledger()};
1646 proposalUniqueId(proposeHash, prevLedger,
set.proposeseq(), closeTime, publicKey.slice(), sig);
1648 if (
auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_); !added)
1652 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1653 overlay_.updateSlotAndSquelch(suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1656 overlay_.reportInboundTraffic(TrafficCount::category::proposal_duplicate, Message::messageSize(*m));
1658 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1665 if (tracking_.load() == Tracking::diverged)
1667 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (peer divergence)";
1671 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1673 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1678 JLOG(p_journal_.trace()) <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1689 app_.timeKeeper().closeTime(),
1690 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1693 app_.getJobQueue().addJob(
1695 if (
auto peer = weak.lock())
1696 peer->checkPropose(isTrusted, m,
proposal);
1703 JLOG(p_journal_.trace()) <<
"Status: Change";
1705 if (!m->has_networktime())
1706 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1710 if (!last_status_.has_newstatus() || m->has_newstatus())
1715 protocol::NodeStatus status = last_status_.newstatus();
1717 m->set_newstatus(status);
1721 if (m->newevent() == protocol::neLOST_SYNC)
1723 bool outOfSync{
false};
1728 if (!closedLedgerHash_.isZero())
1731 closedLedgerHash_.zero();
1733 previousLedgerHash_.zero();
1737 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1750 if (peerChangedLedgers)
1752 closedLedgerHash_ = m->ledgerhash();
1753 closedLedgerHash = closedLedgerHash_;
1754 addLedger(closedLedgerHash, sl);
1758 closedLedgerHash_.zero();
1763 previousLedgerHash_ = m->ledgerhashprevious();
1764 addLedger(previousLedgerHash_, sl);
1768 previousLedgerHash_.zero();
1771 if (peerChangedLedgers)
1773 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1777 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1781 if (m->has_firstseq() && m->has_lastseq())
1785 minLedger_ = m->firstseq();
1786 maxLedger_ = m->lastseq();
1788 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1789 minLedger_ = maxLedger_ = 0;
1792 if (m->has_ledgerseq() && app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1794 checkTracking(m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1797 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1800 if (m->has_newstatus())
1802 switch (m->newstatus())
1804 case protocol::nsCONNECTING:
1805 j[jss::status] =
"CONNECTING";
1807 case protocol::nsCONNECTED:
1808 j[jss::status] =
"CONNECTED";
1810 case protocol::nsMONITORING:
1811 j[jss::status] =
"MONITORING";
1813 case protocol::nsVALIDATING:
1814 j[jss::status] =
"VALIDATING";
1816 case protocol::nsSHUTTING:
1817 j[jss::status] =
"SHUTTING";
1822 if (m->has_newevent())
1824 switch (m->newevent())
1826 case protocol::neCLOSING_LEDGER:
1827 j[jss::action] =
"CLOSING_LEDGER";
1829 case protocol::neACCEPTED_LEDGER:
1830 j[jss::action] =
"ACCEPTED_LEDGER";
1832 case protocol::neSWITCHED_LEDGER:
1833 j[jss::action] =
"SWITCHED_LEDGER";
1835 case protocol::neLOST_SYNC:
1836 j[jss::action] =
"LOST_SYNC";
1841 if (m->has_ledgerseq())
1843 j[jss::ledger_index] = m->ledgerseq();
1846 if (m->has_ledgerhash())
1848 uint256 closedLedgerHash{};
1851 closedLedgerHash = closedLedgerHash_;
1853 j[jss::ledger_hash] = to_string(closedLedgerHash);
1856 if (m->has_networktime())
1861 if (m->has_firstseq() && m->has_lastseq())
1863 j[jss::ledger_index_min] =
Json::UInt(m->firstseq());
1864 j[jss::ledger_index_max] =
Json::UInt(m->lastseq());
1880 serverSeq = maxLedger_;
1886 checkTracking(serverSeq, validationSeq);
1895 if (diff < Tuning::convergedLedgerLimit)
1898 tracking_ = Tracking::converged;
1901 if ((diff > Tuning::divergedLedgerLimit) && (tracking_.load() != Tracking::diverged))
1906 tracking_ = Tracking::diverged;
1907 trackingTime_ = clock_type::now();
1916 fee_.update(Resource::feeMalformedRequest,
"bad hash");
1920 uint256 const hash{m->hash()};
1922 if (m->status() == protocol::tsHAVE)
1926 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) != recentTxSets_.end())
1928 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
1932 recentTxSets_.push_back(hash);
1937PeerImp::onValidatorListMessage(
1947 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType;
1949 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
1955 JLOG(p_journal_.debug()) <<
"Received " << messageType;
1957 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
1959 JLOG(p_journal_.debug()) << messageType <<
": received duplicate " << messageType;
1963 fee_.update(Resource::feeUselessData,
"duplicate");
1967 auto const applyResult = app_.validators().applyListsAndBroadcast(
1971 remote_address_.to_string(),
1974 app_.getHashRouter(),
1977 JLOG(p_journal_.debug()) <<
"Processed " << messageType <<
" version " << version <<
" from "
1978 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
1979 :
"unknown or invalid publisher")
1980 <<
" with best result " << to_string(applyResult.bestDisposition());
1983 switch (applyResult.bestDisposition())
1986 case ListDisposition::accepted:
1988 case ListDisposition::expired:
1990 case ListDisposition::pending: {
1994 applyResult.publisherKey,
1995 "xrpl::PeerImp::onValidatorListMessage : publisher key is "
1997 auto const& pubKey = *applyResult.publisherKey;
1999 if (
auto const iter = publisherListSequences_.find(pubKey); iter != publisherListSequences_.end())
2002 iter->second < applyResult.sequence,
"xrpl::PeerImp::onValidatorListMessage : lower sequence");
2005 publisherListSequences_[pubKey] = applyResult.sequence;
2008 case ListDisposition::same_sequence:
2009 case ListDisposition::known_sequence:
2014 applyResult.sequence && applyResult.publisherKey,
2015 "xrpl::PeerImp::onValidatorListMessage : nonzero sequence "
2016 "and set publisher key");
2018 publisherListSequences_[*applyResult.publisherKey] <= applyResult.sequence,
2019 "xrpl::PeerImp::onValidatorListMessage : maximum sequence");
2024 case ListDisposition::stale:
2025 case ListDisposition::untrusted:
2026 case ListDisposition::invalid:
2027 case ListDisposition::unsupported_version:
2032 "xrpl::PeerImp::onValidatorListMessage : invalid best list "
2038 switch (applyResult.worstDisposition())
2040 case ListDisposition::accepted:
2041 case ListDisposition::expired:
2042 case ListDisposition::pending:
2045 case ListDisposition::same_sequence:
2046 case ListDisposition::known_sequence:
2050 fee_.update(Resource::feeUselessData,
" duplicate (same_sequence or known_sequence)");
2052 case ListDisposition::stale:
2055 fee_.update(Resource::feeInvalidData,
"expired");
2057 case ListDisposition::untrusted:
2061 fee_.update(Resource::feeUselessData,
"untrusted");
2063 case ListDisposition::invalid:
2065 fee_.update(Resource::feeInvalidSignature,
"invalid list disposition");
2067 case ListDisposition::unsupported_version:
2070 fee_.update(Resource::feeInvalidData,
"version");
2075 "xrpl::PeerImp::onValidatorListMessage : invalid worst list "
2081 for (
auto const& [disp, count] : applyResult.dispositions)
2086 case ListDisposition::accepted:
2087 JLOG(p_journal_.debug()) <<
"Applied " << count <<
" new " << messageType;
2090 case ListDisposition::expired:
2091 JLOG(p_journal_.debug()) <<
"Applied " << count <<
" expired " << messageType;
2094 case ListDisposition::pending:
2095 JLOG(p_journal_.debug()) <<
"Processed " << count <<
" future " << messageType;
2097 case ListDisposition::same_sequence:
2098 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
" " << messageType <<
"(s) with current sequence";
2100 case ListDisposition::known_sequence:
2101 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
" " << messageType <<
"(s) with future sequence";
2103 case ListDisposition::stale:
2104 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
"stale " << messageType;
2106 case ListDisposition::untrusted:
2107 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
" untrusted " << messageType;
2109 case ListDisposition::unsupported_version:
2110 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
"unsupported version " << messageType;
2112 case ListDisposition::invalid:
2113 JLOG(p_journal_.warn()) <<
"Ignored " << count <<
"invalid " << messageType;
2118 "xrpl::PeerImp::onValidatorListMessage : invalid list "
2130 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2132 JLOG(p_journal_.debug()) <<
"ValidatorList: received validator list from peer using "
2133 <<
"protocol version " << to_string(protocol_)
2134 <<
" which shouldn't support this feature.";
2135 fee_.update(Resource::feeUselessData,
"unsupported peer");
2138 onValidatorListMessage(
"ValidatorList", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2142 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what();
2143 using namespace std::string_literals;
2144 fee_.update(Resource::feeInvalidData, e.
what());
2153 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2155 JLOG(p_journal_.debug()) <<
"ValidatorListCollection: received validator list from peer "
2156 <<
"using protocol version " << to_string(protocol_)
2157 <<
" which shouldn't support this feature.";
2158 fee_.update(Resource::feeUselessData,
"unsupported peer");
2161 else if (m->version() < 2)
2163 JLOG(p_journal_.debug()) <<
"ValidatorListCollection: received invalid validator list "
2165 << m->version() <<
" from peer using protocol version " << to_string(protocol_);
2166 fee_.update(Resource::feeInvalidData,
"wrong version");
2169 onValidatorListMessage(
"ValidatorListCollection", m->manifest(), m->version(), ValidatorList::parseBlobs(*m));
2173 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, " << e.
what();
2174 using namespace std::string_literals;
2175 fee_.update(Resource::feeInvalidData, e.
what());
2182 if (m->validation().size() < 50)
2184 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2185 fee_.update(Resource::feeMalformedRequest,
"too small");
2191 auto const closeTime = app_.timeKeeper().closeTime();
2198 [
this](
PublicKey const& pk) {
return calcNodeID(app_.validatorManifests().getMasterKey(pk)); },
2200 val->setSeen(closeTime);
2204 app_.getValidations().parms(), app_.timeKeeper().closeTime(), val->getSignTime(), val->getSeenTime()))
2206 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2207 fee_.update(Resource::feeUselessData,
"not current");
2214 auto const isTrusted = app_.validators().trusted(val->getSignerPublic());
2222 overlay_.reportInboundTraffic(TrafficCount::category::validation_untrusted, Message::messageSize(*m));
2224 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2230 auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2237 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2238 overlay_.updateSlotAndSquelch(key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2241 overlay_.reportInboundTraffic(TrafficCount::category::validation_duplicate, Message::messageSize(*m));
2243 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2247 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2249 JLOG(p_journal_.debug()) <<
"Dropping untrusted validation from diverged peer";
2251 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2253 std::string const name = isTrusted ?
"ChkTrust" :
"ChkUntrust";
2257 if (
auto peer = weak.
lock())
2258 peer->checkValidation(val, key, m);
2263 JLOG(p_journal_.debug()) <<
"Dropping untrusted validation for load";
2268 JLOG(p_journal_.warn()) <<
"Exception processing validation: " << e.
what();
2269 using namespace std::string_literals;
2270 fee_.update(Resource::feeMalformedRequest, e.
what());
2277 protocol::TMGetObjectByHash& packet = *m;
2279 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type() <<
" " << packet.objects_size();
2284 if (send_queue_.size() >= Tuning::dropSendQueue)
2286 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2290 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2296 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2298 if (!txReduceRelayEnabled())
2300 JLOG(p_journal_.error()) <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2301 fee_.update(Resource::feeMalformedRequest,
"disabled");
2307 if (
auto peer = weak.
lock())
2308 peer->doTransactions(m);
2313 protocol::TMGetObjectByHash reply;
2315 reply.set_query(
false);
2317 if (packet.has_seq())
2318 reply.set_seq(packet.seq());
2320 reply.set_type(packet.type());
2322 if (packet.has_ledgerhash())
2326 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2330 reply.set_ledgerhash(packet.ledgerhash());
2333 fee_.update(Resource::feeModerateBurdenPeer,
" received a get object by hash request");
2336 for (
int i = 0; i < packet.objects_size(); ++i)
2338 auto const& obj = packet.objects(i);
2341 uint256 const hash{obj.hash()};
2344 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2345 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2348 protocol::TMIndexedObject& newObj = *reply.add_objects();
2349 newObj.set_hash(hash.begin(), hash.size());
2350 newObj.set_data(&nodeObject->getData().front(), nodeObject->getData().size());
2352 if (obj.has_nodeid())
2353 newObj.set_index(obj.nodeid());
2354 if (obj.has_ledgerseq())
2355 newObj.set_ledgerseq(obj.ledgerseq());
2361 if (reply.objects_size() >= Tuning::hardMaxReplyNodes)
2363 fee_.update(Resource::feeModerateBurdenPeer,
" Reply limit reached. Truncating reply.");
2370 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of " << packet.objects_size();
2378 bool progress =
false;
2380 for (
int i = 0; i < packet.objects_size(); ++i)
2382 protocol::TMIndexedObject
const& obj = packet.objects(i);
2386 if (obj.has_ledgerseq())
2388 if (obj.ledgerseq() != pLSeq)
2390 if (pLDo && (pLSeq != 0))
2392 JLOG(p_journal_.debug()) <<
"GetObj: Full fetch pack for " << pLSeq;
2394 pLSeq = obj.ledgerseq();
2395 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2399 JLOG(p_journal_.debug()) <<
"GetObj: Late fetch pack for " << pLSeq;
2408 uint256 const hash{obj.hash()};
2410 app_.getLedgerMaster().addFetchPack(
2416 if (pLDo && (pLSeq != 0))
2418 JLOG(p_journal_.debug()) <<
"GetObj: Partial fetch pack for " << pLSeq;
2420 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2421 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2428 if (!txReduceRelayEnabled())
2430 JLOG(p_journal_.error()) <<
"TMHaveTransactions: tx reduce-relay is disabled";
2431 fee_.update(Resource::feeMalformedRequest,
"disabled");
2436 app_.getJobQueue().addJob(
jtMISSING_TXN,
"HandleHaveTxs", [weak, m]() {
2437 if (
auto peer = weak.
lock())
2438 peer->handleHaveTransactions(m);
2445 protocol::TMGetObjectByHash tmBH;
2446 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2447 tmBH.set_query(
true);
2449 JLOG(p_journal_.trace()) <<
"received TMHaveTransactions " << m->hashes_size();
2455 JLOG(p_journal_.error()) <<
"TMHaveTransactions with invalid hash size";
2456 fee_.update(Resource::feeMalformedRequest,
"hash size");
2462 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2464 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2468 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2470 auto obj = tmBH.add_objects();
2471 obj->set_hash(hash.
data(), hash.
size());
2478 removeTxQueue(hash);
2482 JLOG(p_journal_.trace()) <<
"transaction request object is " << tmBH.objects_size();
2484 if (tmBH.objects_size() > 0)
2491 if (!txReduceRelayEnabled())
2493 JLOG(p_journal_.error()) <<
"TMTransactions: tx reduce-relay is disabled";
2494 fee_.update(Resource::feeMalformedRequest,
"disabled");
2498 JLOG(p_journal_.trace()) <<
"received TMTransactions " << m->transactions_size();
2500 overlay_.addTxMetrics(m->transactions_size());
2513 if (!strand_.running_in_this_thread())
2514 return post(strand_,
std::bind((on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2516 if (!m->has_validatorpubkey())
2518 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2521 auto validator = m->validatorpubkey();
2525 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2531 if (key == app_.getValidationPublicKey())
2533 JLOG(p_journal_.debug()) <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2537 std::uint32_t duration = m->has_squelchduration() ? m->squelchduration() : 0;
2539 squelch_.removeSquelch(key);
2541 fee_.update(Resource::feeInvalidData,
"squelch duration");
2543 JLOG(p_journal_.debug()) <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2553 (void)lockedRecentLock;
2555 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) != recentLedgers_.end())
2558 recentLedgers_.push_back(hash);
2567 if (app_.getFeeTrack().isLoadedLocal() || (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2568 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2570 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2576 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2577 fee_.update(Resource::feeMalformedRequest,
"hash size");
2581 fee_.fee = Resource::feeHeavyBurdenPeer;
2583 uint256 const hash{packet->ledgerhash()};
2586 auto elapsed = UptimeClock::now();
2587 auto const pap = &app_;
2588 app_.getJobQueue().addJob(
jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2589 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2596 protocol::TMTransactions reply;
2598 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx " << packet->objects_size();
2600 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2602 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2603 fee_.update(Resource::feeMalformedRequest,
"too big");
2609 auto const& obj = packet->objects(i);
2613 fee_.update(Resource::feeMalformedRequest,
"hash size");
2619 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2623 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found " <<
Slice(hash.
data(), hash.
size());
2624 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2629 auto tx = reply.add_transactions();
2630 auto sttx = txn->getSTransaction();
2632 tx->set_rawtransaction(s.
data(), s.
size());
2633 tx->set_status(txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT : protocol::tsNEW);
2634 tx->set_receivetimestamp(app_.timeKeeper().now().time_since_epoch().count());
2635 tx->set_deferred(txn->getSubmitResult().queued);
2638 if (reply.transactions_size() > 0)
2643PeerImp::checkTransaction(
2645 bool checkSignature,
2672 JLOG(p_journal_.warn()) <<
"Ignoring Network relayed Tx containing "
2673 "tfInnerBatchTxn (checkSignature).";
2674 charge(Resource::feeModerateBurdenPeer,
"inner batch txn");
2680 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2681 (stx->getFieldU32(sfLastLedgerSequence) < app_.getLedgerMaster().getValidLedgerIndex()))
2683 JLOG(p_journal_.info()) <<
"Marking transaction " << stx->getTransactionID()
2684 <<
"as BAD because it's expired";
2685 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2686 charge(Resource::feeUselessData,
"expired tx");
2697 tx->getStatus() ==
NEW,
2698 "xrpl::PeerImp::checkTransaction Transaction created "
2700 if (tx->getStatus() ==
NEW)
2702 JLOG(p_journal_.debug()) <<
"Processing " << (
batch ?
"batch" :
"unsolicited")
2703 <<
" pseudo-transaction tx " << tx->getID();
2705 app_.getMasterTransaction().canonicalize(&tx);
2707 auto const toSkip = app_.getHashRouter().shouldRelay(tx->getID());
2710 JLOG(p_journal_.debug()) <<
"Passing skipped pseudo pseudo-transaction tx " << tx->getID();
2711 app_.overlay().relay(tx->getID(), {}, *toSkip);
2715 JLOG(p_journal_.debug()) <<
"Charging for pseudo-transaction tx " << tx->getID();
2716 charge(Resource::feeUselessData,
"pseudo tx");
2727 app_.getHashRouter(), *stx, app_.getLedgerMaster().getValidatedRules(), app_.config());
2728 valid != Validity::Valid)
2730 if (!validReason.empty())
2732 JLOG(p_journal_.debug()) <<
"Exception checking transaction: " << validReason;
2737 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2738 charge(Resource::feeInvalidSignature,
"check transaction signature failure");
2744 forceValidity(app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2750 if (tx->getStatus() ==
INVALID)
2752 if (!reason.
empty())
2754 JLOG(p_journal_.debug()) <<
"Exception checking transaction: " << reason;
2756 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2757 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2761 bool const trusted = any(flags & HashRouterFlags::TRUSTED);
2762 app_.getOPs().processTransaction(tx, trusted,
false, NetworkOPs::FailHard::no);
2766 JLOG(p_journal_.warn()) <<
"Exception in " << __func__ <<
": " << ex.
what();
2767 app_.getHashRouter().setFlags(stx->getTransactionID(), HashRouterFlags::BAD);
2768 using namespace std::string_literals;
2769 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
2777 JLOG(p_journal_.trace()) <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
2779 XRPL_ASSERT(packet,
"xrpl::PeerImp::checkPropose : non-null packet");
2784 JLOG(p_journal_.warn()) << desc;
2785 charge(Resource::feeInvalidSignature, desc);
2792 relay = app_.getOPs().processTrustedProposal(peerPos);
2794 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
2803 if (!haveMessage.empty())
2804 overlay_.updateSlotAndSquelch(
2810PeerImp::checkValidation(
2815 if (!val->isValid())
2817 std::string desc{
"Validation forwarded by peer is invalid"};
2818 JLOG(p_journal_.debug()) << desc;
2819 charge(Resource::feeInvalidSignature, desc);
2826 if (app_.getOPs().recvValidation(val,
std::to_string(
id())) || cluster())
2832 auto haveMessage = overlay_.relay(*packet, key, val->getSignerPublic());
2833 if (!haveMessage.empty())
2835 overlay_.updateSlotAndSquelch(
2836 key, val->getSignerPublic(), std::move(haveMessage), protocol::mtVALIDATION);
2842 JLOG(p_journal_.trace()) <<
"Exception processing validation: " << ex.
what();
2843 using namespace std::string_literals;
2844 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
2858 if (p->hasTxSet(rootHash) && p.get() != skip)
2860 auto score = p->getScore(true);
2861 if (!ret || (score > retScore))
2882 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
2884 auto score = p->getScore(true);
2885 if (!ret || (score > retScore))
2899 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
2902 addRaw(ledger->header(), s);
2905 auto const& stateMap{ledger->stateMap()};
2906 if (stateMap.getHash() != beast::zero)
2911 stateMap.serializeRoot(
root);
2912 ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(),
root.getLength());
2914 if (ledger->header().txHash != beast::zero)
2916 auto const& txMap{ledger->txMap()};
2917 if (txMap.getHash() != beast::zero)
2921 txMap.serializeRoot(
root);
2922 ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(),
root.getLength());
2934 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
2938 if (m->has_ledgerhash())
2941 uint256 const ledgerHash{m->ledgerhash()};
2942 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
2945 JLOG(p_journal_.trace()) <<
"getLedger: Don't have ledger with hash " << ledgerHash;
2947 if (m->has_querytype() && !m->has_requestcookie())
2950 if (
auto const peer =
2951 getPeerWithLedger(overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0,
this))
2953 m->set_requestcookie(
id());
2955 JLOG(p_journal_.debug()) <<
"getLedger: Request relayed to peer";
2959 JLOG(p_journal_.trace()) <<
"getLedger: Failed to find peer to relay request";
2963 else if (m->has_ledgerseq())
2966 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
2968 JLOG(p_journal_.debug()) <<
"getLedger: Early ledger sequence request";
2972 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
2975 JLOG(p_journal_.debug()) <<
"getLedger: Don't have ledger with sequence " << m->ledgerseq();
2979 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
2981 ledger = app_.getLedgerMaster().getClosedLedger();
2987 auto const ledgerSeq{ledger->header().seq};
2988 if (m->has_ledgerseq())
2990 if (ledgerSeq != m->ledgerseq())
2993 if (!m->has_requestcookie())
2994 charge(Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
2997 JLOG(p_journal_.warn()) <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3000 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3003 JLOG(p_journal_.debug()) <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3008 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3017 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3019 uint256 const txSetHash{m->ledgerhash()};
3023 if (m->has_querytype() && !m->has_requestcookie())
3028 m->set_requestcookie(
id());
3030 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3034 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find relay peer";
3039 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3050 if (!m->has_requestcookie())
3051 charge(Resource::feeModerateBurdenPeer,
"received a get ledger request");
3055 SHAMap const* map{
nullptr};
3056 protocol::TMLedgerData ledgerData;
3057 bool fatLeaves{
true};
3058 auto const itype{m->itype()};
3060 if (itype == protocol::liTS_CANDIDATE)
3062 if (sharedMap = getTxSet(m); !sharedMap)
3064 map = sharedMap.
get();
3067 ledgerData.set_ledgerseq(0);
3068 ledgerData.set_ledgerhash(m->ledgerhash());
3069 ledgerData.set_type(protocol::liTS_CANDIDATE);
3070 if (m->has_requestcookie())
3071 ledgerData.set_requestcookie(m->requestcookie());
3078 if (send_queue_.size() >= Tuning::dropSendQueue)
3080 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Large send queue";
3083 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3085 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3089 if (ledger = getLedger(m); !ledger)
3093 auto const ledgerHash{ledger->header().hash};
3094 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3095 ledgerData.set_ledgerseq(ledger->header().seq);
3096 ledgerData.set_type(itype);
3097 if (m->has_requestcookie())
3098 ledgerData.set_requestcookie(m->requestcookie());
3102 case protocol::liBASE:
3103 sendLedgerBase(ledger, ledgerData);
3106 case protocol::liTX_NODE:
3107 map = &ledger->txMap();
3108 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash " << to_string(map->getHash());
3111 case protocol::liAS_NODE:
3112 map = &ledger->stateMap();
3113 JLOG(p_journal_.trace()) <<
"processLedgerRequest: Account state map hash "
3114 << to_string(map->getHash());
3119 JLOG(p_journal_.error()) <<
"processLedgerRequest: Invalid ledger info type";
3126 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3131 if (m->nodeids_size() > 0)
3133 auto const queryDepth{m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3137 for (
int i = 0; i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::softMaxReplyNodes; ++i)
3142 data.reserve(Tuning::softMaxReplyNodes);
3146 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3148 JLOG(p_journal_.trace()) <<
"processLedgerRequest: getNodeFat got " << data.size() <<
" nodes";
3150 for (
auto const& d : data)
3152 if (ledgerData.nodes_size() >= Tuning::hardMaxReplyNodes)
3154 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3155 node->set_nodeid(d.first.getRawString());
3156 node->set_nodedata(d.second.data(), d.second.size());
3161 JLOG(p_journal_.warn()) <<
"processLedgerRequest: getNodeFat returns false";
3169 case protocol::liBASE:
3171 info =
"Ledger base";
3174 case protocol::liTX_NODE:
3178 case protocol::liAS_NODE:
3182 case protocol::liTS_CANDIDATE:
3183 info =
"TS candidate";
3191 if (!m->has_ledgerhash())
3192 info +=
", no hash specified";
3194 JLOG(p_journal_.warn()) <<
"processLedgerRequest: getNodeFat with nodeId " << *shaMapNodeId
3195 <<
" and ledger info type " << info <<
" throws exception: " << e.
what();
3199 JLOG(p_journal_.info()) <<
"processLedgerRequest: Got request for " << m->nodeids_size() <<
" nodes at depth "
3200 << queryDepth <<
", return " << ledgerData.nodes_size() <<
" nodes";
3203 if (ledgerData.nodes_size() == 0)
3210PeerImp::getScore(
bool haveItem)
const
3214 static int const spRandomMax = 9999;
3218 static int const spHaveItem = 10000;
3223 static int const spLatency = 30;
3226 static int const spNoLatency = 8000;
3231 score += spHaveItem;
3240 score -= latency->count() * spLatency;
3242 score -= spNoLatency;
3248PeerImp::isHighLatency()
const
3251 return latency_ >= peerHighLatency;
3257 using namespace std::chrono_literals;
3260 totalBytes_ += bytes;
3261 accumBytes_ += bytes;
3262 auto const timeElapsed = clock_type::now() - intervalStart_;
3263 auto const timeElapsedInSecs = std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3265 if (timeElapsedInSecs >= 1s)
3267 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3268 rollingAvg_.push_back(avgBytes);
3270 auto const totalBytes =
std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3271 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3273 intervalStart_ = clock_type::now();
3279PeerImp::Metrics::average_bytes()
const
3282 return rollingAvgBytes_;
3286PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
std::size_t size() const
The number of nodes in the cluster list.
std::chrono::seconds MAX_DIVERGED_TIME
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_UNKNOWN_TIME
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
bool shouldProcess(uint256 const &key, PeerShortID peer, HashRouterFlags &flags, std::chrono::seconds tx_interval)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
int getJobCount(JobType t) const
Jobs waiting at this priority.
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
std::chrono::seconds getValidatedLedgerAge()
LedgerIndex getValidLedgerIndex()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
virtual bool isNeedNetworkLedger()=0
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void incPeerDisconnectCharges() override
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
PeerFinder::Manager & peerFinder()
Resource::Manager & resourceManager()
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void onPeerDeactivate(Peer::id_t id)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
This class manages established peer-to-peer connections, handles message exchange,...
std::optional< std::chrono::milliseconds > latency_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
std::unique_ptr< LoadEvent > load_event_
beast::Journal const p_journal_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
ProtocolVersion protocol_
bool txReduceRelayEnabled_
void close()
Forcibly closes the underlying socket connection.
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
http_request_type request_
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
bool txReduceRelayEnabled() const override
std::shared_ptr< PeerFinder::Slot > const slot_
boost::beast::http::fields const & headers_
Compressed compressionEnabled_
void onTimer(error_code const &ec)
Handles the expiration of the peer activity timer.
boost::system::error_code error_code
void tryAsyncShutdown()
Attempts to perform a graceful SSL shutdown if conditions are met.
std::string const & fingerprint() const override
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
bool ledgerReplayEnabled_
void sendTxQueue() override
Send aggregated transactions' hashes.
uint256 closedLedgerHash_
reduce_relay::Squelch< UptimeClock > squelch_
PeerImp(PeerImp const &)=delete
void cycleStatus() override
std::shared_mutex nameMutex_
beast::IP::Endpoint const remote_address_
std::string domain() const
std::atomic< Tracking > tracking_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
clock_type::duration uptime() const
PublicKey const publicKey_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Json::Value json() override
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
std::queue< std::shared_ptr< Message > > send_queue_
static std::string makePrefix(std::string const &fingerprint)
void onShutdown(error_code ec)
Handles the completion of the asynchronous SSL shutdown.
void onMessageUnknown(std::uint16_t type)
protocol::TMStatusChange last_status_
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
std::unique_ptr< stream_type > stream_ptr_
struct xrpl::PeerImp::@22 metrics_
boost::beast::multi_buffer read_buffer_
void send(std::shared_ptr< Message > const &m) override
hash_set< uint256 > txQueue_
bool supportsFeature(ProtocolFeature f) const override
void shutdown()
Initiates the peer disconnection sequence.
boost::asio::strand< boost::asio::executor > strand_
clock_type::time_point lastPingTime_
boost::circular_buffer< uint256 > recentLedgers_
bool hasTxSet(uint256 const &hash) const override
boost::circular_buffer< uint256 > recentTxSets_
clock_type::time_point trackingTime_
beast::Journal const journal_
void cancelTimer() noexcept
Cancels any pending wait on the peer activity timer.
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
uint256 previousLedgerHash_
Resource::Consumer usage_
std::optional< std::uint32_t > lastPingSeq_
bool crawl() const
Returns true if this connection will publicly share its IP address.
void setTimer(std::chrono::seconds interval)
Sets and starts the peer timer.
void fail(std::string const &name, error_code ec)
Handles a failure associated with a specific error code.
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
bool checkSign() const
Verify the signing hash of the proposal.
An endpoint that consumes resources.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
int balance()
Returns the credit balance representing consumption.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
void const * getDataPtr() const
std::size_t size() const noexcept
void const * data() const noexcept
virtual JobQueue & getJobQueue()=0
virtual ValidatorList & validators()=0
virtual NetworkOPs & getOPs()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual Cluster & cluster()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual HashRouter & getHashRouter()=0
virtual TimeKeeper & timeKeeper()=0
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
static constexpr std::size_t size()
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeInvalidData
Charge const feeModerateBurdenPeer
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeTrivialPeer
Charge const feeUselessData
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ sendQueueLogFreq
How often to log send queue size.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ maxQueryDepth
The maximum number of levels to search.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
static constexpr char FEATURE_COMPR[]
Stopwatch & stopwatch()
Returns an instance of a wall clock.
constexpr std::uint32_t tfInnerBatchTxn
std::string to_string(base_uint< Bits, Tag > const &a)
std::string strHex(FwdIt begin, FwdIt end)
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
std::string base64_decode(std::string_view data)
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
@ ValidatorListPropagation
@ ValidatorList2Propagation
Number root(Number f, unsigned d)
static bool stringIsUint256Sized(std::string const &pBuffStr)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
std::string getFingerprint(beast::IP::Endpoint const &address, std::optional< PublicKey > const &publicKey=std::nullopt, std::optional< std::string > const &id=std::nullopt)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
@ proposal
proposal for signing
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
static constexpr char FEATURE_LEDGER_REPLAY[]
static constexpr char FEATURE_VPRR[]
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
static constexpr char FEATURE_TXRR[]
T shared_from_this(T... args)
std::optional< std::uint32_t > networkID
beast::IP::Address public_ip
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items