Compare commits

..

3 Commits

Author SHA1 Message Date
RichardAH
8cec97d57c Merge branch 'dev' into fix-manifest-cache-invalidation 2025-12-17 09:45:51 +10:00
Niq Dudfield
f818174b8d Merge branch 'dev' into fix-manifest-cache-invalidation 2025-12-16 14:33:00 +07:00
Nicholas Dudfield
684c2ac108 fix: increment manifest sequence for client code cache invalidation
OverlayImpl::getManifestsMessage() caches manifest messages and only rebuilds
them when ManifestCache::sequence() changes. When accepting a new manifest,
the function returned early without incrementing seq_, causing the cache to
never invalidate. This meant peers were exchanging stale lists.

Now seq_++ is called for both new manifests and updates, ensuring the overlay
layer detects changes and sends complete validator lists to connecting peers.

Closes #629
2025-11-21 10:22:25 +07:00
6 changed files with 27 additions and 232 deletions

View File

@@ -479,23 +479,10 @@ private:
to reach consensus. Update our position only on the timer, and in this
phase.
If we have consensus, move to the shuffle phase.
*/
void
phaseEstablish();
/** Handle shuffle phase.
In the shuffle phase, UNLReport nodes exchange entropy to build
a consensus entropy that is then used as an RNG source for Hooks.
The entropy is injected as a ttSHUFFLE psuedo into the final ledger
If we have consensus, move to the accepted phase.
*/
void
phaseShuffle();
phaseEstablish();
/** Evaluate whether pausing increases likelihood of validation.
*
@@ -601,10 +588,6 @@ private:
// Peer proposed positions for the current round
hash_map<NodeID_t, PeerPosition_t> currPeerPositions_;
// our and our peers' entropy as per TMShuffle, used in phaseShuffle
std::optional<uint256> ourEntropy_;
hash_map<NodeID_t, std::pair<uint256, uint256>> currPeerEntropy_;
// Recently received peer positions, available when transitioning between
// ledgers or rounds
hash_map<NodeID_t, std::deque<PeerPosition_t>> recentPeerPositions_;
@@ -849,10 +832,6 @@ Consensus<Adaptor>::timerEntry(NetClock::time_point const& now)
{
phaseEstablish();
}
else if (phase_ == ConsensusPhase::shuffle)
{
phaseShuffle();
}
}
template <class Adaptor>
@@ -1312,12 +1291,8 @@ Consensus<Adaptor>::phaseEstablish()
adaptor_.updateOperatingMode(currPeerPositions_.size());
prevProposers_ = currPeerPositions_.size();
prevRoundTime_ = result_->roundTime.read();
// RHTODO: guard with amendment
phase_ = ConsensusPhase::shuffle;
JLOG(j_.debug()) << "transitioned to ConsensusPhase::shuffle";
/*
phase_ = ConsensusPhase::accepted;
JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
adaptor_.onAccept(
*result_,
previousLedger_,
@@ -1325,60 +1300,6 @@ Consensus<Adaptor>::phaseEstablish()
rawCloseTimes_,
mode_.get(),
getJson(true));
*/
}
template <class Adaptor>
void
Consensus<Adaptor>::phaseShuffle()
{
// can only establish consensus if we already took a stance
assert(result_);
using namespace std::chrono;
ConsensusParms const& parms = adaptor_.parms();
result_->roundTime.tick(clock_.now());
result_->proposers = currPeerPositions_.size();
convergePercent_ = result_->roundTime.read() * 100 /
std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
// Give everyone a chance to take an initial position
if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
return;
updateOurPositions();
// Nothing to do if too many laggards or we don't have consensus.
if (shouldPause() || !haveConsensus())
return;
if (!haveCloseTimeConsensus_)
{
JLOG(j_.info()) << "We have TX consensus but not CT consensus";
return;
}
JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size()
<< " participants)";
adaptor_.updateOperatingMode(currPeerPositions_.size());
prevProposers_ = currPeerPositions_.size();
prevRoundTime_ = result_->roundTime.read();
// RHTODO: guard with amendment
phase_ = ConsensusPhase::shuffle;
JLOG(j_.debug()) << "transitioned to ConsensusPhase::shuffle";
/*
adaptor_.onAccept(
*result_,
previousLedger_,
closeResolution_,
rawCloseTimes_,
mode_.get(),
getJson(true));
*/
}
template <class Adaptor>

View File

