20#include <xrpld/app/consensus/RCLValidations.h>
21#include <xrpld/app/ledger/InboundLedgers.h>
22#include <xrpld/app/ledger/InboundTransactions.h>
23#include <xrpld/app/ledger/LedgerMaster.h>
24#include <xrpld/app/ledger/TransactionMaster.h>
25#include <xrpld/app/misc/HashRouter.h>
26#include <xrpld/app/misc/LoadFeeTrack.h>
27#include <xrpld/app/misc/NetworkOPs.h>
28#include <xrpld/app/misc/Transaction.h>
29#include <xrpld/app/misc/ValidatorList.h>
30#include <xrpld/app/tx/apply.h>
31#include <xrpld/overlay/Cluster.h>
32#include <xrpld/overlay/detail/PeerImp.h>
33#include <xrpld/overlay/detail/Tuning.h>
34#include <xrpld/perflog/PerfLog.h>
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/base64.h>
38#include <xrpl/basics/random.h>
39#include <xrpl/basics/safe_cast.h>
40#include <xrpl/protocol/TxFlags.h>
41#include <xrpl/protocol/digest.h>
43#include <boost/algorithm/string/predicate.hpp>
44#include <boost/beast/core/ostream.hpp>
52using namespace std::chrono_literals;
80 , sink_(app_.journal(
"Peer"), makePrefix(id))
81 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
84 , stream_ptr_(
std::move(stream_ptr))
85 , socket_(stream_ptr_->next_layer().socket())
86 , stream_(*stream_ptr_)
87 , strand_(socket_.get_executor())
89 , remote_address_(slot->remote_endpoint())
95 , publicKey_(publicKey)
98 , squelch_(app_.journal(
"Squelch"))
100 , fee_{Resource::feeTrivialPeer,
""}
102 , request_(
std::move(request))
104 , compressionEnabled_(
109 app_.config().COMPRESSION)
115 app_.config().TX_REDUCE_RELAY_ENABLE))
116 , vpReduceRelayEnabled_(app_.config().VP_REDUCE_RELAY_ENABLE)
120 app_.config().LEDGER_REPLAY))
121 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
125 <<
" vp reduce-relay enabled "
127 <<
" tx reduce-relay enabled "
134 bool const inCluster{
cluster()};
157 if (!
strand_.running_in_this_thread())
160 auto parseLedgerHash =
174 if (
auto const iter =
headers_.find(
"Closed-Ledger");
177 closed = parseLedgerHash(iter->value());
180 fail(
"Malformed handshake data (1)");
183 if (
auto const iter =
headers_.find(
"Previous-Ledger");
186 previous = parseLedgerHash(iter->value());
189 fail(
"Malformed handshake data (2)");
192 if (previous && !closed)
193 fail(
"Malformed handshake data (3)");
215 if (!
strand_.running_in_this_thread())
241 if (!
strand_.running_in_this_thread())
259 safe_cast<TrafficCount::category>(m->getCategory()),
281 <<
" sendq: " << sendq_size;
289 boost::asio::async_write(
298 std::placeholders::_1,
299 std::placeholders::_2)));
305 if (!
strand_.running_in_this_thread())
311 protocol::TMHaveTransactions ht;
313 ht.add_hashes(hash.data(), hash.size());
317 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
324 if (!
strand_.running_in_this_thread())
341 if (!
strand_.running_in_this_thread())
346 auto removed =
txQueue_.erase(hash);
358 fail(
"charge: Resources");
367 auto const iter =
headers_.find(
"Crawl");
370 return boost::iequals(iter->value(),
"public");
396 ret[jss::inbound] =
true;
400 ret[jss::cluster] =
true;
410 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
427 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
432 if ((minSeq != 0) || (maxSeq != 0))
433 ret[jss::complete_ledgers] =
439 ret[jss::track] =
"diverged";
443 ret[jss::track] =
"unknown";
452 protocol::TMStatusChange last_status;
459 if (closedLedgerHash != beast::zero)
460 ret[jss::ledger] =
to_string(closedLedgerHash);
462 if (last_status.has_newstatus())
464 switch (last_status.newstatus())
466 case protocol::nsCONNECTING:
467 ret[jss::status] =
"connecting";
470 case protocol::nsCONNECTED:
471 ret[jss::status] =
"connected";
474 case protocol::nsMONITORING:
475 ret[jss::status] =
"monitoring";
478 case protocol::nsVALIDATING:
479 ret[jss::status] =
"validating";
482 case protocol::nsSHUTTING:
483 ret[jss::status] =
"shutting";
488 <<
"Unknown status: " << last_status.newstatus();
493 ret[jss::metrics][jss::total_bytes_recv] =
495 ret[jss::metrics][jss::total_bytes_sent] =
497 ret[jss::metrics][jss::avg_bps_recv] =
499 ret[jss::metrics][jss::avg_bps_sent] =
578 strand_.running_in_this_thread(),
579 "ripple::PeerImp::close : strand in this thread");
601 if (!
strand_.running_in_this_thread())
612 <<
" failed: " << reason;
621 strand_.running_in_this_thread(),
622 "ripple::PeerImp::fail : strand in this thread");
636 strand_.running_in_this_thread(),
637 "ripple::PeerImp::gracefulClose : strand in this thread");
639 socket_.is_open(),
"ripple::PeerImp::gracefulClose : socket is open");
642 "ripple::PeerImp::gracefulClose : socket is not closing");
647 stream_.async_shutdown(bind_executor(
657 timer_.expires_from_now(peerTimerInterval, ec);
664 timer_.async_wait(bind_executor(
694 if (ec == boost::asio::error::operation_aborted)
706 fail(
"Large send queue");
712 clock_type::duration duration;
733 fail(
"Ping Timeout");
740 protocol::TMPing message;
741 message.set_type(protocol::TMPing::ptPING);
744 send(std::make_shared<Message>(message, protocol::mtPING));
756 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
759 if (ec != boost::asio::error::eof)
760 return fail(
"onShutdown", ec);
770 "ripple::PeerImp::doAccept : empty read buffer");
779 return fail(
"makeSharedValue: Unexpected failure");
800 auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
813 boost::asio::async_write(
815 write_buffer->data(),
816 boost::asio::transfer_all(),
823 if (ec == boost::asio::error::operation_aborted)
826 return fail(
"onWriteResponse", ec);
827 if (write_buffer->size() == bytes_transferred)
829 return fail(
"Failed to write header");
893 if (ec == boost::asio::error::operation_aborted)
895 if (ec == boost::asio::error::eof)
901 return fail(
"onReadMessage", ec);
904 if (bytes_transferred > 0)
905 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
907 stream <<
"onReadMessage";
910 metrics_.recv.add_message(bytes_transferred);
920 using namespace std::chrono_literals;
925 "invokeProtocolMessage",
930 return fail(
"onReadMessage", ec);
935 if (bytes_consumed == 0)
948 std::placeholders::_1,
949 std::placeholders::_2)));
957 if (ec == boost::asio::error::operation_aborted)
960 return fail(
"onWriteMessage", ec);
963 if (bytes_transferred > 0)
964 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
966 stream <<
"onWriteMessage";
969 metrics_.sent.add_message(bytes_transferred);
973 "ripple::PeerImp::onWriteMessage : non-empty send buffer");
978 return boost::asio::async_write(
987 std::placeholders::_1,
988 std::placeholders::_2)));
993 return stream_.async_shutdown(bind_executor(
998 std::placeholders::_1)));
1027 *m,
static_cast<protocol::MessageType
>(type),
true);
1037 if ((type == MessageType::mtTRANSACTION ||
1038 type == MessageType::mtHAVE_TRANSACTIONS ||
1039 type == MessageType::mtTRANSACTIONS ||
1051 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1053 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1054 << uncompressed_size <<
" " << isCompressed;
1069 auto const s = m->list_size();
1089 if (m->type() == protocol::TMPing::ptPING)
1093 m->set_type(protocol::TMPing::ptPONG);
1094 send(std::make_shared<Message>(*m, protocol::mtPING));
1098 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1108 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1133 for (
int i = 0; i < m->clusternodes().size(); ++i)
1135 protocol::TMClusterNode
const& node = m->clusternodes(i);
1138 if (node.has_nodename())
1139 name = node.nodename();
1141 auto const publicKey =
1148 auto const reportTime =
1152 *publicKey,
name, node.nodeload(), reportTime);
1156 int loadSources = m->loadsources().size();
1157 if (loadSources != 0)
1160 gossip.
items.reserve(loadSources);
1161 for (
int i = 0; i < m->loadsources().size(); ++i)
1163 protocol::TMLoadSource
const& node = m->loadsources(i);
1168 gossip.
items.push_back(item);
1181 if (status.getReportTime() >= thresh)
1182 fees.push_back(status.getLoadFee());
1187 auto const index = fees.size() / 2;
1189 clusterFee = fees[index];
1205 if (m->endpoints_v2().size() >= 1024)
1212 endpoints.
reserve(m->endpoints_v2().size());
1215 for (
auto const& tm : m->endpoints_v2())
1222 << tm.endpoint() <<
"}";
1248 if (!endpoints.
empty())
1265 eraseTxQueue !=
batch,
1266 (
"ripple::PeerImp::handleTransaction : valid inputs"));
1275 <<
"Need network ledger";
1283 auto stx = std::make_shared<STTx const>(sit);
1284 uint256 txID = stx->getTransactionID();
1291 JLOG(
p_journal_.
warn()) <<
"Ignoring Network relayed Tx containing "
1292 "tfInnerBatchTxn (handleTransaction).";
1324 bool checkSignature =
true;
1327 if (!m->has_deferred() || !m->deferred())
1331 flags |= SF_TRUSTED;
1340 checkSignature =
false;
1347 <<
"No new transactions until synchronized";
1360 "recvTransaction->checkTransaction",
1366 if (
auto peer = weak.lock())
1367 peer->checkTransaction(
1375 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1376 <<
". Exception: " << ex.
what();
1387 auto const itype{m->itype()};
1390 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1391 return badData(
"Invalid ledger info type");
1396 return std::nullopt;
1399 if (itype == protocol::liTS_CANDIDATE)
1401 if (!m->has_ledgerhash())
1402 return badData(
"Invalid TX candidate set, missing TX set hash");
1405 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1406 !(ltype && *ltype == protocol::ltCLOSED))
1408 return badData(
"Invalid request");
1412 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1413 return badData(
"Invalid ledger type");
1417 return badData(
"Invalid ledger hash");
1420 if (m->has_ledgerseq())
1422 auto const ledgerSeq{m->ledgerseq()};
1425 using namespace std::chrono_literals;
1435 if (itype != protocol::liBASE)
1437 if (m->nodeids_size() <= 0)
1438 return badData(
"Invalid ledger node IDs");
1440 for (
auto const& nodeId : m->nodeids())
1443 return badData(
"Invalid SHAMap node ID");
1448 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1449 return badData(
"Invalid query type");
1452 if (m->has_querydepth())
1455 itype == protocol::liBASE)
1457 return badData(
"Invalid query depth");
1464 if (
auto peer = weak.
lock())
1465 peer->processLedgerRequest(m);
1485 if (
auto peer = weak.
lock())
1488 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1489 if (reply.has_error())
1491 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1493 Resource::feeMalformedRequest,
1494 "proof_path_request");
1497 Resource::feeRequestNoReply,
"proof_path_request");
1501 peer->send(std::make_shared<Message>(
1502 reply, protocol::mtPROOF_PATH_RESPONSE));
1511 if (!ledgerReplayEnabled_)
1514 Resource::feeMalformedRequest,
"proof_path_response disabled");
1518 if (!ledgerReplayMsgHandler_.processProofPathResponse(m))
1520 fee_.update(Resource::feeInvalidData,
"proof_path_response");
1527 JLOG(p_journal_.trace()) <<
"onMessage, TMReplayDeltaRequest";
1528 if (!ledgerReplayEnabled_)
1531 Resource::feeMalformedRequest,
"replay_delta_request disabled");
1535 fee_.fee = Resource::feeModerateBurdenPeer;
1537 app_.getJobQueue().addJob(
1539 if (
auto peer = weak.
lock())
1542 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1543 if (reply.has_error())
1545 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1547 Resource::feeMalformedRequest,
1548 "replay_delta_request");
1551 Resource::feeRequestNoReply,
1552 "replay_delta_request");
1556 peer->send(std::make_shared<Message>(
1557 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1566 if (!ledgerReplayEnabled_)
1569 Resource::feeMalformedRequest,
"replay_delta_response disabled");
1573 if (!ledgerReplayMsgHandler_.processReplayDeltaResponse(m))
1575 fee_.update(Resource::feeInvalidData,
"replay_delta_response");
1583 fee_.update(Resource::feeInvalidData, msg);
1584 JLOG(p_journal_.warn()) <<
"TMLedgerData: " << msg;
1589 return badData(
"Invalid ledger hash");
1593 auto const ledgerSeq{m->ledgerseq()};
1594 if (m->type() == protocol::liTS_CANDIDATE)
1605 using namespace std::chrono_literals;
1606 if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1607 ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1616 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1617 return badData(
"Invalid ledger info type");
1620 if (m->has_error() &&
1621 (m->error() < protocol::reNO_LEDGER ||
1622 m->error() > protocol::reBAD_REQUEST))
1624 return badData(
"Invalid reply error");
1628 if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1631 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1635 if (m->has_requestcookie())
1637 if (
auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1639 m->clear_requestcookie();
1640 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1644 JLOG(p_journal_.info()) <<
"Unable to route TX/ledger data reply";
1649 uint256 const ledgerHash{m->ledgerhash()};
1652 if (m->type() == protocol::liTS_CANDIDATE)
1655 app_.getJobQueue().addJob(
1656 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1657 if (
auto peer = weak.lock())
1659 peer->app_.getInboundTransactions().gotData(
1660 ledgerHash, peer, m);
1667 app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
1673 protocol::TMProposeSet&
set = *m;
1679 if ((std::clamp<std::size_t>(
sig.size(), 64, 72) !=
sig.size()) ||
1682 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1684 Resource::feeInvalidSignature,
1685 " signature can't be longer than 72 bytes");
1692 JLOG(p_journal_.warn()) <<
"Proposal: malformed";
1693 fee_.update(Resource::feeMalformedRequest,
"bad hashes");
1701 auto const isTrusted = app_.validators().trusted(publicKey);
1709 overlay_.reportInboundTraffic(
1710 TrafficCount::category::proposal_untrusted,
1711 Message::messageSize(*m));
1713 if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1717 uint256 const proposeHash{
set.currenttxhash()};
1718 uint256 const prevLedger{
set.previousledger()};
1730 if (
auto [added, relayed] =
1731 app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
1736 if (reduceRelayReady() && relayed &&
1737 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
1738 overlay_.updateSlotAndSquelch(
1739 suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1742 overlay_.reportInboundTraffic(
1743 TrafficCount::category::proposal_duplicate,
1744 Message::messageSize(*m));
1746 JLOG(p_journal_.trace()) <<
"Proposal: duplicate";
1753 if (tracking_.load() == Tracking::diverged)
1755 JLOG(p_journal_.debug())
1756 <<
"Proposal: Dropping untrusted (peer divergence)";
1760 if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1762 JLOG(p_journal_.debug()) <<
"Proposal: Dropping untrusted (load)";
1767 JLOG(p_journal_.trace())
1768 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
1779 app_.timeKeeper().closeTime(),
1780 calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
1783 app_.getJobQueue().addJob(
1785 "recvPropose->checkPropose",
1787 if (
auto peer = weak.lock())
1788 peer->checkPropose(isTrusted, m,
proposal);
1795 JLOG(p_journal_.trace()) <<
"Status: Change";
1797 if (!m->has_networktime())
1798 m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
1802 if (!last_status_.has_newstatus() || m->has_newstatus())
1807 protocol::NodeStatus status = last_status_.newstatus();
1809 m->set_newstatus(status);
1813 if (m->newevent() == protocol::neLOST_SYNC)
1815 bool outOfSync{
false};
1820 if (!closedLedgerHash_.isZero())
1823 closedLedgerHash_.zero();
1825 previousLedgerHash_.zero();
1829 JLOG(p_journal_.debug()) <<
"Status: Out of sync";
1836 bool const peerChangedLedgers{
1843 if (peerChangedLedgers)
1845 closedLedgerHash_ = m->ledgerhash();
1846 closedLedgerHash = closedLedgerHash_;
1847 addLedger(closedLedgerHash, sl);
1851 closedLedgerHash_.zero();
1854 if (m->has_ledgerhashprevious() &&
1857 previousLedgerHash_ = m->ledgerhashprevious();
1858 addLedger(previousLedgerHash_, sl);
1862 previousLedgerHash_.zero();
1865 if (peerChangedLedgers)
1867 JLOG(p_journal_.debug()) <<
"LCL is " << closedLedgerHash;
1871 JLOG(p_journal_.debug()) <<
"Status: No ledger";
1875 if (m->has_firstseq() && m->has_lastseq())
1879 minLedger_ = m->firstseq();
1880 maxLedger_ = m->lastseq();
1882 if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
1883 minLedger_ = maxLedger_ = 0;
1886 if (m->has_ledgerseq() &&
1887 app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
1890 m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
1893 app_.getOPs().pubPeerStatus([=,
this]() ->
Json::Value {
1896 if (m->has_newstatus())
1898 switch (m->newstatus())
1900 case protocol::nsCONNECTING:
1901 j[jss::status] =
"CONNECTING";
1903 case protocol::nsCONNECTED:
1904 j[jss::status] =
"CONNECTED";
1906 case protocol::nsMONITORING:
1907 j[jss::status] =
"MONITORING";
1909 case protocol::nsVALIDATING:
1910 j[jss::status] =
"VALIDATING";
1912 case protocol::nsSHUTTING:
1913 j[jss::status] =
"SHUTTING";
1918 if (m->has_newevent())
1920 switch (m->newevent())
1922 case protocol::neCLOSING_LEDGER:
1923 j[jss::action] =
"CLOSING_LEDGER";
1925 case protocol::neACCEPTED_LEDGER:
1926 j[jss::action] =
"ACCEPTED_LEDGER";
1928 case protocol::neSWITCHED_LEDGER:
1929 j[jss::action] =
"SWITCHED_LEDGER";
1931 case protocol::neLOST_SYNC:
1932 j[jss::action] =
"LOST_SYNC";
1937 if (m->has_ledgerseq())
1939 j[jss::ledger_index] = m->ledgerseq();
1942 if (m->has_ledgerhash())
1944 uint256 closedLedgerHash{};
1946 std::lock_guard sl(recentLock_);
1947 closedLedgerHash = closedLedgerHash_;
1949 j[jss::ledger_hash] = to_string(closedLedgerHash);
1952 if (m->has_networktime())
1954 j[jss::date] = Json::UInt(m->networktime());
1957 if (m->has_firstseq() && m->has_lastseq())
1959 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
1960 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
1976 serverSeq = maxLedger_;
1982 checkTracking(serverSeq, validationSeq);
1991 if (diff < Tuning::convergedLedgerLimit)
1994 tracking_ = Tracking::converged;
1997 if ((diff > Tuning::divergedLedgerLimit) &&
1998 (tracking_.load() != Tracking::diverged))
2003 tracking_ = Tracking::diverged;
2004 trackingTime_ = clock_type::now();
2013 fee_.update(Resource::feeMalformedRequest,
"bad hash");
2017 uint256 const hash{m->hash()};
2019 if (m->status() == protocol::tsHAVE)
2023 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2024 recentTxSets_.end())
2026 fee_.update(Resource::feeUselessData,
"duplicate (tsHAVE)");
2030 recentTxSets_.push_back(hash);
2035PeerImp::onValidatorListMessage(
2045 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
2046 <<
" from peer " << remote_address_;
2048 fee_.update(Resource::feeHeavyBurdenPeer,
"no blobs");
2054 JLOG(p_journal_.debug())
2055 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
2056 <<
" (" << id_ <<
")";
2058 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2060 JLOG(p_journal_.debug())
2061 << messageType <<
": received duplicate " << messageType;
2065 fee_.update(Resource::feeUselessData,
"duplicate");
2069 auto const applyResult = app_.validators().applyListsAndBroadcast(
2073 remote_address_.to_string(),
2076 app_.getHashRouter(),
2079 JLOG(p_journal_.debug())
2080 <<
"Processed " << messageType <<
" version " << version <<
" from "
2081 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2082 :
"unknown or invalid publisher")
2083 <<
" from " << remote_address_.to_string() <<
" (" << id_
2084 <<
") with best result " << to_string(applyResult.bestDisposition());
2087 switch (applyResult.bestDisposition())
2090 case ListDisposition::accepted:
2092 case ListDisposition::expired:
2094 case ListDisposition::pending: {
2098 applyResult.publisherKey,
2099 "ripple::PeerImp::onValidatorListMessage : publisher key is "
2101 auto const& pubKey = *applyResult.publisherKey;
2103 if (
auto const iter = publisherListSequences_.find(pubKey);
2104 iter != publisherListSequences_.end())
2107 iter->second < applyResult.sequence,
2108 "ripple::PeerImp::onValidatorListMessage : lower sequence");
2111 publisherListSequences_[pubKey] = applyResult.sequence;
2114 case ListDisposition::same_sequence:
2115 case ListDisposition::known_sequence:
2120 applyResult.sequence && applyResult.publisherKey,
2121 "ripple::PeerImp::onValidatorListMessage : nonzero sequence "
2122 "and set publisher key");
2124 publisherListSequences_[*applyResult.publisherKey] <=
2125 applyResult.sequence,
2126 "ripple::PeerImp::onValidatorListMessage : maximum sequence");
2131 case ListDisposition::stale:
2132 case ListDisposition::untrusted:
2133 case ListDisposition::invalid:
2134 case ListDisposition::unsupported_version:
2138 "ripple::PeerImp::onValidatorListMessage : invalid best list "
2143 switch (applyResult.worstDisposition())
2145 case ListDisposition::accepted:
2146 case ListDisposition::expired:
2147 case ListDisposition::pending:
2150 case ListDisposition::same_sequence:
2151 case ListDisposition::known_sequence:
2156 Resource::feeUselessData,
2157 " duplicate (same_sequence or known_sequence)");
2159 case ListDisposition::stale:
2162 fee_.update(Resource::feeInvalidData,
"expired");
2164 case ListDisposition::untrusted:
2168 fee_.update(Resource::feeUselessData,
"untrusted");
2170 case ListDisposition::invalid:
2173 Resource::feeInvalidSignature,
"invalid list disposition");
2175 case ListDisposition::unsupported_version:
2178 fee_.update(Resource::feeInvalidData,
"version");
2182 "ripple::PeerImp::onValidatorListMessage : invalid worst list "
2187 for (
auto const& [disp, count] : applyResult.dispositions)
2192 case ListDisposition::accepted:
2193 JLOG(p_journal_.debug())
2194 <<
"Applied " << count <<
" new " << messageType
2195 <<
"(s) from peer " << remote_address_;
2198 case ListDisposition::expired:
2199 JLOG(p_journal_.debug())
2200 <<
"Applied " << count <<
" expired " << messageType
2201 <<
"(s) from peer " << remote_address_;
2204 case ListDisposition::pending:
2205 JLOG(p_journal_.debug())
2206 <<
"Processed " << count <<
" future " << messageType
2207 <<
"(s) from peer " << remote_address_;
2209 case ListDisposition::same_sequence:
2210 JLOG(p_journal_.warn())
2211 <<
"Ignored " << count <<
" " << messageType
2212 <<
"(s) with current sequence from peer "
2215 case ListDisposition::known_sequence:
2216 JLOG(p_journal_.warn())
2217 <<
"Ignored " << count <<
" " << messageType
2218 <<
"(s) with future sequence from peer " << remote_address_;
2220 case ListDisposition::stale:
2221 JLOG(p_journal_.warn())
2222 <<
"Ignored " << count <<
"stale " << messageType
2223 <<
"(s) from peer " << remote_address_;
2225 case ListDisposition::untrusted:
2226 JLOG(p_journal_.warn())
2227 <<
"Ignored " << count <<
" untrusted " << messageType
2228 <<
"(s) from peer " << remote_address_;
2230 case ListDisposition::unsupported_version:
2231 JLOG(p_journal_.warn())
2232 <<
"Ignored " << count <<
"unsupported version "
2233 << messageType <<
"(s) from peer " << remote_address_;
2235 case ListDisposition::invalid:
2236 JLOG(p_journal_.warn())
2237 <<
"Ignored " << count <<
"invalid " << messageType
2238 <<
"(s) from peer " << remote_address_;
2242 "ripple::PeerImp::onValidatorListMessage : invalid list "
2253 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2255 JLOG(p_journal_.debug())
2256 <<
"ValidatorList: received validator list from peer using "
2257 <<
"protocol version " << to_string(protocol_)
2258 <<
" which shouldn't support this feature.";
2259 fee_.update(Resource::feeUselessData,
"unsupported peer");
2262 onValidatorListMessage(
2266 ValidatorList::parseBlobs(*m));
2270 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2271 <<
" from peer " << remote_address_;
2272 using namespace std::string_literals;
2273 fee_.update(Resource::feeInvalidData, e.
what());
2283 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2285 JLOG(p_journal_.debug())
2286 <<
"ValidatorListCollection: received validator list from peer "
2287 <<
"using protocol version " << to_string(protocol_)
2288 <<
" which shouldn't support this feature.";
2289 fee_.update(Resource::feeUselessData,
"unsupported peer");
2292 else if (m->version() < 2)
2294 JLOG(p_journal_.debug())
2295 <<
"ValidatorListCollection: received invalid validator list "
2297 << m->version() <<
" from peer using protocol version "
2298 << to_string(protocol_);
2299 fee_.update(Resource::feeInvalidData,
"wrong version");
2302 onValidatorListMessage(
2303 "ValidatorListCollection",
2306 ValidatorList::parseBlobs(*m));
2310 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2311 << e.
what() <<
" from peer " << remote_address_;
2312 using namespace std::string_literals;
2313 fee_.update(Resource::feeInvalidData, e.
what());
2320 if (m->validation().size() < 50)
2322 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2323 fee_.update(Resource::feeMalformedRequest,
"too small");
2329 auto const closeTime = app_.timeKeeper().closeTime();
2334 val = std::make_shared<STValidation>(
2338 app_.validatorManifests().getMasterKey(pk));
2341 val->setSeen(closeTime);
2345 app_.getValidations().parms(),
2346 app_.timeKeeper().closeTime(),
2348 val->getSeenTime()))
2350 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2351 fee_.update(Resource::feeUselessData,
"not current");
2358 auto const isTrusted =
2359 app_.validators().trusted(val->getSignerPublic());
2367 overlay_.reportInboundTraffic(
2368 TrafficCount::category::validation_untrusted,
2369 Message::messageSize(*m));
2371 if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2377 auto [added, relayed] =
2378 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2386 if (reduceRelayReady() && relayed &&
2387 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2388 overlay_.updateSlotAndSquelch(
2389 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2392 overlay_.reportInboundTraffic(
2393 TrafficCount::category::validation_duplicate,
2394 Message::messageSize(*m));
2396 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2400 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2402 JLOG(p_journal_.debug())
2403 <<
"Dropping untrusted validation from diverged peer";
2405 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2409 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2414 to_string(val->getNodeID());
2421 app_.getJobQueue().addJob(
2424 [weak, val, m, key]() {
2425 if (
auto peer = weak.
lock())
2426 peer->checkValidation(val, key, m);
2431 JLOG(p_journal_.debug())
2432 <<
"Dropping untrusted validation for load";
2437 JLOG(p_journal_.warn())
2438 <<
"Exception processing validation: " << e.
what();
2439 using namespace std::string_literals;
2440 fee_.update(Resource::feeMalformedRequest, e.
what());
2447 protocol::TMGetObjectByHash& packet = *m;
2449 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2450 <<
" " << packet.objects_size();
2455 if (send_queue_.size() >= Tuning::dropSendQueue)
2457 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2461 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2467 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2469 if (!txReduceRelayEnabled())
2471 JLOG(p_journal_.error())
2472 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2473 fee_.update(Resource::feeMalformedRequest,
"disabled");
2478 app_.getJobQueue().addJob(
2480 if (
auto peer = weak.
lock())
2481 peer->doTransactions(m);
2486 protocol::TMGetObjectByHash reply;
2488 reply.set_query(
false);
2490 if (packet.has_seq())
2491 reply.set_seq(packet.seq());
2493 reply.set_type(packet.type());
2495 if (packet.has_ledgerhash())
2499 fee_.update(Resource::feeMalformedRequest,
"ledger hash");
2503 reply.set_ledgerhash(packet.ledgerhash());
2507 Resource::feeModerateBurdenPeer,
2508 " received a get object by hash request");
2511 for (
int i = 0; i < packet.objects_size(); ++i)
2513 auto const& obj = packet.objects(i);
2516 uint256 const hash{obj.hash()};
2520 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash,
seq)};
2523 protocol::TMIndexedObject& newObj = *reply.add_objects();
2524 newObj.set_hash(hash.begin(), hash.size());
2526 &nodeObject->getData().front(),
2527 nodeObject->getData().size());
2529 if (obj.has_nodeid())
2530 newObj.set_index(obj.nodeid());
2531 if (obj.has_ledgerseq())
2532 newObj.set_ledgerseq(obj.ledgerseq());
2539 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2540 << packet.objects_size();
2541 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2548 bool progress =
false;
2550 for (
int i = 0; i < packet.objects_size(); ++i)
2552 protocol::TMIndexedObject
const& obj = packet.objects(i);
2556 if (obj.has_ledgerseq())
2558 if (obj.ledgerseq() != pLSeq)
2560 if (pLDo && (pLSeq != 0))
2562 JLOG(p_journal_.debug())
2563 <<
"GetObj: Full fetch pack for " << pLSeq;
2565 pLSeq = obj.ledgerseq();
2566 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2570 JLOG(p_journal_.debug())
2571 <<
"GetObj: Late fetch pack for " << pLSeq;
2580 uint256 const hash{obj.hash()};
2582 app_.getLedgerMaster().addFetchPack(
2584 std::make_shared<Blob>(
2585 obj.data().begin(), obj.data().end()));
2590 if (pLDo && (pLSeq != 0))
2592 JLOG(p_journal_.debug())
2593 <<
"GetObj: Partial fetch pack for " << pLSeq;
2595 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2596 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2603 if (!txReduceRelayEnabled())
2605 JLOG(p_journal_.error())
2606 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2607 fee_.update(Resource::feeMalformedRequest,
"disabled");
2612 app_.getJobQueue().addJob(
2614 if (
auto peer = weak.
lock())
2615 peer->handleHaveTransactions(m);
2620PeerImp::handleHaveTransactions(
2623 protocol::TMGetObjectByHash tmBH;
2624 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2625 tmBH.set_query(
true);
2627 JLOG(p_journal_.trace())
2628 <<
"received TMHaveTransactions " << m->hashes_size();
2634 JLOG(p_journal_.error())
2635 <<
"TMHaveTransactions with invalid hash size";
2636 fee_.update(Resource::feeMalformedRequest,
"hash size");
2642 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2644 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2648 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2650 auto obj = tmBH.add_objects();
2651 obj->set_hash(hash.
data(), hash.
size());
2658 removeTxQueue(hash);
2662 JLOG(p_journal_.trace())
2663 <<
"transaction request object is " << tmBH.objects_size();
2665 if (tmBH.objects_size() > 0)
2666 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2672 if (!txReduceRelayEnabled())
2674 JLOG(p_journal_.error())
2675 <<
"TMTransactions: tx reduce-relay is disabled";
2676 fee_.update(Resource::feeMalformedRequest,
"disabled");
2680 JLOG(p_journal_.trace())
2681 <<
"received TMTransactions " << m->transactions_size();
2683 overlay_.addTxMetrics(m->transactions_size());
2688 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2696 using on_message_fn =
2698 if (!strand_.running_in_this_thread())
2702 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2704 if (!m->has_validatorpubkey())
2706 fee_.update(Resource::feeInvalidData,
"squelch no pubkey");
2713 fee_.update(Resource::feeInvalidData,
"squelch bad pubkey");
2719 if (key == app_.getValidationPublicKey())
2721 JLOG(p_journal_.debug())
2722 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2727 m->has_squelchduration() ? m->squelchduration() : 0;
2729 squelch_.removeSquelch(key);
2731 fee_.update(Resource::feeInvalidData,
"squelch duration");
2733 JLOG(p_journal_.debug())
2734 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2746 (void)lockedRecentLock;
2748 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2749 recentLedgers_.end())
2752 recentLedgers_.push_back(hash);
2761 if (app_.getFeeTrack().isLoadedLocal() ||
2762 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2763 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2765 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2771 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2772 fee_.update(Resource::feeMalformedRequest,
"hash size");
2776 fee_.fee = Resource::feeHeavyBurdenPeer;
2778 uint256 const hash{packet->ledgerhash()};
2781 auto elapsed = UptimeClock::now();
2782 auto const pap = &app_;
2783 app_.getJobQueue().addJob(
2784 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2785 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2790PeerImp::doTransactions(
2793 protocol::TMTransactions reply;
2795 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
2796 << packet->objects_size();
2798 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
2800 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
2801 fee_.update(Resource::feeMalformedRequest,
"too big");
2807 auto const& obj = packet->objects(i);
2811 fee_.update(Resource::feeMalformedRequest,
"hash size");
2817 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2821 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
2823 fee_.update(Resource::feeMalformedRequest,
"tx not found");
2828 auto tx = reply.add_transactions();
2829 auto sttx = txn->getSTransaction();
2831 tx->set_rawtransaction(s.
data(), s.
size());
2833 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
2835 tx->set_receivetimestamp(
2836 app_.timeKeeper().now().time_since_epoch().count());
2837 tx->set_deferred(txn->getSubmitResult().queued);
2840 if (reply.transactions_size() > 0)
2841 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
2845PeerImp::checkTransaction(
2847 bool checkSignature,
2859 JLOG(p_journal_.warn()) <<
"Ignoring Network relayed Tx containing "
2860 "tfInnerBatchTxn (checkSignature).";
2861 charge(Resource::feeModerateBurdenPeer,
"inner batch txn");
2867 if (stx->isFieldPresent(sfLastLedgerSequence) &&
2868 (stx->getFieldU32(sfLastLedgerSequence) <
2869 app_.getLedgerMaster().getValidLedgerIndex()))
2871 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2872 charge(Resource::feeUselessData,
"expired tx");
2881 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2883 tx->getStatus() ==
NEW,
2884 "ripple::PeerImp::checkTransaction Transaction created "
2886 if (tx->getStatus() ==
NEW)
2888 JLOG(p_journal_.debug())
2889 <<
"Processing " << (
batch ?
"batch" :
"unsolicited")
2890 <<
" pseudo-transaction tx " << tx->getID();
2892 app_.getMasterTransaction().canonicalize(&tx);
2895 app_.getHashRouter().shouldRelay(tx->getID());
2898 JLOG(p_journal_.debug())
2899 <<
"Passing skipped pseudo pseudo-transaction tx "
2901 app_.overlay().relay(tx->getID(), {}, *toSkip);
2905 JLOG(p_journal_.debug())
2906 <<
"Charging for pseudo-transaction tx " << tx->getID();
2907 charge(Resource::feeUselessData,
"pseudo tx");
2918 app_.getHashRouter(),
2920 app_.getLedgerMaster().getValidatedRules(),
2922 valid != Validity::Valid)
2924 if (!validReason.empty())
2926 JLOG(p_journal_.trace())
2927 <<
"Exception checking transaction: " << validReason;
2931 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2933 Resource::feeInvalidSignature,
2934 "check transaction signature failure");
2941 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
2945 auto tx = std::make_shared<Transaction>(stx, reason, app_);
2947 if (tx->getStatus() ==
INVALID)
2949 if (!reason.
empty())
2951 JLOG(p_journal_.trace())
2952 <<
"Exception checking transaction: " << reason;
2954 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2955 charge(Resource::feeInvalidSignature,
"tx (impossible)");
2959 bool const trusted(
flags & SF_TRUSTED);
2960 app_.getOPs().processTransaction(
2961 tx, trusted,
false, NetworkOPs::FailHard::no);
2965 JLOG(p_journal_.warn())
2966 <<
"Exception in " << __func__ <<
": " << ex.
what();
2967 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
2968 using namespace std::string_literals;
2969 charge(Resource::feeInvalidData,
"tx "s + ex.
what());
2975PeerImp::checkPropose(
2980 JLOG(p_journal_.trace())
2981 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
2983 XRPL_ASSERT(packet,
"ripple::PeerImp::checkPropose : non-null packet");
2988 JLOG(p_journal_.warn()) << desc;
2989 charge(Resource::feeInvalidSignature, desc);
2996 relay = app_.getOPs().processTrustedProposal(peerPos);
2998 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3006 auto haveMessage = app_.overlay().relay(
3008 if (reduceRelayReady() && !haveMessage.empty())
3009 overlay_.updateSlotAndSquelch(
3012 std::move(haveMessage),
3013 protocol::mtPROPOSE_LEDGER);
3018PeerImp::checkValidation(
3023 if (!val->isValid())
3025 std::string desc{
"Validation forwarded by peer is invalid"};
3026 JLOG(p_journal_.debug()) << desc;
3027 charge(Resource::feeInvalidSignature, desc);
3042 overlay_.relay(*packet, key, val->getSignerPublic());
3043 if (reduceRelayReady() && !haveMessage.empty())
3045 overlay_.updateSlotAndSquelch(
3047 val->getSignerPublic(),
3048 std::move(haveMessage),
3049 protocol::mtVALIDATION);
3055 JLOG(p_journal_.trace())
3056 <<
"Exception processing validation: " << ex.
what();
3057 using namespace std::string_literals;
3058 charge(Resource::feeMalformedRequest,
"validation "s + ex.
what());
3072 if (p->hasTxSet(rootHash) && p.get() != skip)
3074 auto score = p->getScore(true);
3075 if (!ret || (score > retScore))
3100 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3102 auto score = p->getScore(true);
3103 if (!ret || (score > retScore))
3115PeerImp::sendLedgerBase(
3117 protocol::TMLedgerData& ledgerData)
3119 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3122 addRaw(ledger->info(), s);
3125 auto const& stateMap{ledger->stateMap()};
3126 if (stateMap.getHash() != beast::zero)
3131 stateMap.serializeRoot(
root);
3132 ledgerData.add_nodes()->set_nodedata(
3133 root.getDataPtr(),
root.getLength());
3135 if (ledger->info().txHash != beast::zero)
3137 auto const& txMap{ledger->txMap()};
3138 if (txMap.getHash() != beast::zero)
3142 txMap.serializeRoot(
root);
3143 ledgerData.add_nodes()->set_nodedata(
3144 root.getDataPtr(),
root.getLength());
3150 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3157 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3161 if (m->has_ledgerhash())
3164 uint256 const ledgerHash{m->ledgerhash()};
3165 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3168 JLOG(p_journal_.trace())
3169 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3171 if (m->has_querytype() && !m->has_requestcookie())
3177 m->has_ledgerseq() ? m->ledgerseq() : 0,
3180 m->set_requestcookie(
id());
3182 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3183 JLOG(p_journal_.debug())
3184 <<
"getLedger: Request relayed to peer";
3188 JLOG(p_journal_.trace())
3189 <<
"getLedger: Failed to find peer to relay request";
3193 else if (m->has_ledgerseq())
3196 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3198 JLOG(p_journal_.debug())
3199 <<
"getLedger: Early ledger sequence request";
3203 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3206 JLOG(p_journal_.debug())
3207 <<
"getLedger: Don't have ledger with sequence "
3212 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3214 ledger = app_.getLedgerMaster().getClosedLedger();
3220 auto const ledgerSeq{ledger->info().seq};
3221 if (m->has_ledgerseq())
3223 if (ledgerSeq != m->ledgerseq())
3226 if (!m->has_requestcookie())
3228 Resource::feeMalformedRequest,
"get_ledger ledgerSeq");
3231 JLOG(p_journal_.warn())
3232 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3235 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3238 JLOG(p_journal_.debug())
3239 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3244 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3253 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3255 uint256 const txSetHash{m->ledgerhash()};
3257 app_.getInboundTransactions().getSet(txSetHash,
false)};
3260 if (m->has_querytype() && !m->has_requestcookie())
3265 m->set_requestcookie(
id());
3267 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3268 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3272 JLOG(p_journal_.debug())
3273 <<
"getTxSet: Failed to find relay peer";
3278 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3289 if (!m->has_requestcookie())
3291 Resource::feeModerateBurdenPeer,
"received a get ledger request");
3295 SHAMap const* map{
nullptr};
3296 protocol::TMLedgerData ledgerData;
3297 bool fatLeaves{
true};
3298 auto const itype{m->itype()};
3300 if (itype == protocol::liTS_CANDIDATE)
3302 if (sharedMap = getTxSet(m); !sharedMap)
3304 map = sharedMap.
get();
3307 ledgerData.set_ledgerseq(0);
3308 ledgerData.set_ledgerhash(m->ledgerhash());
3309 ledgerData.set_type(protocol::liTS_CANDIDATE);
3310 if (m->has_requestcookie())
3311 ledgerData.set_requestcookie(m->requestcookie());
3318 if (send_queue_.size() >= Tuning::dropSendQueue)
3320 JLOG(p_journal_.debug())
3321 <<
"processLedgerRequest: Large send queue";
3324 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3326 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3330 if (ledger = getLedger(m); !ledger)
3334 auto const ledgerHash{ledger->info().hash};
3335 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3336 ledgerData.set_ledgerseq(ledger->info().seq);
3337 ledgerData.set_type(itype);
3338 if (m->has_requestcookie())
3339 ledgerData.set_requestcookie(m->requestcookie());
3343 case protocol::liBASE:
3344 sendLedgerBase(ledger, ledgerData);
3347 case protocol::liTX_NODE:
3348 map = &ledger->txMap();
3349 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3350 << to_string(map->getHash());
3353 case protocol::liAS_NODE:
3354 map = &ledger->stateMap();
3355 JLOG(p_journal_.trace())
3356 <<
"processLedgerRequest: Account state map hash "
3357 << to_string(map->getHash());
3362 JLOG(p_journal_.error())
3363 <<
"processLedgerRequest: Invalid ledger info type";
3370 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3375 if (m->nodeids_size() > 0)
3377 auto const queryDepth{
3378 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3382 for (
int i = 0; i < m->nodeids_size() &&
3383 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3389 data.reserve(Tuning::softMaxReplyNodes);
3393 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3395 JLOG(p_journal_.trace())
3396 <<
"processLedgerRequest: getNodeFat got "
3397 << data.size() <<
" nodes";
3399 for (
auto const& d : data)
3401 if (ledgerData.nodes_size() >=
3402 Tuning::hardMaxReplyNodes)
3404 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3405 node->set_nodeid(d.first.getRawString());
3406 node->set_nodedata(d.second.data(), d.second.size());
3411 JLOG(p_journal_.warn())
3412 <<
"processLedgerRequest: getNodeFat returns false";
3420 case protocol::liBASE:
3422 info =
"Ledger base";
3425 case protocol::liTX_NODE:
3429 case protocol::liAS_NODE:
3433 case protocol::liTS_CANDIDATE:
3434 info =
"TS candidate";
3442 if (!m->has_ledgerhash())
3443 info +=
", no hash specified";
3445 JLOG(p_journal_.error())
3446 <<
"processLedgerRequest: getNodeFat with nodeId "
3447 << *shaMapNodeId <<
" and ledger info type " << info
3448 <<
" throws exception: " << e.
what();
3452 JLOG(p_journal_.info())
3453 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3454 <<
" nodes at depth " << queryDepth <<
", return "
3455 << ledgerData.nodes_size() <<
" nodes";
3458 if (ledgerData.nodes_size() == 0)
3461 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3465PeerImp::getScore(
bool haveItem)
const
3469 static int const spRandomMax = 9999;
3473 static int const spHaveItem = 10000;
3478 static int const spLatency = 30;
3481 static int const spNoLatency = 8000;
3486 score += spHaveItem;
3495 score -= latency->count() * spLatency;
3497 score -= spNoLatency;
3503PeerImp::isHighLatency()
const
3506 return latency_ >= peerHighLatency;
3510PeerImp::reduceRelayReady()
3512 if (!reduceRelayReady_)
3514 reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3515 reduce_relay::WAIT_ON_BOOTUP;
3516 return vpReduceRelayEnabled_ && reduceRelayReady_;
3522 using namespace std::chrono_literals;
3525 totalBytes_ += bytes;
3526 accumBytes_ += bytes;
3527 auto const timeElapsed = clock_type::now() - intervalStart_;
3528 auto const timeElapsedInSecs =
3529 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3531 if (timeElapsedInSecs >= 1s)
3533 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3534 rollingAvg_.push_back(avgBytes);
3536 auto const totalBytes =
3538 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3540 intervalStart_ = clock_type::now();
3546PeerImp::Metrics::average_bytes()
const
3549 return rollingAvgBytes_;
3553PeerImp::Metrics::total_bytes()
const
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
static Endpoint from_string(std::string const &s)
std::string to_string() const
Returns a string representing the endpoint.
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Stream trace() const
Severity stream access functions.
virtual Config & config()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual TimeKeeper & timeKeeper()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual Cluster & cluster()=0
virtual HashRouter & getHashRouter()=0
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
std::size_t size() const
The number of nodes in the cluster list.
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
bool TX_REDUCE_RELAY_METRICS
std::chrono::seconds MAX_DIVERGED_TIME
std::chrono::seconds MAX_UNKNOWN_TIME
bool shouldProcess(uint256 const &key, PeerShortID peer, int &flags, std::chrono::seconds tx_interval)
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
int getJobCount(JobType t) const
Jobs waiting at this priority.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
LedgerIndex getValidLedgerIndex()
std::chrono::seconds getValidatedLedgerAge()
void setClusterFee(std::uint32_t fee)
static std::size_t messageSize(::google::protobuf::Message const &message)
virtual bool isNeedNetworkLedger()=0
PeerFinder::Manager & peerFinder()
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
void onPeerDeactivate(Peer::id_t id)
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
void reportOutboundTraffic(TrafficCount::category cat, int bytes)
void for_each(UnaryFunc &&f) const
Resource::Manager & resourceManager()
void reportInboundTraffic(TrafficCount::category cat, int bytes)
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Setup const & setup() const
std::shared_ptr< Message > getManifestsMessage()
void incPeerDisconnectCharges() override
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
std::queue< std::shared_ptr< Message > > send_queue_
bool vpReduceRelayEnabled_
std::unique_ptr< LoadEvent > load_event_
boost::beast::http::fields const & headers_
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
clock_type::duration uptime() const
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
protocol::TMStatusChange last_status_
boost::shared_mutex nameMutex_
boost::circular_buffer< uint256 > recentTxSets_
std::unique_ptr< stream_type > stream_ptr_
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Tracking
Whether the peer's view of the ledger converges or diverges from ours.
Compressed compressionEnabled_
uint256 closedLedgerHash_
std::string domain() const
std::optional< std::uint32_t > lastPingSeq_
void onTimer(boost::system::error_code const &ec)
beast::Journal const journal_
struct ripple::PeerImp::@21 metrics_
beast::Journal const p_journal_
PeerImp(PeerImp const &)=delete
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
bool hasTxSet(uint256 const &hash) const override
clock_type::time_point lastPingTime_
void onMessageUnknown(std::uint16_t type)
std::shared_ptr< PeerFinder::Slot > const slot_
boost::circular_buffer< uint256 > recentLedgers_
std::optional< std::chrono::milliseconds > latency_
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue, bool batch)
Called from onMessage(TMTransaction(s)).
beast::IP::Endpoint const remote_address_
Json::Value json() override
PublicKey const publicKey_
hash_set< uint256 > txQueue_
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
bool txReduceRelayEnabled_
clock_type::time_point trackingTime_
ProtocolVersion protocol_
reduce_relay::Squelch< UptimeClock > squelch_
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
uint256 previousLedgerHash_
void charge(Resource::Charge const &fee, std::string const &context) override
Adjust this peer's load balance based on the type of load imposed.
void send(std::shared_ptr< Message > const &m) override
static std::string makePrefix(id_t id)
boost::system::error_code error_code
void onReadMessage(error_code ec, std::size_t bytes_transferred)
bool ledgerReplayEnabled_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
bool crawl() const
Returns true if this connection will publicly share its IP address.
void sendTxQueue() override
Send aggregated transactions' hashes.
bool txReduceRelayEnabled() const override
bool supportsFeature(ProtocolFeature f) const override
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
http_request_type request_
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
bool cluster() const override
Returns true if this connection is a member of the cluster.
void onShutdown(error_code ec)
boost::asio::strand< boost::asio::executor > strand_
void cycleStatus() override
boost::beast::multi_buffer read_buffer_
Resource::Consumer usage_
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
void fail(std::string const &reason)
std::atomic< Tracking > tracking_
Represents a peer connection in the overlay.
A peer's signed, proposed position for use in RCLConsensus.
bool checkSign() const
Verify the signing hash of the proposal.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
An endpoint that consumes resources.
int balance()
Returns the credit balance representing consumption.
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
std::size_t size() const noexcept
void const * data() const noexcept
void const * getDataPtr() const
An immutable linear range of bytes.
time_point now() const override
Returns the current time, using the server's clock.
static category categorize(::google::protobuf::Message const &message, protocol::MessageType type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
static constexpr std::size_t size()
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Set the regular signature on a JTx.
T emplace_back(T... args)
@ objectValue
object value (collection of name/value pairs).
Charge const feeMalformedRequest
Schedule of fees charged for imposing load on the server.
Charge const feeInvalidData
Charge const feeUselessData
Charge const feeTrivialPeer
Charge const feeModerateBurdenPeer
std::size_t constexpr readBufferBytes
Size of buffer used to read from the socket.
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
@ maxQueryDepth
The maximum number of levels to search.
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
@ sendQueueLogFreq
How often to log send queue size.
TER valid(PreclaimContext const &ctx, AccountID const &src)
auto measureDurationAndLog(Func &&func, std::string const &actionDescription, std::chrono::duration< Rep, Period > maxDelay, beast::Journal const &journal)
static constexpr std::size_t MAX_TX_QUEUE_SIZE
std::unique_ptr< Config > validator(std::unique_ptr< Config >, std::string const &)
adjust configuration with params needed to be a validator
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
static constexpr char FEATURE_COMPR[]
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
@ ValidatorListPropagation
@ ValidatorList2Propagation
std::string base64_decode(std::string_view data)
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
static bool stringIsUint256Sized(std::string const &pBuffStr)
static constexpr char FEATURE_LEDGER_REPLAY[]
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
std::string strHex(FwdIt begin, FwdIt end)
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Stopwatch & stopwatch()
Returns an instance of a wall clock.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
NodeID calcNodeID(PublicKey const &)
Calculate the 160-bit node ID from a node public key.
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
static constexpr char FEATURE_TXRR[]
std::string to_string(base_uint< Bits, Tag > const &a)
std::optional< Rules > const & getCurrentTransactionRules()
Number root(Number f, unsigned d)
@ proposal
proposal for signing
void addRaw(LedgerHeader const &, Serializer &, bool includeHash=false)
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
constexpr std::uint32_t tfInnerBatchTxn
T shared_from_this(T... args)
beast::IP::Address public_ip
std::optional< std::uint32_t > networkID
bool peerPrivate
true if we want our IP address kept private.
void update(Resource::Charge f, std::string const &add)
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items
Set the sequence number on a JTx.