Files
rippled/src/test/csf/Peer.h
2025-11-12 08:23:45 -05:00

988 lines
28 KiB
C++

#ifndef XRPL_TEST_CSF_PEER_H_INCLUDED
#define XRPL_TEST_CSF_PEER_H_INCLUDED
#include <test/csf/CollectorRef.h>
#include <test/csf/Scheduler.h>
#include <test/csf/TrustGraph.h>
#include <test/csf/Tx.h>
#include <test/csf/Validation.h>
#include <test/csf/events.h>
#include <test/csf/ledgers.h>
#include <xrpld/consensus/Consensus.h>
#include <xrpld/consensus/Validations.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/protocol/PublicKey.h>
#include <boost/container/flat_map.hpp>
#include <boost/container/flat_set.hpp>
#include <algorithm>
namespace ripple {
namespace test {
namespace csf {
namespace bc = boost::container;
/** A single peer in the simulation.
This is the main work-horse of the consensus simulation framework and is
where many other components are integrated. The peer
- Implements the Callbacks required by Consensus
- Manages trust & network connections with other peers
- Issues events back to the simulation based on its actions for analysis
by Collectors
- Exposes most internal state for forcibly simulating arbitrary scenarios
*/
struct Peer
{
/** Basic wrapper of a proposed position taken by a peer.
For real consensus, this would add additional data for serialization
and signing. For simulation, nothing extra is needed.
*/
class Position
{
public:
Position(Proposal const& p) : proposal_(p)
{
}
Proposal const&
proposal() const
{
return proposal_;
}
Json::Value
getJson() const
{
return proposal_.getJson();
}
std::string
render() const
{
return "";
}
private:
Proposal proposal_;
};
/** Simulated delays in internal peer processing.
*/
struct ProcessingDelays
{
//! Delay in consensus calling doAccept to accepting and issuing
//! validation
//! TODO: This should be a function of the number of transactions
std::chrono::milliseconds ledgerAccept{0};
//! Delay in processing validations from remote peers
std::chrono::milliseconds recvValidation{0};
// Return the receive delay for message type M, default is no delay
// Received delay is the time from receiving the message to actually
// handling it.
template <class M>
SimDuration
onReceive(M const&) const
{
return SimDuration{};
}
SimDuration
onReceive(Validation const&) const
{
return recvValidation;
}
};
class TestConsensusLogger
{
};
/** Generic Validations adaptor that simply ignores recently stale
* validations
*/
class ValAdaptor
{
Peer& p_;
public:
struct Mutex
{
void
lock()
{
}
void
unlock()
{
}
};
using Validation = csf::Validation;
using Ledger = csf::Ledger;
ValAdaptor(Peer& p) : p_{p}
{
}
NetClock::time_point
now() const
{
return p_.now();
}
std::optional<Ledger>
acquire(Ledger::ID const& lId)
{
if (Ledger const* ledger = p_.acquireLedger(lId))
return *ledger;
return std::nullopt;
}
};
//! Type definitions for generic consensus
using Ledger_t = Ledger;
using NodeID_t = PeerID;
using NodeKey_t = PeerKey;
using TxSet_t = TxSet;
using PeerPosition_t = Position;
using Result = ConsensusResult<Peer>;
using NodeKey = Validation::NodeKey;
//! Logging support that prefixes messages with the peer ID
beast::WrappedSink sink;
beast::Journal j;
//! Generic consensus
Consensus<Peer> consensus;
//! Our unique ID
PeerID id;
//! Current signing key
PeerKey key;
//! The oracle that manages unique ledgers
LedgerOracle& oracle;
//! Scheduler of events
Scheduler& scheduler;
//! Handle to network for sending messages
BasicNetwork<Peer*>& net;
//! Handle to Trust graph of network
TrustGraph<Peer*>& trustGraph;
//! openTxs that haven't been closed in a ledger yet
TxSetType openTxs;
//! The last ledger closed by this node
Ledger lastClosedLedger;
//! Ledgers this node has closed or loaded from the network
hash_map<Ledger::ID, Ledger> ledgers;
//! Validations from trusted nodes
Validations<ValAdaptor> validations;
//! The most recent ledger that has been fully validated by the network from
//! the perspective of this Peer
Ledger fullyValidatedLedger;
//-------------------------------------------------------------------------
// Store most network messages; these could be purged if memory use ever
// becomes problematic
//! Map from Ledger::ID to vector of Positions with that ledger
//! as the prior ledger
bc::flat_map<Ledger::ID, std::vector<Proposal>> peerPositions;
//! TxSet associated with a TxSet::ID
bc::flat_map<TxSet::ID, TxSet> txSets;
// Ledgers/TxSets we are acquiring and when that request times out
bc::flat_map<Ledger::ID, SimTime> acquiringLedgers;
bc::flat_map<TxSet::ID, SimTime> acquiringTxSets;
//! The number of ledgers this peer has completed
int completedLedgers = 0;
//! The number of ledgers this peer should complete before stopping to run
int targetLedgers = std::numeric_limits<int>::max();
//! Skew of time relative to the common scheduler clock
std::chrono::seconds clockSkew{0};
//! Simulated delays to use for internal processing
ProcessingDelays delays;
//! Whether to simulate running as validator or a tracking node
bool runAsValidator = true;
// TODO: Consider removing these two, they are only a convenience for tests
// Number of proposers in the prior round
std::size_t prevProposers = 0;
// Duration of prior round
std::chrono::milliseconds prevRoundTime;
// Quorum of validations needed for a ledger to be fully validated
// TODO: Use the logic in ValidatorList to set this dynamically
std::size_t quorum = 0;
hash_set<NodeKey_t> trustedKeys;
// Simulation parameters
ConsensusParms consensusParms;
//! The collectors to report events to
CollectorRefs& collectors;
/** Constructor
@param i Unique PeerID
@param s Simulation Scheduler
@param o Simulation Oracle
@param n Simulation network
@param tg Simulation trust graph
@param c Simulation collectors
@param jIn Simulation journal
*/
Peer(
PeerID i,
Scheduler& s,
LedgerOracle& o,
BasicNetwork<Peer*>& n,
TrustGraph<Peer*>& tg,
CollectorRefs& c,
beast::Journal jIn)
: sink(jIn, "Peer " + to_string(i) + ": ")
, j(sink)
, consensus(s.clock(), *this, j)
, id{i}
, key{id, 0}
, oracle{o}
, scheduler{s}
, net{n}
, trustGraph(tg)
, lastClosedLedger{Ledger::MakeGenesis{}}
, validations{ValidationParms{}, s.clock(), *this}
, fullyValidatedLedger{Ledger::MakeGenesis{}}
, collectors{c}
{
// All peers start from the default constructed genesis ledger
ledgers[lastClosedLedger.id()] = lastClosedLedger;
// nodes always trust themselves . . SHOULD THEY?
trustGraph.trust(this, this);
}
/** Schedule the provided callback in `when` duration, but if
`when` is 0, call immediately
*/
template <class T>
void
schedule(std::chrono::nanoseconds when, T&& what)
{
using namespace std::chrono_literals;
if (when == 0ns)
what();
else
scheduler.in(when, std::forward<T>(what));
}
// Issue a new event to the collectors
template <class E>
void
issue(E const& event)
{
// Use the scheduler time and not the peer's (skewed) local time
collectors.on(id, scheduler.now(), event);
}
//--------------------------------------------------------------------------
// Trust and Network members
// Methods for modifying and querying the network and trust graphs from
// the perspective of this Peer
//< Extend trust to a peer
void
trust(Peer& o)
{
trustGraph.trust(this, &o);
}
//< Revoke trust from a peer
void
untrust(Peer& o)
{
trustGraph.untrust(this, &o);
}
//< Check whether we trust a peer
bool
trusts(Peer& o)
{
return trustGraph.trusts(this, &o);
}
//< Check whether we trust a peer based on its ID
bool
trusts(PeerID const& oId)
{
for (auto const p : trustGraph.trustedPeers(this))
if (p->id == oId)
return true;
return false;
}
/** Create network connection
Creates a new outbound connection to another Peer if none exists
@param o The peer with the inbound connection
@param dur The fixed delay for messages between the two Peers
@return Whether the connection was created.
*/
bool
connect(Peer& o, SimDuration dur)
{
return net.connect(this, &o, dur);
}
/** Remove a network connection
Removes a connection between peers if one exists
@param o The peer we disconnect from
@return Whether the connection was removed
*/
bool
disconnect(Peer& o)
{
return net.disconnect(this, &o);
}
//--------------------------------------------------------------------------
// Generic Consensus members
// Attempt to acquire the Ledger associated with the given ID
Ledger const*
acquireLedger(Ledger::ID const& ledgerID)
{
if (auto it = ledgers.find(ledgerID); it != ledgers.end())
{
return &(it->second);
}
// No peers
if (net.links(this).empty())
return nullptr;
// Don't retry if we already are acquiring it and haven't timed out
auto aIt = acquiringLedgers.find(ledgerID);
if (aIt != acquiringLedgers.end())
{
if (scheduler.now() < aIt->second)
return nullptr;
}
using namespace std::chrono_literals;
SimDuration minDuration{10s};
for (auto const link : net.links(this))
{
minDuration = std::min(minDuration, link.data.delay);
// Send a message to neighbors to find the ledger
net.send(
this, link.target, [to = link.target, from = this, ledgerID]() {
if (auto it = to->ledgers.find(ledgerID);
it != to->ledgers.end())
{
// if the ledger is found, send it back to the original
// requesting peer where it is added to the available
// ledgers
to->net.send(to, from, [from, ledger = it->second]() {
from->acquiringLedgers.erase(ledger.id());
from->ledgers.emplace(ledger.id(), ledger);
});
}
});
}
acquiringLedgers[ledgerID] = scheduler.now() + 2 * minDuration;
return nullptr;
}
// Attempt to acquire the TxSet associated with the given ID
TxSet const*
acquireTxSet(TxSet::ID const& setId)
{
if (auto it = txSets.find(setId); it != txSets.end())
{
return &(it->second);
}
// No peers
if (net.links(this).empty())
return nullptr;
// Don't retry if we already are acquiring it and haven't timed out
auto aIt = acquiringTxSets.find(setId);
if (aIt != acquiringTxSets.end())
{
if (scheduler.now() < aIt->second)
return nullptr;
}
using namespace std::chrono_literals;
SimDuration minDuration{10s};
for (auto const link : net.links(this))
{
minDuration = std::min(minDuration, link.data.delay);
// Send a message to neighbors to find the tx set
net.send(
this, link.target, [to = link.target, from = this, setId]() {
if (auto it = to->txSets.find(setId);
it != to->txSets.end())
{
// If the txSet is found, send it back to the original
// requesting peer, where it is handled like a TxSet
// that was broadcast over the network
to->net.send(to, from, [from, txSet = it->second]() {
from->acquiringTxSets.erase(txSet.id());
from->handle(txSet);
});
}
});
}
acquiringTxSets[setId] = scheduler.now() + 2 * minDuration;
return nullptr;
}
bool
hasOpenTransactions() const
{
return !openTxs.empty();
}
std::size_t
proposersValidated(Ledger::ID const& prevLedger)
{
return validations.numTrustedForLedger(prevLedger);
}
std::size_t
proposersFinished(Ledger const& prevLedger, Ledger::ID const& prevLedgerID)
{
return validations.getNodesAfter(prevLedger, prevLedgerID);
}
Result
onClose(
Ledger const& prevLedger,
NetClock::time_point closeTime,
ConsensusMode mode)
{
issue(CloseLedger{prevLedger, openTxs});
return Result(
TxSet{openTxs},
Proposal(
prevLedger.id(),
Proposal::seqJoin,
TxSet::calcID(openTxs),
closeTime,
now(),
id));
}
void
onForceAccept(
Result const& result,
Ledger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson)
{
onAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson),
validating());
}
void
onAccept(
Result const& result,
Ledger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson,
bool const validating)
{
schedule(delays.ledgerAccept, [=, this]() {
bool const proposing = mode == ConsensusMode::proposing;
bool const consensusFail = result.state == ConsensusState::MovedOn;
TxSet const acceptedTxs = injectTxs(prevLedger, result.txns);
Ledger const newLedger = oracle.accept(
prevLedger,
acceptedTxs.txs(),
closeResolution,
result.position.closeTime());
ledgers[newLedger.id()] = newLedger;
issue(AcceptLedger{newLedger, lastClosedLedger});
prevProposers = result.proposers;
prevRoundTime = result.roundTime.read();
lastClosedLedger = newLedger;
auto const it = std::remove_if(
openTxs.begin(), openTxs.end(), [&](Tx const& tx) {
return acceptedTxs.exists(tx.id());
});
openTxs.erase(it, openTxs.end());
// Only send validation if the new ledger is compatible with our
// fully validated ledger
bool const isCompatible =
newLedger.isAncestor(fullyValidatedLedger);
// Can only send one validated ledger per seq
if (runAsValidator && isCompatible && !consensusFail &&
validations.canValidateSeq(newLedger.seq()))
{
bool isFull = proposing;
Validation v{
newLedger.id(),
newLedger.seq(),
now(),
now(),
key,
id,
isFull};
// share the new validation; it is trusted by the receiver
share(v);
// we trust ourselves
addTrustedValidation(v);
}
checkFullyValidated(newLedger);
// kick off the next round...
// in the actual implementation, this passes back through
// network ops
++completedLedgers;
// startRound sets the LCL state, so we need to call it once after
// the last requested round completes
if (completedLedgers <= targetLedgers)
{
startRound();
}
});
}
// Earliest allowed sequence number when checking for ledgers with more
// validations than our current ledger
Ledger::Seq
earliestAllowedSeq() const
{
return fullyValidatedLedger.seq();
}
Ledger::ID
getPrevLedger(
Ledger::ID const& ledgerID,
Ledger const& ledger,
ConsensusMode mode)
{
// only do if we are past the genesis ledger
if (ledger.seq() == Ledger::Seq{0})
return ledgerID;
Ledger::ID const netLgr =
validations.getPreferred(ledger, earliestAllowedSeq());
if (netLgr != ledgerID)
{
JLOG(j.trace()) << Json::Compact(validations.getJsonTrie());
issue(WrongPrevLedger{ledgerID, netLgr});
}
return netLgr;
}
void
propose(Proposal const& pos)
{
share(pos);
}
ConsensusParms const&
parms() const
{
return consensusParms;
}
// Not interested in tracking consensus mode changes for now
void
onModeChange(ConsensusMode, ConsensusMode)
{
}
// Share a message by broadcasting to all connected peers
template <class M>
void
share(M const& m)
{
issue(Share<M>{m});
send(BroadcastMesg<M>{m, router.nextSeq++, this->id}, this->id);
}
// Unwrap the Position and share the raw proposal
void
share(Position const& p)
{
share(p.proposal());
}
//--------------------------------------------------------------------------
// Validation members
/** Add a trusted validation and return true if it is worth forwarding */
bool
addTrustedValidation(Validation v)
{
v.setTrusted();
v.setSeen(now());
ValStatus const res = validations.add(v.nodeID(), v);
if (res == ValStatus::stale)
return false;
// Acquire will try to get from network if not already local
if (Ledger const* lgr = acquireLedger(v.ledgerID()))
checkFullyValidated(*lgr);
return true;
}
/** Check if a new ledger can be deemed fully validated */
void
checkFullyValidated(Ledger const& ledger)
{
// Only consider ledgers newer than our last fully validated ledger
if (ledger.seq() <= fullyValidatedLedger.seq())
return;
std::size_t const count = validations.numTrustedForLedger(ledger.id());
std::size_t const numTrustedPeers = trustGraph.graph().outDegree(this);
quorum = static_cast<std::size_t>(std::ceil(numTrustedPeers * 0.8));
if (count >= quorum && ledger.isAncestor(fullyValidatedLedger))
{
issue(FullyValidateLedger{ledger, fullyValidatedLedger});
fullyValidatedLedger = ledger;
}
}
//-------------------------------------------------------------------------
// Peer messaging members
// Basic Sequence number router
// A message that will be flooded across the network is tagged with a
// sequence number by the origin node in a BroadcastMesg. Receivers will
// ignore a message as stale if they've already processed a newer sequence
// number, or will process and potentially relay the message along.
//
// The various bool handle(MessageType) members do the actual processing
// and should return true if the message should continue to be sent to
// peers.
//
// WARN: This assumes messages are received and processed in the order they
// are sent, so that a peer receives a message with seq 1 from node 0
// before seq 2 from node 0, etc.
// TODO: Break this out into a class and identify type interface to allow
// alternate routing strategies
template <class M>
struct BroadcastMesg
{
M mesg;
std::size_t seq;
PeerID origin;
};
struct Router
{
std::size_t nextSeq = 1;
bc::flat_map<PeerID, std::size_t> lastObservedSeq;
};
Router router;
// Send a broadcast message to all peers
template <class M>
void
send(BroadcastMesg<M> const& bm, PeerID from)
{
for (auto const link : net.links(this))
{
if (link.target->id != from && link.target->id != bm.origin)
{
// cheat and don't bother sending if we know it has already been
// used on the other end
if (link.target->router.lastObservedSeq[bm.origin] < bm.seq)
{
issue(Relay<M>{link.target->id, bm.mesg});
net.send(
this,
link.target,
[to = link.target, bm, id = this->id] {
to->receive(bm, id);
});
}
}
}
}
// Receive a shared message, process it and consider continuing to relay it
template <class M>
void
receive(BroadcastMesg<M> const& bm, PeerID from)
{
issue(Receive<M>{from, bm.mesg});
if (router.lastObservedSeq[bm.origin] < bm.seq)
{
router.lastObservedSeq[bm.origin] = bm.seq;
schedule(delays.onReceive(bm.mesg), [this, bm, from] {
if (handle(bm.mesg))
send(bm, from);
});
}
}
// Type specific receive handlers, return true if the message should
// continue to be broadcast to peers
bool
handle(Proposal const& p)
{
// Only relay untrusted proposals on the same ledger
if (!trusts(p.nodeID()))
return p.prevLedger() == lastClosedLedger.id();
// TODO: This always suppresses relay of peer positions already seen
// Should it allow forwarding if for a recent ledger ?
auto& dest = peerPositions[p.prevLedger()];
if (std::find(dest.begin(), dest.end(), p) != dest.end())
return false;
dest.push_back(p);
// Rely on consensus to decide whether to relay
return consensus.peerProposal(now(), Position{p});
}
bool
handle(TxSet const& txs)
{
bool const inserted =
txSets.insert(std::make_pair(txs.id(), txs)).second;
if (inserted)
consensus.gotTxSet(now(), txs);
// relay only if new
return inserted;
}
bool
handle(Tx const& tx)
{
// Ignore and suppress relay of transactions already in last ledger
TxSetType const& lastClosedTxs = lastClosedLedger.txs();
if (lastClosedTxs.find(tx) != lastClosedTxs.end())
return false;
// only relay if it was new to our open ledger
return openTxs.insert(tx).second;
}
bool
handle(Validation const& v)
{
// TODO: This is not relaying untrusted validations
if (!trusts(v.nodeID()))
return false;
// Will only relay if current
return addTrustedValidation(v);
}
bool
haveValidated() const
{
return fullyValidatedLedger.seq() > Ledger::Seq{0};
}
Ledger::Seq
getValidLedgerIndex() const
{
return earliestAllowedSeq();
}
std::pair<std::size_t, hash_set<NodeKey_t>>
getQuorumKeys()
{
hash_set<NodeKey_t> keys;
for (auto const p : trustGraph.trustedPeers(this))
keys.insert(p->key);
return {quorum, keys};
}
std::size_t
laggards(Ledger::Seq const seq, hash_set<NodeKey_t>& trusted)
{
return validations.laggards(seq, trusted);
}
bool
validator() const
{
return runAsValidator;
}
void
updateOperatingMode(std::size_t const positions) const
{
}
bool
validating() const
{
// does not matter
return false;
}
//--------------------------------------------------------------------------
// A locally submitted transaction
void
submit(Tx const& tx)
{
issue(SubmitTx{tx});
if (handle(tx))
share(tx);
}
//--------------------------------------------------------------------------
// Simulation "driver" members
//! Heartbeat timer call
void
timerEntry()
{
consensus.timerEntry(now());
// only reschedule if not completed
if (completedLedgers < targetLedgers)
scheduler.in(parms().ledgerGRANULARITY, [this]() { timerEntry(); });
}
// Called to begin the next round
void
startRound()
{
// Between rounds, we take the majority ledger
// In the future, consider taking peer dominant ledger if no validations
// yet
Ledger::ID bestLCL =
validations.getPreferred(lastClosedLedger, earliestAllowedSeq());
if (bestLCL == Ledger::ID{0})
bestLCL = lastClosedLedger.id();
issue(StartRound{bestLCL, lastClosedLedger});
// Not yet modeling dynamic UNL.
hash_set<PeerID> nowUntrusted;
consensus.startRound(
now(), bestLCL, lastClosedLedger, nowUntrusted, runAsValidator, {});
}
// Start the consensus process assuming it is not yet running
// This runs forever unless targetLedgers is specified
void
start()
{
// TODO: Expire validations less frequently?
validations.expire(j);
scheduler.in(parms().ledgerGRANULARITY, [&]() { timerEntry(); });
startRound();
}
NetClock::time_point
now() const
{
// We don't care about the actual epochs, but do want the
// generated NetClock time to be well past its epoch to ensure
// any subtractions of two NetClock::time_point in the consensus
// code are positive. (e.g. proposeFRESHNESS)
using namespace std::chrono;
using namespace std::chrono_literals;
return NetClock::time_point(duration_cast<NetClock::duration>(
scheduler.now().time_since_epoch() + 86400s + clockSkew));
}
Ledger::ID
prevLedgerID() const
{
return consensus.prevLedgerID();
}
//-------------------------------------------------------------------------
// Injects a specific transaction when generating the ledger following
// the provided sequence. This allows simulating a byzantine failure in
// which a node generates the wrong ledger, even when consensus worked
// properly.
// TODO: Make this more robust
hash_map<Ledger::Seq, Tx> txInjections;
/** Inject non-consensus Tx
Injects a transactionsinto the ledger following prevLedger's sequence
number.
@param prevLedger The ledger we are building the new ledger on top of
@param src The Consensus TxSet
@return Consensus TxSet with inject transactions added if prevLedger.seq
matches a previously registered Tx.
*/
TxSet
injectTxs(Ledger prevLedger, TxSet const& src)
{
auto const it = txInjections.find(prevLedger.seq());
if (it == txInjections.end())
return src;
TxSetType res{src.txs()};
res.insert(it->second);
return TxSet{res};
}
};
} // namespace csf
} // namespace test
} // namespace ripple
#endif