//------------------------------------------------------------------------------ /* This file is part of rippled: https://github.com/ripple/rippled Copyright (c) 2012-2017 Ripple Labs Inc Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ //============================================================================== #ifndef RIPPLE_TEST_CSF_PEER_H_INCLUDED #define RIPPLE_TEST_CSF_PEER_H_INCLUDED #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace ripple { namespace test { namespace csf { namespace bc = boost::container; /** A single peer in the simulation. This is the main work-horse of the consensus simulation framework and is where many other components are integrated. The peer - Implements the Callbacks required by Consensus - Manages trust & network connections with other peers - Issues events back to the simulation based on its actions for analysis by Collectors - Exposes most internal state for forcibly simulating arbitrary scenarios */ struct Peer { /** Basic wrapper of a proposed position taken by a peer. For real consensus, this would add additional data for serialization and signing. For simulation, nothing extra is needed. */ class Position { public: Position(Proposal const& p) : proposal_(p) { } Proposal const& proposal() const { return proposal_; } Json::Value getJson() const { return proposal_.getJson(); } private: Proposal proposal_; }; /** Simulated delays in internal peer processing. */ struct ProcessingDelays { //! Delay in consensus calling doAccept to accepting and issuing //! validation //! TODO: This should be a function of the number of transactions std::chrono::milliseconds ledgerAccept{0}; //! Delay in processing validations from remote peers std::chrono::milliseconds recvValidation{0}; // Return the receive delay for message type M, default is no delay // Received delay is the time from receiving the message to actually // handling it. template SimDuration onReceive(M const&) const { return SimDuration{}; } SimDuration onReceive(Validation const&) const { return recvValidation; } }; /** Generic Validations adaptor that simply ignores recently stale * validations */ class ValAdaptor { Peer& p_; public: struct Mutex { void lock() { } void unlock() { } }; using Validation = csf::Validation; using Ledger = csf::Ledger; ValAdaptor(Peer& p) : p_{p} { } NetClock::time_point now() const { return p_.now(); } std::optional acquire(Ledger::ID const& lId) { if (Ledger const* ledger = p_.acquireLedger(lId)) return *ledger; return std::nullopt; } }; //! Type definitions for generic consensus using Ledger_t = Ledger; using NodeID_t = PeerID; using NodeKey_t = PeerKey; using TxSet_t = TxSet; using PeerPosition_t = Position; using Result = ConsensusResult; using NodeKey = Validation::NodeKey; //! Logging support that prefixes messages with the peer ID beast::WrappedSink sink; beast::Journal j; //! Generic consensus Consensus consensus; //! Our unique ID PeerID id; //! Current signing key PeerKey key; //! The oracle that manages unique ledgers LedgerOracle& oracle; //! Scheduler of events Scheduler& scheduler; //! Handle to network for sending messages BasicNetwork& net; //! Handle to Trust graph of network TrustGraph& trustGraph; //! openTxs that haven't been closed in a ledger yet TxSetType openTxs; //! The last ledger closed by this node Ledger lastClosedLedger; //! Ledgers this node has closed or loaded from the network hash_map ledgers; //! Validations from trusted nodes Validations validations; //! The most recent ledger that has been fully validated by the network from //! the perspective of this Peer Ledger fullyValidatedLedger; //------------------------------------------------------------------------- // Store most network messages; these could be purged if memory use ever // becomes problematic //! Map from Ledger::ID to vector of Positions with that ledger //! as the prior ledger bc::flat_map> peerPositions; //! TxSet associated with a TxSet::ID bc::flat_map txSets; // Ledgers/TxSets we are acquiring and when that request times out bc::flat_map acquiringLedgers; bc::flat_map acquiringTxSets; //! The number of ledgers this peer has completed int completedLedgers = 0; //! The number of ledgers this peer should complete before stopping to run int targetLedgers = std::numeric_limits::max(); //! Skew of time relative to the common scheduler clock std::chrono::seconds clockSkew{0}; //! Simulated delays to use for internal processing ProcessingDelays delays; //! Whether to simulate running as validator or a tracking node bool runAsValidator = true; // TODO: Consider removing these two, they are only a convenience for tests // Number of proposers in the prior round std::size_t prevProposers = 0; // Duration of prior round std::chrono::milliseconds prevRoundTime; // Quorum of validations needed for a ledger to be fully validated // TODO: Use the logic in ValidatorList to set this dynamically std::size_t quorum = 0; hash_set trustedKeys; // Simulation parameters ConsensusParms consensusParms; //! The collectors to report events to CollectorRefs& collectors; /** Constructor @param i Unique PeerID @param s Simulation Scheduler @param o Simulation Oracle @param n Simulation network @param tg Simulation trust graph @param c Simulation collectors @param jIn Simulation journal */ Peer( PeerID i, Scheduler& s, LedgerOracle& o, BasicNetwork& n, TrustGraph& tg, CollectorRefs& c, beast::Journal jIn) : sink(jIn, "Peer " + to_string(i) + ": ") , j(sink) , consensus(s.clock(), *this, j) , id{i} , key{id, 0} , oracle{o} , scheduler{s} , net{n} , trustGraph(tg) , lastClosedLedger{Ledger::MakeGenesis{}} , validations{ValidationParms{}, s.clock(), *this} , fullyValidatedLedger{Ledger::MakeGenesis{}} , collectors{c} { // All peers start from the default constructed genesis ledger ledgers[lastClosedLedger.id()] = lastClosedLedger; // nodes always trust themselves . . SHOULD THEY? trustGraph.trust(this, this); } /** Schedule the provided callback in `when` duration, but if `when` is 0, call immediately */ template void schedule(std::chrono::nanoseconds when, T&& what) { using namespace std::chrono_literals; if (when == 0ns) what(); else scheduler.in(when, std::forward(what)); } // Issue a new event to the collectors template void issue(E const& event) { // Use the scheduler time and not the peer's (skewed) local time collectors.on(id, scheduler.now(), event); } //-------------------------------------------------------------------------- // Trust and Network members // Methods for modifying and querying the network and trust graphs from // the perspective of this Peer //< Extend trust to a peer void trust(Peer& o) { trustGraph.trust(this, &o); } //< Revoke trust from a peer void untrust(Peer& o) { trustGraph.untrust(this, &o); } //< Check whether we trust a peer bool trusts(Peer& o) { return trustGraph.trusts(this, &o); } //< Check whether we trust a peer based on its ID bool trusts(PeerID const& oId) { for (auto const p : trustGraph.trustedPeers(this)) if (p->id == oId) return true; return false; } /** Create network connection Creates a new outbound connection to another Peer if none exists @param o The peer with the inbound connection @param dur The fixed delay for messages between the two Peers @return Whether the connection was created. */ bool connect(Peer& o, SimDuration dur) { return net.connect(this, &o, dur); } /** Remove a network connection Removes a connection between peers if one exists @param o The peer we disconnect from @return Whether the connection was removed */ bool disconnect(Peer& o) { return net.disconnect(this, &o); } //-------------------------------------------------------------------------- // Generic Consensus members // Attempt to acquire the Ledger associated with the given ID Ledger const* acquireLedger(Ledger::ID const& ledgerID) { if (auto it = ledgers.find(ledgerID); it != ledgers.end()) { return &(it->second); } // No peers if (net.links(this).empty()) return nullptr; // Don't retry if we already are acquiring it and haven't timed out auto aIt = acquiringLedgers.find(ledgerID); if (aIt != acquiringLedgers.end()) { if (scheduler.now() < aIt->second) return nullptr; } using namespace std::chrono_literals; SimDuration minDuration{10s}; for (auto const link : net.links(this)) { minDuration = std::min(minDuration, link.data.delay); // Send a messsage to neighbors to find the ledger net.send( this, link.target, [to = link.target, from = this, ledgerID]() { if (auto it = to->ledgers.find(ledgerID); it != to->ledgers.end()) { // if the ledger is found, send it back to the original // requesting peer where it is added to the available // ledgers to->net.send(to, from, [from, ledger = it->second]() { from->acquiringLedgers.erase(ledger.id()); from->ledgers.emplace(ledger.id(), ledger); }); } }); } acquiringLedgers[ledgerID] = scheduler.now() + 2 * minDuration; return nullptr; } // Attempt to acquire the TxSet associated with the given ID TxSet const* acquireTxSet(TxSet::ID const& setId) { if (auto it = txSets.find(setId); it != txSets.end()) { return &(it->second); } // No peers if (net.links(this).empty()) return nullptr; // Don't retry if we already are acquiring it and haven't timed out auto aIt = acquiringTxSets.find(setId); if (aIt != acquiringTxSets.end()) { if (scheduler.now() < aIt->second) return nullptr; } using namespace std::chrono_literals; SimDuration minDuration{10s}; for (auto const link : net.links(this)) { minDuration = std::min(minDuration, link.data.delay); // Send a message to neighbors to find the tx set net.send( this, link.target, [to = link.target, from = this, setId]() { if (auto it = to->txSets.find(setId); it != to->txSets.end()) { // If the txSet is found, send it back to the original // requesting peer, where it is handled like a TxSet // that was broadcast over the network to->net.send(to, from, [from, txSet = it->second]() { from->acquiringTxSets.erase(txSet.id()); from->handle(txSet); }); } }); } acquiringTxSets[setId] = scheduler.now() + 2 * minDuration; return nullptr; } bool hasOpenTransactions() const { return !openTxs.empty(); } std::size_t proposersValidated(Ledger::ID const& prevLedger) { return validations.numTrustedForLedger(prevLedger); } std::size_t proposersFinished(Ledger const& prevLedger, Ledger::ID const& prevLedgerID) { return validations.getNodesAfter(prevLedger, prevLedgerID); } Result onClose( Ledger const& prevLedger, NetClock::time_point closeTime, ConsensusMode mode) { issue(CloseLedger{prevLedger, openTxs}); return Result( TxSet{openTxs}, Proposal( prevLedger.id(), Proposal::seqJoin, TxSet::calcID(openTxs), closeTime, now(), id)); } void onForceAccept( Result const& result, Ledger const& prevLedger, NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, Json::Value&& consensusJson) { onAccept( result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(consensusJson)); } void onAccept( Result const& result, Ledger const& prevLedger, NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, Json::Value&& consensusJson) { schedule(delays.ledgerAccept, [=]() { const bool proposing = mode == ConsensusMode::proposing; const bool consensusFail = result.state == ConsensusState::MovedOn; TxSet const acceptedTxs = injectTxs(prevLedger, result.txns); Ledger const newLedger = oracle.accept( prevLedger, acceptedTxs.txs(), closeResolution, result.position.closeTime()); ledgers[newLedger.id()] = newLedger; issue(AcceptLedger{newLedger, lastClosedLedger}); prevProposers = result.proposers; prevRoundTime = result.roundTime.read(); lastClosedLedger = newLedger; auto const it = std::remove_if( openTxs.begin(), openTxs.end(), [&](Tx const& tx) { return acceptedTxs.exists(tx.id()); }); openTxs.erase(it, openTxs.end()); // Only send validation if the new ledger is compatible with our // fully validated ledger bool const isCompatible = newLedger.isAncestor(fullyValidatedLedger); // Can only send one validated ledger per seq if (runAsValidator && isCompatible && !consensusFail && validations.canValidateSeq(newLedger.seq())) { bool isFull = proposing; Validation v{ newLedger.id(), newLedger.seq(), now(), now(), key, id, isFull}; // share the new validation; it is trusted by the receiver share(v); // we trust ourselves addTrustedValidation(v); } checkFullyValidated(newLedger); // kick off the next round... // in the actual implementation, this passes back through // network ops ++completedLedgers; // startRound sets the LCL state, so we need to call it once after // the last requested round completes if (completedLedgers <= targetLedgers) { startRound(); } }); } // Earliest allowed sequence number when checking for ledgers with more // validations than our current ledger Ledger::Seq earliestAllowedSeq() const { return fullyValidatedLedger.seq(); } Ledger::ID getPrevLedger( Ledger::ID const& ledgerID, Ledger const& ledger, ConsensusMode mode) { // only do if we are past the genesis ledger if (ledger.seq() == Ledger::Seq{0}) return ledgerID; Ledger::ID const netLgr = validations.getPreferred(ledger, earliestAllowedSeq()); if (netLgr != ledgerID) { JLOG(j.trace()) << Json::Compact(validations.getJsonTrie()); issue(WrongPrevLedger{ledgerID, netLgr}); } return netLgr; } void propose(Proposal const& pos) { share(pos); } ConsensusParms const& parms() const { return consensusParms; } // Not interested in tracking consensus mode changes for now void onModeChange(ConsensusMode, ConsensusMode) { } // Share a message by broadcasting to all connected peers template void share(M const& m) { issue(Share{m}); send(BroadcastMesg{m, router.nextSeq++, this->id}, this->id); } // Unwrap the Position and share the raw proposal void share(Position const& p) { share(p.proposal()); } //-------------------------------------------------------------------------- // Validation members /** Add a trusted validation and return true if it is worth forwarding */ bool addTrustedValidation(Validation v) { v.setTrusted(); v.setSeen(now()); ValStatus const res = validations.add(v.nodeID(), v); if (res == ValStatus::stale) return false; // Acquire will try to get from network if not already local if (Ledger const* lgr = acquireLedger(v.ledgerID())) checkFullyValidated(*lgr); return true; } /** Check if a new ledger can be deemed fully validated */ void checkFullyValidated(Ledger const& ledger) { // Only consider ledgers newer than our last fully validated ledger if (ledger.seq() <= fullyValidatedLedger.seq()) return; std::size_t const count = validations.numTrustedForLedger(ledger.id()); std::size_t const numTrustedPeers = trustGraph.graph().outDegree(this); quorum = static_cast(std::ceil(numTrustedPeers * 0.8)); if (count >= quorum && ledger.isAncestor(fullyValidatedLedger)) { issue(FullyValidateLedger{ledger, fullyValidatedLedger}); fullyValidatedLedger = ledger; } } //------------------------------------------------------------------------- // Peer messaging members // Basic Sequence number router // A message that will be flooded across the network is tagged with a // sequence number by the origin node in a BroadcastMesg. Receivers will // ignore a message as stale if they've already processed a newer sequence // number, or will process and potentially relay the message along. // // The various bool handle(MessageType) members do the actual processing // and should return true if the message should continue to be sent to // peers. // // WARN: This assumes messages are received and processed in the order they // are sent, so that a peer receives a message with seq 1 from node 0 // before seq 2 from node 0, etc. // TODO: Break this out into a class and identify type interface to allow // alternate routing strategies template struct BroadcastMesg { M mesg; std::size_t seq; PeerID origin; }; struct Router { std::size_t nextSeq = 1; bc::flat_map lastObservedSeq; }; Router router; // Send a broadcast message to all peers template void send(BroadcastMesg const& bm, PeerID from) { for (auto const link : net.links(this)) { if (link.target->id != from && link.target->id != bm.origin) { // cheat and don't bother sending if we know it has already been // used on the other end if (link.target->router.lastObservedSeq[bm.origin] < bm.seq) { issue(Relay{link.target->id, bm.mesg}); net.send( this, link.target, [to = link.target, bm, id = this->id] { to->receive(bm, id); }); } } } } // Receive a shared message, process it and consider continuing to relay it template void receive(BroadcastMesg const& bm, PeerID from) { issue(Receive{from, bm.mesg}); if (router.lastObservedSeq[bm.origin] < bm.seq) { router.lastObservedSeq[bm.origin] = bm.seq; schedule(delays.onReceive(bm.mesg), [this, bm, from] { if (handle(bm.mesg)) send(bm, from); }); } } // Type specific receive handlers, return true if the message should // continue to be broadcast to peers bool handle(Proposal const& p) { // Only relay untrusted proposals on the same ledger if (!trusts(p.nodeID())) return p.prevLedger() == lastClosedLedger.id(); // TODO: This always suppresses relay of peer positions already seen // Should it allow forwarding if for a recent ledger ? auto& dest = peerPositions[p.prevLedger()]; if (std::find(dest.begin(), dest.end(), p) != dest.end()) return false; dest.push_back(p); // Rely on consensus to decide whether to relay return consensus.peerProposal(now(), Position{p}); } bool handle(TxSet const& txs) { bool const inserted = txSets.insert(std::make_pair(txs.id(), txs)).second; if (inserted) consensus.gotTxSet(now(), txs); // relay only if new return inserted; } bool handle(Tx const& tx) { // Ignore and suppress relay of transactions already in last ledger TxSetType const& lastClosedTxs = lastClosedLedger.txs(); if (lastClosedTxs.find(tx) != lastClosedTxs.end()) return false; // only relay if it was new to our open ledger return openTxs.insert(tx).second; } bool handle(Validation const& v) { // TODO: This is not relaying untrusted validations if (!trusts(v.nodeID())) return false; // Will only relay if current return addTrustedValidation(v); } bool haveValidated() const { return fullyValidatedLedger.seq() > Ledger::Seq{0}; } Ledger::Seq getValidLedgerIndex() const { return earliestAllowedSeq(); } std::pair> getQuorumKeys() { hash_set keys; for (auto const p : trustGraph.trustedPeers(this)) keys.insert(p->key); return {quorum, keys}; } std::size_t laggards(Ledger::Seq const seq, hash_set& trusted) { return validations.laggards(seq, trusted); } bool validator() const { return runAsValidator; } void updateOperatingMode(std::size_t const positions) const { } //-------------------------------------------------------------------------- // A locally submitted transaction void submit(Tx const& tx) { issue(SubmitTx{tx}); if (handle(tx)) share(tx); } //-------------------------------------------------------------------------- // Simulation "driver" members //! Heartbeat timer call void timerEntry() { consensus.timerEntry(now()); // only reschedule if not completed if (completedLedgers < targetLedgers) scheduler.in(parms().ledgerGRANULARITY, [this]() { timerEntry(); }); } // Called to begin the next round void startRound() { // Between rounds, we take the majority ledger // In the future, consider taking peer dominant ledger if no validations // yet Ledger::ID bestLCL = validations.getPreferred(lastClosedLedger, earliestAllowedSeq()); if (bestLCL == Ledger::ID{0}) bestLCL = lastClosedLedger.id(); issue(StartRound{bestLCL, lastClosedLedger}); // Not yet modeling dynamic UNL. hash_set nowUntrusted; consensus.startRound( now(), bestLCL, lastClosedLedger, nowUntrusted, runAsValidator); } // Start the consensus process assuming it is not yet running // This runs forever unless targetLedgers is specified void start() { // TODO: Expire validations less frequently? validations.expire(); scheduler.in(parms().ledgerGRANULARITY, [&]() { timerEntry(); }); startRound(); } NetClock::time_point now() const { // We don't care about the actual epochs, but do want the // generated NetClock time to be well past its epoch to ensure // any subtractions of two NetClock::time_point in the consensus // code are positive. (e.g. proposeFRESHNESS) using namespace std::chrono; using namespace std::chrono_literals; return NetClock::time_point(duration_cast( scheduler.now().time_since_epoch() + 86400s + clockSkew)); } Ledger::ID prevLedgerID() const { return consensus.prevLedgerID(); } //------------------------------------------------------------------------- // Injects a specific transaction when generating the ledger following // the provided sequence. This allows simulating a byzantine failure in // which a node generates the wrong ledger, even when consensus worked // properly. // TODO: Make this more robust hash_map txInjections; /** Inject non-consensus Tx Injects a transactionsinto the ledger following prevLedger's sequence number. @param prevLedger The ledger we are building the new ledger on top of @param src The Consensus TxSet @return Consensus TxSet with inject transactions added if prevLedger.seq matches a previously registered Tx. */ TxSet injectTxs(Ledger prevLedger, TxSet const& src) { auto const it = txInjections.find(prevLedger.seq()); if (it == txInjections.end()) return src; TxSetType res{src.txs()}; res.insert(it->second); return TxSet{res}; } }; } // namespace csf } // namespace test } // namespace ripple #endif