20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/overlay/predicates.h>
35#include <xrpld/perflog/PerfLog.h>
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/base64.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/beast/core/LexicalCast.h>
42#include <xrpl/protocol/digest.h>
44#include <boost/algorithm/string/predicate.hpp>
45#include <boost/beast/core/ostream.hpp>
53using namespace std::chrono_literals;
81 , sink_(app_.journal(
"Peer"), makePrefix(id))
82 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
85 , stream_ptr_(
std::move(stream_ptr))
86 , socket_(stream_ptr_->next_layer().socket())
87 , stream_(*stream_ptr_)
88 , strand_(socket_.get_executor())
90 , remote_address_(slot->remote_endpoint())
96 , publicKey_(publicKey)
99 , squelch_(app_.journal(
"Squelch"))
101 , fee_{Resource::feeTrivialPeer,
""}
103 , request_(
std::move(request))
105 , compressionEnabled_(
110 app_.config().COMPRESSION)
116 app_.config().TX_REDUCE_RELAY_ENABLE))
120 app_.config().VP_REDUCE_RELAY_ENABLE))
124 app_.config().LEDGER_REPLAY))
125 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
129 <<
" vp reduce-relay enabled "
131 <<
" tx reduce-relay enabled "
138 const bool inCluster{
cluster()};
161 if (!
strand_.running_in_this_thread())
164 auto parseLedgerHash =
178 if (
auto const iter =
headers_.find(
"Closed-Ledger");
181 closed = parseLedgerHash(iter->value());
184 fail(
"Malformed handshake data (1)");
187 if (
auto const iter =
headers_.find(
"Previous-Ledger");
190 previous = parseLedgerHash(iter->value());
193 fail(
"Malformed handshake data (2)");
196 if (previous && !closed)
197 fail(
"Malformed handshake data (3)");
219 if (!
strand_.running_in_this_thread())
245 if (!
strand_.running_in_this_thread())
252 auto validator = m->getValidatorKey();
253 if (validator && !
squelch_.expireSquelch(*validator))
257 safe_cast<TrafficCount::category>(m->getCategory()),
275 <<
" sendq: " << sendq_size;
283 boost::asio::async_write(
292 std::placeholders::_1,
293 std::placeholders::_2)));
299 if (!
strand_.running_in_this_thread())
305 protocol::TMHaveTransactions ht;
307 ht.add_hashes(hash.data(), hash.size());
311 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
318 if (!
strand_.running_in_this_thread())
335 if (!
strand_.running_in_this_thread())
340 auto removed =
txQueue_.erase(hash);
352 fail(
"charge: Resources");
361 auto const iter =
headers_.find(
"Crawl");
364 return boost::iequals(iter->value(),
"public");
390 ret[jss::inbound] =
true;
394 ret[jss::cluster] =
true;
404 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
421 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
426 if ((minSeq != 0) || (maxSeq != 0))
427 ret[jss::complete_ledgers] =
433 ret[jss::track] =
"diverged";
437 ret[jss::track] =
"unknown";
446 protocol::TMStatusChange last_status;
453 if (closedLedgerHash != beast::zero)
454 ret[jss::ledger] =
to_string(closedLedgerHash);
456 if (last_status.has_newstatus())
458 switch (last_status.newstatus())
460 case protocol::nsCONNECTING:
461 ret[jss::status] =
"connecting";
464 case protocol::nsCONNECTED:
465 ret[jss::status] =
"connected";
468 case protocol::nsMONITORING:
469 ret[jss::status] =
"monitoring";
472 case protocol::nsVALIDATING:
473 ret[jss::status] =
"validating";
476 case protocol::nsSHUTTING:
477 ret[jss::status] =
"shutting";
482 <<
"Unknown status: " << last_status.newstatus();
487 ret[jss::metrics][jss::total_bytes_recv] =
489 ret[jss::metrics][jss::total_bytes_sent] =
491 ret[jss::metrics][jss::avg_bps_recv] =
493 ret[jss::metrics][jss::avg_bps_sent] =
572 strand_.running_in_this_thread(),
573 "ripple::PeerImp::close : strand in this thread");
595 if (!
strand_.running_in_this_thread())
606 <<
" failed: " << reason;
615 strand_.running_in_this_thread(),
616 "ripple::PeerImp::fail : strand in this thread");
630 strand_.running_in_this_thread(),
631 "ripple::PeerImp::gracefulClose : strand in this thread");
633 socket_.is_open(),
"ripple::PeerImp::gracefulClose : socket is open");
636 "ripple::PeerImp::gracefulClose : socket is not closing");
641 stream_.async_shutdown(bind_executor(
651 timer_.expires_from_now(peerTimerInterval, ec);
658 timer_.async_wait(bind_executor(
688 if (ec == boost::asio::error::operation_aborted)
700 fail(
"Large send queue");
706 clock_type::duration duration;
727 fail(
"Ping Timeout");
734 protocol::TMPing message;
735 message.set_type(protocol::TMPing::ptPING);
738 send(std::make_shared<Message>(message, protocol::mtPING));
750 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
753 if (ec != boost::asio::error::eof)
754 return fail(
"onShutdown", ec);
764 "ripple::PeerImp::doAccept : empty read buffer");
773 return fail(
"makeSharedValue: Unexpected failure");
794 auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
807 boost::asio::async_write(
809 write_buffer->data(),
810 boost::asio::transfer_all(),
817 if (ec == boost::asio::error::operation_aborted)
820 return fail(
"onWriteResponse", ec);
821 if (write_buffer->size() == bytes_transferred)
823 return fail(
"Failed to write header");
887 if (ec == boost::asio::error::operation_aborted)
889 if (ec == boost::asio::error::eof)
895 return fail(
"onReadMessage", ec);
898 if (bytes_transferred > 0)
899 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
901 stream <<
"onReadMessage";
904 metrics_.recv.add_message(bytes_transferred);
914 using namespace std::chrono_literals;
919 "invokeProtocolMessage",
924 return fail(
"onReadMessage", ec);
929 if (bytes_consumed == 0)
942 std::placeholders::_1,
943 std::placeholders::_2)));
951 if (ec == boost::asio::error::operation_aborted)
954 return fail(
"onWriteMessage", ec);
957 if (bytes_transferred > 0)
958 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
960 stream <<
"onWriteMessage";
963 metrics_.sent.add_message(bytes_transferred);
967 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
972 return boost::asio::async_write(
981 std::placeholders::_1,
982 std::placeholders::_2)));
987 return stream_.async_shutdown(bind_executor(
992 std::placeholders::_1)));
1022 if ((type == MessageType::mtTRANSACTION ||
1023 type == MessageType::mtHAVE_TRANSACTIONS ||
1024 type == MessageType::mtTRANSACTIONS ||
1036 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1038 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1039 << uncompressed_size <<
" " << isCompressed;
1054 auto const s = m->list_size();
1074 if (m->type() == protocol::TMPing::ptPING)
1078 m->set_type(protocol::TMPing::ptPONG);
1079 send(std::make_shared<Message>(*m, protocol::mtPING));
1083 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1093 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1118 for (
int i = 0; i < m->clusternodes().size(); ++i)
1120 protocol::TMClusterNode
const& node = m->clusternodes(i);
1123 if (node.has_nodename())
1124 name = node.nodename();
1126 auto const publicKey =
1133 auto const reportTime =
1137 *publicKey,
name, node.nodeload(), reportTime);
1141 int loadSources = m->loadsources().size();
1142 if (loadSources != 0)
1145 gossip.
items.reserve(loadSources);
1146 for (
int i = 0; i < m->loadsources().size(); ++i)
1148 protocol::TMLoadSource
const& node = m->loadsources(i);
1153 gossip.
items.push_back(item);
1166 if (status.getReportTime() >= thresh)
1167 fees.push_back(status.getLoadFee());
1172 auto const index = fees.size() / 2;
1174 clusterFee = fees[index];
1190 if (m->endpoints_v2().size() >= 1024)
1197 endpoints.
reserve(m->endpoints_v2().size());
1199 for (
auto const& tm : m->endpoints_v2())
1206 << tm.endpoint() <<
"}";
1223 if (!endpoints.
empty())
1240 eraseTxQueue != batch,
1241 (
"ripple::PeerImp::handleTransaction correct function params"));
1250 <<
"Ignoring incoming transaction: " <<
"Need network ledger";
1258 auto stx = std::make_shared<STTx const>(sit);
1259 uint256 txID = stx->getTransactionID();
1283 bool checkSignature =
true;
1286 if (!m->has_deferred() || !m->deferred())
1290 flags |= SF_TRUSTED;
1299 checkSignature =
false;
1306 <<
"No new transactions until synchronized";
1319 "recvTransaction->checkTransaction",
1325 if (
auto peer = weak.lock())
1326 peer->checkTransaction(
1327 flags, checkSignature, stx, batch);
1334 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1335 <<
". Exception: " << ex.
what();
1346 auto const itype{m->itype()};
1349 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1350 return badData(
"Invalid ledger info type");
1355 return std::nullopt;
1358 if (itype == protocol::liTS_CANDIDATE)
1360 if (!m->has_ledgerhash())
1361 return badData(
"Invalid TX candidate set, missing TX set hash");
1364 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1365 !(ltype && *ltype == protocol::ltCLOSED))
1367 return badData(
"Invalid request");
1371 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1372 return badData(
"Invalid ledger type");
1376 return badData(
"Invalid ledger hash");
1379 if (m->has_ledgerseq())
1381 auto const ledgerSeq{m->ledgerseq()};
1384 using namespace std::chrono_literals;
1394 if (itype != protocol::liBASE)
1396 if (m->nodeids_size() <= 0)
1397 return badData(
"Invalid ledger node IDs");
1399 for (
auto const& nodeId : m->nodeids())
1402 return badData(
"Invalid SHAMap node ID");
1407 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1408 return badData(
"Invalid query type");
1411 if (m->has_querydepth())
1414 itype == protocol::liBASE)
1416 return badData(
"Invalid query depth");
1423 if (
auto peer = weak.
lock())
1424 peer->processLedgerRequest(m);
1443 if (
auto peer = weak.
lock())
1446 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1447 if (reply.has_error())
1449 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1451 Resource::feeMalformedRequest,
1452 "proof_path_request");
1455 Resource::feeRequestNoReply,
"proof_path_request");
1459 peer->send(std::make_shared<Message>(
1460 reply, protocol::mtPROOF_PATH_RESPONSE));
1469 if (!ledgerReplayEnabled_)
1471 charge(Resource::feeMalformedRequest,
"proof_path_response disabled");
1475 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1477 charge(Resource::feeInvalidData,
"proof_path_response");
1484 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1485 if (!ledgerReplayEnabled_)
1487 charge(Resource::feeMalformedRequest,
"replay_delta_request disabled");
1491 fee_.fee = Resource::feeModerateBurdenPeer;
1493 app_.getJobQueue().addJob(
1495 if (
auto peer = weak.
lock())
1498 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1499 if (reply.has_error())
1501 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1503 Resource::feeMalformedRequest,
1504 "replay_delta_request");
1507 Resource::feeRequestNoReply,
1508 "replay_delta_request");
1512 peer->send(std::make_shared<Message>(
1513 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1522 if (!ledgerReplayEnabled_)
1524 charge(Resource::feeMalformedRequest,
"replay_delta_response disabled");
1528 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1530 charge(Resource::feeInvalidData,
"replay_delta_response");
1538 fee_.update(Resource::feeInvalidData, msg);
1539 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1544 return badData(
"Invalid ledger hash");
1548 auto const ledgerSeq{m->ledgerseq()};
1549 if (m->type() == protocol::liTS_CANDIDATE)
1560 using namespace std::chrono_literals;
1561 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1562 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1571 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1572 return badData(
"Invalid ledger info type");
1575 if (m->has_error() &&
1576 (m->error() < protocol::reNO_LEDGER ||
1577 m->error() > protocol::reBAD_REQUEST))
1579 return badData(
"Invalid reply error");
1583 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1586 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1590 if (m->has_requestcookie())
1592 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1594 m->clear_requestcookie();
1595 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1599 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1604 uint256 const ledgerHash{m->ledgerhash()};
1607 if (m->type() == protocol::liTS_CANDIDATE)
1610 app_.getJobQueue().addJob(
1611 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1612 if (
auto peer = weak.lock())
1614 peer->app_.getInboundTransactions().gotData(
1615 ledgerHash, peer, m);
1622 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1628 protocol::TMProposeSet&
set = *m;
1634 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1637 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1639 Resource::feeInvalidSignature,
1640 " signature can't be longer than 72 bytes");
1647 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1648 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1656 auto const isTrusted = app_.validators().trusted(publicKey);
1661 if (!isTrusted && app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1664 uint256 const proposeHash{
set.currenttxhash()};
1665 uint256 const prevLedger{
set.previousledger()};
1677 if (
auto [added, relayed] =
1678 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1683 if (reduceRelayReady() && relayed &&
1684 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1685 overlay_.updateSlotAndSquelch(
1686 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1687 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1693 if (tracking_.load() == Tracking::diverged)
1695 JLOG(p_journal_.debug())
1696 <<
"Proposal: Dropping untrusted (peer divergence)";
1700 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1702 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1707 JLOG(p_journal_.trace())
1708 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1719 app_.timeKeeper().closeTime(),
1720 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1723 app_.getJobQueue().addJob(
1725 "recvPropose->checkPropose",
1727 if (
auto peer = weak.lock())
1728 peer->checkPropose(isTrusted, m,
proposal);
1735 JLOG(p_journal_.trace()) <<
"Status: Change";
1737 if (!m->has_networktime())
1738 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1742 if (!last_status_.has_newstatus() || m->has_newstatus())
1747 protocol::NodeStatus status = last_status_.newstatus();
1749 m->set_newstatus(status);
1753 if (m->newevent() == protocol::neLOST_SYNC)
1755 bool outOfSync{
false};
1760 if (!closedLedgerHash_.isZero())
1763 closedLedgerHash_.zero();
1765 previousLedgerHash_.zero();
1769 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1776 bool const peerChangedLedgers{
1783 if (peerChangedLedgers)
1785 closedLedgerHash_ = m->ledgerhash();
1786 closedLedgerHash = closedLedgerHash_;
1787 addLedger(closedLedgerHash, sl);
1791 closedLedgerHash_.zero();
1794 if (m->has_ledgerhashprevious() &&
1797 previousLedgerHash_ = m->ledgerhashprevious();
1798 addLedger(previousLedgerHash_, sl);
1802 previousLedgerHash_.zero();
1805 if (peerChangedLedgers)
1807 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1811 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1815 if (m->has_firstseq() && m->has_lastseq())
1819 minLedger_ = m->firstseq();
1820 maxLedger_ = m->lastseq();
1822 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1823 minLedger_ = maxLedger_ = 0;
1826 if (m->has_ledgerseq() &&
1827 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1830 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1833 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1836 if (m->has_newstatus())
1838 switch (m->newstatus())
1840 case protocol::nsCONNECTING:
1841 j[jss::status] =
"CONNECTING";
1843 case protocol::nsCONNECTED:
1844 j[jss::status] =
"CONNECTED";
1846 case protocol::nsMONITORING:
1847 j[jss::status] =
"MONITORING";
1849 case protocol::nsVALIDATING:
1850 j[jss::status] =
"VALIDATING";
1852 case protocol::nsSHUTTING:
1853 j[jss::status] =
"SHUTTING";
1858 if (m->has_newevent())
1860 switch (m->newevent())
1862 case protocol::neCLOSING_LEDGER:
1863 j[jss::action] =
"CLOSING_LEDGER";
1865 case protocol::neACCEPTED_LEDGER:
1866 j[jss::action] =
"ACCEPTED_LEDGER";
1868 case protocol::neSWITCHED_LEDGER:
1869 j[jss::action] =
"SWITCHED_LEDGER";
1871 case protocol::neLOST_SYNC:
1872 j[jss::action] =
"LOST_SYNC";
1877 if (m->has_ledgerseq())
1879 j[jss::ledger_index] = m->ledgerseq();
1882 if (m->has_ledgerhash())
1884 uint256 closedLedgerHash{};
1886 std::lock_guard sl(recentLock_);
1887 closedLedgerHash = closedLedgerHash_;
1889 j[jss::ledger_hash] = to_string(closedLedgerHash);
1892 if (m->has_networktime())
1894 j[jss::date] = Json::UInt(m->networktime());
1897 if (m->has_firstseq() && m->has_lastseq())
1899 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
1900 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
1916 serverSeq = maxLedger_;
1922 checkTracking(serverSeq, validationSeq);
1931 if (diff < Tuning::convergedLedgerLimit)
1934 tracking_ = Tracking::converged;
1937 if ((diff > Tuning::divergedLedgerLimit) &&
1938 (tracking_.load() != Tracking::diverged))
1943 tracking_ = Tracking::diverged;
1944 trackingTime_ = clock_type::now();
1953 fee_.update(Resource::feeMalformedRequest,
"bad hash");
1957 uint256 const hash{m->hash()};
1959 if (m->status() == protocol::tsHAVE)
1963 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
1964 recentTxSets_.end())
1966 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
1970 recentTxSets_.push_back(hash);
1975PeerImp::onValidatorListMessage(
1985 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
1986 <<
" from peer " << remote_address_;
1988 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
1994 JLOG(p_journal_.debug())
1995 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
1996 <<
" (" << id_ <<
")";
1998 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2000 JLOG(p_journal_.debug())
2001 << messageType <<
": received duplicate " << messageType;
2005 fee_.update(Resource::feeUselessData,
"duplicate");
2009 auto const applyResult = app_.validators().applyListsAndBroadcast(
2013 remote_address_.to_string(),
2016 app_.getHashRouter(),
2019 JLOG(p_journal_.debug())
2020 <<
"Processed " << messageType <<
" version " << version <<
" from "
2021 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2022 :
"unknown or invalid publisher")
2023 <<
" from " << remote_address_.to_string() <<
" (" << id_
2024 <<
") with best result " << to_string(applyResult.bestDisposition());
2027 switch (applyResult.bestDisposition())
2030 case ListDisposition::accepted:
2032 case ListDisposition::expired:
2034 case ListDisposition::pending: {
2038 applyResult.publisherKey,
2039 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2041 auto const& pubKey = *applyResult.publisherKey;
2043 if (
auto const iter = publisherListSequences_.find(pubKey);
2044 iter != publisherListSequences_.end())
2047 iter->second < applyResult.sequence,
2048 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2051 publisherListSequences_[pubKey] = applyResult.sequence;
2054 case ListDisposition::same_sequence:
2055 case ListDisposition::known_sequence:
2060 applyResult.sequence && applyResult.publisherKey,
2061 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2062 "and set publisher key");
2064 publisherListSequences_[*applyResult.publisherKey] <=
2065 applyResult.sequence,
2066 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2071 case ListDisposition::stale:
2072 case ListDisposition::untrusted:
2073 case ListDisposition::invalid:
2074 case ListDisposition::unsupported_version:
2078 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2083 switch (applyResult.worstDisposition())
2085 case ListDisposition::accepted:
2086 case ListDisposition::expired:
2087 case ListDisposition::pending:
2090 case ListDisposition::same_sequence:
2091 case ListDisposition::known_sequence:
2096 Resource::feeUselessData,
2097 " duplicate (same_sequence or known_sequence)");
2099 case ListDisposition::stale:
2102 fee_.update(Resource::feeInvalidData,
"expired");
2104 case ListDisposition::untrusted:
2108 fee_.update(Resource::feeUselessData,
"untrusted");
2110 case ListDisposition::invalid:
2113 Resource::feeInvalidSignature,
"invalid list disposition");
2115 case ListDisposition::unsupported_version:
2118 fee_.update(Resource::feeInvalidData,
"version");
2122 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2127 for (
auto const& [disp, count] : applyResult.dispositions)
2132 case ListDisposition::accepted:
2133 JLOG(p_journal_.debug())
2134 <<
"Applied " << count <<
" new " << messageType
2135 <<
"(s) from peer " << remote_address_;
2138 case ListDisposition::expired:
2139 JLOG(p_journal_.debug())
2140 <<
"Applied " << count <<
" expired " << messageType
2141 <<
"(s) from peer " << remote_address_;
2144 case ListDisposition::pending:
2145 JLOG(p_journal_.debug())
2146 <<
"Processed " << count <<
" future " << messageType
2147 <<
"(s) from peer " << remote_address_;
2149 case ListDisposition::same_sequence:
2150 JLOG(p_journal_.warn())
2151 <<
"Ignored " << count <<
" " << messageType
2152 <<
"(s) with current sequence from peer "
2155 case ListDisposition::known_sequence:
2156 JLOG(p_journal_.warn())
2157 <<
"Ignored " << count <<
" " << messageType
2158 <<
"(s) with future sequence from peer " << remote_address_;
2160 case ListDisposition::stale:
2161 JLOG(p_journal_.warn())
2162 <<
"Ignored " << count <<
"stale " << messageType
2163 <<
"(s) from peer " << remote_address_;
2165 case ListDisposition::untrusted:
2166 JLOG(p_journal_.warn())
2167 <<
"Ignored " << count <<
" untrusted " << messageType
2168 <<
"(s) from peer " << remote_address_;
2170 case ListDisposition::unsupported_version:
2171 JLOG(p_journal_.warn())
2172 <<
"Ignored " << count <<
"unsupported version "
2173 << messageType <<
"(s) from peer " << remote_address_;
2175 case ListDisposition::invalid:
2176 JLOG(p_journal_.warn())
2177 <<
"Ignored " << count <<
"invalid " << messageType
2178 <<
"(s) from peer " << remote_address_;
2182 "ripple::PeerImp::onValidatorListMessage : invalid list "
2193 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2195 JLOG(p_journal_.debug())
2196 <<
"ValidatorList: received validator list from peer using "
2197 <<
"protocol version " << to_string(protocol_)
2198 <<
" which shouldn't support this feature.";
2199 fee_.update(Resource::feeUselessData,
"unsupported peer");
2202 onValidatorListMessage(
2206 ValidatorList::parseBlobs(*m));
2210 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2211 <<
" from peer " << remote_address_;
2212 using namespace std::string_literals;
2213 fee_.update(Resource::feeInvalidData, e.
what());
2223 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2225 JLOG(p_journal_.debug())
2226 <<
"ValidatorListCollection: received validator list from peer "
2227 <<
"using protocol version " << to_string(protocol_)
2228 <<
" which shouldn't support this feature.";
2229 fee_.update(Resource::feeUselessData,
"unsupported peer");
2232 else if (m->version() < 2)
2234 JLOG(p_journal_.debug())
2235 <<
"ValidatorListCollection: received invalid validator list "
2237 << m->version() <<
" from peer using protocol version "
2238 << to_string(protocol_);
2239 fee_.update(Resource::feeInvalidData,
"wrong version");
2242 onValidatorListMessage(
2243 "ValidatorListCollection",
2246 ValidatorList::parseBlobs(*m));
2250 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2251 << e.
what() <<
" from peer " << remote_address_;
2252 using namespace std::string_literals;
2253 fee_.update(Resource::feeInvalidData, e.
what());
2260 if (m->validation().size() < 50)
2262 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2263 fee_.update(Resource::feeMalformedRequest,
"too small");
2269 auto const closeTime = app_.timeKeeper().closeTime();
2274 val = std::make_shared<STValidation>(
2278 app_.validatorManifests().getMasterKey(pk));
2281 val->setSeen(closeTime);
2285 app_.getValidations().parms(),
2286 app_.timeKeeper().closeTime(),
2288 val->getSeenTime()))
2290 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2291 fee_.update(Resource::feeUselessData,
"not current");
2298 auto const isTrusted =
2299 app_.validators().trusted(val->getSignerPublic());
2304 if (!isTrusted && app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2309 if (
auto [added, relayed] =
2310 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2317 if (reduceRelayReady() && relayed &&
2318 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2319 overlay_.updateSlotAndSquelch(
2320 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2321 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2325 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2327 JLOG(p_journal_.debug())
2328 <<
"Dropping untrusted validation from diverged peer";
2330 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2334 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2339 to_string(val->getNodeID());
2346 app_.getJobQueue().addJob(
2349 [weak, val, m, key]() {
2350 if (
auto peer = weak.
lock())
2351 peer->checkValidation(val, key, m);
2356 JLOG(p_journal_.debug())
2357 <<
"Dropping untrusted validation for load";
2362 JLOG(p_journal_.warn())
2363 <<
"Exception processing validation: " << e.
what();
2364 using namespace std::string_literals;
2365 fee_.update(Resource::feeMalformedRequest, e.
what());
2372 protocol::TMGetObjectByHash& packet = *m;
2374 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2375 <<
" " << packet.objects_size();
2380 if (send_queue_.size() >= Tuning::dropSendQueue)
2382 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2386 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2392 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2394 if (!txReduceRelayEnabled())
2396 JLOG(p_journal_.error())
2397 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2398 fee_.update(Resource::feeMalformedRequest,
"disabled");
2403 app_.getJobQueue().addJob(
2405 if (
auto peer = weak.
lock())
2406 peer->doTransactions(m);
2412 Resource::feeModerateBurdenPeer,
2413 " received a get object by hash request");
2415 protocol::TMGetObjectByHash reply;
2417 reply.set_query(
false);
2419 if (packet.has_seq())
2420 reply.set_seq(packet.seq());
2422 reply.set_type(packet.type());
2424 if (packet.has_ledgerhash())
2428 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2432 reply.set_ledgerhash(packet.ledgerhash());
2436 for (
int i = 0; i < packet.objects_size(); ++i)
2438 auto const& obj = packet.objects(i);
2441 uint256 const hash{obj.hash()};
2444 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2445 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2448 protocol::TMIndexedObject& newObj = *reply.add_objects();
2449 newObj.set_hash(hash.begin(), hash.size());
2451 &nodeObject->getData().front(),
2452 nodeObject->getData().size());
2454 if (obj.has_nodeid())
2455 newObj.set_index(obj.nodeid());
2456 if (obj.has_ledgerseq())
2457 newObj.set_ledgerseq(obj.ledgerseq());
2464 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2465 << packet.objects_size();
2466 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2473 bool progress =
false;
2475 for (
int i = 0; i < packet.objects_size(); ++i)
2477 const protocol::TMIndexedObject& obj = packet.objects(i);
2481 if (obj.has_ledgerseq())
2483 if (obj.ledgerseq() != pLSeq)
2485 if (pLDo && (pLSeq != 0))
2487 JLOG(p_journal_.debug())
2488 <<
"GetObj: Full fetch pack for " << pLSeq;
2490 pLSeq = obj.ledgerseq();
2491 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2495 JLOG(p_journal_.debug())
2496 <<
"GetObj: Late fetch pack for " << pLSeq;
2505 uint256 const hash{obj.hash()};
2507 app_.getLedgerMaster().addFetchPack(
2509 std::make_shared<Blob>(
2510 obj.data().begin(), obj.data().end()));
2515 if (pLDo && (pLSeq != 0))
2517 JLOG(p_journal_.debug())
2518 <<
"GetObj: Partial fetch pack for " << pLSeq;
2520 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2521 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2528 if (!txReduceRelayEnabled())
2530 JLOG(p_journal_.error())
2531 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2532 fee_.update(Resource::feeMalformedRequest,
"disabled");
2537 app_.getJobQueue().addJob(
2539 if (
auto peer = weak.
lock())
2540 peer->handleHaveTransactions(m);
2545PeerImp::handleHaveTransactions(
2548 protocol::TMGetObjectByHash tmBH;
2549 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2550 tmBH.set_query(
true);
2552 JLOG(p_journal_.trace())
2553 <<
"received TMHaveTransactions " << m->hashes_size();
2559 JLOG(p_journal_.error())
2560 <<
"TMHaveTransactions with invalid hash size";
2561 fee_.update(Resource::feeMalformedRequest,
"hash size");
2567 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2569 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2573 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2575 auto obj = tmBH.add_objects();
2576 obj->set_hash(hash.
data(), hash.
size());
2583 removeTxQueue(hash);
2587 JLOG(p_journal_.trace())
2588 <<
"transaction request object is " << tmBH.objects_size();
2590 if (tmBH.objects_size() > 0)
2591 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2597 if (!txReduceRelayEnabled())
2599 JLOG(p_journal_.error())
2600 <<
"TMTransactions: tx reduce-relay is disabled";
2601 fee_.update(Resource::feeMalformedRequest,
"disabled");
2605 JLOG(p_journal_.trace())
2606 <<
"received TMTransactions " << m->transactions_size();
2608 overlay_.addTxMetrics(m->transactions_size());
2613 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2621 using on_message_fn =
2623 if (!strand_.running_in_this_thread())
2627 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2629 if (!m->has_validatorpubkey())
2631 charge(Resource::feeInvalidData,
"squelch no pubkey");
2634 auto validator = m->validatorpubkey();
2638 charge(Resource::feeInvalidData,
"squelch bad pubkey");
2644 if (!app_.validators().listed(key))
2646 charge(Resource::feeInvalidData,
"squelch non-validator");
2647 JLOG(p_journal_.debug())
2648 <<
"onMessage: TMSquelch discarding non-validator squelch "
2654 if (key == app_.getValidationPublicKey())
2656 JLOG(p_journal_.debug())
2657 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2662 m->has_squelchduration() ? m->squelchduration() : 0;
2664 squelch_.removeSquelch(key);
2666 charge(Resource::feeInvalidData,
"squelch duration");
2668 JLOG(p_journal_.debug())
2669 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2681 (void)lockedRecentLock;
2683 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2684 recentLedgers_.end())
2687 recentLedgers_.push_back(hash);
2696 if (app_.getFeeTrack().isLoadedLocal() ||
2697 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2698 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2700 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2706 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2707 fee_.update(Resource::feeMalformedRequest,
"hash size");
2711 fee_.fee = Resource::feeHeavyBurdenPeer;
2713 uint256 const hash{packet->ledgerhash()};
2716 auto elapsed = UptimeClock::now();
2717 auto const pap = &app_;
2718 app_.getJobQueue().addJob(
2719 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2720 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2725PeerImp::doTransactions(
2728 protocol::TMTransactions reply;
2730 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2731 << packet->objects_size();
2733 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2735 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2736 fee_.update(Resource::feeMalformedRequest,
"too big");
2742 auto const& obj = packet->objects(i);
2746 fee_.update(Resource::feeMalformedRequest,
"hash size");
2752 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2756 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
2758 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2763 auto tx = reply.add_transactions();
2764 auto sttx = txn->getSTransaction();
2766 tx->set_rawtransaction(s.
data(), s.
size());
2768 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
2770 tx->set_receivetimestamp(
2771 app_.timeKeeper().now().time_since_epoch().count());
2772 tx->set_deferred(txn->getSubmitResult().queued);
2775 if (reply.transactions_size() > 0)
2776 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2780PeerImp::checkTransaction(
2782 bool checkSignature,
2790 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2791 (stx->getFieldU32(sfLastLedgerSequence) <
2792 app_.getLedgerMaster().getValidLedgerIndex()))
2794 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2795 charge(Resource::feeUselessData,
"expired tx");
2804 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2806 tx->getStatus() ==
NEW,
2807 "ripple::PeerImp::checkTransaction Transaction created "
2809 if (tx->getStatus() ==
NEW)
2811 JLOG(p_journal_.debug())
2812 <<
"Processing " << (batch ?
"batch" :
"unsolicited")
2813 <<
" pseudo-transaction tx " << tx->getID();
2815 app_.getMasterTransaction().canonicalize(&tx);
2818 app_.getHashRouter().shouldRelay(tx->getID());
2821 JLOG(p_journal_.debug())
2822 <<
"Passing skipped pseudo pseudo-transaction tx "
2824 app_.overlay().relay(tx->getID(), {}, *toSkip);
2828 JLOG(p_journal_.debug())
2829 <<
"Charging for pseudo-transaction tx " << tx->getID();
2830 charge(Resource::feeUselessData,
"pseudo tx");
2841 app_.getHashRouter(),
2843 app_.getLedgerMaster().getValidatedRules(),
2845 valid != Validity::Valid)
2847 if (!validReason.empty())
2849 JLOG(p_journal_.trace())
2850 <<
"Exception checking transaction: " << validReason;
2854 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2856 Resource::feeInvalidSignature,
2857 "check transaction signature failure");
2864 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2868 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2870 if (tx->getStatus() ==
INVALID)
2872 if (!reason.
empty())
2874 JLOG(p_journal_.trace())
2875 <<
"Exception checking transaction: " << reason;
2877 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2878 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2882 bool const trusted(flags & SF_TRUSTED);
2883 app_.getOPs().processTransaction(
2884 tx, trusted,
false, NetworkOPs::FailHard::no);
2888 JLOG(p_journal_.warn())
2889 <<
"Exception in " << __func__ <<
": " << ex.
what();
2890 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2891 using namespace std::string_literals;
2892 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
2898PeerImp::checkPropose(
2903 JLOG(p_journal_.trace())
2904 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
2906 XRPL_ASSERT(packet,
"ripple::PeerImp::checkPropose : non-null packet");
2911 JLOG(p_journal_.warn()) << desc;
2912 charge(Resource::feeInvalidSignature, desc);
2919 relay = app_.getOPs().processTrustedProposal(peerPos);
2921 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
2929 auto haveMessage = app_.overlay().relay(
2931 if (reduceRelayReady() && !haveMessage.empty())
2932 overlay_.updateSlotAndSquelch(
2935 std::move(haveMessage),
2936 protocol::mtPROPOSE_LEDGER);
2941PeerImp::checkValidation(
2946 if (!val->isValid())
2948 std::string desc{
"Validation forwarded by peer is invalid"};
2949 JLOG(p_journal_.debug()) << desc;
2950 charge(Resource::feeInvalidSignature, desc);
2965 overlay_.relay(*packet, key, val->getSignerPublic());
2966 if (reduceRelayReady() && !haveMessage.empty())
2968 overlay_.updateSlotAndSquelch(
2970 val->getSignerPublic(),
2971 std::move(haveMessage),
2972 protocol::mtVALIDATION);
2978 JLOG(p_journal_.trace())
2979 <<
"Exception processing validation: " << ex.
what();
2980 using namespace std::string_literals;
2981 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
2995 if (p->hasTxSet(rootHash) && p.get() != skip)
2997 auto score = p->getScore(true);
2998 if (!ret || (score > retScore))
3023 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3025 auto score = p->getScore(true);
3026 if (!ret || (score > retScore))
3038PeerImp::sendLedgerBase(
3040 protocol::TMLedgerData& ledgerData)
3042 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3045 addRaw(ledger->info(), s);
3048 auto const& stateMap{ledger->stateMap()};
3049 if (stateMap.getHash() != beast::zero)
3054 stateMap.serializeRoot(
root);
3055 ledgerData.add_nodes()->set_nodedata(
3056 root.getDataPtr(),
root.getLength());
3058 if (ledger->info().txHash != beast::zero)
3060 auto const& txMap{ledger->txMap()};
3061 if (txMap.getHash() != beast::zero)
3065 txMap.serializeRoot(
root);
3066 ledgerData.add_nodes()->set_nodedata(
3067 root.getDataPtr(),
root.getLength());
3073 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3080 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3084 if (m->has_ledgerhash())
3087 uint256 const ledgerHash{m->ledgerhash()};
3088 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3091 JLOG(p_journal_.trace())
3092 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3094 if (m->has_querytype() && !m->has_requestcookie())
3100 m->has_ledgerseq() ? m->ledgerseq() : 0,
3103 m->set_requestcookie(
id());
3105 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3106 JLOG(p_journal_.debug())
3107 <<
"getLedger: Request relayed to peer";
3111 JLOG(p_journal_.trace())
3112 <<
"getLedger: Failed to find peer to relay request";
3116 else if (m->has_ledgerseq())
3119 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3121 JLOG(p_journal_.debug())
3122 <<
"getLedger: Early ledger sequence request";
3126 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3129 JLOG(p_journal_.debug())
3130 <<
"getLedger: Don't have ledger with sequence "
3135 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3137 ledger = app_.getLedgerMaster().getClosedLedger();
3143 auto const ledgerSeq{ledger->info().seq};
3144 if (m->has_ledgerseq())
3146 if (ledgerSeq != m->ledgerseq())
3149 if (!m->has_requestcookie())
3151 Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3154 JLOG(p_journal_.warn())
3155 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3158 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3161 JLOG(p_journal_.debug())
3162 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3167 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3176 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3178 uint256 const txSetHash{m->ledgerhash()};
3180 app_.getInboundTransactions().getSet(txSetHash,
false)};
3183 if (m->has_querytype() && !m->has_requestcookie())
3188 m->set_requestcookie(
id());
3190 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3191 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3195 JLOG(p_journal_.debug())
3196 <<
"getTxSet: Failed to find relay peer";
3201 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3212 if (!m->has_requestcookie())
3214 Resource::feeModerateBurdenPeer,
"received a get ledger request");
3218 SHAMap const* map{
nullptr};
3219 protocol::TMLedgerData ledgerData;
3220 bool fatLeaves{
true};
3221 auto const itype{m->itype()};
3223 if (itype == protocol::liTS_CANDIDATE)
3225 if (sharedMap = getTxSet(m); !sharedMap)
3227 map = sharedMap.
get();
3230 ledgerData.set_ledgerseq(0);
3231 ledgerData.set_ledgerhash(m->ledgerhash());
3232 ledgerData.set_type(protocol::liTS_CANDIDATE);
3233 if (m->has_requestcookie())
3234 ledgerData.set_requestcookie(m->requestcookie());
3241 if (send_queue_.size() >= Tuning::dropSendQueue)
3243 JLOG(p_journal_.debug())
3244 <<
"processLedgerRequest: Large send queue";
3247 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3249 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3253 if (ledger = getLedger(m); !ledger)
3257 auto const ledgerHash{ledger->info().hash};
3258 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3259 ledgerData.set_ledgerseq(ledger->info().seq);
3260 ledgerData.set_type(itype);
3261 if (m->has_requestcookie())
3262 ledgerData.set_requestcookie(m->requestcookie());
3266 case protocol::liBASE:
3267 sendLedgerBase(ledger, ledgerData);
3270 case protocol::liTX_NODE:
3271 map = &ledger->txMap();
3272 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3273 << to_string(map->getHash());
3276 case protocol::liAS_NODE:
3277 map = &ledger->stateMap();
3278 JLOG(p_journal_.trace())
3279 <<
"processLedgerRequest: Account state map hash "
3280 << to_string(map->getHash());
3285 JLOG(p_journal_.error())
3286 <<
"processLedgerRequest: Invalid ledger info type";
3293 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3298 if (m->nodeids_size() > 0)
3300 auto const queryDepth{
3301 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3305 for (
int i = 0; i < m->nodeids_size() &&
3306 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3312 data.reserve(Tuning::softMaxReplyNodes);
3316 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3318 JLOG(p_journal_.trace())
3319 <<
"processLedgerRequest: getNodeFat got "
3320 << data.size() <<
" nodes";
3322 for (
auto const& d : data)
3324 if (ledgerData.nodes_size() >=
3325 Tuning::hardMaxReplyNodes)
3327 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3328 node->set_nodeid(d.first.getRawString());
3329 node->set_nodedata(d.second.data(), d.second.size());
3334 JLOG(p_journal_.warn())
3335 <<
"processLedgerRequest: getNodeFat returns false";
3343 case protocol::liBASE:
3345 info =
"Ledger base";
3348 case protocol::liTX_NODE:
3352 case protocol::liAS_NODE:
3356 case protocol::liTS_CANDIDATE:
3357 info =
"TS candidate";
3365 if (!m->has_ledgerhash())
3366 info +=
", no hash specified";
3368 JLOG(p_journal_.error())
3369 <<
"processLedgerRequest: getNodeFat with nodeId "
3370 << *shaMapNodeId <<
" and ledger info type " << info
3371 <<
" throws exception: " << e.
what();
3375 JLOG(p_journal_.info())
3376 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3377 <<
" nodes at depth " << queryDepth <<
", return "
3378 << ledgerData.nodes_size() <<
" nodes";
3381 if (ledgerData.nodes_size() == 0)
3384 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3388PeerImp::getScore(
bool haveItem)
const
3392 static const int spRandomMax = 9999;
3396 static const int spHaveItem = 10000;
3401 static const int spLatency = 30;
3404 static const int spNoLatency = 8000;
3409 score += spHaveItem;
3418 score -= latency->count() * spLatency;
3420 score -= spNoLatency;
3426PeerImp::isHighLatency()
const
3429 return latency_ >= peerHighLatency;
3433PeerImp::reduceRelayReady()
3435 if (!reduceRelayReady_)
3437 reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3438 reduce_relay::WAIT_ON_BOOTUP;
3439 return vpReduceRelayEnabled_ && reduceRelayReady_;
3445 using namespace std::chrono_literals;
3448 totalBytes_ += bytes;
3449 accumBytes_ += bytes;
3450 auto const timeElapsed = clock_type::now() - intervalStart_;
3451 auto const timeElapsedInSecs =
3452 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3454 if (timeElapsedInSecs >= 1s)
3456 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3457 rollingAvg_.push_back(avgBytes);
3459 auto const totalBytes =
3461 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3463 intervalStart_ = clock_type::now();
3469PeerImp::Metrics::average_bytes()
const
3472 return rollingAvgBytes_;
3476PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::size_t size() const
The number of nodes in the cluster list.
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_DIVERGED_TIME
std::chrono::seconds MAX_UNKNOWN_TIME
bool shouldProcess(uint256 const &key, PeerShortID peer, int &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void reportTraffic(TrafficCount::category cat, bool isInbound, int bytes)
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
std::queue< std::shared_ptr< Message > > send_queue_
bool vpReduceRelayEnabled_
std::unique_ptr< LoadEvent > load_event_
boost::beast::http::fields const & headers_
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
clock_type::duration uptime() const
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
protocol::TMStatusChange last_status_
boost::shared_mutex nameMutex_
boost::circular_buffer< uint256 > recentTxSets_
std::unique_ptr< stream_type > stream_ptr_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Compressed compressionEnabled_
uint256 closedLedgerHash_
std::string domain() const
std::optional< std::uint32_t > lastPingSeq_
void onTimer(boost::system::error_code const &ec)
beast::Journal const journal_
beast::Journal const p_journal_
PeerImp(PeerImp const &)=delete
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
bool hasTxSet(uint256 const &hash) const override
clock_type::time_point lastPingTime_
void onMessageUnknown(std::uint16_t type)
std::shared_ptr< PeerFinder::Slot > const slot_
boost::circular_buffer< uint256 > recentLedgers_
std::optional< std::chrono::milliseconds > latency_
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
beast::IP::Endpoint const remote_address_
Json::Value json() override
PublicKey const publicKey_
hash_set< uint256 > txQueue_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
bool txReduceRelayEnabled_
clock_type::time_point trackingTime_
ProtocolVersion protocol_
reduce_relay::Squelch< UptimeClock > squelch_
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
struct ripple::PeerImp::@22 metrics_
uint256 previousLedgerHash_
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
void send(std::shared_ptr< Message > const &m) override
static std::string makePrefix(id_t id)
boost::system::error_code error_code
void onReadMessage(error_code ec, std::size_t bytes_transferred)
bool ledgerReplayEnabled_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
bool crawl() const
Returns true if this connection will publicly share its IP address.
void sendTxQueue() override
Send aggregated transactions' hashes.
bool txReduceRelayEnabled() const override
bool supportsFeature(ProtocolFeature f) const override
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
http_request_type request_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onShutdown(error_code ec)
boost::asio::strand< boost::asio::executor > strand_
void cycleStatus() override
boost::beast::multi_buffer read_buffer_
Resource::Consumer usage_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
void fail(std::string const &reason)
std::atomic< Tracking > tracking_
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
An endpoint that consumes resources.
int balance()
Returns the credit balance representing consumption.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
std::size_t size() const noexcept
void const * data() const noexcept
const void * getDataPtr() const
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, int type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(PreclaimContext const &ctx, AccountID const &src)
auto measureDurationAndLog(Func &&func, const std::string &actionDescription, std::chrono::duration< Rep, Period > maxDelay, const beast::Journal &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
static constexpr char FEATURE_COMPR[]
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ ValidatorListPropagation
@ ValidatorList2Propagation
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
static constexpr char FEATURE_LEDGER_REPLAY[]
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
static constexpr char FEATURE_TXRR[]
std::string to_string(base_uint< Bits, Tag > const &a)
Number root(Number f, unsigned d)
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
static constexpr char FEATURE_VPRR[]
T shared_from_this(T... args)
beast::IP::Address public_ip
std::optional< std::uint32_t > networkID
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items