20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/perflog/PerfLog.h>
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/base64.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/protocol/digest.h>
42#include <boost/algorithm/string/predicate.hpp>
43#include <boost/beast/core/ostream.hpp>
51using namespace std::chrono_literals;
79 , sink_(app_.journal(
"Peer"), makePrefix(id))
80 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
83 , stream_ptr_(
std::move(stream_ptr))
84 , socket_(stream_ptr_->next_layer().socket())
85 , stream_(*stream_ptr_)
86 , strand_(socket_.get_executor())
88 , remote_address_(slot->remote_endpoint())
94 , publicKey_(publicKey)
97 , squelch_(app_.journal(
"Squelch"))
99 , fee_{Resource::feeTrivialPeer,
""}
101 , request_(
std::move(request))
103 , compressionEnabled_(
108 app_.config().COMPRESSION)
114 app_.config().TX_REDUCE_RELAY_ENABLE))
118 app_.config().VP_REDUCE_RELAY_ENABLE))
122 app_.config().LEDGER_REPLAY))
123 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
127 <<
" vp reduce-relay enabled "
129 <<
" tx reduce-relay enabled "
136 const bool inCluster{
cluster()};
159 if (!
strand_.running_in_this_thread())
162 auto parseLedgerHash =
176 if (
auto const iter =
headers_.find(
"Closed-Ledger");
179 closed = parseLedgerHash(iter->value());
182 fail(
"Malformed handshake data (1)");
185 if (
auto const iter =
headers_.find(
"Previous-Ledger");
188 previous = parseLedgerHash(iter->value());
191 fail(
"Malformed handshake data (2)");
194 if (previous && !closed)
195 fail(
"Malformed handshake data (3)");
217 if (!
strand_.running_in_this_thread())
243 if (!
strand_.running_in_this_thread())
250 auto validator = m->getValidatorKey();
251 if (validator && !
squelch_.expireSquelch(*validator))
255 safe_cast<TrafficCount::category>(m->getCategory()),
273 <<
" sendq: " << sendq_size;
281 boost::asio::async_write(
290 std::placeholders::_1,
291 std::placeholders::_2)));
297 if (!
strand_.running_in_this_thread())
303 protocol::TMHaveTransactions ht;
305 ht.add_hashes(hash.data(), hash.size());
309 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
316 if (!
strand_.running_in_this_thread())
333 if (!
strand_.running_in_this_thread())
338 auto removed =
txQueue_.erase(hash);
350 fail(
"charge: Resources");
359 auto const iter =
headers_.find(
"Crawl");
362 return boost::iequals(iter->value(),
"public");
388 ret[jss::inbound] =
true;
392 ret[jss::cluster] =
true;
402 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
419 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
424 if ((minSeq != 0) || (maxSeq != 0))
425 ret[jss::complete_ledgers] =
431 ret[jss::track] =
"diverged";
435 ret[jss::track] =
"unknown";
444 protocol::TMStatusChange last_status;
451 if (closedLedgerHash != beast::zero)
452 ret[jss::ledger] =
to_string(closedLedgerHash);
454 if (last_status.has_newstatus())
456 switch (last_status.newstatus())
458 case protocol::nsCONNECTING:
459 ret[jss::status] =
"connecting";
462 case protocol::nsCONNECTED:
463 ret[jss::status] =
"connected";
466 case protocol::nsMONITORING:
467 ret[jss::status] =
"monitoring";
470 case protocol::nsVALIDATING:
471 ret[jss::status] =
"validating";
474 case protocol::nsSHUTTING:
475 ret[jss::status] =
"shutting";
480 <<
"Unknown status: " << last_status.newstatus();
485 ret[jss::metrics][jss::total_bytes_recv] =
487 ret[jss::metrics][jss::total_bytes_sent] =
489 ret[jss::metrics][jss::avg_bps_recv] =
491 ret[jss::metrics][jss::avg_bps_sent] =
570 strand_.running_in_this_thread(),
571 "ripple::PeerImp::close : strand in this thread");
593 if (!
strand_.running_in_this_thread())
604 <<
" failed: " << reason;
613 strand_.running_in_this_thread(),
614 "ripple::PeerImp::fail : strand in this thread");
628 strand_.running_in_this_thread(),
629 "ripple::PeerImp::gracefulClose : strand in this thread");
631 socket_.is_open(),
"ripple::PeerImp::gracefulClose : socket is open");
634 "ripple::PeerImp::gracefulClose : socket is not closing");
639 stream_.async_shutdown(bind_executor(
649 timer_.expires_from_now(peerTimerInterval, ec);
656 timer_.async_wait(bind_executor(
686 if (ec == boost::asio::error::operation_aborted)
698 fail(
"Large send queue");
704 clock_type::duration duration;
725 fail(
"Ping Timeout");
732 protocol::TMPing message;
733 message.set_type(protocol::TMPing::ptPING);
736 send(std::make_shared<Message>(message, protocol::mtPING));
748 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
751 if (ec != boost::asio::error::eof)
752 return fail(
"onShutdown", ec);
762 "ripple::PeerImp::doAccept : empty read buffer");
771 return fail(
"makeSharedValue: Unexpected failure");
792 auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
805 boost::asio::async_write(
807 write_buffer->data(),
808 boost::asio::transfer_all(),
815 if (ec == boost::asio::error::operation_aborted)
818 return fail(
"onWriteResponse", ec);
819 if (write_buffer->size() == bytes_transferred)
821 return fail(
"Failed to write header");
885 if (ec == boost::asio::error::operation_aborted)
887 if (ec == boost::asio::error::eof)
893 return fail(
"onReadMessage", ec);
896 if (bytes_transferred > 0)
897 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
899 stream <<
"onReadMessage";
902 metrics_.recv.add_message(bytes_transferred);
912 using namespace std::chrono_literals;
917 "invokeProtocolMessage",
922 return fail(
"onReadMessage", ec);
927 if (bytes_consumed == 0)
940 std::placeholders::_1,
941 std::placeholders::_2)));
949 if (ec == boost::asio::error::operation_aborted)
952 return fail(
"onWriteMessage", ec);
955 if (bytes_transferred > 0)
956 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
958 stream <<
"onWriteMessage";
961 metrics_.sent.add_message(bytes_transferred);
965 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
970 return boost::asio::async_write(
979 std::placeholders::_1,
980 std::placeholders::_2)));
985 return stream_.async_shutdown(bind_executor(
990 std::placeholders::_1)));
1020 if ((type == MessageType::mtTRANSACTION ||
1021 type == MessageType::mtHAVE_TRANSACTIONS ||
1022 type == MessageType::mtTRANSACTIONS ||
1034 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1036 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1037 << uncompressed_size <<
" " << isCompressed;
1052 auto const s = m->list_size();
1072 if (m->type() == protocol::TMPing::ptPING)
1076 m->set_type(protocol::TMPing::ptPONG);
1077 send(std::make_shared<Message>(*m, protocol::mtPING));
1081 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1091 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1116 for (
int i = 0; i < m->clusternodes().size(); ++i)
1118 protocol::TMClusterNode
const& node = m->clusternodes(i);
1121 if (node.has_nodename())
1122 name = node.nodename();
1124 auto const publicKey =
1131 auto const reportTime =
1135 *publicKey,
name, node.nodeload(), reportTime);
1139 int loadSources = m->loadsources().size();
1140 if (loadSources != 0)
1143 gossip.
items.reserve(loadSources);
1144 for (
int i = 0; i < m->loadsources().size(); ++i)
1146 protocol::TMLoadSource
const& node = m->loadsources(i);
1151 gossip.
items.push_back(item);
1164 if (status.getReportTime() >= thresh)
1165 fees.push_back(status.getLoadFee());
1170 auto const index = fees.size() / 2;
1172 clusterFee = fees[index];
1188 if (m->endpoints_v2().size() >= 1024)
1195 endpoints.
reserve(m->endpoints_v2().size());
1198 for (
auto const& tm : m->endpoints_v2())
1205 << tm.endpoint() <<
"}";
1231 if (!endpoints.
empty())
1248 eraseTxQueue != batch,
1249 (
"ripple::PeerImp::handleTransaction : valid inputs"));
1258 <<
"Ignoring incoming transaction: " <<
"Need network ledger";
1266 auto stx = std::make_shared<STTx const>(sit);
1267 uint256 txID = stx->getTransactionID();
1291 bool checkSignature =
true;
1294 if (!m->has_deferred() || !m->deferred())
1298 flags |= SF_TRUSTED;
1307 checkSignature =
false;
1314 <<
"No new transactions until synchronized";
1327 "recvTransaction->checkTransaction",
1333 if (
auto peer = weak.lock())
1334 peer->checkTransaction(
1335 flags, checkSignature, stx, batch);
1342 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1343 <<
". Exception: " << ex.
what();
1354 auto const itype{m->itype()};
1357 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1358 return badData(
"Invalid ledger info type");
1363 return std::nullopt;
1366 if (itype == protocol::liTS_CANDIDATE)
1368 if (!m->has_ledgerhash())
1369 return badData(
"Invalid TX candidate set, missing TX set hash");
1372 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1373 !(ltype && *ltype == protocol::ltCLOSED))
1375 return badData(
"Invalid request");
1379 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1380 return badData(
"Invalid ledger type");
1384 return badData(
"Invalid ledger hash");
1387 if (m->has_ledgerseq())
1389 auto const ledgerSeq{m->ledgerseq()};
1392 using namespace std::chrono_literals;
1402 if (itype != protocol::liBASE)
1404 if (m->nodeids_size() <= 0)
1405 return badData(
"Invalid ledger node IDs");
1407 for (
auto const& nodeId : m->nodeids())
1410 return badData(
"Invalid SHAMap node ID");
1415 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1416 return badData(
"Invalid query type");
1419 if (m->has_querydepth())
1422 itype == protocol::liBASE)
1424 return badData(
"Invalid query depth");
1431 if (
auto peer = weak.
lock())
1432 peer->processLedgerRequest(m);
1452 if (
auto peer = weak.
lock())
1455 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1456 if (reply.has_error())
1458 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1460 Resource::feeMalformedRequest,
1461 "proof_path_request");
1464 Resource::feeRequestNoReply,
"proof_path_request");
1468 peer->send(std::make_shared<Message>(
1469 reply, protocol::mtPROOF_PATH_RESPONSE));
1478 if (!ledgerReplayEnabled_)
1481 Resource::feeMalformedRequest,
"proof_path_response disabled");
1485 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1487 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1494 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1495 if (!ledgerReplayEnabled_)
1498 Resource::feeMalformedRequest,
"replay_delta_request disabled");
1502 fee_.fee = Resource::feeModerateBurdenPeer;
1504 app_.getJobQueue().addJob(
1506 if (
auto peer = weak.
lock())
1509 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1510 if (reply.has_error())
1512 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1514 Resource::feeMalformedRequest,
1515 "replay_delta_request");
1518 Resource::feeRequestNoReply,
1519 "replay_delta_request");
1523 peer->send(std::make_shared<Message>(
1524 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1533 if (!ledgerReplayEnabled_)
1536 Resource::feeMalformedRequest,
"replay_delta_response disabled");
1540 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1542 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1550 fee_.update(Resource::feeInvalidData, msg);
1551 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1556 return badData(
"Invalid ledger hash");
1560 auto const ledgerSeq{m->ledgerseq()};
1561 if (m->type() == protocol::liTS_CANDIDATE)
1572 using namespace std::chrono_literals;
1573 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1574 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1583 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1584 return badData(
"Invalid ledger info type");
1587 if (m->has_error() &&
1588 (m->error() < protocol::reNO_LEDGER ||
1589 m->error() > protocol::reBAD_REQUEST))
1591 return badData(
"Invalid reply error");
1595 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1598 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1602 if (m->has_requestcookie())
1604 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1606 m->clear_requestcookie();
1607 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1611 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1616 uint256 const ledgerHash{m->ledgerhash()};
1619 if (m->type() == protocol::liTS_CANDIDATE)
1622 app_.getJobQueue().addJob(
1623 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1624 if (
auto peer = weak.lock())
1626 peer->app_.getInboundTransactions().gotData(
1627 ledgerHash, peer, m);
1634 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1640 protocol::TMProposeSet&
set = *m;
1646 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1649 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1651 Resource::feeInvalidSignature,
1652 " signature can't be longer than 72 bytes");
1659 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1660 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1668 auto const isTrusted = app_.validators().trusted(publicKey);
1673 if (!isTrusted && app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1676 uint256 const proposeHash{
set.currenttxhash()};
1677 uint256 const prevLedger{
set.previousledger()};
1689 if (
auto [added, relayed] =
1690 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1695 if (reduceRelayReady() && relayed &&
1696 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1697 overlay_.updateSlotAndSquelch(
1698 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1699 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1705 if (tracking_.load() == Tracking::diverged)
1707 JLOG(p_journal_.debug())
1708 <<
"Proposal: Dropping untrusted (peer divergence)";
1712 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1714 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1719 JLOG(p_journal_.trace())
1720 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1731 app_.timeKeeper().closeTime(),
1732 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1735 app_.getJobQueue().addJob(
1737 "recvPropose->checkPropose",
1739 if (
auto peer = weak.lock())
1740 peer->checkPropose(isTrusted, m,
proposal);
1747 JLOG(p_journal_.trace()) <<
"Status: Change";
1749 if (!m->has_networktime())
1750 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1754 if (!last_status_.has_newstatus() || m->has_newstatus())
1759 protocol::NodeStatus status = last_status_.newstatus();
1761 m->set_newstatus(status);
1765 if (m->newevent() == protocol::neLOST_SYNC)
1767 bool outOfSync{
false};
1772 if (!closedLedgerHash_.isZero())
1775 closedLedgerHash_.zero();
1777 previousLedgerHash_.zero();
1781 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1788 bool const peerChangedLedgers{
1795 if (peerChangedLedgers)
1797 closedLedgerHash_ = m->ledgerhash();
1798 closedLedgerHash = closedLedgerHash_;
1799 addLedger(closedLedgerHash, sl);
1803 closedLedgerHash_.zero();
1806 if (m->has_ledgerhashprevious() &&
1809 previousLedgerHash_ = m->ledgerhashprevious();
1810 addLedger(previousLedgerHash_, sl);
1814 previousLedgerHash_.zero();
1817 if (peerChangedLedgers)
1819 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1823 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1827 if (m->has_firstseq() && m->has_lastseq())
1831 minLedger_ = m->firstseq();
1832 maxLedger_ = m->lastseq();
1834 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1835 minLedger_ = maxLedger_ = 0;
1838 if (m->has_ledgerseq() &&
1839 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1842 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1845 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1848 if (m->has_newstatus())
1850 switch (m->newstatus())
1852 case protocol::nsCONNECTING:
1853 j[jss::status] =
"CONNECTING";
1855 case protocol::nsCONNECTED:
1856 j[jss::status] =
"CONNECTED";
1858 case protocol::nsMONITORING:
1859 j[jss::status] =
"MONITORING";
1861 case protocol::nsVALIDATING:
1862 j[jss::status] =
"VALIDATING";
1864 case protocol::nsSHUTTING:
1865 j[jss::status] =
"SHUTTING";
1870 if (m->has_newevent())
1872 switch (m->newevent())
1874 case protocol::neCLOSING_LEDGER:
1875 j[jss::action] =
"CLOSING_LEDGER";
1877 case protocol::neACCEPTED_LEDGER:
1878 j[jss::action] =
"ACCEPTED_LEDGER";
1880 case protocol::neSWITCHED_LEDGER:
1881 j[jss::action] =
"SWITCHED_LEDGER";
1883 case protocol::neLOST_SYNC:
1884 j[jss::action] =
"LOST_SYNC";
1889 if (m->has_ledgerseq())
1891 j[jss::ledger_index] = m->ledgerseq();
1894 if (m->has_ledgerhash())
1896 uint256 closedLedgerHash{};
1898 std::lock_guard sl(recentLock_);
1899 closedLedgerHash = closedLedgerHash_;
1901 j[jss::ledger_hash] = to_string(closedLedgerHash);
1904 if (m->has_networktime())
1906 j[jss::date] = Json::UInt(m->networktime());
1909 if (m->has_firstseq() && m->has_lastseq())
1911 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
1912 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
1928 serverSeq = maxLedger_;
1934 checkTracking(serverSeq, validationSeq);
1943 if (diff < Tuning::convergedLedgerLimit)
1946 tracking_ = Tracking::converged;
1949 if ((diff > Tuning::divergedLedgerLimit) &&
1950 (tracking_.load() != Tracking::diverged))
1955 tracking_ = Tracking::diverged;
1956 trackingTime_ = clock_type::now();
1965 fee_.update(Resource::feeMalformedRequest,
"bad hash");
1969 uint256 const hash{m->hash()};
1971 if (m->status() == protocol::tsHAVE)
1975 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
1976 recentTxSets_.end())
1978 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
1982 recentTxSets_.push_back(hash);
1987PeerImp::onValidatorListMessage(
1997 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
1998 <<
" from peer " << remote_address_;
2000 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
2006 JLOG(p_journal_.debug())
2007 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
2008 <<
" (" << id_ <<
")";
2010 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2012 JLOG(p_journal_.debug())
2013 << messageType <<
": received duplicate " << messageType;
2017 fee_.update(Resource::feeUselessData,
"duplicate");
2021 auto const applyResult = app_.validators().applyListsAndBroadcast(
2025 remote_address_.to_string(),
2028 app_.getHashRouter(),
2031 JLOG(p_journal_.debug())
2032 <<
"Processed " << messageType <<
" version " << version <<
" from "
2033 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2034 :
"unknown or invalid publisher")
2035 <<
" from " << remote_address_.to_string() <<
" (" << id_
2036 <<
") with best result " << to_string(applyResult.bestDisposition());
2039 switch (applyResult.bestDisposition())
2042 case ListDisposition::accepted:
2044 case ListDisposition::expired:
2046 case ListDisposition::pending: {
2050 applyResult.publisherKey,
2051 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2053 auto const& pubKey = *applyResult.publisherKey;
2055 if (
auto const iter = publisherListSequences_.find(pubKey);
2056 iter != publisherListSequences_.end())
2059 iter->second < applyResult.sequence,
2060 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2063 publisherListSequences_[pubKey] = applyResult.sequence;
2066 case ListDisposition::same_sequence:
2067 case ListDisposition::known_sequence:
2072 applyResult.sequence && applyResult.publisherKey,
2073 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2074 "and set publisher key");
2076 publisherListSequences_[*applyResult.publisherKey] <=
2077 applyResult.sequence,
2078 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2083 case ListDisposition::stale:
2084 case ListDisposition::untrusted:
2085 case ListDisposition::invalid:
2086 case ListDisposition::unsupported_version:
2090 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2095 switch (applyResult.worstDisposition())
2097 case ListDisposition::accepted:
2098 case ListDisposition::expired:
2099 case ListDisposition::pending:
2102 case ListDisposition::same_sequence:
2103 case ListDisposition::known_sequence:
2108 Resource::feeUselessData,
2109 " duplicate (same_sequence or known_sequence)");
2111 case ListDisposition::stale:
2114 fee_.update(Resource::feeInvalidData,
"expired");
2116 case ListDisposition::untrusted:
2120 fee_.update(Resource::feeUselessData,
"untrusted");
2122 case ListDisposition::invalid:
2125 Resource::feeInvalidSignature,
"invalid list disposition");
2127 case ListDisposition::unsupported_version:
2130 fee_.update(Resource::feeInvalidData,
"version");
2134 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2139 for (
auto const& [disp, count] : applyResult.dispositions)
2144 case ListDisposition::accepted:
2145 JLOG(p_journal_.debug())
2146 <<
"Applied " << count <<
" new " << messageType
2147 <<
"(s) from peer " << remote_address_;
2150 case ListDisposition::expired:
2151 JLOG(p_journal_.debug())
2152 <<
"Applied " << count <<
" expired " << messageType
2153 <<
"(s) from peer " << remote_address_;
2156 case ListDisposition::pending:
2157 JLOG(p_journal_.debug())
2158 <<
"Processed " << count <<
" future " << messageType
2159 <<
"(s) from peer " << remote_address_;
2161 case ListDisposition::same_sequence:
2162 JLOG(p_journal_.warn())
2163 <<
"Ignored " << count <<
" " << messageType
2164 <<
"(s) with current sequence from peer "
2167 case ListDisposition::known_sequence:
2168 JLOG(p_journal_.warn())
2169 <<
"Ignored " << count <<
" " << messageType
2170 <<
"(s) with future sequence from peer " << remote_address_;
2172 case ListDisposition::stale:
2173 JLOG(p_journal_.warn())
2174 <<
"Ignored " << count <<
"stale " << messageType
2175 <<
"(s) from peer " << remote_address_;
2177 case ListDisposition::untrusted:
2178 JLOG(p_journal_.warn())
2179 <<
"Ignored " << count <<
" untrusted " << messageType
2180 <<
"(s) from peer " << remote_address_;
2182 case ListDisposition::unsupported_version:
2183 JLOG(p_journal_.warn())
2184 <<
"Ignored " << count <<
"unsupported version "
2185 << messageType <<
"(s) from peer " << remote_address_;
2187 case ListDisposition::invalid:
2188 JLOG(p_journal_.warn())
2189 <<
"Ignored " << count <<
"invalid " << messageType
2190 <<
"(s) from peer " << remote_address_;
2194 "ripple::PeerImp::onValidatorListMessage : invalid list "
2205 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2207 JLOG(p_journal_.debug())
2208 <<
"ValidatorList: received validator list from peer using "
2209 <<
"protocol version " << to_string(protocol_)
2210 <<
" which shouldn't support this feature.";
2211 fee_.update(Resource::feeUselessData,
"unsupported peer");
2214 onValidatorListMessage(
2218 ValidatorList::parseBlobs(*m));
2222 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2223 <<
" from peer " << remote_address_;
2224 using namespace std::string_literals;
2225 fee_.update(Resource::feeInvalidData, e.
what());
2235 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2237 JLOG(p_journal_.debug())
2238 <<
"ValidatorListCollection: received validator list from peer "
2239 <<
"using protocol version " << to_string(protocol_)
2240 <<
" which shouldn't support this feature.";
2241 fee_.update(Resource::feeUselessData,
"unsupported peer");
2244 else if (m->version() < 2)
2246 JLOG(p_journal_.debug())
2247 <<
"ValidatorListCollection: received invalid validator list "
2249 << m->version() <<
" from peer using protocol version "
2250 << to_string(protocol_);
2251 fee_.update(Resource::feeInvalidData,
"wrong version");
2254 onValidatorListMessage(
2255 "ValidatorListCollection",
2258 ValidatorList::parseBlobs(*m));
2262 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2263 << e.
what() <<
" from peer " << remote_address_;
2264 using namespace std::string_literals;
2265 fee_.update(Resource::feeInvalidData, e.
what());
2272 if (m->validation().size() < 50)
2274 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2275 fee_.update(Resource::feeMalformedRequest,
"too small");
2281 auto const closeTime = app_.timeKeeper().closeTime();
2286 val = std::make_shared<STValidation>(
2290 app_.validatorManifests().getMasterKey(pk));
2293 val->setSeen(closeTime);
2297 app_.getValidations().parms(),
2298 app_.timeKeeper().closeTime(),
2300 val->getSeenTime()))
2302 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2303 fee_.update(Resource::feeUselessData,
"not current");
2310 auto const isTrusted =
2311 app_.validators().trusted(val->getSignerPublic());
2316 if (!isTrusted && app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2321 if (
auto [added, relayed] =
2322 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2329 if (reduceRelayReady() && relayed &&
2330 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2331 overlay_.updateSlotAndSquelch(
2332 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2333 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2337 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2339 JLOG(p_journal_.debug())
2340 <<
"Dropping untrusted validation from diverged peer";
2342 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2346 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2351 to_string(val->getNodeID());
2358 app_.getJobQueue().addJob(
2361 [weak, val, m, key]() {
2362 if (
auto peer = weak.
lock())
2363 peer->checkValidation(val, key, m);
2368 JLOG(p_journal_.debug())
2369 <<
"Dropping untrusted validation for load";
2374 JLOG(p_journal_.warn())
2375 <<
"Exception processing validation: " << e.
what();
2376 using namespace std::string_literals;
2377 fee_.update(Resource::feeMalformedRequest, e.
what());
2384 protocol::TMGetObjectByHash& packet = *m;
2386 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2387 <<
" " << packet.objects_size();
2392 if (send_queue_.size() >= Tuning::dropSendQueue)
2394 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2398 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2404 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2406 if (!txReduceRelayEnabled())
2408 JLOG(p_journal_.error())
2409 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2410 fee_.update(Resource::feeMalformedRequest,
"disabled");
2415 app_.getJobQueue().addJob(
2417 if (
auto peer = weak.
lock())
2418 peer->doTransactions(m);
2423 protocol::TMGetObjectByHash reply;
2425 reply.set_query(
false);
2427 if (packet.has_seq())
2428 reply.set_seq(packet.seq());
2430 reply.set_type(packet.type());
2432 if (packet.has_ledgerhash())
2436 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2440 reply.set_ledgerhash(packet.ledgerhash());
2444 Resource::feeModerateBurdenPeer,
2445 " received a get object by hash request");
2448 for (
int i = 0; i < packet.objects_size(); ++i)
2450 auto const& obj = packet.objects(i);
2453 uint256 const hash{obj.hash()};
2456 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2457 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2460 protocol::TMIndexedObject& newObj = *reply.add_objects();
2461 newObj.set_hash(hash.begin(), hash.size());
2463 &nodeObject->getData().front(),
2464 nodeObject->getData().size());
2466 if (obj.has_nodeid())
2467 newObj.set_index(obj.nodeid());
2468 if (obj.has_ledgerseq())
2469 newObj.set_ledgerseq(obj.ledgerseq());
2476 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2477 << packet.objects_size();
2478 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2485 bool progress =
false;
2487 for (
int i = 0; i < packet.objects_size(); ++i)
2489 const protocol::TMIndexedObject& obj = packet.objects(i);
2493 if (obj.has_ledgerseq())
2495 if (obj.ledgerseq() != pLSeq)
2497 if (pLDo && (pLSeq != 0))
2499 JLOG(p_journal_.debug())
2500 <<
"GetObj: Full fetch pack for " << pLSeq;
2502 pLSeq = obj.ledgerseq();
2503 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2507 JLOG(p_journal_.debug())
2508 <<
"GetObj: Late fetch pack for " << pLSeq;
2517 uint256 const hash{obj.hash()};
2519 app_.getLedgerMaster().addFetchPack(
2521 std::make_shared<Blob>(
2522 obj.data().begin(), obj.data().end()));
2527 if (pLDo && (pLSeq != 0))
2529 JLOG(p_journal_.debug())
2530 <<
"GetObj: Partial fetch pack for " << pLSeq;
2532 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2533 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2540 if (!txReduceRelayEnabled())
2542 JLOG(p_journal_.error())
2543 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2544 fee_.update(Resource::feeMalformedRequest,
"disabled");
2549 app_.getJobQueue().addJob(
2551 if (
auto peer = weak.
lock())
2552 peer->handleHaveTransactions(m);
2557PeerImp::handleHaveTransactions(
2560 protocol::TMGetObjectByHash tmBH;
2561 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2562 tmBH.set_query(
true);
2564 JLOG(p_journal_.trace())
2565 <<
"received TMHaveTransactions " << m->hashes_size();
2571 JLOG(p_journal_.error())
2572 <<
"TMHaveTransactions with invalid hash size";
2573 fee_.update(Resource::feeMalformedRequest,
"hash size");
2579 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2581 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2585 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2587 auto obj = tmBH.add_objects();
2588 obj->set_hash(hash.
data(), hash.
size());
2595 removeTxQueue(hash);
2599 JLOG(p_journal_.trace())
2600 <<
"transaction request object is " << tmBH.objects_size();
2602 if (tmBH.objects_size() > 0)
2603 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2609 if (!txReduceRelayEnabled())
2611 JLOG(p_journal_.error())
2612 <<
"TMTransactions: tx reduce-relay is disabled";
2613 fee_.update(Resource::feeMalformedRequest,
"disabled");
2617 JLOG(p_journal_.trace())
2618 <<
"received TMTransactions " << m->transactions_size();
2620 overlay_.addTxMetrics(m->transactions_size());
2625 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2633 using on_message_fn =
2635 if (!strand_.running_in_this_thread())
2639 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2641 if (!m->has_validatorpubkey())
2643 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2646 auto validator = m->validatorpubkey();
2650 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2656 if (!app_.validators().listed(key))
2658 fee_.update(Resource::feeInvalidData,
"squelch non-validator");
2659 JLOG(p_journal_.debug())
2660 <<
"onMessage: TMSquelch discarding non-validator squelch "
2666 if (key == app_.getValidationPublicKey())
2668 JLOG(p_journal_.debug())
2669 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2674 m->has_squelchduration() ? m->squelchduration() : 0;
2676 squelch_.removeSquelch(key);
2678 fee_.update(Resource::feeInvalidData,
"squelch duration");
2680 JLOG(p_journal_.debug())
2681 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2693 (void)lockedRecentLock;
2695 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2696 recentLedgers_.end())
2699 recentLedgers_.push_back(hash);
2708 if (app_.getFeeTrack().isLoadedLocal() ||
2709 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2710 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2712 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2718 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2719 fee_.update(Resource::feeMalformedRequest,
"hash size");
2723 fee_.fee = Resource::feeHeavyBurdenPeer;
2725 uint256 const hash{packet->ledgerhash()};
2728 auto elapsed = UptimeClock::now();
2729 auto const pap = &app_;
2730 app_.getJobQueue().addJob(
2731 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2732 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2737PeerImp::doTransactions(
2740 protocol::TMTransactions reply;
2742 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2743 << packet->objects_size();
2745 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2747 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2748 fee_.update(Resource::feeMalformedRequest,
"too big");
2754 auto const& obj = packet->objects(i);
2758 fee_.update(Resource::feeMalformedRequest,
"hash size");
2764 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2768 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
2770 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2775 auto tx = reply.add_transactions();
2776 auto sttx = txn->getSTransaction();
2778 tx->set_rawtransaction(s.
data(), s.
size());
2780 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
2782 tx->set_receivetimestamp(
2783 app_.timeKeeper().now().time_since_epoch().count());
2784 tx->set_deferred(txn->getSubmitResult().queued);
2787 if (reply.transactions_size() > 0)
2788 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2792PeerImp::checkTransaction(
2794 bool checkSignature,
2802 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2803 (stx->getFieldU32(sfLastLedgerSequence) <
2804 app_.getLedgerMaster().getValidLedgerIndex()))
2806 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2807 charge(Resource::feeUselessData,
"expired tx");
2816 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2818 tx->getStatus() ==
NEW,
2819 "ripple::PeerImp::checkTransaction Transaction created "
2821 if (tx->getStatus() ==
NEW)
2823 JLOG(p_journal_.debug())
2824 <<
"Processing " << (batch ?
"batch" :
"unsolicited")
2825 <<
" pseudo-transaction tx " << tx->getID();
2827 app_.getMasterTransaction().canonicalize(&tx);
2830 app_.getHashRouter().shouldRelay(tx->getID());
2833 JLOG(p_journal_.debug())
2834 <<
"Passing skipped pseudo pseudo-transaction tx "
2836 app_.overlay().relay(tx->getID(), {}, *toSkip);
2840 JLOG(p_journal_.debug())
2841 <<
"Charging for pseudo-transaction tx " << tx->getID();
2842 charge(Resource::feeUselessData,
"pseudo tx");
2853 app_.getHashRouter(),
2855 app_.getLedgerMaster().getValidatedRules(),
2857 valid != Validity::Valid)
2859 if (!validReason.empty())
2861 JLOG(p_journal_.trace())
2862 <<
"Exception checking transaction: " << validReason;
2866 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2868 Resource::feeInvalidSignature,
2869 "check transaction signature failure");
2876 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2880 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2882 if (tx->getStatus() ==
INVALID)
2884 if (!reason.
empty())
2886 JLOG(p_journal_.trace())
2887 <<
"Exception checking transaction: " << reason;
2889 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2890 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2894 bool const trusted(flags & SF_TRUSTED);
2895 app_.getOPs().processTransaction(
2896 tx, trusted,
false, NetworkOPs::FailHard::no);
2900 JLOG(p_journal_.warn())
2901 <<
"Exception in " << __func__ <<
": " << ex.
what();
2902 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2903 using namespace std::string_literals;
2904 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
2910PeerImp::checkPropose(
2915 JLOG(p_journal_.trace())
2916 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
2918 XRPL_ASSERT(packet,
"ripple::PeerImp::checkPropose : non-null packet");
2923 JLOG(p_journal_.warn()) << desc;
2924 charge(Resource::feeInvalidSignature, desc);
2931 relay = app_.getOPs().processTrustedProposal(peerPos);
2933 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
2941 auto haveMessage = app_.overlay().relay(
2943 if (reduceRelayReady() && !haveMessage.empty())
2944 overlay_.updateSlotAndSquelch(
2947 std::move(haveMessage),
2948 protocol::mtPROPOSE_LEDGER);
2953PeerImp::checkValidation(
2958 if (!val->isValid())
2960 std::string desc{
"Validation forwarded by peer is invalid"};
2961 JLOG(p_journal_.debug()) << desc;
2962 charge(Resource::feeInvalidSignature, desc);
2977 overlay_.relay(*packet, key, val->getSignerPublic());
2978 if (reduceRelayReady() && !haveMessage.empty())
2980 overlay_.updateSlotAndSquelch(
2982 val->getSignerPublic(),
2983 std::move(haveMessage),
2984 protocol::mtVALIDATION);
2990 JLOG(p_journal_.trace())
2991 <<
"Exception processing validation: " << ex.
what();
2992 using namespace std::string_literals;
2993 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
3007 if (p->hasTxSet(rootHash) && p.get() != skip)
3009 auto score = p->getScore(true);
3010 if (!ret || (score > retScore))
3035 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3037 auto score = p->getScore(true);
3038 if (!ret || (score > retScore))
3050PeerImp::sendLedgerBase(
3052 protocol::TMLedgerData& ledgerData)
3054 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3057 addRaw(ledger->info(), s);
3060 auto const& stateMap{ledger->stateMap()};
3061 if (stateMap.getHash() != beast::zero)
3066 stateMap.serializeRoot(
root);
3067 ledgerData.add_nodes()->set_nodedata(
3068 root.getDataPtr(),
root.getLength());
3070 if (ledger->info().txHash != beast::zero)
3072 auto const& txMap{ledger->txMap()};
3073 if (txMap.getHash() != beast::zero)
3077 txMap.serializeRoot(
root);
3078 ledgerData.add_nodes()->set_nodedata(
3079 root.getDataPtr(),
root.getLength());
3085 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3092 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3096 if (m->has_ledgerhash())
3099 uint256 const ledgerHash{m->ledgerhash()};
3100 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3103 JLOG(p_journal_.trace())
3104 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3106 if (m->has_querytype() && !m->has_requestcookie())
3112 m->has_ledgerseq() ? m->ledgerseq() : 0,
3115 m->set_requestcookie(
id());
3117 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3118 JLOG(p_journal_.debug())
3119 <<
"getLedger: Request relayed to peer";
3123 JLOG(p_journal_.trace())
3124 <<
"getLedger: Failed to find peer to relay request";
3128 else if (m->has_ledgerseq())
3131 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3133 JLOG(p_journal_.debug())
3134 <<
"getLedger: Early ledger sequence request";
3138 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3141 JLOG(p_journal_.debug())
3142 <<
"getLedger: Don't have ledger with sequence "
3147 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3149 ledger = app_.getLedgerMaster().getClosedLedger();
3155 auto const ledgerSeq{ledger->info().seq};
3156 if (m->has_ledgerseq())
3158 if (ledgerSeq != m->ledgerseq())
3161 if (!m->has_requestcookie())
3163 Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3166 JLOG(p_journal_.warn())
3167 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3170 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3173 JLOG(p_journal_.debug())
3174 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3179 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3188 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3190 uint256 const txSetHash{m->ledgerhash()};
3192 app_.getInboundTransactions().getSet(txSetHash,
false)};
3195 if (m->has_querytype() && !m->has_requestcookie())
3200 m->set_requestcookie(
id());
3202 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3203 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3207 JLOG(p_journal_.debug())
3208 <<
"getTxSet: Failed to find relay peer";
3213 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3224 if (!m->has_requestcookie())
3226 Resource::feeModerateBurdenPeer,
"received a get ledger request");
3230 SHAMap const* map{
nullptr};
3231 protocol::TMLedgerData ledgerData;
3232 bool fatLeaves{
true};
3233 auto const itype{m->itype()};
3235 if (itype == protocol::liTS_CANDIDATE)
3237 if (sharedMap = getTxSet(m); !sharedMap)
3239 map = sharedMap.
get();
3242 ledgerData.set_ledgerseq(0);
3243 ledgerData.set_ledgerhash(m->ledgerhash());
3244 ledgerData.set_type(protocol::liTS_CANDIDATE);
3245 if (m->has_requestcookie())
3246 ledgerData.set_requestcookie(m->requestcookie());
3253 if (send_queue_.size() >= Tuning::dropSendQueue)
3255 JLOG(p_journal_.debug())
3256 <<
"processLedgerRequest: Large send queue";
3259 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3261 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3265 if (ledger = getLedger(m); !ledger)
3269 auto const ledgerHash{ledger->info().hash};
3270 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3271 ledgerData.set_ledgerseq(ledger->info().seq);
3272 ledgerData.set_type(itype);
3273 if (m->has_requestcookie())
3274 ledgerData.set_requestcookie(m->requestcookie());
3278 case protocol::liBASE:
3279 sendLedgerBase(ledger, ledgerData);
3282 case protocol::liTX_NODE:
3283 map = &ledger->txMap();
3284 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3285 << to_string(map->getHash());
3288 case protocol::liAS_NODE:
3289 map = &ledger->stateMap();
3290 JLOG(p_journal_.trace())
3291 <<
"processLedgerRequest: Account state map hash "
3292 << to_string(map->getHash());
3297 JLOG(p_journal_.error())
3298 <<
"processLedgerRequest: Invalid ledger info type";
3305 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3310 if (m->nodeids_size() > 0)
3312 auto const queryDepth{
3313 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3317 for (
int i = 0; i < m->nodeids_size() &&
3318 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3324 data.reserve(Tuning::softMaxReplyNodes);
3328 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3330 JLOG(p_journal_.trace())
3331 <<
"processLedgerRequest: getNodeFat got "
3332 << data.size() <<
" nodes";
3334 for (
auto const& d : data)
3336 if (ledgerData.nodes_size() >=
3337 Tuning::hardMaxReplyNodes)
3339 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3340 node->set_nodeid(d.first.getRawString());
3341 node->set_nodedata(d.second.data(), d.second.size());
3346 JLOG(p_journal_.warn())
3347 <<
"processLedgerRequest: getNodeFat returns false";
3355 case protocol::liBASE:
3357 info =
"Ledger base";
3360 case protocol::liTX_NODE:
3364 case protocol::liAS_NODE:
3368 case protocol::liTS_CANDIDATE:
3369 info =
"TS candidate";
3377 if (!m->has_ledgerhash())
3378 info +=
", no hash specified";
3380 JLOG(p_journal_.error())
3381 <<
"processLedgerRequest: getNodeFat with nodeId "
3382 << *shaMapNodeId <<
" and ledger info type " << info
3383 <<
" throws exception: " << e.
what();
3387 JLOG(p_journal_.info())
3388 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3389 <<
" nodes at depth " << queryDepth <<
", return "
3390 << ledgerData.nodes_size() <<
" nodes";
3393 if (ledgerData.nodes_size() == 0)
3396 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3400PeerImp::getScore(
bool haveItem)
const
3404 static const int spRandomMax = 9999;
3408 static const int spHaveItem = 10000;
3413 static const int spLatency = 30;
3416 static const int spNoLatency = 8000;
3421 score += spHaveItem;
3430 score -= latency->count() * spLatency;
3432 score -= spNoLatency;
3438PeerImp::isHighLatency()
const
3441 return latency_ >= peerHighLatency;
3445PeerImp::reduceRelayReady()
3447 if (!reduceRelayReady_)
3449 reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3450 reduce_relay::WAIT_ON_BOOTUP;
3451 return vpReduceRelayEnabled_ && reduceRelayReady_;
3457 using namespace std::chrono_literals;
3460 totalBytes_ += bytes;
3461 accumBytes_ += bytes;
3462 auto const timeElapsed = clock_type::now() - intervalStart_;
3463 auto const timeElapsedInSecs =
3464 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3466 if (timeElapsedInSecs >= 1s)
3468 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3469 rollingAvg_.push_back(avgBytes);
3471 auto const totalBytes =
3473 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3475 intervalStart_ = clock_type::now();
3481PeerImp::Metrics::average_bytes()
const
3484 return rollingAvgBytes_;
3488PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::size_t size() const
The number of nodes in the cluster list.
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_DIVERGED_TIME
std::chrono::seconds MAX_UNKNOWN_TIME
bool shouldProcess(uint256 const &key, PeerShortID peer, int &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void reportTraffic(TrafficCount::category cat, bool isInbound, int bytes)
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
std::queue< std::shared_ptr< Message > > send_queue_
bool vpReduceRelayEnabled_
std::unique_ptr< LoadEvent > load_event_
boost::beast::http::fields const & headers_
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
clock_type::duration uptime() const
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
protocol::TMStatusChange last_status_
boost::shared_mutex nameMutex_
boost::circular_buffer< uint256 > recentTxSets_
std::unique_ptr< stream_type > stream_ptr_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Compressed compressionEnabled_
uint256 closedLedgerHash_
std::string domain() const
std::optional< std::uint32_t > lastPingSeq_
void onTimer(boost::system::error_code const &ec)
beast::Journal const journal_
beast::Journal const p_journal_
PeerImp(PeerImp const &)=delete
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
bool hasTxSet(uint256 const &hash) const override
clock_type::time_point lastPingTime_
void onMessageUnknown(std::uint16_t type)
std::shared_ptr< PeerFinder::Slot > const slot_
boost::circular_buffer< uint256 > recentLedgers_
std::optional< std::chrono::milliseconds > latency_
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
beast::IP::Endpoint const remote_address_
Json::Value json() override
PublicKey const publicKey_
hash_set< uint256 > txQueue_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
bool txReduceRelayEnabled_
clock_type::time_point trackingTime_
ProtocolVersion protocol_
reduce_relay::Squelch< UptimeClock > squelch_
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
struct ripple::PeerImp::@22 metrics_
uint256 previousLedgerHash_
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
void send(std::shared_ptr< Message > const &m) override
static std::string makePrefix(id_t id)
boost::system::error_code error_code
void onReadMessage(error_code ec, std::size_t bytes_transferred)
bool ledgerReplayEnabled_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
bool crawl() const
Returns true if this connection will publicly share its IP address.
void sendTxQueue() override
Send aggregated transactions' hashes.
bool txReduceRelayEnabled() const override
bool supportsFeature(ProtocolFeature f) const override
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
http_request_type request_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onShutdown(error_code ec)
boost::asio::strand< boost::asio::executor > strand_
void cycleStatus() override
boost::beast::multi_buffer read_buffer_
Resource::Consumer usage_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
void fail(std::string const &reason)
std::atomic< Tracking > tracking_
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
An endpoint that consumes resources.
int balance()
Returns the credit balance representing consumption.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
std::size_t size() const noexcept
void const * data() const noexcept
const void * getDataPtr() const
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, int type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(PreclaimContext const &ctx, AccountID const &src)
auto measureDurationAndLog(Func &&func, const std::string &actionDescription, std::chrono::duration< Rep, Period > maxDelay, const beast::Journal &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
static constexpr char FEATURE_COMPR[]
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ ValidatorListPropagation
@ ValidatorList2Propagation
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
static constexpr char FEATURE_LEDGER_REPLAY[]
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
static constexpr char FEATURE_TXRR[]
std::string to_string(base_uint< Bits, Tag > const &a)
Number root(Number f, unsigned d)
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
static constexpr char FEATURE_VPRR[]
T shared_from_this(T... args)
beast::IP::Address public_ip
std::optional< std::uint32_t > networkID
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items