@@ -87,15 +87,15 @@ to_string(ConsensusMode m)
/** Phases of consensus for a single ledger round.
@code
"close" "shuffle" "accept"
open ------- > establish -------> shuffle ---------> accepted
^ | |
|---------------| |
^ "startRound" |
|-----------------------------------------------------|
"close" "accept"
open ------- > establish ---------> accepted
^ | |
|---------------| |
^ "startRound" |
|------------------------------------|
@endcode
The typical transition goes from open to establish to shuffle to accepted and
The typical transition goes from open to establish to accepted and
then a call to startRound begins the process anew. However, if a wrong prior
ledger is detected and recovered during the establish or accept phase,
consensus will internally go back to open (see Consensus::handleWrongLedger).
@@ -107,9 +107,6 @@ enum class ConsensusPhase {
//! Establishing consensus by exchanging proposals with our peers
establish,
//! Negotitate featureRNG entropy
shuffle,
//! We have accepted a new last closed ledger and are waiting on a call
//! to startRound to begin the next consensus round. No changes
//! to consensus phase occur while in this phase.
@@ -125,8 +122,6 @@ to_string(ConsensusPhase p)
return "open";
case ConsensusPhase::establish:
return "establish";
case ConsensusPhase::shuffle:
return "shuffle";
case ConsensusPhase::accepted:
return "accepted";
default:

View File

@@ -1094,7 +1094,6 @@ trustTransferLockedBalance(
}
return tesSUCCESS;
}
} // namespace ripple
#endif

View File

@@ -484,61 +484,44 @@ OverlayImpl::start()
m_peerFinder->setConfig(config);
m_peerFinder->start();
auto addIps = [this](std::vector<std::string> ips, bool fixed) {
auto addIps = [&](std::vector<std::string> bootstrapIps) -> void {
beast::Journal const& j = app_.journal("Overlay");
for (auto& ip : ips)
for (auto& ip : bootstrapIps)
{
std::size_t pos = ip.find('#');
if (pos != std::string::npos)
ip.erase(pos);
JLOG(j.trace())
<< "Found " << (fixed ? "fixed" : "bootstrap") << " IP: " << ip;
JLOG(j.trace()) << "Found boostrap IP: " << ip;
}
m_resolver.resolve(
ips,
[this, fixed](
std::string const& name,
bootstrapIps,
[&](std::string const& name,
std::vector<beast::IP::Endpoint> const& addresses) {
std::vector<std::string> ips;
ips.reserve(addresses.size());
beast::Journal const& j = app_.journal("Overlay");
std::string const base("config: ");
std::vector<beast::IP::Endpoint> eps;
eps.reserve(addresses.size());
for (auto const& addr : addresses)
{
auto ep = addr.port() == 0 ? addr.at_port(DEFAULT_PEER_PORT)
: addr;
JLOG(j.trace())
<< "Parsed " << (fixed ? "fixed" : "bootstrap")
<< " IP: " << ep;
eps.push_back(ep);
std::string addrStr = addr.port() == 0
? to_string(addr.at_port(DEFAULT_PEER_PORT))
: to_string(addr);
JLOG(j.trace()) << "Parsed boostrap IP: " << addrStr;
ips.push_back(addrStr);
}
if (eps.empty())
return;
if (fixed)
{
m_peerFinder->addFixedPeer(base + name, eps);
}
else
{
std::vector<std::string> strs;
strs.reserve(eps.size());
for (auto const& ep : eps)
strs.push_back(to_string(ep));
m_peerFinder->addFallbackStrings(base + name, strs);
}
std::string const base("config: ");
if (!ips.empty())
m_peerFinder->addFallbackStrings(base + name, ips);
});
};
if (!app_.config().IPS.empty())
addIps(app_.config().IPS, false);
addIps(app_.config().IPS);
if (!app_.config().IPS_FIXED.empty())
addIps(app_.config().IPS_FIXED, true);
addIps(app_.config().IPS_FIXED);
auto const timer = std::make_shared<Timer>(*this);
std::lock_guard lock(mutex_);

View File

@@ -1918,100 +1918,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMShuffle> const& m)
{
protocol::TMShuffle& shuf = *m;
auto const sig = makeSlice(shf.signature());
// Preliminary check for the validity of the signature: A DER encoded
// signature can't be longer than 72 bytes.
if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
(publicKeyType(makeSlice(shf.nodepubkey())) != KeyType::secp256k1))
{
JLOG(p_journal_.warn()) << "Shuffle: malformed";
fee_ = Resource::feeInvalidSignature;
return;
}
if (!stringIsUint256Sized(shf.nodeentropy()) ||
!stringIsUint256Sized(shf.consensusentropy()) ||
!stringIsUint256Sized(shf.previousledger()))
{
JLOG(p_journal_.warn()) << "Shuffle: malformed";
fee_ = Resource::feeInvalidRequest;
return;
}
PublicKey const publicKey{makeSlice(shf.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
if (!isTrusted)
return;
uint256 const prevLedger{shf.previousledger()};
uint32_t const shuffleSeq{shf.shuffleseq()};
uint256 const nodeEntropy{shf.nodeentropy()};
uint256 const consensusEntropy{shf.consensusentropy()};
uint256 const suppression = sha512Half(std::string("TMShuffle", sig));
if (auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
if (reduceRelayReady() && relayed &&
(stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtSHUFFLE);
JLOG(p_journal_.trace()) << "Shuffle: duplicate";
return;
}
if (!isTrusted)
{
if (tracking_.load() == Tracking::diverged)
{
JLOG(p_journal_.debug())
<< "Proposal: Dropping untrusted (peer divergence)";
return;
}
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
{
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
return;
}
}
JLOG(p_journal_.trace())
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
auto proposal = RCLCxPeerPos(
publicKey,
sig,
suppression,
RCLCxPeerPos::Proposal{
prevLedger,
set.proposeseq(),
proposeHash,
closeTime,
app_.timeKeeper().closeTime(),
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"recvPropose->checkPropose",
[weak, isTrusted, m, proposal]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
});
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
{

View File

@@ -450,12 +450,3 @@ message TMHaveTransactions
repeated bytes hashes = 1;
}
message TMShuffle
{
required bytes nodeEntropy = 1;
required bytes consensusEntropy = 2;
required uint32 shuffleSeq = 3;
required bytes nodePubKey = 4;
required bytes previousledger = 5;
required bytes signature = 6; // signature of above fields
}