20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/perflog/PerfLog.h>
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/base64.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/protocol/digest.h>
42#include <boost/algorithm/string/predicate.hpp>
43#include <boost/beast/core/ostream.hpp>
51using namespace std::chrono_literals;
79 , sink_(app_.journal(
"Peer"), makePrefix(id))
80 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
83 , stream_ptr_(
std::move(stream_ptr))
84 , socket_(stream_ptr_->next_layer().socket())
85 , stream_(*stream_ptr_)
86 , strand_(socket_.get_executor())
88 , remote_address_(slot->remote_endpoint())
94 , publicKey_(publicKey)
97 , squelch_(app_.journal(
"Squelch"))
99 , fee_{Resource::feeTrivialPeer,
""}
101 , request_(
std::move(request))
103 , compressionEnabled_(
108 app_.config().COMPRESSION)
114 app_.config().TX_REDUCE_RELAY_ENABLE))
115 , vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
119 app_.config().LEDGER_REPLAY))
120 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
124 <<
" vp reduce-relay enabled "
126 <<
" tx reduce-relay enabled "
133 bool const inCluster{
cluster()};
156 if (!
strand_.running_in_this_thread())
159 auto parseLedgerHash =
173 if (
auto const iter =
headers_.find(
"Closed-Ledger");
176 closed = parseLedgerHash(iter->value());
179 fail(
"Malformed handshake data (1)");
182 if (
auto const iter =
headers_.find(
"Previous-Ledger");
185 previous = parseLedgerHash(iter->value());
188 fail(
"Malformed handshake data (2)");
191 if (previous && !closed)
192 fail(
"Malformed handshake data (3)");
214 if (!
strand_.running_in_this_thread())
240 if (!
strand_.running_in_this_thread())
258 safe_cast<TrafficCount::category>(m->getCategory()),
280 <<
" sendq: " << sendq_size;
288 boost::asio::async_write(
297 std::placeholders::_1,
298 std::placeholders::_2)));
304 if (!
strand_.running_in_this_thread())
310 protocol::TMHaveTransactions ht;
312 ht.add_hashes(hash.data(), hash.size());
316 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
323 if (!
strand_.running_in_this_thread())
340 if (!
strand_.running_in_this_thread())
345 auto removed =
txQueue_.erase(hash);
357 fail(
"charge: Resources");
366 auto const iter =
headers_.find(
"Crawl");
369 return boost::iequals(iter->value(),
"public");
395 ret[jss::inbound] =
true;
399 ret[jss::cluster] =
true;
409 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
426 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
431 if ((minSeq != 0) || (maxSeq != 0))
432 ret[jss::complete_ledgers] =
438 ret[jss::track] =
"diverged";
442 ret[jss::track] =
"unknown";
451 protocol::TMStatusChange last_status;
458 if (closedLedgerHash != beast::zero)
459 ret[jss::ledger] =
to_string(closedLedgerHash);
461 if (last_status.has_newstatus())
463 switch (last_status.newstatus())
465 case protocol::nsCONNECTING:
466 ret[jss::status] =
"connecting";
469 case protocol::nsCONNECTED:
470 ret[jss::status] =
"connected";
473 case protocol::nsMONITORING:
474 ret[jss::status] =
"monitoring";
477 case protocol::nsVALIDATING:
478 ret[jss::status] =
"validating";
481 case protocol::nsSHUTTING:
482 ret[jss::status] =
"shutting";
487 <<
"Unknown status: " << last_status.newstatus();
492 ret[jss::metrics][jss::total_bytes_recv] =
494 ret[jss::metrics][jss::total_bytes_sent] =
496 ret[jss::metrics][jss::avg_bps_recv] =
498 ret[jss::metrics][jss::avg_bps_sent] =
577 strand_.running_in_this_thread(),
578 "ripple::PeerImp::close : strand in this thread");
600 if (!
strand_.running_in_this_thread())
611 <<
" failed: " << reason;
620 strand_.running_in_this_thread(),
621 "ripple::PeerImp::fail : strand in this thread");
635 strand_.running_in_this_thread(),
636 "ripple::PeerImp::gracefulClose : strand in this thread");
638 socket_.is_open(),
"ripple::PeerImp::gracefulClose : socket is open");
641 "ripple::PeerImp::gracefulClose : socket is not closing");
646 stream_.async_shutdown(bind_executor(
656 timer_.expires_from_now(peerTimerInterval, ec);
663 timer_.async_wait(bind_executor(
693 if (ec == boost::asio::error::operation_aborted)
705 fail(
"Large send queue");
711 clock_type::duration duration;
732 fail(
"Ping Timeout");
739 protocol::TMPing message;
740 message.set_type(protocol::TMPing::ptPING);
743 send(std::make_shared<Message>(message, protocol::mtPING));
755 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
758 if (ec != boost::asio::error::eof)
759 return fail(
"onShutdown", ec);
769 "ripple::PeerImp::doAccept : empty read buffer");
778 return fail(
"makeSharedValue: Unexpected failure");
799 auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
812 boost::asio::async_write(
814 write_buffer->data(),
815 boost::asio::transfer_all(),
822 if (ec == boost::asio::error::operation_aborted)
825 return fail(
"onWriteResponse", ec);
826 if (write_buffer->size() == bytes_transferred)
828 return fail(
"Failed to write header");
892 if (ec == boost::asio::error::operation_aborted)
894 if (ec == boost::asio::error::eof)
900 return fail(
"onReadMessage", ec);
903 if (bytes_transferred > 0)
904 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
906 stream <<
"onReadMessage";
909 metrics_.recv.add_message(bytes_transferred);
919 using namespace std::chrono_literals;
924 "invokeProtocolMessage",
929 return fail(
"onReadMessage", ec);
934 if (bytes_consumed == 0)
947 std::placeholders::_1,
948 std::placeholders::_2)));
956 if (ec == boost::asio::error::operation_aborted)
959 return fail(
"onWriteMessage", ec);
962 if (bytes_transferred > 0)
963 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
965 stream <<
"onWriteMessage";
968 metrics_.sent.add_message(bytes_transferred);
972 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
977 return boost::asio::async_write(
986 std::placeholders::_1,
987 std::placeholders::_2)));
992 return stream_.async_shutdown(bind_executor(
997 std::placeholders::_1)));
1026 *m,
static_cast<protocol::MessageType
>(type),
true);
1036 if ((type == MessageType::mtTRANSACTION ||
1037 type == MessageType::mtHAVE_TRANSACTIONS ||
1038 type == MessageType::mtTRANSACTIONS ||
1050 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1052 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1053 << uncompressed_size <<
" " << isCompressed;
1068 auto const s = m->list_size();
1088 if (m->type() == protocol::TMPing::ptPING)
1092 m->set_type(protocol::TMPing::ptPONG);
1093 send(std::make_shared<Message>(*m, protocol::mtPING));
1097 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1107 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1132 for (
int i = 0; i < m->clusternodes().size(); ++i)
1134 protocol::TMClusterNode
const& node = m->clusternodes(i);
1137 if (node.has_nodename())
1138 name = node.nodename();
1140 auto const publicKey =
1147 auto const reportTime =
1151 *publicKey,
name, node.nodeload(), reportTime);
1155 int loadSources = m->loadsources().size();
1156 if (loadSources != 0)
1159 gossip.
items.reserve(loadSources);
1160 for (
int i = 0; i < m->loadsources().size(); ++i)
1162 protocol::TMLoadSource
const& node = m->loadsources(i);
1167 gossip.
items.push_back(item);
1180 if (status.getReportTime() >= thresh)
1181 fees.push_back(status.getLoadFee());
1186 auto const index = fees.size() / 2;
1188 clusterFee = fees[index];
1204 if (m->endpoints_v2().size() >= 1024)
1211 endpoints.
reserve(m->endpoints_v2().size());
1214 for (
auto const& tm : m->endpoints_v2())
1221 << tm.endpoint() <<
"}";
1247 if (!endpoints.
empty())
1264 eraseTxQueue != batch,
1265 (
"ripple::PeerImp::handleTransaction : valid inputs"));
1274 <<
"Need network ledger";
1282 auto stx = std::make_shared<STTx const>(sit);
1283 uint256 txID = stx->getTransactionID();
1311 bool checkSignature =
true;
1314 if (!m->has_deferred() || !m->deferred())
1318 flags |= SF_TRUSTED;
1327 checkSignature =
false;
1334 <<
"No new transactions until synchronized";
1347 "recvTransaction->checkTransaction",
1353 if (
auto peer = weak.lock())
1354 peer->checkTransaction(
1355 flags, checkSignature, stx, batch);
1362 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1363 <<
". Exception: " << ex.
what();
1374 auto const itype{m->itype()};
1377 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1378 return badData(
"Invalid ledger info type");
1383 return std::nullopt;
1386 if (itype == protocol::liTS_CANDIDATE)
1388 if (!m->has_ledgerhash())
1389 return badData(
"Invalid TX candidate set, missing TX set hash");
1392 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1393 !(ltype && *ltype == protocol::ltCLOSED))
1395 return badData(
"Invalid request");
1399 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1400 return badData(
"Invalid ledger type");
1404 return badData(
"Invalid ledger hash");
1407 if (m->has_ledgerseq())
1409 auto const ledgerSeq{m->ledgerseq()};
1412 using namespace std::chrono_literals;
1422 if (itype != protocol::liBASE)
1424 if (m->nodeids_size() <= 0)
1425 return badData(
"Invalid ledger node IDs");
1427 for (
auto const& nodeId : m->nodeids())
1430 return badData(
"Invalid SHAMap node ID");
1435 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1436 return badData(
"Invalid query type");
1439 if (m->has_querydepth())
1442 itype == protocol::liBASE)
1444 return badData(
"Invalid query depth");
1451 if (
auto peer = weak.
lock())
1452 peer->processLedgerRequest(m);
1472 if (
auto peer = weak.
lock())
1475 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1476 if (reply.has_error())
1478 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1480 Resource::feeMalformedRequest,
1481 "proof_path_request");
1484 Resource::feeRequestNoReply,
"proof_path_request");
1488 peer->send(std::make_shared<Message>(
1489 reply, protocol::mtPROOF_PATH_RESPONSE));
1498 if (!ledgerReplayEnabled_)
1501 Resource::feeMalformedRequest,
"proof_path_response disabled");
1505 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1507 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1514 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1515 if (!ledgerReplayEnabled_)
1518 Resource::feeMalformedRequest,
"replay_delta_request disabled");
1522 fee_.fee = Resource::feeModerateBurdenPeer;
1524 app_.getJobQueue().addJob(
1526 if (
auto peer = weak.
lock())
1529 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1530 if (reply.has_error())
1532 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1534 Resource::feeMalformedRequest,
1535 "replay_delta_request");
1538 Resource::feeRequestNoReply,
1539 "replay_delta_request");
1543 peer->send(std::make_shared<Message>(
1544 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1553 if (!ledgerReplayEnabled_)
1556 Resource::feeMalformedRequest,
"replay_delta_response disabled");
1560 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1562 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1570 fee_.update(Resource::feeInvalidData, msg);
1571 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1576 return badData(
"Invalid ledger hash");
1580 auto const ledgerSeq{m->ledgerseq()};
1581 if (m->type() == protocol::liTS_CANDIDATE)
1592 using namespace std::chrono_literals;
1593 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1594 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1603 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1604 return badData(
"Invalid ledger info type");
1607 if (m->has_error() &&
1608 (m->error() < protocol::reNO_LEDGER ||
1609 m->error() > protocol::reBAD_REQUEST))
1611 return badData(
"Invalid reply error");
1615 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1618 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1622 if (m->has_requestcookie())
1624 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1626 m->clear_requestcookie();
1627 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1631 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1636 uint256 const ledgerHash{m->ledgerhash()};
1639 if (m->type() == protocol::liTS_CANDIDATE)
1642 app_.getJobQueue().addJob(
1643 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1644 if (
auto peer = weak.lock())
1646 peer->app_.getInboundTransactions().gotData(
1647 ledgerHash, peer, m);
1654 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1660 protocol::TMProposeSet&
set = *m;
1666 if ((std::clamp<std::size_t>(
sig.size(), 64, 72) !=
sig.size()) ||
1669 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1671 Resource::feeInvalidSignature,
1672 " signature can't be longer than 72 bytes");
1679 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1680 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1688 auto const isTrusted = app_.validators().trusted(publicKey);
1696 overlay_.reportInboundTraffic(
1697 TrafficCount::category::proposal_untrusted,
1698 Message::messageSize(*m));
1700 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1704 uint256 const proposeHash{
set.currenttxhash()};
1705 uint256 const prevLedger{
set.previousledger()};
1717 if (
auto [added, relayed] =
1718 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1723 if (reduceRelayReady() && relayed &&
1724 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1725 overlay_.updateSlotAndSquelch(
1726 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1729 overlay_.reportInboundTraffic(
1730 TrafficCount::category::proposal_duplicate,
1731 Message::messageSize(*m));
1733 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1740 if (tracking_.load() == Tracking::diverged)
1742 JLOG(p_journal_.debug())
1743 <<
"Proposal: Dropping untrusted (peer divergence)";
1747 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1749 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1754 JLOG(p_journal_.trace())
1755 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1766 app_.timeKeeper().closeTime(),
1767 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1770 app_.getJobQueue().addJob(
1772 "recvPropose->checkPropose",
1774 if (
auto peer = weak.lock())
1775 peer->checkPropose(isTrusted, m,
proposal);
1782 JLOG(p_journal_.trace()) <<
"Status: Change";
1784 if (!m->has_networktime())
1785 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1789 if (!last_status_.has_newstatus() || m->has_newstatus())
1794 protocol::NodeStatus status = last_status_.newstatus();
1796 m->set_newstatus(status);
1800 if (m->newevent() == protocol::neLOST_SYNC)
1802 bool outOfSync{
false};
1807 if (!closedLedgerHash_.isZero())
1810 closedLedgerHash_.zero();
1812 previousLedgerHash_.zero();
1816 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1823 bool const peerChangedLedgers{
1830 if (peerChangedLedgers)
1832 closedLedgerHash_ = m->ledgerhash();
1833 closedLedgerHash = closedLedgerHash_;
1834 addLedger(closedLedgerHash, sl);
1838 closedLedgerHash_.zero();
1841 if (m->has_ledgerhashprevious() &&
1844 previousLedgerHash_ = m->ledgerhashprevious();
1845 addLedger(previousLedgerHash_, sl);
1849 previousLedgerHash_.zero();
1852 if (peerChangedLedgers)
1854 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1858 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1862 if (m->has_firstseq() && m->has_lastseq())
1866 minLedger_ = m->firstseq();
1867 maxLedger_ = m->lastseq();
1869 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1870 minLedger_ = maxLedger_ = 0;
1873 if (m->has_ledgerseq() &&
1874 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1877 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1880 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1883 if (m->has_newstatus())
1885 switch (m->newstatus())
1887 case protocol::nsCONNECTING:
1888 j[jss::status] =
"CONNECTING";
1890 case protocol::nsCONNECTED:
1891 j[jss::status] =
"CONNECTED";
1893 case protocol::nsMONITORING:
1894 j[jss::status] =
"MONITORING";
1896 case protocol::nsVALIDATING:
1897 j[jss::status] =
"VALIDATING";
1899 case protocol::nsSHUTTING:
1900 j[jss::status] =
"SHUTTING";
1905 if (m->has_newevent())
1907 switch (m->newevent())
1909 case protocol::neCLOSING_LEDGER:
1910 j[jss::action] =
"CLOSING_LEDGER";
1912 case protocol::neACCEPTED_LEDGER:
1913 j[jss::action] =
"ACCEPTED_LEDGER";
1915 case protocol::neSWITCHED_LEDGER:
1916 j[jss::action] =
"SWITCHED_LEDGER";
1918 case protocol::neLOST_SYNC:
1919 j[jss::action] =
"LOST_SYNC";
1924 if (m->has_ledgerseq())
1926 j[jss::ledger_index] = m->ledgerseq();
1929 if (m->has_ledgerhash())
1931 uint256 closedLedgerHash{};
1933 std::lock_guard sl(recentLock_);
1934 closedLedgerHash = closedLedgerHash_;
1936 j[jss::ledger_hash] = to_string(closedLedgerHash);
1939 if (m->has_networktime())
1941 j[jss::date] = Json::UInt(m->networktime());
1944 if (m->has_firstseq() && m->has_lastseq())
1946 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
1947 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
1963 serverSeq = maxLedger_;
1969 checkTracking(serverSeq, validationSeq);
1978 if (diff < Tuning::convergedLedgerLimit)
1981 tracking_ = Tracking::converged;
1984 if ((diff > Tuning::divergedLedgerLimit) &&
1985 (tracking_.load() != Tracking::diverged))
1990 tracking_ = Tracking::diverged;
1991 trackingTime_ = clock_type::now();
2000 fee_.update(Resource::feeMalformedRequest,
"bad hash");
2004 uint256 const hash{m->hash()};
2006 if (m->status() == protocol::tsHAVE)
2010 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2011 recentTxSets_.end())
2013 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
2017 recentTxSets_.push_back(hash);
2022PeerImp::onValidatorListMessage(
2032 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
2033 <<
" from peer " << remote_address_;
2035 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
2041 JLOG(p_journal_.debug())
2042 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
2043 <<
" (" << id_ <<
")";
2045 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2047 JLOG(p_journal_.debug())
2048 << messageType <<
": received duplicate " << messageType;
2052 fee_.update(Resource::feeUselessData,
"duplicate");
2056 auto const applyResult = app_.validators().applyListsAndBroadcast(
2060 remote_address_.to_string(),
2063 app_.getHashRouter(),
2066 JLOG(p_journal_.debug())
2067 <<
"Processed " << messageType <<
" version " << version <<
" from "
2068 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2069 :
"unknown or invalid publisher")
2070 <<
" from " << remote_address_.to_string() <<
" (" << id_
2071 <<
") with best result " << to_string(applyResult.bestDisposition());
2074 switch (applyResult.bestDisposition())
2077 case ListDisposition::accepted:
2079 case ListDisposition::expired:
2081 case ListDisposition::pending: {
2085 applyResult.publisherKey,
2086 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2088 auto const& pubKey = *applyResult.publisherKey;
2090 if (
auto const iter = publisherListSequences_.find(pubKey);
2091 iter != publisherListSequences_.end())
2094 iter->second < applyResult.sequence,
2095 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2098 publisherListSequences_[pubKey] = applyResult.sequence;
2101 case ListDisposition::same_sequence:
2102 case ListDisposition::known_sequence:
2107 applyResult.sequence && applyResult.publisherKey,
2108 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2109 "and set publisher key");
2111 publisherListSequences_[*applyResult.publisherKey] <=
2112 applyResult.sequence,
2113 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2118 case ListDisposition::stale:
2119 case ListDisposition::untrusted:
2120 case ListDisposition::invalid:
2121 case ListDisposition::unsupported_version:
2125 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2130 switch (applyResult.worstDisposition())
2132 case ListDisposition::accepted:
2133 case ListDisposition::expired:
2134 case ListDisposition::pending:
2137 case ListDisposition::same_sequence:
2138 case ListDisposition::known_sequence:
2143 Resource::feeUselessData,
2144 " duplicate (same_sequence or known_sequence)");
2146 case ListDisposition::stale:
2149 fee_.update(Resource::feeInvalidData,
"expired");
2151 case ListDisposition::untrusted:
2155 fee_.update(Resource::feeUselessData,
"untrusted");
2157 case ListDisposition::invalid:
2160 Resource::feeInvalidSignature,
"invalid list disposition");
2162 case ListDisposition::unsupported_version:
2165 fee_.update(Resource::feeInvalidData,
"version");
2169 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2174 for (
auto const& [disp, count] : applyResult.dispositions)
2179 case ListDisposition::accepted:
2180 JLOG(p_journal_.debug())
2181 <<
"Applied " << count <<
" new " << messageType
2182 <<
"(s) from peer " << remote_address_;
2185 case ListDisposition::expired:
2186 JLOG(p_journal_.debug())
2187 <<
"Applied " << count <<
" expired " << messageType
2188 <<
"(s) from peer " << remote_address_;
2191 case ListDisposition::pending:
2192 JLOG(p_journal_.debug())
2193 <<
"Processed " << count <<
" future " << messageType
2194 <<
"(s) from peer " << remote_address_;
2196 case ListDisposition::same_sequence:
2197 JLOG(p_journal_.warn())
2198 <<
"Ignored " << count <<
" " << messageType
2199 <<
"(s) with current sequence from peer "
2202 case ListDisposition::known_sequence:
2203 JLOG(p_journal_.warn())
2204 <<
"Ignored " << count <<
" " << messageType
2205 <<
"(s) with future sequence from peer " << remote_address_;
2207 case ListDisposition::stale:
2208 JLOG(p_journal_.warn())
2209 <<
"Ignored " << count <<
"stale " << messageType
2210 <<
"(s) from peer " << remote_address_;
2212 case ListDisposition::untrusted:
2213 JLOG(p_journal_.warn())
2214 <<
"Ignored " << count <<
" untrusted " << messageType
2215 <<
"(s) from peer " << remote_address_;
2217 case ListDisposition::unsupported_version:
2218 JLOG(p_journal_.warn())
2219 <<
"Ignored " << count <<
"unsupported version "
2220 << messageType <<
"(s) from peer " << remote_address_;
2222 case ListDisposition::invalid:
2223 JLOG(p_journal_.warn())
2224 <<
"Ignored " << count <<
"invalid " << messageType
2225 <<
"(s) from peer " << remote_address_;
2229 "ripple::PeerImp::onValidatorListMessage : invalid list "
2240 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2242 JLOG(p_journal_.debug())
2243 <<
"ValidatorList: received validator list from peer using "
2244 <<
"protocol version " << to_string(protocol_)
2245 <<
" which shouldn't support this feature.";
2246 fee_.update(Resource::feeUselessData,
"unsupported peer");
2249 onValidatorListMessage(
2253 ValidatorList::parseBlobs(*m));
2257 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2258 <<
" from peer " << remote_address_;
2259 using namespace std::string_literals;
2260 fee_.update(Resource::feeInvalidData, e.
what());
2270 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2272 JLOG(p_journal_.debug())
2273 <<
"ValidatorListCollection: received validator list from peer "
2274 <<
"using protocol version " << to_string(protocol_)
2275 <<
" which shouldn't support this feature.";
2276 fee_.update(Resource::feeUselessData,
"unsupported peer");
2279 else if (m->version() < 2)
2281 JLOG(p_journal_.debug())
2282 <<
"ValidatorListCollection: received invalid validator list "
2284 << m->version() <<
" from peer using protocol version "
2285 << to_string(protocol_);
2286 fee_.update(Resource::feeInvalidData,
"wrong version");
2289 onValidatorListMessage(
2290 "ValidatorListCollection",
2293 ValidatorList::parseBlobs(*m));
2297 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2298 << e.
what() <<
" from peer " << remote_address_;
2299 using namespace std::string_literals;
2300 fee_.update(Resource::feeInvalidData, e.
what());
2307 if (m->validation().size() < 50)
2309 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2310 fee_.update(Resource::feeMalformedRequest,
"too small");
2316 auto const closeTime = app_.timeKeeper().closeTime();
2321 val = std::make_shared<STValidation>(
2325 app_.validatorManifests().getMasterKey(pk));
2328 val->setSeen(closeTime);
2332 app_.getValidations().parms(),
2333 app_.timeKeeper().closeTime(),
2335 val->getSeenTime()))
2337 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2338 fee_.update(Resource::feeUselessData,
"not current");
2345 auto const isTrusted =
2346 app_.validators().trusted(val->getSignerPublic());
2354 overlay_.reportInboundTraffic(
2355 TrafficCount::category::validation_untrusted,
2356 Message::messageSize(*m));
2358 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2364 auto [added, relayed] =
2365 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2373 if (reduceRelayReady() && relayed &&
2374 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2375 overlay_.updateSlotAndSquelch(
2376 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2379 overlay_.reportInboundTraffic(
2380 TrafficCount::category::validation_duplicate,
2381 Message::messageSize(*m));
2383 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2387 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2389 JLOG(p_journal_.debug())
2390 <<
"Dropping untrusted validation from diverged peer";
2392 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2396 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2401 to_string(val->getNodeID());
2408 app_.getJobQueue().addJob(
2411 [weak, val, m, key]() {
2412 if (
auto peer = weak.
lock())
2413 peer->checkValidation(val, key, m);
2418 JLOG(p_journal_.debug())
2419 <<
"Dropping untrusted validation for load";
2424 JLOG(p_journal_.warn())
2425 <<
"Exception processing validation: " << e.
what();
2426 using namespace std::string_literals;
2427 fee_.update(Resource::feeMalformedRequest, e.
what());
2434 protocol::TMGetObjectByHash& packet = *m;
2436 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2437 <<
" " << packet.objects_size();
2442 if (send_queue_.size() >= Tuning::dropSendQueue)
2444 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2448 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2454 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2456 if (!txReduceRelayEnabled())
2458 JLOG(p_journal_.error())
2459 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2460 fee_.update(Resource::feeMalformedRequest,
"disabled");
2465 app_.getJobQueue().addJob(
2467 if (
auto peer = weak.
lock())
2468 peer->doTransactions(m);
2473 protocol::TMGetObjectByHash reply;
2475 reply.set_query(
false);
2477 if (packet.has_seq())
2478 reply.set_seq(packet.seq());
2480 reply.set_type(packet.type());
2482 if (packet.has_ledgerhash())
2486 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2490 reply.set_ledgerhash(packet.ledgerhash());
2494 Resource::feeModerateBurdenPeer,
2495 " received a get object by hash request");
2498 for (
int i = 0; i < packet.objects_size(); ++i)
2500 auto const& obj = packet.objects(i);
2503 uint256 const hash{obj.hash()};
2507 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash,
seq)};
2510 protocol::TMIndexedObject& newObj = *reply.add_objects();
2511 newObj.set_hash(hash.begin(), hash.size());
2513 &nodeObject->getData().front(),
2514 nodeObject->getData().size());
2516 if (obj.has_nodeid())
2517 newObj.set_index(obj.nodeid());
2518 if (obj.has_ledgerseq())
2519 newObj.set_ledgerseq(obj.ledgerseq());
2526 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2527 << packet.objects_size();
2528 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2535 bool progress =
false;
2537 for (
int i = 0; i < packet.objects_size(); ++i)
2539 protocol::TMIndexedObject
const& obj = packet.objects(i);
2543 if (obj.has_ledgerseq())
2545 if (obj.ledgerseq() != pLSeq)
2547 if (pLDo && (pLSeq != 0))
2549 JLOG(p_journal_.debug())
2550 <<
"GetObj: Full fetch pack for " << pLSeq;
2552 pLSeq = obj.ledgerseq();
2553 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2557 JLOG(p_journal_.debug())
2558 <<
"GetObj: Late fetch pack for " << pLSeq;
2567 uint256 const hash{obj.hash()};
2569 app_.getLedgerMaster().addFetchPack(
2571 std::make_shared<Blob>(
2572 obj.data().begin(), obj.data().end()));
2577 if (pLDo && (pLSeq != 0))
2579 JLOG(p_journal_.debug())
2580 <<
"GetObj: Partial fetch pack for " << pLSeq;
2582 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2583 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2590 if (!txReduceRelayEnabled())
2592 JLOG(p_journal_.error())
2593 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2594 fee_.update(Resource::feeMalformedRequest,
"disabled");
2599 app_.getJobQueue().addJob(
2601 if (
auto peer = weak.
lock())
2602 peer->handleHaveTransactions(m);
2607PeerImp::handleHaveTransactions(
2610 protocol::TMGetObjectByHash tmBH;
2611 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2612 tmBH.set_query(
true);
2614 JLOG(p_journal_.trace())
2615 <<
"received TMHaveTransactions " << m->hashes_size();
2621 JLOG(p_journal_.error())
2622 <<
"TMHaveTransactions with invalid hash size";
2623 fee_.update(Resource::feeMalformedRequest,
"hash size");
2629 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2631 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2635 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2637 auto obj = tmBH.add_objects();
2638 obj->set_hash(hash.
data(), hash.
size());
2645 removeTxQueue(hash);
2649 JLOG(p_journal_.trace())
2650 <<
"transaction request object is " << tmBH.objects_size();
2652 if (tmBH.objects_size() > 0)
2653 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2659 if (!txReduceRelayEnabled())
2661 JLOG(p_journal_.error())
2662 <<
"TMTransactions: tx reduce-relay is disabled";
2663 fee_.update(Resource::feeMalformedRequest,
"disabled");
2667 JLOG(p_journal_.trace())
2668 <<
"received TMTransactions " << m->transactions_size();
2670 overlay_.addTxMetrics(m->transactions_size());
2675 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2683 using on_message_fn =
2685 if (!strand_.running_in_this_thread())
2689 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2691 if (!m->has_validatorpubkey())
2693 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2700 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2706 if (key == app_.getValidationPublicKey())
2708 JLOG(p_journal_.debug())
2709 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2714 m->has_squelchduration() ? m->squelchduration() : 0;
2716 squelch_.removeSquelch(key);
2718 fee_.update(Resource::feeInvalidData,
"squelch duration");
2720 JLOG(p_journal_.debug())
2721 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2733 (void)lockedRecentLock;
2735 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2736 recentLedgers_.end())
2739 recentLedgers_.push_back(hash);
2748 if (app_.getFeeTrack().isLoadedLocal() ||
2749 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2750 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2752 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2758 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2759 fee_.update(Resource::feeMalformedRequest,
"hash size");
2763 fee_.fee = Resource::feeHeavyBurdenPeer;
2765 uint256 const hash{packet->ledgerhash()};
2768 auto elapsed = UptimeClock::now();
2769 auto const pap = &app_;
2770 app_.getJobQueue().addJob(
2771 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2772 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2777PeerImp::doTransactions(
2780 protocol::TMTransactions reply;
2782 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2783 << packet->objects_size();
2785 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2787 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2788 fee_.update(Resource::feeMalformedRequest,
"too big");
2794 auto const& obj = packet->objects(i);
2798 fee_.update(Resource::feeMalformedRequest,
"hash size");
2804 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2808 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
2810 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2815 auto tx = reply.add_transactions();
2816 auto sttx = txn->getSTransaction();
2818 tx->set_rawtransaction(s.
data(), s.
size());
2820 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
2822 tx->set_receivetimestamp(
2823 app_.timeKeeper().now().time_since_epoch().count());
2824 tx->set_deferred(txn->getSubmitResult().queued);
2827 if (reply.transactions_size() > 0)
2828 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2832PeerImp::checkTransaction(
2834 bool checkSignature,
2842 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2843 (stx->getFieldU32(sfLastLedgerSequence) <
2844 app_.getLedgerMaster().getValidLedgerIndex()))
2846 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2847 charge(Resource::feeUselessData,
"expired tx");
2856 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2858 tx->getStatus() ==
NEW,
2859 "ripple::PeerImp::checkTransaction Transaction created "
2861 if (tx->getStatus() ==
NEW)
2863 JLOG(p_journal_.debug())
2864 <<
"Processing " << (batch ?
"batch" :
"unsolicited")
2865 <<
" pseudo-transaction tx " << tx->getID();
2867 app_.getMasterTransaction().canonicalize(&tx);
2870 app_.getHashRouter().shouldRelay(tx->getID());
2873 JLOG(p_journal_.debug())
2874 <<
"Passing skipped pseudo pseudo-transaction tx "
2876 app_.overlay().relay(tx->getID(), {}, *toSkip);
2880 JLOG(p_journal_.debug())
2881 <<
"Charging for pseudo-transaction tx " << tx->getID();
2882 charge(Resource::feeUselessData,
"pseudo tx");
2893 app_.getHashRouter(),
2895 app_.getLedgerMaster().getValidatedRules(),
2897 valid != Validity::Valid)
2899 if (!validReason.empty())
2901 JLOG(p_journal_.trace())
2902 <<
"Exception checking transaction: " << validReason;
2906 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2908 Resource::feeInvalidSignature,
2909 "check transaction signature failure");
2916 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2920 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2922 if (tx->getStatus() ==
INVALID)
2924 if (!reason.
empty())
2926 JLOG(p_journal_.trace())
2927 <<
"Exception checking transaction: " << reason;
2929 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2930 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2934 bool const trusted(
flags & SF_TRUSTED);
2935 app_.getOPs().processTransaction(
2936 tx, trusted,
false, NetworkOPs::FailHard::no);
2940 JLOG(p_journal_.warn())
2941 <<
"Exception in " << __func__ <<
": " << ex.
what();
2942 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2943 using namespace std::string_literals;
2944 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
2950PeerImp::checkPropose(
2955 JLOG(p_journal_.trace())
2956 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
2958 XRPL_ASSERT(packet,
"ripple::PeerImp::checkPropose : non-null packet");
2963 JLOG(p_journal_.warn()) << desc;
2964 charge(Resource::feeInvalidSignature, desc);
2971 relay = app_.getOPs().processTrustedProposal(peerPos);
2973 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
2981 auto haveMessage = app_.overlay().relay(
2983 if (reduceRelayReady() && !haveMessage.empty())
2984 overlay_.updateSlotAndSquelch(
2987 std::move(haveMessage),
2988 protocol::mtPROPOSE_LEDGER);
2993PeerImp::checkValidation(
2998 if (!val->isValid())
3000 std::string desc{
"Validation forwarded by peer is invalid"};
3001 JLOG(p_journal_.debug()) << desc;
3002 charge(Resource::feeInvalidSignature, desc);
3017 overlay_.relay(*packet, key, val->getSignerPublic());
3018 if (reduceRelayReady() && !haveMessage.empty())
3020 overlay_.updateSlotAndSquelch(
3022 val->getSignerPublic(),
3023 std::move(haveMessage),
3024 protocol::mtVALIDATION);
3030 JLOG(p_journal_.trace())
3031 <<
"Exception processing validation: " << ex.
what();
3032 using namespace std::string_literals;
3033 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
3047 if (p->hasTxSet(rootHash) && p.get() != skip)
3049 auto score = p->getScore(true);
3050 if (!ret || (score > retScore))
3075 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3077 auto score = p->getScore(true);
3078 if (!ret || (score > retScore))
3090PeerImp::sendLedgerBase(
3092 protocol::TMLedgerData& ledgerData)
3094 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3097 addRaw(ledger->info(), s);
3100 auto const& stateMap{ledger->stateMap()};
3101 if (stateMap.getHash() != beast::zero)
3106 stateMap.serializeRoot(
root);
3107 ledgerData.add_nodes()->set_nodedata(
3108 root.getDataPtr(),
root.getLength());
3110 if (ledger->info().txHash != beast::zero)
3112 auto const& txMap{ledger->txMap()};
3113 if (txMap.getHash() != beast::zero)
3117 txMap.serializeRoot(
root);
3118 ledgerData.add_nodes()->set_nodedata(
3119 root.getDataPtr(),
root.getLength());
3125 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3132 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3136 if (m->has_ledgerhash())
3139 uint256 const ledgerHash{m->ledgerhash()};
3140 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3143 JLOG(p_journal_.trace())
3144 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3146 if (m->has_querytype() && !m->has_requestcookie())
3152 m->has_ledgerseq() ? m->ledgerseq() : 0,
3155 m->set_requestcookie(
id());
3157 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3158 JLOG(p_journal_.debug())
3159 <<
"getLedger: Request relayed to peer";
3163 JLOG(p_journal_.trace())
3164 <<
"getLedger: Failed to find peer to relay request";
3168 else if (m->has_ledgerseq())
3171 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3173 JLOG(p_journal_.debug())
3174 <<
"getLedger: Early ledger sequence request";
3178 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3181 JLOG(p_journal_.debug())
3182 <<
"getLedger: Don't have ledger with sequence "
3187 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3189 ledger = app_.getLedgerMaster().getClosedLedger();
3195 auto const ledgerSeq{ledger->info().seq};
3196 if (m->has_ledgerseq())
3198 if (ledgerSeq != m->ledgerseq())
3201 if (!m->has_requestcookie())
3203 Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3206 JLOG(p_journal_.warn())
3207 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3210 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3213 JLOG(p_journal_.debug())
3214 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3219 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3228 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3230 uint256 const txSetHash{m->ledgerhash()};
3232 app_.getInboundTransactions().getSet(txSetHash,
false)};
3235 if (m->has_querytype() && !m->has_requestcookie())
3240 m->set_requestcookie(
id());
3242 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3243 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3247 JLOG(p_journal_.debug())
3248 <<
"getTxSet: Failed to find relay peer";
3253 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3264 if (!m->has_requestcookie())
3266 Resource::feeModerateBurdenPeer,
"received a get ledger request");
3270 SHAMap const* map{
nullptr};
3271 protocol::TMLedgerData ledgerData;
3272 bool fatLeaves{
true};
3273 auto const itype{m->itype()};
3275 if (itype == protocol::liTS_CANDIDATE)
3277 if (sharedMap = getTxSet(m); !sharedMap)
3279 map = sharedMap.
get();
3282 ledgerData.set_ledgerseq(0);
3283 ledgerData.set_ledgerhash(m->ledgerhash());
3284 ledgerData.set_type(protocol::liTS_CANDIDATE);
3285 if (m->has_requestcookie())
3286 ledgerData.set_requestcookie(m->requestcookie());
3293 if (send_queue_.size() >= Tuning::dropSendQueue)
3295 JLOG(p_journal_.debug())
3296 <<
"processLedgerRequest: Large send queue";
3299 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3301 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3305 if (ledger = getLedger(m); !ledger)
3309 auto const ledgerHash{ledger->info().hash};
3310 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3311 ledgerData.set_ledgerseq(ledger->info().seq);
3312 ledgerData.set_type(itype);
3313 if (m->has_requestcookie())
3314 ledgerData.set_requestcookie(m->requestcookie());
3318 case protocol::liBASE:
3319 sendLedgerBase(ledger, ledgerData);
3322 case protocol::liTX_NODE:
3323 map = &ledger->txMap();
3324 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3325 << to_string(map->getHash());
3328 case protocol::liAS_NODE:
3329 map = &ledger->stateMap();
3330 JLOG(p_journal_.trace())
3331 <<
"processLedgerRequest: Account state map hash "
3332 << to_string(map->getHash());
3337 JLOG(p_journal_.error())
3338 <<
"processLedgerRequest: Invalid ledger info type";
3345 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3350 if (m->nodeids_size() > 0)
3352 auto const queryDepth{
3353 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3357 for (
int i = 0; i < m->nodeids_size() &&
3358 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3364 data.reserve(Tuning::softMaxReplyNodes);
3368 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3370 JLOG(p_journal_.trace())
3371 <<
"processLedgerRequest: getNodeFat got "
3372 << data.size() <<
" nodes";
3374 for (
auto const& d : data)
3376 if (ledgerData.nodes_size() >=
3377 Tuning::hardMaxReplyNodes)
3379 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3380 node->set_nodeid(d.first.getRawString());
3381 node->set_nodedata(d.second.data(), d.second.size());
3386 JLOG(p_journal_.warn())
3387 <<
"processLedgerRequest: getNodeFat returns false";
3395 case protocol::liBASE:
3397 info =
"Ledger base";
3400 case protocol::liTX_NODE:
3404 case protocol::liAS_NODE:
3408 case protocol::liTS_CANDIDATE:
3409 info =
"TS candidate";
3417 if (!m->has_ledgerhash())
3418 info +=
", no hash specified";
3420 JLOG(p_journal_.error())
3421 <<
"processLedgerRequest: getNodeFat with nodeId "
3422 << *shaMapNodeId <<
" and ledger info type " << info
3423 <<
" throws exception: " << e.
what();
3427 JLOG(p_journal_.info())
3428 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3429 <<
" nodes at depth " << queryDepth <<
", return "
3430 << ledgerData.nodes_size() <<
" nodes";
3433 if (ledgerData.nodes_size() == 0)
3436 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3440PeerImp::getScore(
bool haveItem)
const
3444 static int const spRandomMax = 9999;
3448 static int const spHaveItem = 10000;
3453 static int const spLatency = 30;
3456 static int const spNoLatency = 8000;
3461 score += spHaveItem;
3470 score -= latency->count() * spLatency;
3472 score -= spNoLatency;
3478PeerImp::isHighLatency()
const
3481 return latency_ >= peerHighLatency;
3485PeerImp::reduceRelayReady()
3487 if (!reduceRelayReady_)
3489 reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3490 reduce_relay::WAIT_ON_BOOTUP;
3491 return vpReduceRelayEnabled_ && reduceRelayReady_;
3497 using namespace std::chrono_literals;
3500 totalBytes_ += bytes;
3501 accumBytes_ += bytes;
3502 auto const timeElapsed = clock_type::now() - intervalStart_;
3503 auto const timeElapsedInSecs =
3504 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3506 if (timeElapsedInSecs >= 1s)
3508 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3509 rollingAvg_.push_back(avgBytes);
3511 auto const totalBytes =
3513 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3515 intervalStart_ = clock_type::now();
3521PeerImp::Metrics::average_bytes()
const
3524 return rollingAvgBytes_;
3528PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::size_t size() const
The number of nodes in the cluster list.
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_DIVERGED_TIME
std::chrono::seconds MAX_UNKNOWN_TIME
bool shouldProcess(uint256 const &key, PeerShortID peer, int &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
std::queue< std::shared_ptr< Message > > send_queue_
bool vpReduceRelayEnabled_
std::unique_ptr< LoadEvent > load_event_
boost::beast::http::fields const & headers_
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
clock_type::duration uptime() const
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
protocol::TMStatusChange last_status_
boost::shared_mutex nameMutex_
boost::circular_buffer< uint256 > recentTxSets_
std::unique_ptr< stream_type > stream_ptr_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Compressed compressionEnabled_
uint256 closedLedgerHash_
std::string domain() const
std::optional< std::uint32_t > lastPingSeq_
void onTimer(boost::system::error_code const &ec)
beast::Journal const journal_
struct ripple::PeerImp::@21 metrics_
beast::Journal const p_journal_
PeerImp(PeerImp const &)=delete
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
bool hasTxSet(uint256 const &hash) const override
clock_type::time_point lastPingTime_
void onMessageUnknown(std::uint16_t type)
std::shared_ptr< PeerFinder::Slot > const slot_
boost::circular_buffer< uint256 > recentLedgers_
std::optional< std::chrono::milliseconds > latency_
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
beast::IP::Endpoint const remote_address_
Json::Value json() override
PublicKey const publicKey_
hash_set< uint256 > txQueue_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
bool txReduceRelayEnabled_
clock_type::time_point trackingTime_
ProtocolVersion protocol_
reduce_relay::Squelch< UptimeClock > squelch_
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
uint256 previousLedgerHash_
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
void send(std::shared_ptr< Message > const &m) override
static std::string makePrefix(id_t id)
boost::system::error_code error_code
void onReadMessage(error_code ec, std::size_t bytes_transferred)
bool ledgerReplayEnabled_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
bool crawl() const
Returns true if this connection will publicly share its IP address.
void sendTxQueue() override
Send aggregated transactions' hashes.
bool txReduceRelayEnabled() const override
bool supportsFeature(ProtocolFeature f) const override
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
http_request_type request_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onShutdown(error_code ec)
boost::asio::strand< boost::asio::executor > strand_
void cycleStatus() override
boost::beast::multi_buffer read_buffer_
Resource::Consumer usage_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
void fail(std::string const &reason)
std::atomic< Tracking > tracking_
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
An endpoint that consumes resources.
int balance()
Returns the credit balance representing consumption.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
std::size_t size() const noexcept
void const * data() const noexcept
void const * getDataPtr() const
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Set the regular signature on a JTx.
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(PreclaimContext const &ctx, AccountID const &src)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
std::unique_ptr< Config > validator(std::unique_ptr< Config >, std::string const &)
adjust configuration with params needed to be a validator
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
static constexpr char FEATURE_COMPR[]
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ ValidatorListPropagation
@ ValidatorList2Propagation
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
static constexpr char FEATURE_LEDGER_REPLAY[]
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
static constexpr char FEATURE_TXRR[]
std::string to_string(base_uint< Bits, Tag > const &a)
Number root(Number f, unsigned d)
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
T shared_from_this(T... args)
beast::IP::Address public_ip
std::optional< std::uint32_t > networkID
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items
Set the sequence number on a JTx.