20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/perflog/PerfLog.h>
35#include <xrpl/basics/UptimeClock.h>
36#include <xrpl/basics/base64.h>
37#include <xrpl/basics/random.h>
38#include <xrpl/basics/safe_cast.h>
39#include <xrpl/protocol/digest.h>
41#include <boost/algorithm/string/predicate.hpp>
42#include <boost/beast/core/ostream.hpp>
50using namespace std::chrono_literals;
78 , sink_(app_.journal(
"Peer"), makePrefix(id))
79 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
82 , stream_ptr_(
std::move(stream_ptr))
83 , socket_(stream_ptr_->next_layer().socket())
84 , stream_(*stream_ptr_)
85 , strand_(socket_.get_executor())
87 , remote_address_(slot->remote_endpoint())
93 , publicKey_(publicKey)
96 , squelch_(app_.journal(
"Squelch"))
98 , fee_{Resource::feeTrivialPeer,
""}
100 , request_(
std::move(request))
102 , compressionEnabled_(
107 app_.config().COMPRESSION)
113 app_.config().TX_REDUCE_RELAY_ENABLE))
117 app_.config().VP_REDUCE_RELAY_ENABLE))
121 app_.config().LEDGER_REPLAY))
122 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
126 <<
" vp reduce-relay enabled "
128 <<
" tx reduce-relay enabled "
135 const bool inCluster{
cluster()};
158 if (!
strand_.running_in_this_thread())
161 auto parseLedgerHash =
175 if (
auto const iter =
headers_.find(
"Closed-Ledger");
178 closed = parseLedgerHash(iter->value());
181 fail(
"Malformed handshake data (1)");
184 if (
auto const iter =
headers_.find(
"Previous-Ledger");
187 previous = parseLedgerHash(iter->value());
190 fail(
"Malformed handshake data (2)");
193 if (previous && !closed)
194 fail(
"Malformed handshake data (3)");
216 if (!
strand_.running_in_this_thread())
242 if (!
strand_.running_in_this_thread())
249 auto validator = m->getValidatorKey();
250 if (validator && !
squelch_.expireSquelch(*validator))
254 safe_cast<TrafficCount::category>(m->getCategory()),
272 <<
" sendq: " << sendq_size;
280 boost::asio::async_write(
289 std::placeholders::_1,
290 std::placeholders::_2)));
296 if (!
strand_.running_in_this_thread())
302 protocol::TMHaveTransactions ht;
304 ht.add_hashes(hash.data(), hash.size());
308 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
315 if (!
strand_.running_in_this_thread())
332 if (!
strand_.running_in_this_thread())
337 auto removed =
txQueue_.erase(hash);
349 fail(
"charge: Resources");
358 auto const iter =
headers_.find(
"Crawl");
361 return boost::iequals(iter->value(),
"public");
387 ret[jss::inbound] =
true;
391 ret[jss::cluster] =
true;
401 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
418 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
423 if ((minSeq != 0) || (maxSeq != 0))
424 ret[jss::complete_ledgers] =
430 ret[jss::track] =
"diverged";
434 ret[jss::track] =
"unknown";
443 protocol::TMStatusChange last_status;
450 if (closedLedgerHash != beast::zero)
451 ret[jss::ledger] =
to_string(closedLedgerHash);
453 if (last_status.has_newstatus())
455 switch (last_status.newstatus())
457 case protocol::nsCONNECTING:
458 ret[jss::status] =
"connecting";
461 case protocol::nsCONNECTED:
462 ret[jss::status] =
"connected";
465 case protocol::nsMONITORING:
466 ret[jss::status] =
"monitoring";
469 case protocol::nsVALIDATING:
470 ret[jss::status] =
"validating";
473 case protocol::nsSHUTTING:
474 ret[jss::status] =
"shutting";
479 <<
"Unknown status: " << last_status.newstatus();
484 ret[jss::metrics][jss::total_bytes_recv] =
486 ret[jss::metrics][jss::total_bytes_sent] =
488 ret[jss::metrics][jss::avg_bps_recv] =
490 ret[jss::metrics][jss::avg_bps_sent] =
569 strand_.running_in_this_thread(),
570 "ripple::PeerImp::close : strand in this thread");
592 if (!
strand_.running_in_this_thread())
603 <<
" failed: " << reason;
612 strand_.running_in_this_thread(),
613 "ripple::PeerImp::fail : strand in this thread");
627 strand_.running_in_this_thread(),
628 "ripple::PeerImp::gracefulClose : strand in this thread");
630 socket_.is_open(),
"ripple::PeerImp::gracefulClose : socket is open");
633 "ripple::PeerImp::gracefulClose : socket is not closing");
638 stream_.async_shutdown(bind_executor(
648 timer_.expires_from_now(peerTimerInterval, ec);
655 timer_.async_wait(bind_executor(
685 if (ec == boost::asio::error::operation_aborted)
697 fail(
"Large send queue");
703 clock_type::duration duration;
724 fail(
"Ping Timeout");
731 protocol::TMPing message;
732 message.set_type(protocol::TMPing::ptPING);
735 send(std::make_shared<Message>(message, protocol::mtPING));
747 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
750 if (ec != boost::asio::error::eof)
751 return fail(
"onShutdown", ec);
761 "ripple::PeerImp::doAccept : empty read buffer");
770 return fail(
"makeSharedValue: Unexpected failure");
791 auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
804 boost::asio::async_write(
806 write_buffer->data(),
807 boost::asio::transfer_all(),
814 if (ec == boost::asio::error::operation_aborted)
817 return fail(
"onWriteResponse", ec);
818 if (write_buffer->size() == bytes_transferred)
820 return fail(
"Failed to write header");
884 if (ec == boost::asio::error::operation_aborted)
886 if (ec == boost::asio::error::eof)
892 return fail(
"onReadMessage", ec);
895 if (bytes_transferred > 0)
896 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
898 stream <<
"onReadMessage";
901 metrics_.recv.add_message(bytes_transferred);
911 using namespace std::chrono_literals;
916 "invokeProtocolMessage",
921 return fail(
"onReadMessage", ec);
926 if (bytes_consumed == 0)
939 std::placeholders::_1,
940 std::placeholders::_2)));
948 if (ec == boost::asio::error::operation_aborted)
951 return fail(
"onWriteMessage", ec);
954 if (bytes_transferred > 0)
955 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
957 stream <<
"onWriteMessage";
960 metrics_.sent.add_message(bytes_transferred);
964 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
969 return boost::asio::async_write(
978 std::placeholders::_1,
979 std::placeholders::_2)));
984 return stream_.async_shutdown(bind_executor(
989 std::placeholders::_1)));
1019 if ((type == MessageType::mtTRANSACTION ||
1020 type == MessageType::mtHAVE_TRANSACTIONS ||
1021 type == MessageType::mtTRANSACTIONS ||
1033 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1035 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1036 << uncompressed_size <<
" " << isCompressed;
1051 auto const s = m->list_size();
1071 if (m->type() == protocol::TMPing::ptPING)
1075 m->set_type(protocol::TMPing::ptPONG);
1076 send(std::make_shared<Message>(*m, protocol::mtPING));
1080 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1090 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1115 for (
int i = 0; i < m->clusternodes().size(); ++i)
1117 protocol::TMClusterNode
const& node = m->clusternodes(i);
1120 if (node.has_nodename())
1121 name = node.nodename();
1123 auto const publicKey =
1130 auto const reportTime =
1134 *publicKey,
name, node.nodeload(), reportTime);
1138 int loadSources = m->loadsources().size();
1139 if (loadSources != 0)
1142 gossip.
items.reserve(loadSources);
1143 for (
int i = 0; i < m->loadsources().size(); ++i)
1145 protocol::TMLoadSource
const& node = m->loadsources(i);
1150 gossip.
items.push_back(item);
1163 if (status.getReportTime() >= thresh)
1164 fees.push_back(status.getLoadFee());
1169 auto const index = fees.size() / 2;
1171 clusterFee = fees[index];
1187 if (m->endpoints_v2().size() >= 1024)
1194 endpoints.
reserve(m->endpoints_v2().size());
1197 for (
auto const& tm : m->endpoints_v2())
1204 << tm.endpoint() <<
"}";
1230 if (!endpoints.
empty())
1247 eraseTxQueue != batch,
1248 (
"ripple::PeerImp::handleTransaction correct function params"));
1257 <<
"Ignoring incoming transaction: " <<
"Need network ledger";
1265 auto stx = std::make_shared<STTx const>(sit);
1266 uint256 txID = stx->getTransactionID();
1290 bool checkSignature =
true;
1293 if (!m->has_deferred() || !m->deferred())
1297 flags |= SF_TRUSTED;
1306 checkSignature =
false;
1313 <<
"No new transactions until synchronized";
1326 "recvTransaction->checkTransaction",
1332 if (
auto peer = weak.lock())
1333 peer->checkTransaction(
1334 flags, checkSignature, stx, batch);
1341 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1342 <<
". Exception: " << ex.
what();
1353 auto const itype{m->itype()};
1356 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1357 return badData(
"Invalid ledger info type");
1362 return std::nullopt;
1365 if (itype == protocol::liTS_CANDIDATE)
1367 if (!m->has_ledgerhash())
1368 return badData(
"Invalid TX candidate set, missing TX set hash");
1371 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1372 !(ltype && *ltype == protocol::ltCLOSED))
1374 return badData(
"Invalid request");
1378 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1379 return badData(
"Invalid ledger type");
1383 return badData(
"Invalid ledger hash");
1386 if (m->has_ledgerseq())
1388 auto const ledgerSeq{m->ledgerseq()};
1391 using namespace std::chrono_literals;
1401 if (itype != protocol::liBASE)
1403 if (m->nodeids_size() <= 0)
1404 return badData(
"Invalid ledger node IDs");
1406 for (
auto const& nodeId : m->nodeids())
1409 return badData(
"Invalid SHAMap node ID");
1414 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1415 return badData(
"Invalid query type");
1418 if (m->has_querydepth())
1421 itype == protocol::liBASE)
1423 return badData(
"Invalid query depth");
1430 if (
auto peer = weak.
lock())
1431 peer->processLedgerRequest(m);
1451 if (
auto peer = weak.
lock())
1454 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1455 if (reply.has_error())
1457 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1459 Resource::feeMalformedRequest,
1460 "proof_path_request");
1463 Resource::feeRequestNoReply,
"proof_path_request");
1467 peer->send(std::make_shared<Message>(
1468 reply, protocol::mtPROOF_PATH_RESPONSE));
1477 if (!ledgerReplayEnabled_)
1480 Resource::feeMalformedRequest,
"proof_path_response disabled");
1484 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1486 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1493 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1494 if (!ledgerReplayEnabled_)
1497 Resource::feeMalformedRequest,
"replay_delta_request disabled");
1501 fee_.fee = Resource::feeModerateBurdenPeer;
1503 app_.getJobQueue().addJob(
1505 if (
auto peer = weak.
lock())
1508 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1509 if (reply.has_error())
1511 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1513 Resource::feeMalformedRequest,
1514 "replay_delta_request");
1517 Resource::feeRequestNoReply,
1518 "replay_delta_request");
1522 peer->send(std::make_shared<Message>(
1523 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1532 if (!ledgerReplayEnabled_)
1535 Resource::feeMalformedRequest,
"replay_delta_response disabled");
1539 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1541 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1549 fee_.update(Resource::feeInvalidData, msg);
1550 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1555 return badData(
"Invalid ledger hash");
1559 auto const ledgerSeq{m->ledgerseq()};
1560 if (m->type() == protocol::liTS_CANDIDATE)
1571 using namespace std::chrono_literals;
1572 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1573 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1582 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1583 return badData(
"Invalid ledger info type");
1586 if (m->has_error() &&
1587 (m->error() < protocol::reNO_LEDGER ||
1588 m->error() > protocol::reBAD_REQUEST))
1590 return badData(
"Invalid reply error");
1594 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1597 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1601 if (m->has_requestcookie())
1603 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1605 m->clear_requestcookie();
1606 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1610 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1615 uint256 const ledgerHash{m->ledgerhash()};
1618 if (m->type() == protocol::liTS_CANDIDATE)
1621 app_.getJobQueue().addJob(
1622 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1623 if (
auto peer = weak.lock())
1625 peer->app_.getInboundTransactions().gotData(
1626 ledgerHash, peer, m);
1633 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1639 protocol::TMProposeSet&
set = *m;
1645 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1648 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1650 Resource::feeInvalidSignature,
1651 " signature can't be longer than 72 bytes");
1658 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1659 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1667 auto const isTrusted = app_.validators().trusted(publicKey);
1672 if (!isTrusted && app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1675 uint256 const proposeHash{
set.currenttxhash()};
1676 uint256 const prevLedger{
set.previousledger()};
1688 if (
auto [added, relayed] =
1689 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1694 if (reduceRelayReady() && relayed &&
1695 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1696 overlay_.updateSlotAndSquelch(
1697 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1698 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1704 if (tracking_.load() == Tracking::diverged)
1706 JLOG(p_journal_.debug())
1707 <<
"Proposal: Dropping untrusted (peer divergence)";
1711 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1713 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1718 JLOG(p_journal_.trace())
1719 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1730 app_.timeKeeper().closeTime(),
1731 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1734 app_.getJobQueue().addJob(
1736 "recvPropose->checkPropose",
1738 if (
auto peer = weak.lock())
1739 peer->checkPropose(isTrusted, m,
proposal);
1746 JLOG(p_journal_.trace()) <<
"Status: Change";
1748 if (!m->has_networktime())
1749 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1753 if (!last_status_.has_newstatus() || m->has_newstatus())
1758 protocol::NodeStatus status = last_status_.newstatus();
1760 m->set_newstatus(status);
1764 if (m->newevent() == protocol::neLOST_SYNC)
1766 bool outOfSync{
false};
1771 if (!closedLedgerHash_.isZero())
1774 closedLedgerHash_.zero();
1776 previousLedgerHash_.zero();
1780 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1787 bool const peerChangedLedgers{
1794 if (peerChangedLedgers)
1796 closedLedgerHash_ = m->ledgerhash();
1797 closedLedgerHash = closedLedgerHash_;
1798 addLedger(closedLedgerHash, sl);
1802 closedLedgerHash_.zero();
1805 if (m->has_ledgerhashprevious() &&
1808 previousLedgerHash_ = m->ledgerhashprevious();
1809 addLedger(previousLedgerHash_, sl);
1813 previousLedgerHash_.zero();
1816 if (peerChangedLedgers)
1818 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1822 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1826 if (m->has_firstseq() && m->has_lastseq())
1830 minLedger_ = m->firstseq();
1831 maxLedger_ = m->lastseq();
1833 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1834 minLedger_ = maxLedger_ = 0;
1837 if (m->has_ledgerseq() &&
1838 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1841 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1844 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1847 if (m->has_newstatus())
1849 switch (m->newstatus())
1851 case protocol::nsCONNECTING:
1852 j[jss::status] =
"CONNECTING";
1854 case protocol::nsCONNECTED:
1855 j[jss::status] =
"CONNECTED";
1857 case protocol::nsMONITORING:
1858 j[jss::status] =
"MONITORING";
1860 case protocol::nsVALIDATING:
1861 j[jss::status] =
"VALIDATING";
1863 case protocol::nsSHUTTING:
1864 j[jss::status] =
"SHUTTING";
1869 if (m->has_newevent())
1871 switch (m->newevent())
1873 case protocol::neCLOSING_LEDGER:
1874 j[jss::action] =
"CLOSING_LEDGER";
1876 case protocol::neACCEPTED_LEDGER:
1877 j[jss::action] =
"ACCEPTED_LEDGER";
1879 case protocol::neSWITCHED_LEDGER:
1880 j[jss::action] =
"SWITCHED_LEDGER";
1882 case protocol::neLOST_SYNC:
1883 j[jss::action] =
"LOST_SYNC";
1888 if (m->has_ledgerseq())
1890 j[jss::ledger_index] = m->ledgerseq();
1893 if (m->has_ledgerhash())
1895 uint256 closedLedgerHash{};
1897 std::lock_guard sl(recentLock_);
1898 closedLedgerHash = closedLedgerHash_;
1900 j[jss::ledger_hash] = to_string(closedLedgerHash);
1903 if (m->has_networktime())
1905 j[jss::date] = Json::UInt(m->networktime());
1908 if (m->has_firstseq() && m->has_lastseq())
1910 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
1911 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
1927 serverSeq = maxLedger_;
1933 checkTracking(serverSeq, validationSeq);
1942 if (diff < Tuning::convergedLedgerLimit)
1945 tracking_ = Tracking::converged;
1948 if ((diff > Tuning::divergedLedgerLimit) &&
1949 (tracking_.load() != Tracking::diverged))
1954 tracking_ = Tracking::diverged;
1955 trackingTime_ = clock_type::now();
1964 fee_.update(Resource::feeMalformedRequest,
"bad hash");
1968 uint256 const hash{m->hash()};
1970 if (m->status() == protocol::tsHAVE)
1974 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
1975 recentTxSets_.end())
1977 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
1981 recentTxSets_.push_back(hash);
1986PeerImp::onValidatorListMessage(
1996 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
1997 <<
" from peer " << remote_address_;
1999 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
2005 JLOG(p_journal_.debug())
2006 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
2007 <<
" (" << id_ <<
")";
2009 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2011 JLOG(p_journal_.debug())
2012 << messageType <<
": received duplicate " << messageType;
2016 fee_.update(Resource::feeUselessData,
"duplicate");
2020 auto const applyResult = app_.validators().applyListsAndBroadcast(
2024 remote_address_.to_string(),
2027 app_.getHashRouter(),
2030 JLOG(p_journal_.debug())
2031 <<
"Processed " << messageType <<
" version " << version <<
" from "
2032 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2033 :
"unknown or invalid publisher")
2034 <<
" from " << remote_address_.to_string() <<
" (" << id_
2035 <<
") with best result " << to_string(applyResult.bestDisposition());
2038 switch (applyResult.bestDisposition())
2041 case ListDisposition::accepted:
2043 case ListDisposition::expired:
2045 case ListDisposition::pending: {
2049 applyResult.publisherKey,
2050 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2052 auto const& pubKey = *applyResult.publisherKey;
2054 if (
auto const iter = publisherListSequences_.find(pubKey);
2055 iter != publisherListSequences_.end())
2058 iter->second < applyResult.sequence,
2059 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2062 publisherListSequences_[pubKey] = applyResult.sequence;
2065 case ListDisposition::same_sequence:
2066 case ListDisposition::known_sequence:
2071 applyResult.sequence && applyResult.publisherKey,
2072 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2073 "and set publisher key");
2075 publisherListSequences_[*applyResult.publisherKey] <=
2076 applyResult.sequence,
2077 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2082 case ListDisposition::stale:
2083 case ListDisposition::untrusted:
2084 case ListDisposition::invalid:
2085 case ListDisposition::unsupported_version:
2089 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2094 switch (applyResult.worstDisposition())
2096 case ListDisposition::accepted:
2097 case ListDisposition::expired:
2098 case ListDisposition::pending:
2101 case ListDisposition::same_sequence:
2102 case ListDisposition::known_sequence:
2107 Resource::feeUselessData,
2108 " duplicate (same_sequence or known_sequence)");
2110 case ListDisposition::stale:
2113 fee_.update(Resource::feeInvalidData,
"expired");
2115 case ListDisposition::untrusted:
2119 fee_.update(Resource::feeUselessData,
"untrusted");
2121 case ListDisposition::invalid:
2124 Resource::feeInvalidSignature,
"invalid list disposition");
2126 case ListDisposition::unsupported_version:
2129 fee_.update(Resource::feeInvalidData,
"version");
2133 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2138 for (
auto const& [disp, count] : applyResult.dispositions)
2143 case ListDisposition::accepted:
2144 JLOG(p_journal_.debug())
2145 <<
"Applied " << count <<
" new " << messageType
2146 <<
"(s) from peer " << remote_address_;
2149 case ListDisposition::expired:
2150 JLOG(p_journal_.debug())
2151 <<
"Applied " << count <<
" expired " << messageType
2152 <<
"(s) from peer " << remote_address_;
2155 case ListDisposition::pending:
2156 JLOG(p_journal_.debug())
2157 <<
"Processed " << count <<
" future " << messageType
2158 <<
"(s) from peer " << remote_address_;
2160 case ListDisposition::same_sequence:
2161 JLOG(p_journal_.warn())
2162 <<
"Ignored " << count <<
" " << messageType
2163 <<
"(s) with current sequence from peer "
2166 case ListDisposition::known_sequence:
2167 JLOG(p_journal_.warn())
2168 <<
"Ignored " << count <<
" " << messageType
2169 <<
"(s) with future sequence from peer " << remote_address_;
2171 case ListDisposition::stale:
2172 JLOG(p_journal_.warn())
2173 <<
"Ignored " << count <<
"stale " << messageType
2174 <<
"(s) from peer " << remote_address_;
2176 case ListDisposition::untrusted:
2177 JLOG(p_journal_.warn())
2178 <<
"Ignored " << count <<
" untrusted " << messageType
2179 <<
"(s) from peer " << remote_address_;
2181 case ListDisposition::unsupported_version:
2182 JLOG(p_journal_.warn())
2183 <<
"Ignored " << count <<
"unsupported version "
2184 << messageType <<
"(s) from peer " << remote_address_;
2186 case ListDisposition::invalid:
2187 JLOG(p_journal_.warn())
2188 <<
"Ignored " << count <<
"invalid " << messageType
2189 <<
"(s) from peer " << remote_address_;
2193 "ripple::PeerImp::onValidatorListMessage : invalid list "
2204 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2206 JLOG(p_journal_.debug())
2207 <<
"ValidatorList: received validator list from peer using "
2208 <<
"protocol version " << to_string(protocol_)
2209 <<
" which shouldn't support this feature.";
2210 fee_.update(Resource::feeUselessData,
"unsupported peer");
2213 onValidatorListMessage(
2217 ValidatorList::parseBlobs(*m));
2221 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2222 <<
" from peer " << remote_address_;
2223 using namespace std::string_literals;
2224 fee_.update(Resource::feeInvalidData, e.
what());
2234 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2236 JLOG(p_journal_.debug())
2237 <<
"ValidatorListCollection: received validator list from peer "
2238 <<
"using protocol version " << to_string(protocol_)
2239 <<
" which shouldn't support this feature.";
2240 fee_.update(Resource::feeUselessData,
"unsupported peer");
2243 else if (m->version() < 2)
2245 JLOG(p_journal_.debug())
2246 <<
"ValidatorListCollection: received invalid validator list "
2248 << m->version() <<
" from peer using protocol version "
2249 << to_string(protocol_);
2250 fee_.update(Resource::feeInvalidData,
"wrong version");
2253 onValidatorListMessage(
2254 "ValidatorListCollection",
2257 ValidatorList::parseBlobs(*m));
2261 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2262 << e.
what() <<
" from peer " << remote_address_;
2263 using namespace std::string_literals;
2264 fee_.update(Resource::feeInvalidData, e.
what());
2271 if (m->validation().size() < 50)
2273 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2274 fee_.update(Resource::feeMalformedRequest,
"too small");
2280 auto const closeTime = app_.timeKeeper().closeTime();
2285 val = std::make_shared<STValidation>(
2289 app_.validatorManifests().getMasterKey(pk));
2292 val->setSeen(closeTime);
2296 app_.getValidations().parms(),
2297 app_.timeKeeper().closeTime(),
2299 val->getSeenTime()))
2301 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2302 fee_.update(Resource::feeUselessData,
"not current");
2309 auto const isTrusted =
2310 app_.validators().trusted(val->getSignerPublic());
2315 if (!isTrusted && app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2320 if (
auto [added, relayed] =
2321 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2328 if (reduceRelayReady() && relayed &&
2329 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2330 overlay_.updateSlotAndSquelch(
2331 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2332 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2336 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2338 JLOG(p_journal_.debug())
2339 <<
"Dropping untrusted validation from diverged peer";
2341 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2345 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2350 to_string(val->getNodeID());
2357 app_.getJobQueue().addJob(
2360 [weak, val, m, key]() {
2361 if (
auto peer = weak.
lock())
2362 peer->checkValidation(val, key, m);
2367 JLOG(p_journal_.debug())
2368 <<
"Dropping untrusted validation for load";
2373 JLOG(p_journal_.warn())
2374 <<
"Exception processing validation: " << e.
what();
2375 using namespace std::string_literals;
2376 fee_.update(Resource::feeMalformedRequest, e.
what());
2383 protocol::TMGetObjectByHash& packet = *m;
2385 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2386 <<
" " << packet.objects_size();
2391 if (send_queue_.size() >= Tuning::dropSendQueue)
2393 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2397 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2403 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2405 if (!txReduceRelayEnabled())
2407 JLOG(p_journal_.error())
2408 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2409 fee_.update(Resource::feeMalformedRequest,
"disabled");
2414 app_.getJobQueue().addJob(
2416 if (
auto peer = weak.
lock())
2417 peer->doTransactions(m);
2422 protocol::TMGetObjectByHash reply;
2424 reply.set_query(
false);
2426 if (packet.has_seq())
2427 reply.set_seq(packet.seq());
2429 reply.set_type(packet.type());
2431 if (packet.has_ledgerhash())
2435 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2439 reply.set_ledgerhash(packet.ledgerhash());
2443 Resource::feeModerateBurdenPeer,
2444 " received a get object by hash request");
2447 for (
int i = 0; i < packet.objects_size(); ++i)
2449 auto const& obj = packet.objects(i);
2452 uint256 const hash{obj.hash()};
2455 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2456 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2459 protocol::TMIndexedObject& newObj = *reply.add_objects();
2460 newObj.set_hash(hash.begin(), hash.size());
2462 &nodeObject->getData().front(),
2463 nodeObject->getData().size());
2465 if (obj.has_nodeid())
2466 newObj.set_index(obj.nodeid());
2467 if (obj.has_ledgerseq())
2468 newObj.set_ledgerseq(obj.ledgerseq());
2475 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2476 << packet.objects_size();
2477 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2484 bool progress =
false;
2486 for (
int i = 0; i < packet.objects_size(); ++i)
2488 const protocol::TMIndexedObject& obj = packet.objects(i);
2492 if (obj.has_ledgerseq())
2494 if (obj.ledgerseq() != pLSeq)
2496 if (pLDo && (pLSeq != 0))
2498 JLOG(p_journal_.debug())
2499 <<
"GetObj: Full fetch pack for " << pLSeq;
2501 pLSeq = obj.ledgerseq();
2502 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2506 JLOG(p_journal_.debug())
2507 <<
"GetObj: Late fetch pack for " << pLSeq;
2516 uint256 const hash{obj.hash()};
2518 app_.getLedgerMaster().addFetchPack(
2520 std::make_shared<Blob>(
2521 obj.data().begin(), obj.data().end()));
2526 if (pLDo && (pLSeq != 0))
2528 JLOG(p_journal_.debug())
2529 <<
"GetObj: Partial fetch pack for " << pLSeq;
2531 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2532 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2539 if (!txReduceRelayEnabled())
2541 JLOG(p_journal_.error())
2542 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2543 fee_.update(Resource::feeMalformedRequest,
"disabled");
2548 app_.getJobQueue().addJob(
2550 if (
auto peer = weak.
lock())
2551 peer->handleHaveTransactions(m);
2556PeerImp::handleHaveTransactions(
2559 protocol::TMGetObjectByHash tmBH;
2560 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2561 tmBH.set_query(
true);
2563 JLOG(p_journal_.trace())
2564 <<
"received TMHaveTransactions " << m->hashes_size();
2570 JLOG(p_journal_.error())
2571 <<
"TMHaveTransactions with invalid hash size";
2572 fee_.update(Resource::feeMalformedRequest,
"hash size");
2578 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2580 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2584 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2586 auto obj = tmBH.add_objects();
2587 obj->set_hash(hash.
data(), hash.
size());
2594 removeTxQueue(hash);
2598 JLOG(p_journal_.trace())
2599 <<
"transaction request object is " << tmBH.objects_size();
2601 if (tmBH.objects_size() > 0)
2602 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2608 if (!txReduceRelayEnabled())
2610 JLOG(p_journal_.error())
2611 <<
"TMTransactions: tx reduce-relay is disabled";
2612 fee_.update(Resource::feeMalformedRequest,
"disabled");
2616 JLOG(p_journal_.trace())
2617 <<
"received TMTransactions " << m->transactions_size();
2619 overlay_.addTxMetrics(m->transactions_size());
2624 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2632 using on_message_fn =
2634 if (!strand_.running_in_this_thread())
2638 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2640 if (!m->has_validatorpubkey())
2642 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2645 auto validator = m->validatorpubkey();
2649 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2655 if (!app_.validators().listed(key))
2657 fee_.update(Resource::feeInvalidData,
"squelch non-validator");
2658 JLOG(p_journal_.debug())
2659 <<
"onMessage: TMSquelch discarding non-validator squelch "
2665 if (key == app_.getValidationPublicKey())
2667 JLOG(p_journal_.debug())
2668 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2673 m->has_squelchduration() ? m->squelchduration() : 0;
2675 squelch_.removeSquelch(key);
2677 fee_.update(Resource::feeInvalidData,
"squelch duration");
2679 JLOG(p_journal_.debug())
2680 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2692 (void)lockedRecentLock;
2694 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2695 recentLedgers_.end())
2698 recentLedgers_.push_back(hash);
2707 if (app_.getFeeTrack().isLoadedLocal() ||
2708 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2709 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2711 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2717 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2718 fee_.update(Resource::feeMalformedRequest,
"hash size");
2722 fee_.fee = Resource::feeHeavyBurdenPeer;
2724 uint256 const hash{packet->ledgerhash()};
2727 auto elapsed = UptimeClock::now();
2728 auto const pap = &app_;
2729 app_.getJobQueue().addJob(
2730 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2731 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2736PeerImp::doTransactions(
2739 protocol::TMTransactions reply;
2741 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2742 << packet->objects_size();
2744 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2746 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2747 fee_.update(Resource::feeMalformedRequest,
"too big");
2753 auto const& obj = packet->objects(i);
2757 fee_.update(Resource::feeMalformedRequest,
"hash size");
2763 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2767 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
2769 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2774 auto tx = reply.add_transactions();
2775 auto sttx = txn->getSTransaction();
2777 tx->set_rawtransaction(s.
data(), s.
size());
2779 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
2781 tx->set_receivetimestamp(
2782 app_.timeKeeper().now().time_since_epoch().count());
2783 tx->set_deferred(txn->getSubmitResult().queued);
2786 if (reply.transactions_size() > 0)
2787 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2791PeerImp::checkTransaction(
2793 bool checkSignature,
2801 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2802 (stx->getFieldU32(sfLastLedgerSequence) <
2803 app_.getLedgerMaster().getValidLedgerIndex()))
2805 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2806 charge(Resource::feeUselessData,
"expired tx");
2815 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2817 tx->getStatus() ==
NEW,
2818 "ripple::PeerImp::checkTransaction Transaction created "
2820 if (tx->getStatus() ==
NEW)
2822 JLOG(p_journal_.debug())
2823 <<
"Processing " << (batch ?
"batch" :
"unsolicited")
2824 <<
" pseudo-transaction tx " << tx->getID();
2826 app_.getMasterTransaction().canonicalize(&tx);
2829 app_.getHashRouter().shouldRelay(tx->getID());
2832 JLOG(p_journal_.debug())
2833 <<
"Passing skipped pseudo pseudo-transaction tx "
2835 app_.overlay().relay(tx->getID(), {}, *toSkip);
2839 JLOG(p_journal_.debug())
2840 <<
"Charging for pseudo-transaction tx " << tx->getID();
2841 charge(Resource::feeUselessData,
"pseudo tx");
2852 app_.getHashRouter(),
2854 app_.getLedgerMaster().getValidatedRules(),
2856 valid != Validity::Valid)
2858 if (!validReason.empty())
2860 JLOG(p_journal_.trace())
2861 <<
"Exception checking transaction: " << validReason;
2865 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2867 Resource::feeInvalidSignature,
2868 "check transaction signature failure");
2875 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2879 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2881 if (tx->getStatus() ==
INVALID)
2883 if (!reason.
empty())
2885 JLOG(p_journal_.trace())
2886 <<
"Exception checking transaction: " << reason;
2888 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2889 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2893 bool const trusted(flags & SF_TRUSTED);
2894 app_.getOPs().processTransaction(
2895 tx, trusted,
false, NetworkOPs::FailHard::no);
2899 JLOG(p_journal_.warn())
2900 <<
"Exception in " << __func__ <<
": " << ex.
what();
2901 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2902 using namespace std::string_literals;
2903 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
2909PeerImp::checkPropose(
2914 JLOG(p_journal_.trace())
2915 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
2917 XRPL_ASSERT(packet,
"ripple::PeerImp::checkPropose : non-null packet");
2922 JLOG(p_journal_.warn()) << desc;
2923 charge(Resource::feeInvalidSignature, desc);
2930 relay = app_.getOPs().processTrustedProposal(peerPos);
2932 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
2940 auto haveMessage = app_.overlay().relay(
2942 if (reduceRelayReady() && !haveMessage.empty())
2943 overlay_.updateSlotAndSquelch(
2946 std::move(haveMessage),
2947 protocol::mtPROPOSE_LEDGER);
2952PeerImp::checkValidation(
2957 if (!val->isValid())
2959 std::string desc{
"Validation forwarded by peer is invalid"};
2960 JLOG(p_journal_.debug()) << desc;
2961 charge(Resource::feeInvalidSignature, desc);
2976 overlay_.relay(*packet, key, val->getSignerPublic());
2977 if (reduceRelayReady() && !haveMessage.empty())
2979 overlay_.updateSlotAndSquelch(
2981 val->getSignerPublic(),
2982 std::move(haveMessage),
2983 protocol::mtVALIDATION);
2989 JLOG(p_journal_.trace())
2990 <<
"Exception processing validation: " << ex.
what();
2991 using namespace std::string_literals;
2992 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
3006 if (p->hasTxSet(rootHash) && p.get() != skip)
3008 auto score = p->getScore(true);
3009 if (!ret || (score > retScore))
3034 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3036 auto score = p->getScore(true);
3037 if (!ret || (score > retScore))
3049PeerImp::sendLedgerBase(
3051 protocol::TMLedgerData& ledgerData)
3053 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3056 addRaw(ledger->info(), s);
3059 auto const& stateMap{ledger->stateMap()};
3060 if (stateMap.getHash() != beast::zero)
3065 stateMap.serializeRoot(
root);
3066 ledgerData.add_nodes()->set_nodedata(
3067 root.getDataPtr(),
root.getLength());
3069 if (ledger->info().txHash != beast::zero)
3071 auto const& txMap{ledger->txMap()};
3072 if (txMap.getHash() != beast::zero)
3076 txMap.serializeRoot(
root);
3077 ledgerData.add_nodes()->set_nodedata(
3078 root.getDataPtr(),
root.getLength());
3084 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3091 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3095 if (m->has_ledgerhash())
3098 uint256 const ledgerHash{m->ledgerhash()};
3099 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3102 JLOG(p_journal_.trace())
3103 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3105 if (m->has_querytype() && !m->has_requestcookie())
3111 m->has_ledgerseq() ? m->ledgerseq() : 0,
3114 m->set_requestcookie(
id());
3116 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3117 JLOG(p_journal_.debug())
3118 <<
"getLedger: Request relayed to peer";
3122 JLOG(p_journal_.trace())
3123 <<
"getLedger: Failed to find peer to relay request";
3127 else if (m->has_ledgerseq())
3130 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3132 JLOG(p_journal_.debug())
3133 <<
"getLedger: Early ledger sequence request";
3137 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3140 JLOG(p_journal_.debug())
3141 <<
"getLedger: Don't have ledger with sequence "
3146 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3148 ledger = app_.getLedgerMaster().getClosedLedger();
3154 auto const ledgerSeq{ledger->info().seq};
3155 if (m->has_ledgerseq())
3157 if (ledgerSeq != m->ledgerseq())
3160 if (!m->has_requestcookie())
3162 Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3165 JLOG(p_journal_.warn())
3166 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3169 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3172 JLOG(p_journal_.debug())
3173 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3178 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3187 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3189 uint256 const txSetHash{m->ledgerhash()};
3191 app_.getInboundTransactions().getSet(txSetHash,
false)};
3194 if (m->has_querytype() && !m->has_requestcookie())
3199 m->set_requestcookie(
id());
3201 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3202 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3206 JLOG(p_journal_.debug())
3207 <<
"getTxSet: Failed to find relay peer";
3212 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3223 if (!m->has_requestcookie())
3225 Resource::feeModerateBurdenPeer,
"received a get ledger request");
3229 SHAMap const* map{
nullptr};
3230 protocol::TMLedgerData ledgerData;
3231 bool fatLeaves{
true};
3232 auto const itype{m->itype()};
3234 if (itype == protocol::liTS_CANDIDATE)
3236 if (sharedMap = getTxSet(m); !sharedMap)
3238 map = sharedMap.
get();
3241 ledgerData.set_ledgerseq(0);
3242 ledgerData.set_ledgerhash(m->ledgerhash());
3243 ledgerData.set_type(protocol::liTS_CANDIDATE);
3244 if (m->has_requestcookie())
3245 ledgerData.set_requestcookie(m->requestcookie());
3252 if (send_queue_.size() >= Tuning::dropSendQueue)
3254 JLOG(p_journal_.debug())
3255 <<
"processLedgerRequest: Large send queue";
3258 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3260 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3264 if (ledger = getLedger(m); !ledger)
3268 auto const ledgerHash{ledger->info().hash};
3269 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3270 ledgerData.set_ledgerseq(ledger->info().seq);
3271 ledgerData.set_type(itype);
3272 if (m->has_requestcookie())
3273 ledgerData.set_requestcookie(m->requestcookie());
3277 case protocol::liBASE:
3278 sendLedgerBase(ledger, ledgerData);
3281 case protocol::liTX_NODE:
3282 map = &ledger->txMap();
3283 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3284 << to_string(map->getHash());
3287 case protocol::liAS_NODE:
3288 map = &ledger->stateMap();
3289 JLOG(p_journal_.trace())
3290 <<
"processLedgerRequest: Account state map hash "
3291 << to_string(map->getHash());
3296 JLOG(p_journal_.error())
3297 <<
"processLedgerRequest: Invalid ledger info type";
3304 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3309 if (m->nodeids_size() > 0)
3311 auto const queryDepth{
3312 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3316 for (
int i = 0; i < m->nodeids_size() &&
3317 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3323 data.reserve(Tuning::softMaxReplyNodes);
3327 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3329 JLOG(p_journal_.trace())
3330 <<
"processLedgerRequest: getNodeFat got "
3331 << data.size() <<
" nodes";
3333 for (
auto const& d : data)
3335 if (ledgerData.nodes_size() >=
3336 Tuning::hardMaxReplyNodes)
3338 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3339 node->set_nodeid(d.first.getRawString());
3340 node->set_nodedata(d.second.data(), d.second.size());
3345 JLOG(p_journal_.warn())
3346 <<
"processLedgerRequest: getNodeFat returns false";
3354 case protocol::liBASE:
3356 info =
"Ledger base";
3359 case protocol::liTX_NODE:
3363 case protocol::liAS_NODE:
3367 case protocol::liTS_CANDIDATE:
3368 info =
"TS candidate";
3376 if (!m->has_ledgerhash())
3377 info +=
", no hash specified";
3379 JLOG(p_journal_.error())
3380 <<
"processLedgerRequest: getNodeFat with nodeId "
3381 << *shaMapNodeId <<
" and ledger info type " << info
3382 <<
" throws exception: " << e.
what();
3386 JLOG(p_journal_.info())
3387 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3388 <<
" nodes at depth " << queryDepth <<
", return "
3389 << ledgerData.nodes_size() <<
" nodes";
3392 if (ledgerData.nodes_size() == 0)
3395 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3399PeerImp::getScore(
bool haveItem)
const
3403 static const int spRandomMax = 9999;
3407 static const int spHaveItem = 10000;
3412 static const int spLatency = 30;
3415 static const int spNoLatency = 8000;
3420 score += spHaveItem;
3429 score -= latency->count() * spLatency;
3431 score -= spNoLatency;
3437PeerImp::isHighLatency()
const
3440 return latency_ >= peerHighLatency;
3444PeerImp::reduceRelayReady()
3446 if (!reduceRelayReady_)
3448 reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3449 reduce_relay::WAIT_ON_BOOTUP;
3450 return vpReduceRelayEnabled_ && reduceRelayReady_;
3456 using namespace std::chrono_literals;
3459 totalBytes_ += bytes;
3460 accumBytes_ += bytes;
3461 auto const timeElapsed = clock_type::now() - intervalStart_;
3462 auto const timeElapsedInSecs =
3463 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3465 if (timeElapsedInSecs >= 1s)
3467 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3468 rollingAvg_.push_back(avgBytes);
3470 auto const totalBytes =
3472 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3474 intervalStart_ = clock_type::now();
3480PeerImp::Metrics::average_bytes()
const
3483 return rollingAvgBytes_;
3487PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::size_t size() const
The number of nodes in the cluster list.
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_DIVERGED_TIME
std::chrono::seconds MAX_UNKNOWN_TIME
bool shouldProcess(uint256 const &key, PeerShortID peer, int &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void reportTraffic(TrafficCount::category cat, bool isInbound, int bytes)
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
std::queue< std::shared_ptr< Message > > send_queue_
bool vpReduceRelayEnabled_
std::unique_ptr< LoadEvent > load_event_
boost::beast::http::fields const & headers_
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
clock_type::duration uptime() const
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
protocol::TMStatusChange last_status_
boost::shared_mutex nameMutex_
boost::circular_buffer< uint256 > recentTxSets_
std::unique_ptr< stream_type > stream_ptr_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Compressed compressionEnabled_
uint256 closedLedgerHash_
std::string domain() const
std::optional< std::uint32_t > lastPingSeq_
void onTimer(boost::system::error_code const &ec)
beast::Journal const journal_
beast::Journal const p_journal_
PeerImp(PeerImp const &)=delete
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
bool hasTxSet(uint256 const &hash) const override
clock_type::time_point lastPingTime_
void onMessageUnknown(std::uint16_t type)
std::shared_ptr< PeerFinder::Slot > const slot_
boost::circular_buffer< uint256 > recentLedgers_
std::optional< std::chrono::milliseconds > latency_
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
beast::IP::Endpoint const remote_address_
Json::Value json() override
PublicKey const publicKey_
hash_set< uint256 > txQueue_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
bool txReduceRelayEnabled_
clock_type::time_point trackingTime_
ProtocolVersion protocol_
reduce_relay::Squelch< UptimeClock > squelch_
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
struct ripple::PeerImp::@22 metrics_
uint256 previousLedgerHash_
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
void send(std::shared_ptr< Message > const &m) override
static std::string makePrefix(id_t id)
boost::system::error_code error_code
void onReadMessage(error_code ec, std::size_t bytes_transferred)
bool ledgerReplayEnabled_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
bool crawl() const
Returns true if this connection will publicly share its IP address.
void sendTxQueue() override
Send aggregated transactions' hashes.
bool txReduceRelayEnabled() const override
bool supportsFeature(ProtocolFeature f) const override
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
http_request_type request_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onShutdown(error_code ec)
boost::asio::strand< boost::asio::executor > strand_
void cycleStatus() override
boost::beast::multi_buffer read_buffer_
Resource::Consumer usage_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
void fail(std::string const &reason)
std::atomic< Tracking > tracking_
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
An endpoint that consumes resources.
int balance()
Returns the credit balance representing consumption.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
std::size_t size() const noexcept
void const * data() const noexcept
const void * getDataPtr() const
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, int type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(PreclaimContext const &ctx, AccountID const &src)
auto measureDurationAndLog(Func &&func, const std::string &actionDescription, std::chrono::duration< Rep, Period > maxDelay, const beast::Journal &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
static constexpr char FEATURE_COMPR[]
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ ValidatorListPropagation
@ ValidatorList2Propagation
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
static constexpr char FEATURE_LEDGER_REPLAY[]
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
static constexpr char FEATURE_TXRR[]
std::string to_string(base_uint< Bits, Tag > const &a)
Number root(Number f, unsigned d)
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
static constexpr char FEATURE_VPRR[]
T shared_from_this(T... args)
beast::IP::Address public_ip
std::optional< std::uint32_t > networkID
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items