20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/perflog/PerfLog.h>
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/base64.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/protocol/TxFlags.h>
41#include <xrpl/protocol/digest.h>
43#include <boost/algorithm/string/predicate.hpp>
44#include <boost/beast/core/ostream.hpp>
52using namespace std::chrono_literals;
80 , sink_(app_.journal(
"Peer"), makePrefix(id))
81 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
84 , stream_ptr_(
std::move(stream_ptr))
85 , socket_(stream_ptr_->next_layer().socket())
86 , stream_(*stream_ptr_)
87 , strand_(socket_.get_executor())
89 , remote_address_(slot->remote_endpoint())
95 , publicKey_(publicKey)
98 , squelch_(app_.journal(
"Squelch"))
100 , fee_{Resource::feeTrivialPeer,
""}
102 , request_(
std::move(request))
104 , compressionEnabled_(
109 app_.config().COMPRESSION)
115 app_.config().TX_REDUCE_RELAY_ENABLE))
119 app_.config().LEDGER_REPLAY))
120 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
124 <<
" vp reduce-relay base squelch enabled "
135 bool const inCluster{
cluster()};
158 if (!
strand_.running_in_this_thread())
161 auto parseLedgerHash =
175 if (
auto const iter =
headers_.find(
"Closed-Ledger");
178 closed = parseLedgerHash(iter->value());
181 fail(
"Malformed handshake data (1)");
184 if (
auto const iter =
headers_.find(
"Previous-Ledger");
187 previous = parseLedgerHash(iter->value());
190 fail(
"Malformed handshake data (2)");
193 if (previous && !closed)
194 fail(
"Malformed handshake data (3)");
216 if (!
strand_.running_in_this_thread())
242 if (!
strand_.running_in_this_thread())
249 auto validator = m->getValidatorKey();
250 if (validator && !
squelch_.expireSquelch(*validator))
260 safe_cast<TrafficCount::category>(m->getCategory()),
282 <<
" sendq: " << sendq_size;
290 boost::asio::async_write(
299 std::placeholders::_1,
300 std::placeholders::_2)));
306 if (!
strand_.running_in_this_thread())
312 protocol::TMHaveTransactions ht;
314 ht.add_hashes(hash.data(), hash.size());
318 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
325 if (!
strand_.running_in_this_thread())
342 if (!
strand_.running_in_this_thread())
347 auto removed =
txQueue_.erase(hash);
359 fail(
"charge: Resources");
368 auto const iter =
headers_.find(
"Crawl");
371 return boost::iequals(iter->value(),
"public");
397 ret[jss::inbound] =
true;
401 ret[jss::cluster] =
true;
411 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
428 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
433 if ((minSeq != 0) || (maxSeq != 0))
434 ret[jss::complete_ledgers] =
440 ret[jss::track] =
"diverged";
444 ret[jss::track] =
"unknown";
453 protocol::TMStatusChange last_status;
460 if (closedLedgerHash != beast::zero)
461 ret[jss::ledger] =
to_string(closedLedgerHash);
463 if (last_status.has_newstatus())
465 switch (last_status.newstatus())
467 case protocol::nsCONNECTING:
468 ret[jss::status] =
"connecting";
471 case protocol::nsCONNECTED:
472 ret[jss::status] =
"connected";
475 case protocol::nsMONITORING:
476 ret[jss::status] =
"monitoring";
479 case protocol::nsVALIDATING:
480 ret[jss::status] =
"validating";
483 case protocol::nsSHUTTING:
484 ret[jss::status] =
"shutting";
489 <<
"Unknown status: " << last_status.newstatus();
494 ret[jss::metrics][jss::total_bytes_recv] =
496 ret[jss::metrics][jss::total_bytes_sent] =
498 ret[jss::metrics][jss::avg_bps_recv] =
500 ret[jss::metrics][jss::avg_bps_sent] =
579 strand_.running_in_this_thread(),
580 "ripple::PeerImp::close : strand in this thread");
602 if (!
strand_.running_in_this_thread())
613 <<
" failed: " << reason;
622 strand_.running_in_this_thread(),
623 "ripple::PeerImp::fail : strand in this thread");
637 strand_.running_in_this_thread(),
638 "ripple::PeerImp::gracefulClose : strand in this thread");
640 socket_.is_open(),
"ripple::PeerImp::gracefulClose : socket is open");
643 "ripple::PeerImp::gracefulClose : socket is not closing");
648 stream_.async_shutdown(bind_executor(
658 timer_.expires_from_now(peerTimerInterval, ec);
665 timer_.async_wait(bind_executor(
695 if (ec == boost::asio::error::operation_aborted)
707 fail(
"Large send queue");
713 clock_type::duration duration;
734 fail(
"Ping Timeout");
741 protocol::TMPing message;
742 message.set_type(protocol::TMPing::ptPING);
745 send(std::make_shared<Message>(message, protocol::mtPING));
757 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
760 if (ec != boost::asio::error::eof)
761 return fail(
"onShutdown", ec);
771 "ripple::PeerImp::doAccept : empty read buffer");
780 return fail(
"makeSharedValue: Unexpected failure");
801 auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
814 boost::asio::async_write(
816 write_buffer->data(),
817 boost::asio::transfer_all(),
824 if (ec == boost::asio::error::operation_aborted)
827 return fail(
"onWriteResponse", ec);
828 if (write_buffer->size() == bytes_transferred)
830 return fail(
"Failed to write header");
894 if (ec == boost::asio::error::operation_aborted)
896 if (ec == boost::asio::error::eof)
902 return fail(
"onReadMessage", ec);
905 if (bytes_transferred > 0)
906 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
908 stream <<
"onReadMessage";
911 metrics_.recv.add_message(bytes_transferred);
921 using namespace std::chrono_literals;
926 "invokeProtocolMessage",
931 return fail(
"onReadMessage", ec);
936 if (bytes_consumed == 0)
949 std::placeholders::_1,
950 std::placeholders::_2)));
958 if (ec == boost::asio::error::operation_aborted)
961 return fail(
"onWriteMessage", ec);
964 if (bytes_transferred > 0)
965 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
967 stream <<
"onWriteMessage";
970 metrics_.sent.add_message(bytes_transferred);
974 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
979 return boost::asio::async_write(
988 std::placeholders::_1,
989 std::placeholders::_2)));
994 return stream_.async_shutdown(bind_executor(
999 std::placeholders::_1)));
1028 *m,
static_cast<protocol::MessageType
>(type),
true);
1038 if ((type == MessageType::mtTRANSACTION ||
1039 type == MessageType::mtHAVE_TRANSACTIONS ||
1040 type == MessageType::mtTRANSACTIONS ||
1052 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1054 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1055 << uncompressed_size <<
" " << isCompressed;
1070 auto const s = m->list_size();
1090 if (m->type() == protocol::TMPing::ptPING)
1094 m->set_type(protocol::TMPing::ptPONG);
1095 send(std::make_shared<Message>(*m, protocol::mtPING));
1099 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1109 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1134 for (
int i = 0; i < m->clusternodes().size(); ++i)
1136 protocol::TMClusterNode
const& node = m->clusternodes(i);
1139 if (node.has_nodename())
1140 name = node.nodename();
1142 auto const publicKey =
1149 auto const reportTime =
1153 *publicKey,
name, node.nodeload(), reportTime);
1157 int loadSources = m->loadsources().size();
1158 if (loadSources != 0)
1161 gossip.
items.reserve(loadSources);
1162 for (
int i = 0; i < m->loadsources().size(); ++i)
1164 protocol::TMLoadSource
const& node = m->loadsources(i);
1169 gossip.
items.push_back(item);
1182 if (status.getReportTime() >= thresh)
1183 fees.push_back(status.getLoadFee());
1188 auto const index = fees.size() / 2;
1190 clusterFee = fees[index];
1206 if (m->endpoints_v2().size() >= 1024)
1213 endpoints.
reserve(m->endpoints_v2().size());
1216 for (
auto const& tm : m->endpoints_v2())
1223 << tm.endpoint() <<
"}";
1249 if (!endpoints.
empty())
1266 eraseTxQueue !=
batch,
1267 (
"ripple::PeerImp::handleTransaction : valid inputs"));
1276 <<
"Need network ledger";
1284 auto stx = std::make_shared<STTx const>(sit);
1285 uint256 txID = stx->getTransactionID();
1292 JLOG(
p_journal_.
warn()) <<
"Ignoring Network relayed Tx containing "
1293 "tfInnerBatchTxn (handleTransaction).";
1325 bool checkSignature =
true;
1328 if (!m->has_deferred() || !m->deferred())
1332 flags |= SF_TRUSTED;
1341 checkSignature =
false;
1348 <<
"No new transactions until synchronized";
1361 "recvTransaction->checkTransaction",
1367 if (
auto peer = weak.lock())
1368 peer->checkTransaction(
1369 flags, checkSignature, stx,
batch);
1376 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1377 <<
". Exception: " << ex.
what();
1388 auto const itype{m->itype()};
1391 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1392 return badData(
"Invalid ledger info type");
1397 return std::nullopt;
1400 if (itype == protocol::liTS_CANDIDATE)
1402 if (!m->has_ledgerhash())
1403 return badData(
"Invalid TX candidate set, missing TX set hash");
1406 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1407 !(ltype && *ltype == protocol::ltCLOSED))
1409 return badData(
"Invalid request");
1413 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1414 return badData(
"Invalid ledger type");
1418 return badData(
"Invalid ledger hash");
1421 if (m->has_ledgerseq())
1423 auto const ledgerSeq{m->ledgerseq()};
1426 using namespace std::chrono_literals;
1436 if (itype != protocol::liBASE)
1438 if (m->nodeids_size() <= 0)
1439 return badData(
"Invalid ledger node IDs");
1441 for (
auto const& nodeId : m->nodeids())
1444 return badData(
"Invalid SHAMap node ID");
1449 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1450 return badData(
"Invalid query type");
1453 if (m->has_querydepth())
1456 itype == protocol::liBASE)
1458 return badData(
"Invalid query depth");
1465 if (
auto peer = weak.
lock())
1466 peer->processLedgerRequest(m);
1486 if (
auto peer = weak.
lock())
1489 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1490 if (reply.has_error())
1492 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1494 Resource::feeMalformedRequest,
1495 "proof_path_request");
1498 Resource::feeRequestNoReply,
"proof_path_request");
1502 peer->send(std::make_shared<Message>(
1503 reply, protocol::mtPROOF_PATH_RESPONSE));
1512 if (!ledgerReplayEnabled_)
1515 Resource::feeMalformedRequest,
"proof_path_response disabled");
1519 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1521 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1528 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1529 if (!ledgerReplayEnabled_)
1532 Resource::feeMalformedRequest,
"replay_delta_request disabled");
1536 fee_.fee = Resource::feeModerateBurdenPeer;
1538 app_.getJobQueue().addJob(
1540 if (
auto peer = weak.
lock())
1543 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1544 if (reply.has_error())
1546 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1548 Resource::feeMalformedRequest,
1549 "replay_delta_request");
1552 Resource::feeRequestNoReply,
1553 "replay_delta_request");
1557 peer->send(std::make_shared<Message>(
1558 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1567 if (!ledgerReplayEnabled_)
1570 Resource::feeMalformedRequest,
"replay_delta_response disabled");
1574 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1576 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1584 fee_.update(Resource::feeInvalidData, msg);
1585 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1590 return badData(
"Invalid ledger hash");
1594 auto const ledgerSeq{m->ledgerseq()};
1595 if (m->type() == protocol::liTS_CANDIDATE)
1606 using namespace std::chrono_literals;
1607 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1608 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1617 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1618 return badData(
"Invalid ledger info type");
1621 if (m->has_error() &&
1622 (m->error() < protocol::reNO_LEDGER ||
1623 m->error() > protocol::reBAD_REQUEST))
1625 return badData(
"Invalid reply error");
1629 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1632 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1636 if (m->has_requestcookie())
1638 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1640 m->clear_requestcookie();
1641 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1645 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1650 uint256 const ledgerHash{m->ledgerhash()};
1653 if (m->type() == protocol::liTS_CANDIDATE)
1656 app_.getJobQueue().addJob(
1657 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1658 if (
auto peer = weak.lock())
1660 peer->app_.getInboundTransactions().gotData(
1661 ledgerHash, peer, m);
1668 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1674 protocol::TMProposeSet&
set = *m;
1680 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1683 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1685 Resource::feeInvalidSignature,
1686 " signature can't be longer than 72 bytes");
1693 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1694 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1702 auto const isTrusted = app_.validators().trusted(publicKey);
1710 overlay_.reportInboundTraffic(
1711 TrafficCount::category::proposal_untrusted,
1712 Message::messageSize(*m));
1714 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1718 uint256 const proposeHash{
set.currenttxhash()};
1719 uint256 const prevLedger{
set.previousledger()};
1731 if (
auto [added, relayed] =
1732 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1737 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1738 overlay_.updateSlotAndSquelch(
1739 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1742 overlay_.reportInboundTraffic(
1743 TrafficCount::category::proposal_duplicate,
1744 Message::messageSize(*m));
1746 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1753 if (tracking_.load() == Tracking::diverged)
1755 JLOG(p_journal_.debug())
1756 <<
"Proposal: Dropping untrusted (peer divergence)";
1760 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1762 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1767 JLOG(p_journal_.trace())
1768 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1779 app_.timeKeeper().closeTime(),
1780 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1783 app_.getJobQueue().addJob(
1785 "recvPropose->checkPropose",
1787 if (
auto peer = weak.lock())
1788 peer->checkPropose(isTrusted, m,
proposal);
1795 JLOG(p_journal_.trace()) <<
"Status: Change";
1797 if (!m->has_networktime())
1798 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1802 if (!last_status_.has_newstatus() || m->has_newstatus())
1807 protocol::NodeStatus status = last_status_.newstatus();
1809 m->set_newstatus(status);
1813 if (m->newevent() == protocol::neLOST_SYNC)
1815 bool outOfSync{
false};
1820 if (!closedLedgerHash_.isZero())
1823 closedLedgerHash_.zero();
1825 previousLedgerHash_.zero();
1829 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1836 bool const peerChangedLedgers{
1843 if (peerChangedLedgers)
1845 closedLedgerHash_ = m->ledgerhash();
1846 closedLedgerHash = closedLedgerHash_;
1847 addLedger(closedLedgerHash, sl);
1851 closedLedgerHash_.zero();
1854 if (m->has_ledgerhashprevious() &&
1857 previousLedgerHash_ = m->ledgerhashprevious();
1858 addLedger(previousLedgerHash_, sl);
1862 previousLedgerHash_.zero();
1865 if (peerChangedLedgers)
1867 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1871 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1875 if (m->has_firstseq() && m->has_lastseq())
1879 minLedger_ = m->firstseq();
1880 maxLedger_ = m->lastseq();
1882 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1883 minLedger_ = maxLedger_ = 0;
1886 if (m->has_ledgerseq() &&
1887 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1890 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1893 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1896 if (m->has_newstatus())
1898 switch (m->newstatus())
1900 case protocol::nsCONNECTING:
1901 j[jss::status] =
"CONNECTING";
1903 case protocol::nsCONNECTED:
1904 j[jss::status] =
"CONNECTED";
1906 case protocol::nsMONITORING:
1907 j[jss::status] =
"MONITORING";
1909 case protocol::nsVALIDATING:
1910 j[jss::status] =
"VALIDATING";
1912 case protocol::nsSHUTTING:
1913 j[jss::status] =
"SHUTTING";
1918 if (m->has_newevent())
1920 switch (m->newevent())
1922 case protocol::neCLOSING_LEDGER:
1923 j[jss::action] =
"CLOSING_LEDGER";
1925 case protocol::neACCEPTED_LEDGER:
1926 j[jss::action] =
"ACCEPTED_LEDGER";
1928 case protocol::neSWITCHED_LEDGER:
1929 j[jss::action] =
"SWITCHED_LEDGER";
1931 case protocol::neLOST_SYNC:
1932 j[jss::action] =
"LOST_SYNC";
1937 if (m->has_ledgerseq())
1939 j[jss::ledger_index] = m->ledgerseq();
1942 if (m->has_ledgerhash())
1944 uint256 closedLedgerHash{};
1946 std::lock_guard sl(recentLock_);
1947 closedLedgerHash = closedLedgerHash_;
1949 j[jss::ledger_hash] = to_string(closedLedgerHash);
1952 if (m->has_networktime())
1954 j[jss::date] = Json::UInt(m->networktime());
1957 if (m->has_firstseq() && m->has_lastseq())
1959 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
1960 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
1976 serverSeq = maxLedger_;
1982 checkTracking(serverSeq, validationSeq);
1991 if (diff < Tuning::convergedLedgerLimit)
1994 tracking_ = Tracking::converged;
1997 if ((diff > Tuning::divergedLedgerLimit) &&
1998 (tracking_.load() != Tracking::diverged))
2003 tracking_ = Tracking::diverged;
2004 trackingTime_ = clock_type::now();
2013 fee_.update(Resource::feeMalformedRequest,
"bad hash");
2017 uint256 const hash{m->hash()};
2019 if (m->status() == protocol::tsHAVE)
2023 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2024 recentTxSets_.end())
2026 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
2030 recentTxSets_.push_back(hash);
2035PeerImp::onValidatorListMessage(
2045 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
2046 <<
" from peer " << remote_address_;
2048 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
2054 JLOG(p_journal_.debug())
2055 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
2056 <<
" (" << id_ <<
")";
2058 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2060 JLOG(p_journal_.debug())
2061 << messageType <<
": received duplicate " << messageType;
2065 fee_.update(Resource::feeUselessData,
"duplicate");
2069 auto const applyResult = app_.validators().applyListsAndBroadcast(
2073 remote_address_.to_string(),
2076 app_.getHashRouter(),
2079 JLOG(p_journal_.debug())
2080 <<
"Processed " << messageType <<
" version " << version <<
" from "
2081 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2082 :
"unknown or invalid publisher")
2083 <<
" from " << remote_address_.to_string() <<
" (" << id_
2084 <<
") with best result " << to_string(applyResult.bestDisposition());
2087 switch (applyResult.bestDisposition())
2090 case ListDisposition::accepted:
2092 case ListDisposition::expired:
2094 case ListDisposition::pending: {
2098 applyResult.publisherKey,
2099 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2101 auto const& pubKey = *applyResult.publisherKey;
2103 if (
auto const iter = publisherListSequences_.find(pubKey);
2104 iter != publisherListSequences_.end())
2107 iter->second < applyResult.sequence,
2108 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2111 publisherListSequences_[pubKey] = applyResult.sequence;
2114 case ListDisposition::same_sequence:
2115 case ListDisposition::known_sequence:
2120 applyResult.sequence && applyResult.publisherKey,
2121 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2122 "and set publisher key");
2124 publisherListSequences_[*applyResult.publisherKey] <=
2125 applyResult.sequence,
2126 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2131 case ListDisposition::stale:
2132 case ListDisposition::untrusted:
2133 case ListDisposition::invalid:
2134 case ListDisposition::unsupported_version:
2138 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2143 switch (applyResult.worstDisposition())
2145 case ListDisposition::accepted:
2146 case ListDisposition::expired:
2147 case ListDisposition::pending:
2150 case ListDisposition::same_sequence:
2151 case ListDisposition::known_sequence:
2156 Resource::feeUselessData,
2157 " duplicate (same_sequence or known_sequence)");
2159 case ListDisposition::stale:
2162 fee_.update(Resource::feeInvalidData,
"expired");
2164 case ListDisposition::untrusted:
2168 fee_.update(Resource::feeUselessData,
"untrusted");
2170 case ListDisposition::invalid:
2173 Resource::feeInvalidSignature,
"invalid list disposition");
2175 case ListDisposition::unsupported_version:
2178 fee_.update(Resource::feeInvalidData,
"version");
2182 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2187 for (
auto const& [disp, count] : applyResult.dispositions)
2192 case ListDisposition::accepted:
2193 JLOG(p_journal_.debug())
2194 <<
"Applied " << count <<
" new " << messageType
2195 <<
"(s) from peer " << remote_address_;
2198 case ListDisposition::expired:
2199 JLOG(p_journal_.debug())
2200 <<
"Applied " << count <<
" expired " << messageType
2201 <<
"(s) from peer " << remote_address_;
2204 case ListDisposition::pending:
2205 JLOG(p_journal_.debug())
2206 <<
"Processed " << count <<
" future " << messageType
2207 <<
"(s) from peer " << remote_address_;
2209 case ListDisposition::same_sequence:
2210 JLOG(p_journal_.warn())
2211 <<
"Ignored " << count <<
" " << messageType
2212 <<
"(s) with current sequence from peer "
2215 case ListDisposition::known_sequence:
2216 JLOG(p_journal_.warn())
2217 <<
"Ignored " << count <<
" " << messageType
2218 <<
"(s) with future sequence from peer " << remote_address_;
2220 case ListDisposition::stale:
2221 JLOG(p_journal_.warn())
2222 <<
"Ignored " << count <<
"stale " << messageType
2223 <<
"(s) from peer " << remote_address_;
2225 case ListDisposition::untrusted:
2226 JLOG(p_journal_.warn())
2227 <<
"Ignored " << count <<
" untrusted " << messageType
2228 <<
"(s) from peer " << remote_address_;
2230 case ListDisposition::unsupported_version:
2231 JLOG(p_journal_.warn())
2232 <<
"Ignored " << count <<
"unsupported version "
2233 << messageType <<
"(s) from peer " << remote_address_;
2235 case ListDisposition::invalid:
2236 JLOG(p_journal_.warn())
2237 <<
"Ignored " << count <<
"invalid " << messageType
2238 <<
"(s) from peer " << remote_address_;
2242 "ripple::PeerImp::onValidatorListMessage : invalid list "
2253 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2255 JLOG(p_journal_.debug())
2256 <<
"ValidatorList: received validator list from peer using "
2257 <<
"protocol version " << to_string(protocol_)
2258 <<
" which shouldn't support this feature.";
2259 fee_.update(Resource::feeUselessData,
"unsupported peer");
2262 onValidatorListMessage(
2266 ValidatorList::parseBlobs(*m));
2270 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2271 <<
" from peer " << remote_address_;
2272 using namespace std::string_literals;
2273 fee_.update(Resource::feeInvalidData, e.
what());
2283 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2285 JLOG(p_journal_.debug())
2286 <<
"ValidatorListCollection: received validator list from peer "
2287 <<
"using protocol version " << to_string(protocol_)
2288 <<
" which shouldn't support this feature.";
2289 fee_.update(Resource::feeUselessData,
"unsupported peer");
2292 else if (m->version() < 2)
2294 JLOG(p_journal_.debug())
2295 <<
"ValidatorListCollection: received invalid validator list "
2297 << m->version() <<
" from peer using protocol version "
2298 << to_string(protocol_);
2299 fee_.update(Resource::feeInvalidData,
"wrong version");
2302 onValidatorListMessage(
2303 "ValidatorListCollection",
2306 ValidatorList::parseBlobs(*m));
2310 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2311 << e.
what() <<
" from peer " << remote_address_;
2312 using namespace std::string_literals;
2313 fee_.update(Resource::feeInvalidData, e.
what());
2320 if (m->validation().size() < 50)
2322 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2323 fee_.update(Resource::feeMalformedRequest,
"too small");
2329 auto const closeTime = app_.timeKeeper().closeTime();
2334 val = std::make_shared<STValidation>(
2338 app_.validatorManifests().getMasterKey(pk));
2341 val->setSeen(closeTime);
2345 app_.getValidations().parms(),
2346 app_.timeKeeper().closeTime(),
2348 val->getSeenTime()))
2350 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2351 fee_.update(Resource::feeUselessData,
"not current");
2358 auto const isTrusted =
2359 app_.validators().trusted(val->getSignerPublic());
2367 overlay_.reportInboundTraffic(
2368 TrafficCount::category::validation_untrusted,
2369 Message::messageSize(*m));
2371 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2377 auto [added, relayed] =
2378 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2385 if (relayed && (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2386 overlay_.updateSlotAndSquelch(
2387 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2390 overlay_.reportInboundTraffic(
2391 TrafficCount::category::validation_duplicate,
2392 Message::messageSize(*m));
2394 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2398 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2400 JLOG(p_journal_.debug())
2401 <<
"Dropping untrusted validation from diverged peer";
2403 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2407 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2412 to_string(val->getNodeID());
2419 app_.getJobQueue().addJob(
2422 [weak, val, m, key]() {
2423 if (
auto peer = weak.
lock())
2424 peer->checkValidation(val, key, m);
2429 JLOG(p_journal_.debug())
2430 <<
"Dropping untrusted validation for load";
2435 JLOG(p_journal_.warn())
2436 <<
"Exception processing validation: " << e.
what();
2437 using namespace std::string_literals;
2438 fee_.update(Resource::feeMalformedRequest, e.
what());
2445 protocol::TMGetObjectByHash& packet = *m;
2447 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2448 <<
" " << packet.objects_size();
2453 if (send_queue_.size() >= Tuning::dropSendQueue)
2455 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2459 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2465 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2467 if (!txReduceRelayEnabled())
2469 JLOG(p_journal_.error())
2470 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2471 fee_.update(Resource::feeMalformedRequest,
"disabled");
2476 app_.getJobQueue().addJob(
2478 if (
auto peer = weak.
lock())
2479 peer->doTransactions(m);
2484 protocol::TMGetObjectByHash reply;
2486 reply.set_query(
false);
2488 if (packet.has_seq())
2489 reply.set_seq(packet.seq());
2491 reply.set_type(packet.type());
2493 if (packet.has_ledgerhash())
2497 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2501 reply.set_ledgerhash(packet.ledgerhash());
2505 Resource::feeModerateBurdenPeer,
2506 " received a get object by hash request");
2509 for (
int i = 0; i < packet.objects_size(); ++i)
2511 auto const& obj = packet.objects(i);
2514 uint256 const hash{obj.hash()};
2517 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2518 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2521 protocol::TMIndexedObject& newObj = *reply.add_objects();
2522 newObj.set_hash(hash.begin(), hash.size());
2524 &nodeObject->getData().front(),
2525 nodeObject->getData().size());
2527 if (obj.has_nodeid())
2528 newObj.set_index(obj.nodeid());
2529 if (obj.has_ledgerseq())
2530 newObj.set_ledgerseq(obj.ledgerseq());
2537 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2538 << packet.objects_size();
2539 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2546 bool progress =
false;
2548 for (
int i = 0; i < packet.objects_size(); ++i)
2550 protocol::TMIndexedObject
const& obj = packet.objects(i);
2554 if (obj.has_ledgerseq())
2556 if (obj.ledgerseq() != pLSeq)
2558 if (pLDo && (pLSeq != 0))
2560 JLOG(p_journal_.debug())
2561 <<
"GetObj: Full fetch pack for " << pLSeq;
2563 pLSeq = obj.ledgerseq();
2564 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2568 JLOG(p_journal_.debug())
2569 <<
"GetObj: Late fetch pack for " << pLSeq;
2578 uint256 const hash{obj.hash()};
2580 app_.getLedgerMaster().addFetchPack(
2582 std::make_shared<Blob>(
2583 obj.data().begin(), obj.data().end()));
2588 if (pLDo && (pLSeq != 0))
2590 JLOG(p_journal_.debug())
2591 <<
"GetObj: Partial fetch pack for " << pLSeq;
2593 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2594 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2601 if (!txReduceRelayEnabled())
2603 JLOG(p_journal_.error())
2604 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2605 fee_.update(Resource::feeMalformedRequest,
"disabled");
2610 app_.getJobQueue().addJob(
2612 if (
auto peer = weak.
lock())
2613 peer->handleHaveTransactions(m);
2618PeerImp::handleHaveTransactions(
2621 protocol::TMGetObjectByHash tmBH;
2622 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2623 tmBH.set_query(
true);
2625 JLOG(p_journal_.trace())
2626 <<
"received TMHaveTransactions " << m->hashes_size();
2632 JLOG(p_journal_.error())
2633 <<
"TMHaveTransactions with invalid hash size";
2634 fee_.update(Resource::feeMalformedRequest,
"hash size");
2640 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2642 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2646 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2648 auto obj = tmBH.add_objects();
2649 obj->set_hash(hash.
data(), hash.
size());
2656 removeTxQueue(hash);
2660 JLOG(p_journal_.trace())
2661 <<
"transaction request object is " << tmBH.objects_size();
2663 if (tmBH.objects_size() > 0)
2664 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2670 if (!txReduceRelayEnabled())
2672 JLOG(p_journal_.error())
2673 <<
"TMTransactions: tx reduce-relay is disabled";
2674 fee_.update(Resource::feeMalformedRequest,
"disabled");
2678 JLOG(p_journal_.trace())
2679 <<
"received TMTransactions " << m->transactions_size();
2681 overlay_.addTxMetrics(m->transactions_size());
2686 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2694 using on_message_fn =
2696 if (!strand_.running_in_this_thread())
2700 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2702 if (!m->has_validatorpubkey())
2704 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2707 auto validator = m->validatorpubkey();
2711 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2717 if (key == app_.getValidationPublicKey())
2719 JLOG(p_journal_.debug())
2720 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2725 m->has_squelchduration() ? m->squelchduration() : 0;
2727 squelch_.removeSquelch(key);
2729 fee_.update(Resource::feeInvalidData,
"squelch duration");
2731 JLOG(p_journal_.debug())
2732 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2744 (void)lockedRecentLock;
2746 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2747 recentLedgers_.end())
2750 recentLedgers_.push_back(hash);
2759 if (app_.getFeeTrack().isLoadedLocal() ||
2760 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2761 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2763 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2769 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2770 fee_.update(Resource::feeMalformedRequest,
"hash size");
2774 fee_.fee = Resource::feeHeavyBurdenPeer;
2776 uint256 const hash{packet->ledgerhash()};
2779 auto elapsed = UptimeClock::now();
2780 auto const pap = &app_;
2781 app_.getJobQueue().addJob(
2782 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2783 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2788PeerImp::doTransactions(
2791 protocol::TMTransactions reply;
2793 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2794 << packet->objects_size();
2796 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2798 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2799 fee_.update(Resource::feeMalformedRequest,
"too big");
2805 auto const& obj = packet->objects(i);
2809 fee_.update(Resource::feeMalformedRequest,
"hash size");
2815 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2819 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
2821 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2826 auto tx = reply.add_transactions();
2827 auto sttx = txn->getSTransaction();
2829 tx->set_rawtransaction(s.
data(), s.
size());
2831 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
2833 tx->set_receivetimestamp(
2834 app_.timeKeeper().now().time_since_epoch().count());
2835 tx->set_deferred(txn->getSubmitResult().queued);
2838 if (reply.transactions_size() > 0)
2839 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2843PeerImp::checkTransaction(
2845 bool checkSignature,
2857 JLOG(p_journal_.warn()) <<
"Ignoring Network relayed Tx containing "
2858 "tfInnerBatchTxn (checkSignature).";
2859 charge(Resource::feeModerateBurdenPeer,
"inner batch txn");
2865 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2866 (stx->getFieldU32(sfLastLedgerSequence) <
2867 app_.getLedgerMaster().getValidLedgerIndex()))
2869 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2870 charge(Resource::feeUselessData,
"expired tx");
2879 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2881 tx->getStatus() ==
NEW,
2882 "ripple::PeerImp::checkTransaction Transaction created "
2884 if (tx->getStatus() ==
NEW)
2886 JLOG(p_journal_.debug())
2887 <<
"Processing " << (
batch ?
"batch" :
"unsolicited")
2888 <<
" pseudo-transaction tx " << tx->getID();
2890 app_.getMasterTransaction().canonicalize(&tx);
2893 app_.getHashRouter().shouldRelay(tx->getID());
2896 JLOG(p_journal_.debug())
2897 <<
"Passing skipped pseudo pseudo-transaction tx "
2899 app_.overlay().relay(tx->getID(), {}, *toSkip);
2903 JLOG(p_journal_.debug())
2904 <<
"Charging for pseudo-transaction tx " << tx->getID();
2905 charge(Resource::feeUselessData,
"pseudo tx");
2916 app_.getHashRouter(),
2918 app_.getLedgerMaster().getValidatedRules(),
2920 valid != Validity::Valid)
2922 if (!validReason.empty())
2924 JLOG(p_journal_.trace())
2925 <<
"Exception checking transaction: " << validReason;
2929 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2931 Resource::feeInvalidSignature,
2932 "check transaction signature failure");
2939 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2943 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2945 if (tx->getStatus() ==
INVALID)
2947 if (!reason.
empty())
2949 JLOG(p_journal_.trace())
2950 <<
"Exception checking transaction: " << reason;
2952 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2953 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2957 bool const trusted(flags & SF_TRUSTED);
2958 app_.getOPs().processTransaction(
2959 tx, trusted,
false, NetworkOPs::FailHard::no);
2963 JLOG(p_journal_.warn())
2964 <<
"Exception in " << __func__ <<
": " << ex.
what();
2965 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2966 using namespace std::string_literals;
2967 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
2973PeerImp::checkPropose(
2978 JLOG(p_journal_.trace())
2979 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
2981 XRPL_ASSERT(packet,
"ripple::PeerImp::checkPropose : non-null packet");
2986 JLOG(p_journal_.warn()) << desc;
2987 charge(Resource::feeInvalidSignature, desc);
2994 relay = app_.getOPs().processTrustedProposal(peerPos);
2996 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3004 auto haveMessage = app_.overlay().relay(
3006 if (!haveMessage.empty())
3007 overlay_.updateSlotAndSquelch(
3010 std::move(haveMessage),
3011 protocol::mtPROPOSE_LEDGER);
3016PeerImp::checkValidation(
3021 if (!val->isValid())
3023 std::string desc{
"Validation forwarded by peer is invalid"};
3024 JLOG(p_journal_.debug()) << desc;
3025 charge(Resource::feeInvalidSignature, desc);
3040 overlay_.relay(*packet, key, val->getSignerPublic());
3041 if (!haveMessage.empty())
3043 overlay_.updateSlotAndSquelch(
3045 val->getSignerPublic(),
3046 std::move(haveMessage),
3047 protocol::mtVALIDATION);
3053 JLOG(p_journal_.trace())
3054 <<
"Exception processing validation: " << ex.
what();
3055 using namespace std::string_literals;
3056 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
3070 if (p->hasTxSet(rootHash) && p.get() != skip)
3072 auto score = p->getScore(true);
3073 if (!ret || (score > retScore))
3098 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3100 auto score = p->getScore(true);
3101 if (!ret || (score > retScore))
3113PeerImp::sendLedgerBase(
3115 protocol::TMLedgerData& ledgerData)
3117 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3120 addRaw(ledger->info(), s);
3123 auto const& stateMap{ledger->stateMap()};
3124 if (stateMap.getHash() != beast::zero)
3129 stateMap.serializeRoot(
root);
3130 ledgerData.add_nodes()->set_nodedata(
3131 root.getDataPtr(),
root.getLength());
3133 if (ledger->info().txHash != beast::zero)
3135 auto const& txMap{ledger->txMap()};
3136 if (txMap.getHash() != beast::zero)
3140 txMap.serializeRoot(
root);
3141 ledgerData.add_nodes()->set_nodedata(
3142 root.getDataPtr(),
root.getLength());
3148 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3155 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3159 if (m->has_ledgerhash())
3162 uint256 const ledgerHash{m->ledgerhash()};
3163 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3166 JLOG(p_journal_.trace())
3167 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3169 if (m->has_querytype() && !m->has_requestcookie())
3175 m->has_ledgerseq() ? m->ledgerseq() : 0,
3178 m->set_requestcookie(
id());
3180 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3181 JLOG(p_journal_.debug())
3182 <<
"getLedger: Request relayed to peer";
3186 JLOG(p_journal_.trace())
3187 <<
"getLedger: Failed to find peer to relay request";
3191 else if (m->has_ledgerseq())
3194 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3196 JLOG(p_journal_.debug())
3197 <<
"getLedger: Early ledger sequence request";
3201 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3204 JLOG(p_journal_.debug())
3205 <<
"getLedger: Don't have ledger with sequence "
3210 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3212 ledger = app_.getLedgerMaster().getClosedLedger();
3218 auto const ledgerSeq{ledger->info().seq};
3219 if (m->has_ledgerseq())
3221 if (ledgerSeq != m->ledgerseq())
3224 if (!m->has_requestcookie())
3226 Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3229 JLOG(p_journal_.warn())
3230 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3233 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3236 JLOG(p_journal_.debug())
3237 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3242 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3251 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3253 uint256 const txSetHash{m->ledgerhash()};
3255 app_.getInboundTransactions().getSet(txSetHash,
false)};
3258 if (m->has_querytype() && !m->has_requestcookie())
3263 m->set_requestcookie(
id());
3265 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3266 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3270 JLOG(p_journal_.debug())
3271 <<
"getTxSet: Failed to find relay peer";
3276 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3287 if (!m->has_requestcookie())
3289 Resource::feeModerateBurdenPeer,
"received a get ledger request");
3293 SHAMap const* map{
nullptr};
3294 protocol::TMLedgerData ledgerData;
3295 bool fatLeaves{
true};
3296 auto const itype{m->itype()};
3298 if (itype == protocol::liTS_CANDIDATE)
3300 if (sharedMap = getTxSet(m); !sharedMap)
3302 map = sharedMap.
get();
3305 ledgerData.set_ledgerseq(0);
3306 ledgerData.set_ledgerhash(m->ledgerhash());
3307 ledgerData.set_type(protocol::liTS_CANDIDATE);
3308 if (m->has_requestcookie())
3309 ledgerData.set_requestcookie(m->requestcookie());
3316 if (send_queue_.size() >= Tuning::dropSendQueue)
3318 JLOG(p_journal_.debug())
3319 <<
"processLedgerRequest: Large send queue";
3322 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3324 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3328 if (ledger = getLedger(m); !ledger)
3332 auto const ledgerHash{ledger->info().hash};
3333 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3334 ledgerData.set_ledgerseq(ledger->info().seq);
3335 ledgerData.set_type(itype);
3336 if (m->has_requestcookie())
3337 ledgerData.set_requestcookie(m->requestcookie());
3341 case protocol::liBASE:
3342 sendLedgerBase(ledger, ledgerData);
3345 case protocol::liTX_NODE:
3346 map = &ledger->txMap();
3347 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3348 << to_string(map->getHash());
3351 case protocol::liAS_NODE:
3352 map = &ledger->stateMap();
3353 JLOG(p_journal_.trace())
3354 <<
"processLedgerRequest: Account state map hash "
3355 << to_string(map->getHash());
3360 JLOG(p_journal_.error())
3361 <<
"processLedgerRequest: Invalid ledger info type";
3368 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3373 if (m->nodeids_size() > 0)
3375 auto const queryDepth{
3376 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3380 for (
int i = 0; i < m->nodeids_size() &&
3381 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3387 data.reserve(Tuning::softMaxReplyNodes);
3391 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3393 JLOG(p_journal_.trace())
3394 <<
"processLedgerRequest: getNodeFat got "
3395 << data.size() <<
" nodes";
3397 for (
auto const& d : data)
3399 if (ledgerData.nodes_size() >=
3400 Tuning::hardMaxReplyNodes)
3402 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3403 node->set_nodeid(d.first.getRawString());
3404 node->set_nodedata(d.second.data(), d.second.size());
3409 JLOG(p_journal_.warn())
3410 <<
"processLedgerRequest: getNodeFat returns false";
3418 case protocol::liBASE:
3420 info =
"Ledger base";
3423 case protocol::liTX_NODE:
3427 case protocol::liAS_NODE:
3431 case protocol::liTS_CANDIDATE:
3432 info =
"TS candidate";
3440 if (!m->has_ledgerhash())
3441 info +=
", no hash specified";
3443 JLOG(p_journal_.error())
3444 <<
"processLedgerRequest: getNodeFat with nodeId "
3445 << *shaMapNodeId <<
" and ledger info type " << info
3446 <<
" throws exception: " << e.
what();
3450 JLOG(p_journal_.info())
3451 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3452 <<
" nodes at depth " << queryDepth <<
", return "
3453 << ledgerData.nodes_size() <<
" nodes";
3456 if (ledgerData.nodes_size() == 0)
3459 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3463PeerImp::getScore(
bool haveItem)
const
3467 static int const spRandomMax = 9999;
3471 static int const spHaveItem = 10000;
3476 static int const spLatency = 30;
3479 static int const spNoLatency = 8000;
3484 score += spHaveItem;
3493 score -= latency->count() * spLatency;
3495 score -= spNoLatency;
3501PeerImp::isHighLatency()
const
3504 return latency_ >= peerHighLatency;
3510 using namespace std::chrono_literals;
3513 totalBytes_ += bytes;
3514 accumBytes_ += bytes;
3515 auto const timeElapsed = clock_type::now() - intervalStart_;
3516 auto const timeElapsedInSecs =
3517 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3519 if (timeElapsedInSecs >= 1s)
3521 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3522 rollingAvg_.push_back(avgBytes);
3524 auto const totalBytes =
3526 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3528 intervalStart_ = clock_type::now();
3534PeerImp::Metrics::average_bytes()
const
3537 return rollingAvgBytes_;
3541PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::size_t size() const
The number of nodes in the cluster list.
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_DIVERGED_TIME
std::chrono::seconds MAX_UNKNOWN_TIME
bool shouldProcess(uint256 const &key, PeerShortID peer, int &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
std::queue< std::shared_ptr< Message > > send_queue_
std::unique_ptr< LoadEvent > load_event_
boost::beast::http::fields const & headers_
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
clock_type::duration uptime() const
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
protocol::TMStatusChange last_status_
boost::shared_mutex nameMutex_
boost::circular_buffer< uint256 > recentTxSets_
std::unique_ptr< stream_type > stream_ptr_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Compressed compressionEnabled_
uint256 closedLedgerHash_
std::string domain() const
std::optional< std::uint32_t > lastPingSeq_
void onTimer(boost::system::error_code const &ec)
beast::Journal const journal_
struct ripple::PeerImp::@21 metrics_
beast::Journal const p_journal_
PeerImp(PeerImp const &)=delete
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
bool hasTxSet(uint256 const &hash) const override
clock_type::time_point lastPingTime_
void onMessageUnknown(std::uint16_t type)
std::shared_ptr< PeerFinder::Slot > const slot_
boost::circular_buffer< uint256 > recentLedgers_
std::optional< std::chrono::milliseconds > latency_
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
beast::IP::Endpoint const remote_address_
Json::Value json() override
PublicKey const publicKey_
hash_set< uint256 > txQueue_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
bool txReduceRelayEnabled_
clock_type::time_point trackingTime_
ProtocolVersion protocol_
reduce_relay::Squelch< UptimeClock > squelch_
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
uint256 previousLedgerHash_
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
void send(std::shared_ptr< Message > const &m) override
static std::string makePrefix(id_t id)
boost::system::error_code error_code
void onReadMessage(error_code ec, std::size_t bytes_transferred)
bool ledgerReplayEnabled_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
bool crawl() const
Returns true if this connection will publicly share its IP address.
void sendTxQueue() override
Send aggregated transactions' hashes.
bool txReduceRelayEnabled() const override
bool supportsFeature(ProtocolFeature f) const override
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
http_request_type request_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onShutdown(error_code ec)
boost::asio::strand< boost::asio::executor > strand_
void cycleStatus() override
boost::beast::multi_buffer read_buffer_
Resource::Consumer usage_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
void fail(std::string const &reason)
std::atomic< Tracking > tracking_
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
An endpoint that consumes resources.
int balance()
Returns the credit balance representing consumption.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
std::size_t size() const noexcept
void const * data() const noexcept
void const * getDataPtr() const
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(PreclaimContext const &ctx, AccountID const &src)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
static constexpr char FEATURE_COMPR[]
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ ValidatorListPropagation
@ ValidatorList2Propagation
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
static constexpr char FEATURE_LEDGER_REPLAY[]
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
static constexpr char FEATURE_TXRR[]
std::string to_string(base_uint< Bits, Tag > const &a)
std::optional< Rules > const & getCurrentTransactionRules()
Number root(Number f, unsigned d)
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
static constexpr char FEATURE_VPRR[]
constexpr std::uint32_t tfInnerBatchTxn
T shared_from_this(T... args)
beast::IP::Address public_ip
std::optional< std::uint32_t > networkID
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items