20 #include <ripple/app/consensus/RCLValidations.h>
21 #include <ripple/app/ledger/InboundLedgers.h>
22 #include <ripple/app/ledger/InboundTransactions.h>
23 #include <ripple/app/ledger/LedgerMaster.h>
24 #include <ripple/app/ledger/TransactionMaster.h>
25 #include <ripple/app/misc/HashRouter.h>
26 #include <ripple/app/misc/LoadFeeTrack.h>
27 #include <ripple/app/misc/NetworkOPs.h>
28 #include <ripple/app/misc/Transaction.h>
29 #include <ripple/app/misc/ValidatorList.h>
30 #include <ripple/app/tx/apply.h>
31 #include <ripple/basics/SubmitSync.h>
32 #include <ripple/basics/UptimeClock.h>
33 #include <ripple/basics/base64.h>
34 #include <ripple/basics/random.h>
35 #include <ripple/basics/safe_cast.h>
36 #include <ripple/beast/core/LexicalCast.h>
37 #include <ripple/beast/core/SemanticVersion.h>
38 #include <ripple/nodestore/DatabaseShard.h>
39 #include <ripple/overlay/Cluster.h>
40 #include <ripple/overlay/impl/PeerImp.h>
41 #include <ripple/overlay/impl/Tuning.h>
42 #include <ripple/overlay/predicates.h>
43 #include <ripple/protocol/Protocol.h>
44 #include <ripple/protocol/digest.h>
46 #include <boost/algorithm/string/predicate.hpp>
47 #include <boost/beast/core/ostream.hpp>
56 using namespace std::chrono_literals;
73 case protocol::TMCloseReason::crCHARGE_RESOURCES:
74 return "Charge: Resources";
75 case protocol::TMCloseReason::crMALFORMED_HANDSHAKE1:
76 return "Malformed handshake data (1)";
77 case protocol::TMCloseReason::crMALFORMED_HANDSHAKE2:
78 return "Malformed handshake data (2)";
79 case protocol::TMCloseReason::crMALFORMED_HANDSHAKE3:
80 return "Malformed handshake data (3)";
81 case protocol::TMCloseReason::crLARGE_SENDQUEUE:
82 return "Large send queue";
83 case protocol::TMCloseReason::crNOT_USEFUL:
85 case protocol::TMCloseReason::crPING_TIMEOUT:
86 return "Ping timeout";
88 return "Unknown reason";
105 , sink_(app_.journal(
"Peer"), makePrefix(id))
106 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
108 , p_journal_(p_sink_)
109 , stream_ptr_(
std::move(stream_ptr))
110 , socket_(stream_ptr_->next_layer().socket())
111 , stream_(*stream_ptr_)
112 , strand_(socket_.get_executor())
114 , remote_address_(slot->remote_endpoint())
118 , tracking_(Tracking::unknown)
119 , trackingTime_(clock_type::now())
120 , publicKey_(publicKey)
121 , lastPingTime_(clock_type::now())
122 , creationTime_(clock_type::now())
123 , squelch_(app_.journal(
"Squelch"))
127 , request_(std::move(request))
129 , compressionEnabled_(
134 app_.config().COMPRESSION)
140 app_.config().TX_REDUCE_RELAY_ENABLE))
144 app_.config().VP_REDUCE_RELAY_ENABLE))
148 app_.config().LEDGER_REPLAY))
149 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
151 JLOG(journal_.info()) <<
"compression enabled "
152 << (compressionEnabled_ == Compressed::On)
153 <<
" vp reduce-relay enabled "
154 << vpReduceRelayEnabled_
155 <<
" tx reduce-relay enabled "
156 << txReduceRelayEnabled_ <<
" on " << remote_address_
158 if (
auto member = app_.cluster().member(publicKey_))
161 JLOG(journal_.info()) <<
"Cluster name: " << *member;
167 const bool inCluster{
cluster()};
190 if (!
strand_.running_in_this_thread())
193 auto parseLedgerHash =
207 if (
auto const iter =
headers_.find(
"Closed-Ledger");
210 closed = parseLedgerHash(iter->value());
213 fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE1);
216 if (
auto const iter =
headers_.find(
"Previous-Ledger");
219 previous = parseLedgerHash(iter->value());
222 fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE2);
225 if (previous && !closed)
226 fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE3);
245 if (!
strand_.running_in_this_thread())
271 if (!
strand_.running_in_this_thread())
278 auto validator = m->getValidatorKey();
279 if (validator && !
squelch_.expireSquelch(*validator))
283 safe_cast<TrafficCount::category>(m->getCategory()),
301 <<
" sendq: " << sendq_size;
309 boost::asio::async_write(
318 std::placeholders::_1,
319 std::placeholders::_2)));
325 if (!
strand_.running_in_this_thread())
331 protocol::TMHaveTransactions ht;
333 ht.add_hashes(hash.data(), hash.size());
337 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
344 if (!
strand_.running_in_this_thread())
361 if (!
strand_.running_in_this_thread())
366 auto removed =
txQueue_.erase(hash);
378 fail(protocol::TMCloseReason::crCHARGE_RESOURCES);
387 auto const iter =
headers_.find(
"Crawl");
390 return boost::iequals(iter->value(),
"public");
416 ret[jss::inbound] =
true;
420 ret[jss::cluster] =
true;
428 ret[jss::server_domain] =
domain();
430 if (
auto const nid =
headers_[
"Network-ID"]; !nid.empty())
436 ret[jss::version] = version;
447 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
452 if ((minSeq != 0) || (maxSeq != 0))
453 ret[jss::complete_ledgers] =
459 ret[jss::track] =
"diverged";
463 ret[jss::track] =
"unknown";
472 protocol::TMStatusChange last_status;
479 if (closedLedgerHash != beast::zero)
480 ret[jss::ledger] =
to_string(closedLedgerHash);
482 if (last_status.has_newstatus())
484 switch (last_status.newstatus())
486 case protocol::nsCONNECTING:
487 ret[jss::status] =
"connecting";
490 case protocol::nsCONNECTED:
491 ret[jss::status] =
"connected";
494 case protocol::nsMONITORING:
495 ret[jss::status] =
"monitoring";
498 case protocol::nsVALIDATING:
499 ret[jss::status] =
"validating";
502 case protocol::nsSHUTTING:
503 ret[jss::status] =
"shutting";
508 <<
"Unknown status: " << last_status.newstatus();
513 ret[jss::metrics][jss::total_bytes_recv] =
515 ret[jss::metrics][jss::total_bytes_sent] =
517 ret[jss::metrics][jss::avg_bps_recv] =
519 ret[jss::metrics][jss::avg_bps_sent] =
564 return boost::icl::contains(it->second.finalized(), shardIndex);
610 assert(
strand_.running_in_this_thread());
632 if (!
strand_.running_in_this_thread())
655 protocol::TMGracefulClose tmGC;
656 tmGC.set_reason(reason);
657 send(std::make_shared<Message>(tmGC, protocol::mtGRACEFUL_CLOSE));
663 assert(
strand_.running_in_this_thread());
683 assert(
strand_.running_in_this_thread());
690 stream_.async_shutdown(bind_executor(
700 timer_.expires_from_now(peerTimerInterval, ec);
707 timer_.async_wait(bind_executor(
737 if (ec == boost::asio::error::operation_aborted)
749 fail(protocol::TMCloseReason::crLARGE_SENDQUEUE);
755 clock_type::duration duration;
768 fail(protocol::TMCloseReason::crLARGE_SENDQUEUE);
776 fail(protocol::TMCloseReason::crPING_TIMEOUT);
783 protocol::TMPing message;
784 message.set_type(protocol::TMPing::ptPING);
787 send(std::make_shared<Message>(message, protocol::mtPING));
799 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
802 if (ec != boost::asio::error::eof)
803 return fail(
"onShutdown", ec);
837 if (supportedProtocol)
840 <<
"doProtocolStart(): outbound sending mtSTART_PROTOCOL to "
842 protocol::TMStartProtocol tmPS;
843 tmPS.set_starttime(std::chrono::duration_cast<std::chrono::seconds>(
846 send(std::make_shared<Message>(tmPS, protocol::mtSTART_PROTOCOL));
850 JLOG(
journal_.
debug()) <<
"doProtocolStart(): outbound connected "
851 "to an older protocol on "
860 protocol::TMGetPeerShardInfoV2 tmGPS;
862 send(std::make_shared<Message>(
863 tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
866 else if (!supportedProtocol)
869 <<
"doProtocolStart(): inbound handling of an older protocol on "
884 if (ec == boost::asio::error::operation_aborted)
886 if (ec == boost::asio::error::eof)
892 return fail(
"onReadMessage", ec);
895 if (bytes_transferred > 0)
896 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
898 stream <<
"onReadMessage";
901 metrics_.recv.add_message(bytes_transferred);
913 return fail(
"onReadMessage", ec);
918 if (bytes_consumed == 0)
931 std::placeholders::_1,
932 std::placeholders::_2)));
940 if (ec == boost::asio::error::operation_aborted)
947 return fail(
"onWriteMessage", ec);
950 if (bytes_transferred > 0)
951 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
953 stream <<
"onWriteMessage";
956 metrics_.sent.add_message(bytes_transferred);
959 if (
send_queue_.front()->getType() == protocol::mtGRACEFUL_CLOSE)
968 return boost::asio::async_write(
977 std::placeholders::_1,
978 std::placeholders::_2)));
983 return stream_.async_shutdown(bind_executor(
988 std::placeholders::_1)));
1018 if ((type == MessageType::mtTRANSACTION ||
1019 type == MessageType::mtHAVE_TRANSACTIONS ||
1020 type == MessageType::mtTRANSACTIONS ||
1022 category == TrafficCount::category::get_transactions ||
1024 category == TrafficCount::category::ld_tsc_get ||
1025 category == TrafficCount::category::ld_tsc_share ||
1027 category == TrafficCount::category::gl_tsc_share ||
1028 category == TrafficCount::category::gl_tsc_get) &&
1032 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1034 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1035 << uncompressed_size <<
" " << isCompressed;
1050 auto const s = m->list_size();
1070 if (m->type() == protocol::TMPing::ptPING)
1074 m->set_type(protocol::TMPing::ptPONG);
1075 send(std::make_shared<Message>(*m, protocol::mtPING));
1079 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1089 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1114 for (
int i = 0; i < m->clusternodes().size(); ++i)
1116 protocol::TMClusterNode
const& node = m->clusternodes(i);
1119 if (node.has_nodename())
1120 name = node.nodename();
1122 auto const publicKey =
1129 auto const reportTime =
1133 *publicKey,
name, node.nodeload(), reportTime);
1137 int loadSources = m->loadsources().size();
1138 if (loadSources != 0)
1141 gossip.
items.reserve(loadSources);
1142 for (
int i = 0; i < m->loadsources().size(); ++i)
1144 protocol::TMLoadSource
const& node = m->loadsources(i);
1149 gossip.
items.push_back(item);
1162 if (status.getReportTime() >= thresh)
1163 fees.push_back(status.getLoadFee());
1168 auto const index = fees.size() / 2;
1170 clusterFee = fees[index];
1198 return badData(
"Invalid relays");
1207 auto const peerChainSz{m->peerchain_size()};
1208 if (peerChainSz > 0)
1211 return badData(
"Invalid peer chain size");
1214 return badData(
"Invalid relays and peer chain size");
1216 for (
int i = 0; i < peerChainSz; ++i)
1218 auto const slice{
makeSlice(m->peerchain(i).publickey())};
1222 return badData(
"Invalid peer public key");
1225 if (!pubKeyChain.
emplace(slice).second)
1226 return badData(
"Invalid peer public key");
1233 auto reply{shardStore->getShardInfo()->makeMessage(
app_)};
1234 if (peerChainSz > 0)
1235 *(reply.mutable_peerchain()) = m->peerchain();
1236 send(std::make_shared<Message>(reply, protocol::mtPEER_SHARD_INFO_V2));
1239 if (m->relays() == 0)
1243 if (peerChainSz == 0)
1250 m->set_relays(m->relays() - 1);
1252 std::make_shared<Message>(*m, protocol::mtGET_PEER_SHARD_INFO_V2),
1254 return pubKeyChain.
find(peer->getNodePublic()) != pubKeyChain.
end();
1266 if (curLedgerSeq >= db.earliestLedgerSeq())
1267 return db.seqToShardIndex(curLedgerSeq);
1268 return std::nullopt;
1283 auto const timestamp{
1286 if (timestamp > (now + 5s))
1287 return badData(
"Invalid timestamp");
1290 using namespace std::chrono_literals;
1291 if (timestamp < (now - 5min))
1292 return badData(
"Stale timestamp");
1294 s.
add32(m->timestamp());
1299 auto const numIncomplete{m->incomplete_size()};
1300 if (numIncomplete > 0)
1302 if (latestShardIndex && numIncomplete > *latestShardIndex)
1303 return badData(
"Invalid number of incomplete shards");
1306 for (
int i = 0; i < numIncomplete; ++i)
1308 auto const& incomplete{m->incomplete(i)};
1309 auto const shardIndex{incomplete.shardindex()};
1312 if (shardIndex < earliestShardIndex ||
1313 (latestShardIndex && shardIndex > latestShardIndex))
1315 return badData(
"Invalid incomplete shard index");
1317 s.
add32(shardIndex);
1320 auto const state{
static_cast<ShardState>(incomplete.state())};
1332 return badData(
"Invalid incomplete shard state");
1334 s.
add32(incomplete.state());
1338 if (incomplete.has_progress())
1340 progress = incomplete.progress();
1341 if (progress < 1 || progress > 100)
1342 return badData(
"Invalid incomplete shard progress");
1347 if (!
shardInfo.update(shardIndex, state, progress))
1348 return badData(
"Invalid duplicate incomplete shards");
1353 if (m->has_finalized())
1355 auto const& str{m->finalized()};
1357 return badData(
"Invalid finalized shards");
1359 if (!
shardInfo.setFinalizedFromString(str))
1360 return badData(
"Invalid finalized shard indexes");
1363 auto const numFinalized{boost::icl::length(
finalized)};
1364 if (numFinalized == 0 ||
1365 boost::icl::first(
finalized) < earliestShardIndex ||
1366 (latestShardIndex &&
1367 boost::icl::last(
finalized) > latestShardIndex))
1369 return badData(
"Invalid finalized shard indexes");
1372 if (latestShardIndex &&
1373 (numFinalized + numIncomplete) > *latestShardIndex)
1375 return badData(
"Invalid number of finalized and incomplete shards");
1378 s.
addRaw(str.data(), str.size());
1384 return badData(
"Invalid public key");
1389 return badData(
"Invalid public key");
1393 return badData(
"Invalid signature");
1396 auto const peerChainSz{m->peerchain_size()};
1397 if (peerChainSz > 0)
1401 return badData(
"Invalid peer chain size");
1409 for (
int i = 0; i < peerChainSz; ++i)
1412 slice =
makeSlice(m->peerchain(i).publickey());
1414 return badData(
"Invalid peer public key");
1417 if (!pubKeyChain.
emplace(slice).second)
1418 return badData(
"Invalid peer public key");
1423 makeSlice(m->peerchain(peerChainSz - 1).publickey()));
1426 m->mutable_peerchain()->RemoveLast();
1428 std::make_shared<Message>(*m, protocol::mtPEER_SHARD_INFO_V2));
1430 <<
"Relayed TMPeerShardInfoV2 from peer IP "
1432 << peer->getRemoteAddress().to_string();
1442 <<
"Consumed TMPeerShardInfoV2 originating from public key "
1445 << (
shardInfo.incomplete().empty() ?
"empty"
1455 else if (
shardInfo.msgTimestamp() > it->second.msgTimestamp())
1460 if (peerChainSz == 0)
1474 if (m->endpoints_v2().size() >= 1024)
1481 endpoints.
reserve(m->endpoints_v2().size());
1483 for (
auto const& tm : m->endpoints_v2())
1490 << tm.endpoint() <<
"}";
1507 if (!endpoints.
empty())
1530 <<
"Need network ledger";
1538 auto stx = std::make_shared<STTx const>(sit);
1539 uint256 txID = stx->getTransactionID();
1563 bool checkSignature =
true;
1566 if (!m->has_deferred() || !m->deferred())
1570 flags |= SF_TRUSTED;
1577 checkSignature =
false;
1584 <<
"No new transactions until synchronized";
1597 "recvTransaction->checkTransaction",
1602 if (
auto peer = weak.lock())
1603 peer->checkTransaction(flags, checkSignature, stx);
1610 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1611 <<
". Exception: " << ex.
what();
1622 auto const itype{m->itype()};
1625 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1626 return badData(
"Invalid ledger info type");
1631 return std::nullopt;
1634 if (itype == protocol::liTS_CANDIDATE)
1636 if (!m->has_ledgerhash())
1637 return badData(
"Invalid TX candidate set, missing TX set hash");
1640 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1641 !(ltype && *ltype == protocol::ltCLOSED))
1643 return badData(
"Invalid request");
1647 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1648 return badData(
"Invalid ledger type");
1652 return badData(
"Invalid ledger hash");
1655 if (m->has_ledgerseq())
1657 auto const ledgerSeq{m->ledgerseq()};
1667 using namespace std::chrono_literals;
1677 if (itype != protocol::liBASE)
1679 if (m->nodeids_size() <= 0)
1680 return badData(
"Invalid ledger node IDs");
1682 for (
auto const& nodeId : m->nodeids())
1685 return badData(
"Invalid SHAMap node ID");
1690 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1691 return badData(
"Invalid query type");
1694 if (m->has_querydepth())
1697 itype == protocol::liBASE)
1699 return badData(
"Invalid query depth");
1706 if (
auto peer = weak.
lock())
1707 peer->processLedgerRequest(m);
1725 if (
auto peer = weak.
lock())
1728 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1729 if (reply.has_error())
1731 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1732 peer->charge(Resource::feeInvalidRequest);
1734 peer->charge(Resource::feeRequestNoReply);
1738 peer->send(std::make_shared<Message>(
1739 reply, protocol::mtPROOF_PATH_RESPONSE));
1774 if (
auto peer = weak.
lock())
1777 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1778 if (reply.has_error())
1780 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1781 peer->charge(Resource::feeInvalidRequest);
1783 peer->charge(Resource::feeRequestNoReply);
1787 peer->send(std::make_shared<Message>(
1788 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1819 return badData(
"Invalid ledger hash");
1823 auto const ledgerSeq{m->ledgerseq()};
1824 if (m->type() == protocol::liTS_CANDIDATE)
1843 using namespace std::chrono_literals;
1854 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1855 return badData(
"Invalid ledger info type");
1858 if (m->has_error() &&
1859 (m->error() < protocol::reNO_LEDGER ||
1860 m->error() > protocol::reBAD_REQUEST))
1862 return badData(
"Invalid reply error");
1869 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1873 if (m->has_requestcookie())
1877 m->clear_requestcookie();
1878 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1882 JLOG(
p_journal_.
info()) <<
"Unable to route TX/ledger data reply";
1887 uint256 const ledgerHash{m->ledgerhash()};
1890 if (m->type() == protocol::liTS_CANDIDATE)
1894 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1895 if (
auto peer = weak.lock())
1897 peer->app_.getInboundTransactions().gotData(
1898 ledgerHash, peer, m);
1911 protocol::TMProposeSet&
set = *m;
1917 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1945 uint256 const proposeHash{
set.currenttxhash()};
1946 uint256 const prevLedger{
set.previousledger()};
1958 if (
auto [added, relayed] =
1967 suppression, publicKey,
id_, protocol::mtPROPOSE_LEDGER);
1977 <<
"Proposal: Dropping untrusted (peer divergence)";
1989 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
2006 "recvPropose->checkPropose",
2008 if (
auto peer = weak.lock())
2009 peer->checkPropose(isTrusted, m,
proposal);
2018 if (!m->has_networktime())
2023 if (!
last_status_.has_newstatus() || m->has_newstatus())
2028 protocol::NodeStatus status =
last_status_.newstatus();
2030 m->set_newstatus(status);
2034 if (m->newevent() == protocol::neLOST_SYNC)
2036 bool outOfSync{
false};
2057 bool const peerChangedLedgers{
2064 if (peerChangedLedgers)
2075 if (m->has_ledgerhashprevious() &&
2086 if (peerChangedLedgers)
2096 if (m->has_firstseq() && m->has_lastseq())
2107 if (m->has_ledgerseq() &&
2117 if (m->has_newstatus())
2119 switch (m->newstatus())
2121 case protocol::nsCONNECTING:
2122 j[jss::status] =
"CONNECTING";
2124 case protocol::nsCONNECTED:
2125 j[jss::status] =
"CONNECTED";
2127 case protocol::nsMONITORING:
2128 j[jss::status] =
"MONITORING";
2130 case protocol::nsVALIDATING:
2131 j[jss::status] =
"VALIDATING";
2133 case protocol::nsSHUTTING:
2134 j[jss::status] =
"SHUTTING";
2139 if (m->has_newevent())
2141 switch (m->newevent())
2143 case protocol::neCLOSING_LEDGER:
2144 j[jss::action] =
"CLOSING_LEDGER";
2146 case protocol::neACCEPTED_LEDGER:
2147 j[jss::action] =
"ACCEPTED_LEDGER";
2149 case protocol::neSWITCHED_LEDGER:
2150 j[jss::action] =
"SWITCHED_LEDGER";
2152 case protocol::neLOST_SYNC:
2153 j[jss::action] =
"LOST_SYNC";
2158 if (m->has_ledgerseq())
2160 j[jss::ledger_index] = m->ledgerseq();
2163 if (m->has_ledgerhash())
2165 uint256 closedLedgerHash{};
2167 std::lock_guard sl(recentLock_);
2168 closedLedgerHash = closedLedgerHash_;
2170 j[jss::ledger_hash] =
to_string(closedLedgerHash);
2173 if (m->has_networktime())
2175 j[jss::date] = Json::UInt(m->networktime());
2178 if (m->has_firstseq() && m->has_lastseq())
2180 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2181 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2197 serverSeq = maxLedger_;
2203 checkTracking(serverSeq, validationSeq);
2212 if (diff < Tuning::convergedLedgerLimit)
2215 tracking_ = Tracking::converged;
2218 if ((diff > Tuning::divergedLedgerLimit) &&
2219 (tracking_.load() != Tracking::diverged))
2224 tracking_ = Tracking::diverged;
2225 trackingTime_ = clock_type::now();
2234 fee_ = Resource::feeInvalidRequest;
2238 uint256 const hash{m->hash()};
2240 if (m->status() == protocol::tsHAVE)
2244 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2245 recentTxSets_.end())
2247 fee_ = Resource::feeUnwantedData;
2251 recentTxSets_.push_back(hash);
2256 PeerImp::onValidatorListMessage(
2266 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
2267 <<
" from peer " << remote_address_;
2269 fee_ = Resource::feeHighBurdenPeer;
2275 JLOG(p_journal_.debug())
2276 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
2277 <<
" (" << id_ <<
")";
2279 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2281 JLOG(p_journal_.debug())
2282 << messageType <<
": received duplicate " << messageType;
2286 fee_ = Resource::feeUnwantedData;
2290 auto const applyResult = app_.validators().applyListsAndBroadcast(
2294 remote_address_.to_string(),
2297 app_.getHashRouter(),
2300 JLOG(p_journal_.debug())
2301 <<
"Processed " << messageType <<
" version " << version <<
" from "
2302 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2303 :
"unknown or invalid publisher")
2304 <<
" from " << remote_address_.to_string() <<
" (" << id_
2305 <<
") with best result " << to_string(applyResult.bestDisposition());
2308 switch (applyResult.bestDisposition())
2311 case ListDisposition::accepted:
2313 case ListDisposition::expired:
2315 case ListDisposition::pending: {
2318 assert(applyResult.publisherKey);
2319 auto const& pubKey = *applyResult.publisherKey;
2321 if (
auto const iter = publisherListSequences_.find(pubKey);
2322 iter != publisherListSequences_.end())
2324 assert(iter->second < applyResult.sequence);
2327 publisherListSequences_[pubKey] = applyResult.sequence;
2330 case ListDisposition::same_sequence:
2331 case ListDisposition::known_sequence:
2335 assert(applyResult.sequence && applyResult.publisherKey);
2337 publisherListSequences_[*applyResult.publisherKey] <=
2338 applyResult.sequence);
2343 case ListDisposition::stale:
2344 case ListDisposition::untrusted:
2345 case ListDisposition::invalid:
2346 case ListDisposition::unsupported_version:
2353 switch (applyResult.worstDisposition())
2355 case ListDisposition::accepted:
2356 case ListDisposition::expired:
2357 case ListDisposition::pending:
2360 case ListDisposition::same_sequence:
2361 case ListDisposition::known_sequence:
2365 fee_ = Resource::feeUnwantedData;
2367 case ListDisposition::stale:
2370 fee_ = Resource::feeBadData;
2372 case ListDisposition::untrusted:
2376 fee_ = Resource::feeUnwantedData;
2378 case ListDisposition::invalid:
2380 fee_ = Resource::feeInvalidSignature;
2382 case ListDisposition::unsupported_version:
2385 fee_ = Resource::feeBadData;
2392 for (
auto const& [disp, count] : applyResult.dispositions)
2397 case ListDisposition::accepted:
2398 JLOG(p_journal_.debug())
2399 <<
"Applied " << count <<
" new " << messageType
2400 <<
"(s) from peer " << remote_address_;
2403 case ListDisposition::expired:
2404 JLOG(p_journal_.debug())
2405 <<
"Applied " << count <<
" expired " << messageType
2406 <<
"(s) from peer " << remote_address_;
2409 case ListDisposition::pending:
2410 JLOG(p_journal_.debug())
2411 <<
"Processed " << count <<
" future " << messageType
2412 <<
"(s) from peer " << remote_address_;
2414 case ListDisposition::same_sequence:
2415 JLOG(p_journal_.warn())
2416 <<
"Ignored " << count <<
" " << messageType
2417 <<
"(s) with current sequence from peer "
2420 case ListDisposition::known_sequence:
2421 JLOG(p_journal_.warn())
2422 <<
"Ignored " << count <<
" " << messageType
2423 <<
"(s) with future sequence from peer " << remote_address_;
2425 case ListDisposition::stale:
2426 JLOG(p_journal_.warn())
2427 <<
"Ignored " << count <<
"stale " << messageType
2428 <<
"(s) from peer " << remote_address_;
2430 case ListDisposition::untrusted:
2431 JLOG(p_journal_.warn())
2432 <<
"Ignored " << count <<
" untrusted " << messageType
2433 <<
"(s) from peer " << remote_address_;
2435 case ListDisposition::unsupported_version:
2436 JLOG(p_journal_.warn())
2437 <<
"Ignored " << count <<
"unsupported version "
2438 << messageType <<
"(s) from peer " << remote_address_;
2440 case ListDisposition::invalid:
2441 JLOG(p_journal_.warn())
2442 <<
"Ignored " << count <<
"invalid " << messageType
2443 <<
"(s) from peer " << remote_address_;
2456 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2458 JLOG(p_journal_.debug())
2459 <<
"ValidatorList: received validator list from peer using "
2460 <<
"protocol version " << to_string(protocol_)
2461 <<
" which shouldn't support this feature.";
2462 fee_ = Resource::feeUnwantedData;
2465 onValidatorListMessage(
2469 ValidatorList::parseBlobs(*m));
2473 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2474 <<
" from peer " << remote_address_;
2475 fee_ = Resource::feeBadData;
2485 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2487 JLOG(p_journal_.debug())
2488 <<
"ValidatorListCollection: received validator list from peer "
2489 <<
"using protocol version " << to_string(protocol_)
2490 <<
" which shouldn't support this feature.";
2491 fee_ = Resource::feeUnwantedData;
2494 else if (m->version() < 2)
2496 JLOG(p_journal_.debug())
2497 <<
"ValidatorListCollection: received invalid validator list "
2499 << m->version() <<
" from peer using protocol version "
2500 << to_string(protocol_);
2501 fee_ = Resource::feeBadData;
2504 onValidatorListMessage(
2505 "ValidatorListCollection",
2508 ValidatorList::parseBlobs(*m));
2512 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2513 << e.
what() <<
" from peer " << remote_address_;
2514 fee_ = Resource::feeBadData;
2521 if (m->validation().size() < 50)
2523 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2524 fee_ = Resource::feeInvalidRequest;
2530 auto const closeTime = app_.timeKeeper().closeTime();
2535 val = std::make_shared<STValidation>(
2539 app_.validatorManifests().getMasterKey(pk));
2542 val->setSeen(closeTime);
2546 app_.getValidations().parms(),
2547 app_.timeKeeper().closeTime(),
2549 val->getSeenTime()))
2551 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2552 fee_ = Resource::feeUnwantedData;
2559 auto const isTrusted =
2560 app_.validators().trusted(val->getSignerPublic());
2565 if (!isTrusted && app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2570 if (
auto [added, relayed] =
2571 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2578 if (reduceRelayReady() && relayed &&
2579 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2580 overlay_.updateSlotAndSquelch(
2581 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2582 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2586 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2588 JLOG(p_journal_.debug())
2589 <<
"Dropping untrusted validation from diverged peer";
2591 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2595 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2600 to_string(val->getNodeID());
2607 app_.getJobQueue().addJob(
2610 [weak, val, m, key]() {
2611 if (
auto peer = weak.
lock())
2612 peer->checkValidation(val, key, m);
2617 JLOG(p_journal_.debug())
2618 <<
"Dropping untrusted validation for load";
2623 JLOG(p_journal_.warn())
2624 <<
"Exception processing validation: " << e.
what();
2625 fee_ = Resource::feeInvalidRequest;
2632 protocol::TMGetObjectByHash& packet = *m;
2634 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2635 <<
" " << packet.objects_size();
2640 if (send_queue_.size() >= Tuning::dropSendQueue)
2642 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2646 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2652 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2654 if (!txReduceRelayEnabled())
2656 JLOG(p_journal_.error())
2657 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2658 fee_ = Resource::feeInvalidRequest;
2663 app_.getJobQueue().addJob(
2665 if (
auto peer = weak.
lock())
2666 peer->doTransactions(m);
2671 fee_ = Resource::feeMediumBurdenPeer;
2673 protocol::TMGetObjectByHash reply;
2675 reply.set_query(
false);
2677 if (packet.has_seq())
2678 reply.set_seq(packet.seq());
2680 reply.set_type(packet.type());
2682 if (packet.has_ledgerhash())
2686 fee_ = Resource::feeInvalidRequest;
2690 reply.set_ledgerhash(packet.ledgerhash());
2694 for (
int i = 0; i < packet.objects_size(); ++i)
2696 auto const& obj = packet.objects(i);
2699 uint256 const hash{obj.hash()};
2702 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2703 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2706 if (
auto shardStore = app_.getShardStore())
2708 if (seq >= shardStore->earliestLedgerSeq())
2709 nodeObject = shardStore->fetchNodeObject(hash, seq);
2714 protocol::TMIndexedObject& newObj = *reply.add_objects();
2715 newObj.set_hash(hash.begin(), hash.size());
2717 &nodeObject->getData().front(),
2718 nodeObject->getData().size());
2720 if (obj.has_nodeid())
2721 newObj.set_index(obj.nodeid());
2722 if (obj.has_ledgerseq())
2723 newObj.set_ledgerseq(obj.ledgerseq());
2730 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2731 << packet.objects_size();
2732 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2739 bool progress =
false;
2741 for (
int i = 0; i < packet.objects_size(); ++i)
2743 const protocol::TMIndexedObject& obj = packet.objects(i);
2747 if (obj.has_ledgerseq())
2749 if (obj.ledgerseq() != pLSeq)
2751 if (pLDo && (pLSeq != 0))
2753 JLOG(p_journal_.debug())
2754 <<
"GetObj: Full fetch pack for " << pLSeq;
2756 pLSeq = obj.ledgerseq();
2757 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2761 JLOG(p_journal_.debug())
2762 <<
"GetObj: Late fetch pack for " << pLSeq;
2771 uint256 const hash{obj.hash()};
2773 app_.getLedgerMaster().addFetchPack(
2775 std::make_shared<Blob>(
2776 obj.data().begin(), obj.data().end()));
2781 if (pLDo && (pLSeq != 0))
2783 JLOG(p_journal_.debug())
2784 <<
"GetObj: Partial fetch pack for " << pLSeq;
2786 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2787 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2794 if (!txReduceRelayEnabled())
2796 JLOG(p_journal_.error())
2797 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2798 fee_ = Resource::feeInvalidRequest;
2803 app_.getJobQueue().addJob(
2805 if (
auto peer = weak.
lock())
2806 peer->handleHaveTransactions(m);
2811 PeerImp::handleHaveTransactions(
2814 protocol::TMGetObjectByHash tmBH;
2815 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2816 tmBH.set_query(
true);
2818 JLOG(p_journal_.trace())
2819 <<
"received TMHaveTransactions " << m->hashes_size();
2825 JLOG(p_journal_.error())
2826 <<
"TMHaveTransactions with invalid hash size";
2827 fee_ = Resource::feeInvalidRequest;
2833 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2835 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2839 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2841 auto obj = tmBH.add_objects();
2842 obj->set_hash(hash.
data(), hash.
size());
2849 removeTxQueue(hash);
2853 JLOG(p_journal_.trace())
2854 <<
"transaction request object is " << tmBH.objects_size();
2856 if (tmBH.objects_size() > 0)
2857 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2863 if (!txReduceRelayEnabled())
2865 JLOG(p_journal_.error())
2866 <<
"TMTransactions: tx reduce-relay is disabled";
2867 fee_ = Resource::feeInvalidRequest;
2871 JLOG(p_journal_.trace())
2872 <<
"received TMTransactions " << m->transactions_size();
2874 overlay_.addTxMetrics(m->transactions_size());
2879 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2886 using on_message_fn =
2888 if (!strand_.running_in_this_thread())
2892 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2894 if (!m->has_validatorpubkey())
2896 charge(Resource::feeBadData);
2899 auto validator = m->validatorpubkey();
2903 charge(Resource::feeBadData);
2909 if (!app_.validators().listed(key))
2911 charge(Resource::feeBadData);
2912 JLOG(p_journal_.debug())
2913 <<
"onMessage: TMSquelch discarding non-validator squelch "
2919 if (key == app_.getValidationPublicKey())
2921 JLOG(p_journal_.debug())
2922 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2927 m->has_squelchduration() ? m->squelchduration() : 0;
2929 squelch_.removeSquelch(key);
2931 charge(Resource::feeBadData);
2933 JLOG(p_journal_.debug())
2934 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2938 PeerImp::onStartProtocol()
2940 JLOG(journal_.debug()) <<
"onStartProtocol(): " << remote_address_;
2942 if (supportsFeature(ProtocolFeature::ValidatorListPropagation))
2944 app_.validators().for_each_available(
2951 ValidatorList::sendValidatorList(
2959 app_.getHashRouter(),
2963 app_.getHashRouter().addSuppressionPeer(hash, id_);
2967 if (
auto m = overlay_.getManifestsMessage())
2971 protocol::TMGetPeerShardInfoV2 tmGPS;
2972 tmGPS.set_relays(0);
2973 send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
2979 JLOG(journal_.debug()) <<
"onMessage(TMStartProtocol): " << remote_address_;
2986 using on_message_fn =
2988 if (!strand_.running_in_this_thread())
2992 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2994 JLOG(journal_.info()) <<
"got graceful close from: " << remote_address_
3009 (void)lockedRecentLock;
3011 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
3012 recentLedgers_.end())
3015 recentLedgers_.push_back(hash);
3024 if (app_.getFeeTrack().isLoadedLocal() ||
3025 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
3026 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
3028 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
3034 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
3035 fee_ = Resource::feeInvalidRequest;
3039 fee_ = Resource::feeHighBurdenPeer;
3041 uint256 const hash{packet->ledgerhash()};
3044 auto elapsed = UptimeClock::now();
3045 auto const pap = &app_;
3046 app_.getJobQueue().addJob(
3047 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
3048 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
3053 PeerImp::doTransactions(
3056 protocol::TMTransactions reply;
3058 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
3059 << packet->objects_size();
3061 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
3063 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
3064 fee_ = Resource::feeInvalidRequest;
3070 auto const& obj = packet->objects(i);
3074 fee_ = Resource::feeInvalidRequest;
3080 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
3084 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
3086 fee_ = Resource::feeInvalidRequest;
3091 auto tx = reply.add_transactions();
3092 auto sttx = txn->getSTransaction();
3094 tx->set_rawtransaction(s.
data(), s.
size());
3096 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
3098 tx->set_receivetimestamp(
3099 app_.timeKeeper().now().time_since_epoch().count());
3100 tx->set_deferred(txn->getSubmitResult().queued);
3103 if (reply.transactions_size() > 0)
3104 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
3108 PeerImp::checkTransaction(
3110 bool checkSignature,
3119 app_.getLedgerMaster().getValidLedgerIndex()))
3121 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3122 charge(Resource::feeUnwantedData);
3130 app_.getHashRouter(),
3132 app_.getLedgerMaster().getValidatedRules(),
3134 valid != Validity::Valid)
3136 if (!validReason.empty())
3138 JLOG(p_journal_.trace())
3139 <<
"Exception checking transaction: " << validReason;
3143 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3144 charge(Resource::feeInvalidSignature);
3151 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3155 auto tx = std::make_shared<Transaction>(stx, reason, app_);
3157 if (tx->getStatus() ==
INVALID)
3159 if (!reason.
empty())
3161 JLOG(p_journal_.trace())
3162 <<
"Exception checking transaction: " << reason;
3164 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3165 charge(Resource::feeInvalidSignature);
3169 bool const trusted(flags & SF_TRUSTED);
3170 app_.getOPs().processTransaction(
3173 RPC::SubmitSync::async,
3175 NetworkOPs::FailHard::no);
3179 JLOG(p_journal_.warn())
3180 <<
"Exception in " << __func__ <<
": " << ex.
what();
3181 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3182 charge(Resource::feeBadData);
3188 PeerImp::checkPropose(
3193 JLOG(p_journal_.trace())
3194 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
3200 JLOG(p_journal_.warn()) <<
"Proposal fails sig check";
3201 charge(Resource::feeInvalidSignature);
3208 relay = app_.getOPs().processTrustedProposal(peerPos);
3210 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3218 auto haveMessage = app_.overlay().relay(
3220 if (reduceRelayReady() && !haveMessage.empty())
3221 overlay_.updateSlotAndSquelch(
3224 std::move(haveMessage),
3225 protocol::mtPROPOSE_LEDGER);
3230 PeerImp::checkValidation(
3235 if (!val->isValid())
3237 JLOG(p_journal_.debug()) <<
"Validation forwarded by peer is invalid";
3238 charge(Resource::feeInvalidSignature);
3253 overlay_.relay(*packet, key, val->getSignerPublic());
3254 if (reduceRelayReady() && !haveMessage.empty())
3256 overlay_.updateSlotAndSquelch(
3258 val->getSignerPublic(),
3259 std::move(haveMessage),
3260 protocol::mtVALIDATION);
3266 JLOG(p_journal_.trace())
3267 <<
"Exception processing validation: " << ex.
what();
3268 charge(Resource::feeInvalidRequest);
3282 if (p->hasTxSet(rootHash) && p.get() != skip)
3284 auto score = p->getScore(true);
3285 if (!ret || (score > retScore))
3310 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3312 auto score = p->getScore(true);
3313 if (!ret || (score > retScore))
3325 PeerImp::sendLedgerBase(
3327 protocol::TMLedgerData& ledgerData)
3329 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3332 addRaw(ledger->info(), s);
3335 auto const& stateMap{ledger->stateMap()};
3336 if (stateMap.getHash() != beast::zero)
3341 stateMap.serializeRoot(
root);
3342 ledgerData.add_nodes()->set_nodedata(
3343 root.getDataPtr(),
root.getLength());
3345 if (ledger->info().txHash != beast::zero)
3347 auto const& txMap{ledger->txMap()};
3348 if (txMap.getHash() != beast::zero)
3352 txMap.serializeRoot(
root);
3353 ledgerData.add_nodes()->set_nodedata(
3354 root.getDataPtr(),
root.getLength());
3360 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3367 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3371 if (m->has_ledgerhash())
3374 uint256 const ledgerHash{m->ledgerhash()};
3375 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3378 if (m->has_ledgerseq())
3381 if (
auto shards = app_.getShardStore())
3383 if (m->ledgerseq() >= shards->earliestLedgerSeq())
3386 shards->fetchLedger(ledgerHash, m->ledgerseq());
3393 JLOG(p_journal_.trace())
3394 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3396 if (m->has_querytype() && !m->has_requestcookie())
3402 m->has_ledgerseq() ? m->ledgerseq() : 0,
3405 m->set_requestcookie(
id());
3406 peer->send(std::make_shared<Message>(
3407 *m, protocol::mtGET_LEDGER));
3408 JLOG(p_journal_.debug())
3409 <<
"getLedger: Request relayed to peer";
3413 JLOG(p_journal_.trace())
3414 <<
"getLedger: Failed to find peer to relay request";
3419 else if (m->has_ledgerseq())
3422 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3424 JLOG(p_journal_.debug())
3425 <<
"getLedger: Early ledger sequence request";
3429 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3432 JLOG(p_journal_.debug())
3433 <<
"getLedger: Don't have ledger with sequence "
3438 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3440 ledger = app_.getLedgerMaster().getClosedLedger();
3446 auto const ledgerSeq{ledger->info().seq};
3447 if (m->has_ledgerseq())
3449 if (ledgerSeq != m->ledgerseq())
3452 if (!m->has_requestcookie())
3453 charge(Resource::feeInvalidRequest);
3456 JLOG(p_journal_.warn())
3457 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3460 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3463 JLOG(p_journal_.debug())
3464 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3469 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3478 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3480 uint256 const txSetHash{m->ledgerhash()};
3482 app_.getInboundTransactions().getSet(txSetHash,
false)};
3485 if (m->has_querytype() && !m->has_requestcookie())
3490 m->set_requestcookie(
id());
3492 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3493 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3497 JLOG(p_journal_.debug())
3498 <<
"getTxSet: Failed to find relay peer";
3503 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3514 if (!m->has_requestcookie())
3515 charge(Resource::feeMediumBurdenPeer);
3519 SHAMap const* map{
nullptr};
3520 protocol::TMLedgerData ledgerData;
3521 bool fatLeaves{
true};
3522 auto const itype{m->itype()};
3524 if (itype == protocol::liTS_CANDIDATE)
3526 if (sharedMap = getTxSet(m); !sharedMap)
3528 map = sharedMap.
get();
3531 ledgerData.set_ledgerseq(0);
3532 ledgerData.set_ledgerhash(m->ledgerhash());
3533 ledgerData.set_type(protocol::liTS_CANDIDATE);
3534 if (m->has_requestcookie())
3535 ledgerData.set_requestcookie(m->requestcookie());
3542 if (send_queue_.size() >= Tuning::dropSendQueue)
3544 JLOG(p_journal_.debug())
3545 <<
"processLedgerRequest: Large send queue";
3548 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3550 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3554 if (ledger = getLedger(m); !ledger)
3558 auto const ledgerHash{ledger->info().hash};
3559 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3560 ledgerData.set_ledgerseq(ledger->info().seq);
3561 ledgerData.set_type(itype);
3562 if (m->has_requestcookie())
3563 ledgerData.set_requestcookie(m->requestcookie());
3567 case protocol::liBASE:
3568 sendLedgerBase(ledger, ledgerData);
3571 case protocol::liTX_NODE:
3572 map = &ledger->txMap();
3573 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3574 << to_string(map->getHash());
3577 case protocol::liAS_NODE:
3578 map = &ledger->stateMap();
3579 JLOG(p_journal_.trace())
3580 <<
"processLedgerRequest: Account state map hash "
3581 << to_string(map->getHash());
3586 JLOG(p_journal_.error())
3587 <<
"processLedgerRequest: Invalid ledger info type";
3594 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3599 if (m->nodeids_size() > 0)
3601 auto const queryDepth{
3602 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3606 for (
int i = 0; i < m->nodeids_size() &&
3607 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3613 data.reserve(Tuning::softMaxReplyNodes);
3617 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3619 JLOG(p_journal_.trace())
3620 <<
"processLedgerRequest: getNodeFat got "
3621 << data.size() <<
" nodes";
3623 for (
auto const& d : data)
3625 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3626 node->set_nodeid(d.first.getRawString());
3627 node->set_nodedata(d.second.data(), d.second.size());
3632 JLOG(p_journal_.warn())
3633 <<
"processLedgerRequest: getNodeFat returns false";
3641 case protocol::liBASE:
3643 info =
"Ledger base";
3646 case protocol::liTX_NODE:
3650 case protocol::liAS_NODE:
3654 case protocol::liTS_CANDIDATE:
3655 info =
"TS candidate";
3663 if (!m->has_ledgerhash())
3664 info +=
", no hash specified";
3666 JLOG(p_journal_.error())
3667 <<
"processLedgerRequest: getNodeFat with nodeId "
3668 << *shaMapNodeId <<
" and ledger info type " << info
3669 <<
" throws exception: " << e.
what();
3673 JLOG(p_journal_.info())
3674 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3675 <<
" nodes at depth " << queryDepth <<
", return "
3676 << ledgerData.nodes_size() <<
" nodes";
3679 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3683 PeerImp::getScore(
bool haveItem)
const
3687 static const int spRandomMax = 9999;
3691 static const int spHaveItem = 10000;
3696 static const int spLatency = 30;
3699 static const int spNoLatency = 8000;
3704 score += spHaveItem;
3713 score -= latency->count() * spLatency;
3715 score -= spNoLatency;
3721 PeerImp::isHighLatency()
const
3724 return latency_ >= peerHighLatency;
3728 PeerImp::reduceRelayReady()
3730 if (!reduceRelayReady_)
3732 reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3733 reduce_relay::WAIT_ON_BOOTUP;
3734 return vpReduceRelayEnabled_ && reduceRelayReady_;
3740 using namespace std::chrono_literals;
3743 totalBytes_ += bytes;
3744 accumBytes_ += bytes;
3745 auto const timeElapsed = clock_type::now() - intervalStart_;
3746 auto const timeElapsedInSecs =
3747 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3749 if (timeElapsedInSecs >= 1s)
3751 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3752 rollingAvg_.push_back(avgBytes);
3754 auto const totalBytes =
3756 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3758 intervalStart_ = clock_type::now();
3764 PeerImp::Metrics::average_bytes()
const
3767 return rollingAvgBytes_;
3771 PeerImp::Metrics::total_bytes()
const