From f456355da236377fad0df968fe8c49773720bba3 Mon Sep 17 00:00:00 2001 From: David Schwartz Date: Wed, 13 Jul 2016 13:21:51 -0700 Subject: [PATCH] Begin consensus refactor (RIPD-1011): * New RCLCx* classes * Refactor consensus positions * Refactor proposed transaction sets * Refactor disputed transactions * Refactor position broadcast/replay --- Builds/VisualStudio2015/RippleD.vcxproj | 10 +- .../VisualStudio2015/RippleD.vcxproj.filters | 15 +- src/ripple/app/consensus/RCLCxPos.h | 143 ++++ src/ripple/app/consensus/RCLCxTraits.h | 55 ++ src/ripple/app/consensus/RCLCxTx.h | 156 ++++ src/ripple/app/consensus/README.md | 115 +++ src/ripple/app/ledger/Consensus.h | 5 +- src/ripple/app/ledger/LedgerConsensus.h | 28 +- src/ripple/app/ledger/LedgerProposal.cpp | 23 +- src/ripple/app/ledger/LedgerProposal.h | 26 +- src/ripple/app/ledger/impl/ConsensusImp.cpp | 14 +- src/ripple/app/ledger/impl/ConsensusImp.h | 8 +- src/ripple/app/ledger/impl/DisputedTx.cpp | 161 ---- src/ripple/app/ledger/impl/DisputedTx.h | 170 ++++- .../app/ledger/impl/LedgerConsensusImp.cpp | 713 +++++++++--------- .../app/ledger/impl/LedgerConsensusImp.h | 91 ++- src/ripple/app/main/Application.cpp | 3 +- src/ripple/app/misc/NetworkOPs.cpp | 17 +- src/ripple/overlay/impl/PeerImp.cpp | 6 +- src/ripple/unity/app_ledger.cpp | 1 - 20 files changed, 1128 insertions(+), 632 deletions(-) create mode 100644 src/ripple/app/consensus/RCLCxPos.h create mode 100644 src/ripple/app/consensus/RCLCxTraits.h create mode 100644 src/ripple/app/consensus/RCLCxTx.h create mode 100644 src/ripple/app/consensus/README.md delete mode 100644 src/ripple/app/ledger/impl/DisputedTx.cpp diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj index 78c9ae19ca..3be23550ae 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj +++ b/Builds/VisualStudio2015/RippleD.vcxproj @@ -783,6 +783,12 @@ + + + + + + True True @@ -821,10 +827,6 @@ - - True - True - diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters index dd3d32805e..b2eca3b59e 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters @@ -94,6 +94,9 @@ {83B96C00-A786-6597-826D-E12FA6187AA8} + + {0E8BC18A-9853-B13E-1A9D-C55FA29DA60F} + {CE126498-A44D-30A2-345B-0F672BCDF947} @@ -1299,6 +1302,15 @@ protobuf\vsprojects + + ripple\app\consensus + + + ripple\app\consensus + + + ripple\app\consensus + ripple\app\ledger @@ -1338,9 +1350,6 @@ ripple\app\ledger\impl - - ripple\app\ledger\impl - ripple\app\ledger\impl diff --git a/src/ripple/app/consensus/RCLCxPos.h b/src/ripple/app/consensus/RCLCxPos.h new file mode 100644 index 0000000000..fd0380b66e --- /dev/null +++ b/src/ripple/app/consensus/RCLCxPos.h @@ -0,0 +1,143 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2016 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_APP_CONSENSUS_RCLCXPOSITION_H_INCLUDED +#define RIPPLE_APP_CONSENSUS_RCLCXPOSITION_H_INCLUDED + +#include +#include +#include +#include + +namespace ripple { + +// A position taken during a consensus round +// As seen by the RCL consensus process +class RCLCxPos +{ + +public: + + static std::uint32_t constexpr seqInitial = 0; + static std::uint32_t constexpr seqLeave = 0xffffffff; + + RCLCxPos (LedgerProposal const& prop) : + proposal_ (prop) + { } + + std::uint32_t getSequence() const + { + return proposal_.getProposeSeq(); + } + + NetClock::time_point getCloseTime () const + { + return proposal_.getCloseTime(); + } + + NetClock::time_point getSeenTime() const + { + return proposal_.getSeenTime(); + } + + bool isStale (NetClock::time_point lastValid) const + { + return getSeenTime() < lastValid; + } + + NodeID const& getNodeID() const + { + return proposal_.getPeerID(); + } + + LedgerHash const& getPosition() const + { + return proposal_.getCurrentHash(); + } + + LedgerHash const& getPrevLedger() const + { + return proposal_.getPrevLedger(); + } + + bool changePosition ( + LedgerHash const& position, + NetClock::time_point closeTime, + NetClock::time_point now) + { + return proposal_.changePosition (position, closeTime, now); + } + + bool bowOut (NetClock::time_point now) + { + if (isBowOut ()) + return false; + + proposal_.bowOut (now); + return true; + } + + Json::Value getJson() const + { + return proposal_.getJson(); + } + + bool isInitial () const + { + return getSequence() == seqInitial; + } + + bool isBowOut() const + { + return getSequence() == seqLeave; + } + + // These three functions will be removed. New code + // should use getPosition, getSequence and getNodeID + LedgerHash const& getCurrentHash() const + { + return getPosition(); + } + NodeID const& getPeerID() const + { + return getNodeID(); + } + std::uint32_t getProposeSeq() const + { + return getSequence(); + } + + LedgerProposal const& peek() const + { + return proposal_; + } + + LedgerProposal& peek() + { + return proposal_; + } + +protected: + + LedgerProposal proposal_; + +}; + +} +#endif diff --git a/src/ripple/app/consensus/RCLCxTraits.h b/src/ripple/app/consensus/RCLCxTraits.h new file mode 100644 index 0000000000..35ecf94da1 --- /dev/null +++ b/src/ripple/app/consensus/RCLCxTraits.h @@ -0,0 +1,55 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2016 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_APP_CONSENSUS_RCLCXTRAITS_H_INCLUDED +#define RIPPLE_APP_CONSENSUS_RCLCXTRAITS_H_INCLUDED + +#include +#include + +#include +#include + +#include +#include + +namespace ripple { + +// Consensus traits class +// For adapting consensus to RCL + +class RCLCxTraits +{ +public: + + using Time_t = NetClock::time_point; + + using Pos_t = RCLCxPos; + using TxSet_t = RCLTxSet; + using Tx_t = RCLCxTx; + + using LgrID_t = LedgerHash; + using TxID_t = uint256; + using TxSetID_t = uint256; + using NodeID_t = NodeID; +}; + +} + +#endif diff --git a/src/ripple/app/consensus/RCLCxTx.h b/src/ripple/app/consensus/RCLCxTx.h new file mode 100644 index 0000000000..0d1d2278bf --- /dev/null +++ b/src/ripple/app/consensus/RCLCxTx.h @@ -0,0 +1,156 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2016 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_APP_CONSENSUS_RCLCXTX_H_INCLUDED +#define RIPPLE_APP_CONSENSUS_RCLCXTX_H_INCLUDED + +#include +#include +#include + +namespace ripple { + +// Transactions, as seen by the consensus code in the rippled app +class RCLCxTx +{ +public: + + RCLCxTx (SHAMapItem const& txn) : txn_ (txn) + { } + + uint256 const& getID() const + { + return txn_.key (); + } + + SHAMapItem const& txn() const + { + return txn_; + } + +protected: + + SHAMapItem const txn_; +}; + +class RCLTxSet; + +class MutableRCLTxSet +{ +public: + + MutableRCLTxSet (RCLTxSet const&); + + bool + addEntry (RCLCxTx const& p) + { + return map_->addItem ( + SHAMapItem {p.getID(), p.txn().peekData()}, + true, false); + } + + bool + removeEntry (uint256 const& entry) + { + return map_->delItem (entry); + } + + std::shared_ptr const& map() const + { + return map_; + } + +protected: + + std::shared_ptr map_; +}; + +// Sets of transactions +// as seen by the consensus code in the rippled app +class RCLTxSet +{ +public: + + using mutable_t = MutableRCLTxSet; + + RCLTxSet (std::shared_ptr map) : + map_ (std::move(map)) + { + assert (map_); + } + + RCLTxSet (MutableRCLTxSet const& set) : + map_ (set.map()->snapShot (false)) + { } + + bool hasEntry (uint256 const& entry) const + { + return map_->hasItem (entry); + } + + boost::optional + getEntry (uint256 const& entry) const + { + auto item = map_->peekItem (entry); + if (item) + return RCLCxTx(*item); + return boost::none; + } + + uint256 getID() const + { + return map_->getHash().as_uint256(); + } + + std::map + getDifferences (RCLTxSet const& j) const + { + SHAMap::Delta delta; + + // Bound the work we do in case of a malicious + // map from a trusted validator + map_->compare (*(j.map_), delta, 65536); + + std::map ret; + for (auto const& item : delta) + { + assert ( (item.second.first && ! item.second.second) || + (item.second.second && ! item.second.first) ); + + ret[item.first] = static_cast (item.second.first); + } + return ret; + } + + std::shared_ptr const& map() const + { + return map_; + } + +protected: + + std::shared_ptr map_; +}; + +inline MutableRCLTxSet::MutableRCLTxSet (RCLTxSet const& set) + : map_ (set.map()->snapShot (true)) +{ } + +} +#endif diff --git a/src/ripple/app/consensus/README.md b/src/ripple/app/consensus/README.md new file mode 100644 index 0000000000..f43143fa13 --- /dev/null +++ b/src/ripple/app/consensus/README.md @@ -0,0 +1,115 @@ +# Consensus Algorithm + +This directory holds the types and classes needed +to connect consensus to rippled. + +## Types + +All types must be copy constructible and assignable. + +* `LgrID_t` + Represents a ledger identifier. + Typically a 256-bit hash of the ledger header. + +* `TxID_t` + Represents a transaction identifier. + Typically a 256-bit hash of the transaction data. + +* `TxSetID_t` + Represents an identifier of a set of transactions. + Typically a 256-bit hash of the set's root tree node. + +* `NodeID_t` + Represents an identifier for a node that can take positions during + the consenus process. + +* `Time_t` + Encodes absolute times. Used for the close times of ledgers and the + expiration times of positions. + +* `Pos_t` + Represents a position on a consensus taken by a participant. + Typically it encodes the previous ledger identifier, the transaction + set identifier, the participant, and a sequence number. It also includes + either the time it was signed or the time it was first seen. It may also + include additional information such as the participant's public key or + signature + +* `Tx_t` + Represent a transaction. Has an identifier and also whatever information + is needed to add it to a set. + +* `TxSet_t` + Represents a set of transactions. It has an identifier and can report + which transactions it has and provide the actual transaction data. + If non-const, it can be modified. + +## `Pos_t` + +Represents a position taken by a validator during a consensus round. +Must provide: + +static std::uint32_t seqInitial; + +static std::uint32_t seqLeave; + +std::uint32_t getSequence() const; + +Time_t getCloseTime() const; + +Time_t getSeenTime() const; + +bool isStale (Time_t) const; + +NodeID_t getNodeID() const; + +TxSetID_t getPosition() const; + +LgrID_t getPrevLedger() const; + +bool isInitial() const; + +bool isBowOut() const; + +Json::Value getJson() const; + +bool changePosition (TxSetID_t const& position, Time_t closeTime, Time_t now); + +bool bowOut (Time_t now); + + +### `Tx_t` + +Represents a transaction. +Must provide: + +TxID_t getID() const; + + +### TxSet_t + +Represents a set of transactions. +Must provide: + +TxSet_t (TxSet_t::mutable_t const&); + +TxSetID_t getID() const; + +bool hasEntry (TxID_t const&) const; + +bool hasEntry (Tx_t const&) const; + +boost::optional const getEntry (TxID_t const&) const; + +std::map getDifferences(TxSet_t const&) const; + +## TxSet_t::mutable_t + +Represents a set of transactions that can be modified. +Must provide: + +TxSet_t::mutable_t (TxSet_t const &); + +bool addEntry (Tx_t const&); + +bool removeEntry (TxID_t const&); diff --git a/src/ripple/app/ledger/Consensus.h b/src/ripple/app/ledger/Consensus.h index 57b32a83a6..aa601aaa36 100644 --- a/src/ripple/app/ledger/Consensus.h +++ b/src/ripple/app/ledger/Consensus.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -64,7 +65,7 @@ public: /** Called to create a LedgerConsensus instance */ virtual - std::shared_ptr + std::shared_ptr> makeLedgerConsensus ( Application& app, InboundTransactions& inboundTransactions, @@ -75,7 +76,7 @@ public: virtual void startRound ( - LedgerConsensus& consensus, + LedgerConsensus& consensus, LedgerHash const &prevLCLHash, std::shared_ptr const& previousLedger, NetClock::time_point closeTime) = 0; diff --git a/src/ripple/app/ledger/LedgerConsensus.h b/src/ripple/app/ledger/LedgerConsensus.h index 4334b4cd56..d0cd199c5e 100644 --- a/src/ripple/app/ledger/LedgerConsensus.h +++ b/src/ripple/app/ledger/LedgerConsensus.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -34,30 +35,37 @@ namespace ripple { /** Manager for achieving consensus on the next ledger. - - This object is created when the consensus process starts, and - is destroyed when the process is complete. */ -class LedgerConsensus +template +class LedgerConsensus : public Traits { public: + + using typename Traits::Time_t; + using typename Traits::Pos_t; + using typename Traits::TxSet_t; + using typename Traits::Tx_t; + using typename Traits::LgrID_t; + using typename Traits::TxID_t; + using typename Traits::TxSetID_t; + using typename Traits::NodeID_t; + virtual ~LedgerConsensus() = default; virtual Json::Value getJson (bool full) = 0; - virtual uint256 getLCL () = 0; + virtual LgrID_t getLCL () = 0; - virtual void gotMap ( - std::shared_ptr const& map) = 0; + virtual void gotMap (TxSet_t const& map) = 0; virtual void timerEntry () = 0; - virtual bool peerPosition (LedgerProposal::ref) = 0; + virtual bool peerPosition (Pos_t const& position) = 0; virtual void startRound ( - LedgerHash const& prevLCLHash, + LgrID_t const& prevLCLHash, std::shared_ptr const& prevLedger, - NetClock::time_point closeTime, + Time_t closeTime, int previousProposers, std::chrono::milliseconds previousConvergeTime) = 0; diff --git a/src/ripple/app/ledger/LedgerProposal.cpp b/src/ripple/app/ledger/LedgerProposal.cpp index 07731913ec..767d022b26 100644 --- a/src/ripple/app/ledger/LedgerProposal.cpp +++ b/src/ripple/app/ledger/LedgerProposal.cpp @@ -33,8 +33,10 @@ LedgerProposal::LedgerProposal ( std::uint32_t seq, uint256 const& tx, NetClock::time_point closeTime, + NetClock::time_point now, PublicKey const& publicKey, NodeID const& nodeID, + Slice const& signature, uint256 const& suppression) : mPreviousLedger (pLgr) , mCurrentHash (tx) @@ -43,8 +45,11 @@ LedgerProposal::LedgerProposal ( , mProposeSeq (seq) , publicKey_ (publicKey) , mPeerID (nodeID) - , mTime (std::chrono::steady_clock::now ()) + , mTime (now) { + signature_.resize (signature.size()); + std::memcpy(signature_.data(), + signature.data(), signature.size()); } // Used to construct local proposals @@ -52,12 +57,13 @@ LedgerProposal::LedgerProposal ( LedgerProposal::LedgerProposal ( uint256 const& prevLgr, uint256 const& position, - NetClock::time_point closeTime) + NetClock::time_point closeTime, + NetClock::time_point now) : mPreviousLedger (prevLgr) , mCurrentHash (position) , mCloseTime (closeTime) , mProposeSeq (seqJoin) - , mTime (std::chrono::steady_clock::now ()) + , mTime (now) { } @@ -76,27 +82,28 @@ bool LedgerProposal::checkSign () const return verifyDigest ( publicKey_, getSigningHash(), - signature_, + makeSlice (signature_), false); } bool LedgerProposal::changePosition ( uint256 const& newPosition, - NetClock::time_point closeTime) + NetClock::time_point closeTime, + NetClock::time_point now) { if (mProposeSeq == seqLeave) return false; mCurrentHash = newPosition; mCloseTime = closeTime; - mTime = std::chrono::steady_clock::now (); + mTime = now; ++mProposeSeq; return true; } -void LedgerProposal::bowOut () +void LedgerProposal::bowOut (NetClock::time_point now) { - mTime = std::chrono::steady_clock::now (); + mTime = now; mProposeSeq = seqLeave; } diff --git a/src/ripple/app/ledger/LedgerProposal.h b/src/ripple/app/ledger/LedgerProposal.h index 44665d7705..0659583212 100644 --- a/src/ripple/app/ledger/LedgerProposal.h +++ b/src/ripple/app/ledger/LedgerProposal.h @@ -55,15 +55,18 @@ public: std::uint32_t proposeSeq, uint256 const& propose, NetClock::time_point closeTime, + NetClock::time_point now, PublicKey const& publicKey, NodeID const& nodeID, + Slice const& signature, uint256 const& suppress); // Our own proposal: LedgerProposal ( uint256 const& prevLedger, uint256 const& position, - NetClock::time_point closeTime); + NetClock::time_point closeTime, + NetClock::time_point now); uint256 getSigningHash () const; bool checkSign () const; @@ -96,17 +99,14 @@ public: { return mCloseTime; } - - void setSignature (Buffer&& sig) + NetClock::time_point getSeenTime () const { - signature_ = std::move(sig); + return mTime; } - - Slice getSignature () const + Blob const& getSignature () const { return signature_; } - bool isInitial () const { return mProposeSeq == seqJoin; @@ -116,14 +116,16 @@ public: return mProposeSeq == seqLeave; } - bool isStale (std::chrono::steady_clock::time_point cutoff) const + bool isStale (NetClock::time_point cutoff) const { return mTime <= cutoff; } bool changePosition ( - uint256 const& newPosition, NetClock::time_point newCloseTime); - void bowOut (); + uint256 const& newPosition, + NetClock::time_point newCloseTime, + NetClock::time_point now); + void bowOut (NetClock::time_point now); Json::Value getJson () const; private: @@ -145,9 +147,9 @@ private: PublicKey publicKey_; NodeID mPeerID; - Buffer signature_; + Blob signature_; - std::chrono::steady_clock::time_point mTime; + NetClock::time_point mTime; }; /** Calculate a unique identifier for a signed proposal. diff --git a/src/ripple/app/ledger/impl/ConsensusImp.cpp b/src/ripple/app/ledger/impl/ConsensusImp.cpp index 37aa931c6f..4b6194a370 100644 --- a/src/ripple/app/ledger/impl/ConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/ConsensusImp.cpp @@ -63,7 +63,7 @@ ConsensusImp::getLastCloseDuration () const return lastCloseConvergeTook_; } -std::shared_ptr +std::shared_ptr> ConsensusImp::makeLedgerConsensus ( Application& app, InboundTransactions& inboundTransactions, @@ -76,7 +76,7 @@ ConsensusImp::makeLedgerConsensus ( void ConsensusImp::startRound ( - LedgerConsensus& consensus, + LedgerConsensus& consensus, LedgerHash const &prevLCLHash, std::shared_ptr const& previousLedger, NetClock::time_point closeTime) @@ -154,11 +154,11 @@ ConsensusImp::storeProposal ( props.push_back (proposal); } -std::vector > +std::vector ConsensusImp::getStoredProposals (uint256 const& prevLedger) { - std::vector > ret; + std::vector ret; { std::lock_guard _(lock_); @@ -166,15 +166,13 @@ ConsensusImp::getStoredProposals (uint256 const& prevLedger) for (auto const& it : storedProposals_) for (auto const& prop : it.second) if (prop->getPrevLedger() == prevLedger) - ret.push_back (prop); + ret.emplace_back (*prop); } return ret; } -//============================================================================== - -std::unique_ptr +std::unique_ptr make_Consensus (Config const& config, Logs& logs) { return std::make_unique ( diff --git a/src/ripple/app/ledger/impl/ConsensusImp.h b/src/ripple/app/ledger/impl/ConsensusImp.h index b9fccd67a3..7bed788941 100644 --- a/src/ripple/app/ledger/impl/ConsensusImp.h +++ b/src/ripple/app/ledger/impl/ConsensusImp.h @@ -1,4 +1,4 @@ -//------------------------------------------------------------------------------ + /* This file is part of rippled: https://github.com/ripple/rippled Copyright (c) 2012, 2013 Ripple Labs Inc. @@ -52,7 +52,7 @@ public: std::chrono::milliseconds getLastCloseDuration () const override; - std::shared_ptr + std::shared_ptr> makeLedgerConsensus ( Application& app, InboundTransactions& inboundTransactions, @@ -61,7 +61,7 @@ public: void startRound ( - LedgerConsensus& ledgerConsensus, + LedgerConsensus& ledgerConsensus, LedgerHash const& prevLCLHash, std::shared_ptr const& previousLedger, NetClock::time_point closeTime) override; @@ -94,7 +94,7 @@ public: NetClock::time_point getLastCloseTime () const; - std::vector > + std::vector getStoredProposals (uint256 const& previousLedger); private: diff --git a/src/ripple/app/ledger/impl/DisputedTx.cpp b/src/ripple/app/ledger/impl/DisputedTx.cpp deleted file mode 100644 index 8d9061a05c..0000000000 --- a/src/ripple/app/ledger/impl/DisputedTx.cpp +++ /dev/null @@ -1,161 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include -#include -#include -#include - -namespace ripple { - -// Track a peer's yes/no vote on a particular disputed transaction -void DisputedTx::setVote (NodeID const& peer, bool votesYes) -{ - auto res = mVotes.insert (std::make_pair (peer, votesYes)); - - // new vote - if (res.second) - { - if (votesYes) - { - JLOG (j_.debug()) - << "Peer " << peer << " votes YES on " << mTransactionID; - ++mYays; - } - else - { - JLOG (j_.debug()) - << "Peer " << peer << " votes NO on " << mTransactionID; - ++mNays; - } - } - // changes vote to yes - else if (votesYes && !res.first->second) - { - JLOG (j_.debug()) - << "Peer " << peer << " now votes YES on " << mTransactionID; - --mNays; - ++mYays; - res.first->second = true; - } - // changes vote to no - else if (!votesYes && res.first->second) - { - JLOG (j_.debug()) - << "Peer " << peer << " now votes NO on " << mTransactionID; - ++mNays; - --mYays; - res.first->second = false; - } -} - -// Remove a peer's vote on this disputed transasction -void DisputedTx::unVote (NodeID const& peer) -{ - auto it = mVotes.find (peer); - - if (it != mVotes.end ()) - { - if (it->second) - --mYays; - else - --mNays; - - mVotes.erase (it); - } -} - -bool DisputedTx::updateVote (int percentTime, bool proposing) -{ - // VFALCO TODO Give the return value a descriptive local variable name - // and don't return from the middle. - - if (mOurVote && (mNays == 0)) - return false; - - if (!mOurVote && (mYays == 0)) - return false; - - bool newPosition; - int weight; - - if (proposing) // give ourselves full weight - { - // This is basically the percentage of nodes voting 'yes' (including us) - weight = (mYays * 100 + (mOurVote ? 100 : 0)) / (mNays + mYays + 1); - - // VFALCO TODO Rename these macros and turn them into language - // constructs. consolidate them into a class that collects - // all these related values. - // - // To prevent avalanche stalls, we increase the needed weight slightly - // over time. - if (percentTime < AV_MID_CONSENSUS_TIME) - newPosition = weight > AV_INIT_CONSENSUS_PCT; - else if (percentTime < AV_LATE_CONSENSUS_TIME) - newPosition = weight > AV_MID_CONSENSUS_PCT; - else if (percentTime < AV_STUCK_CONSENSUS_TIME) - newPosition = weight > AV_LATE_CONSENSUS_PCT; - else - newPosition = weight > AV_STUCK_CONSENSUS_PCT; - } - else // don't let us outweigh a proposing node, just recognize consensus - { - weight = -1; - newPosition = mYays > mNays; - } - - if (newPosition == mOurVote) - { - JLOG (j_.info()) - << "No change (" << (mOurVote ? "YES" : "NO") << ") : weight " - << weight << ", percent " << percentTime; - JLOG (j_.debug()) << getJson (); - return false; - } - - mOurVote = newPosition; - JLOG (j_.debug()) - << "We now vote " << (mOurVote ? "YES" : "NO") - << " on " << mTransactionID; - JLOG (j_.debug()) << getJson (); - return true; -} - -Json::Value DisputedTx::getJson () -{ - Json::Value ret (Json::objectValue); - - ret["yays"] = mYays; - ret["nays"] = mNays; - ret["our_vote"] = mOurVote; - - if (!mVotes.empty ()) - { - Json::Value votesj (Json::objectValue); - for (auto& vote : mVotes) - votesj[to_string (vote.first)] = vote.second; - ret["votes"] = votesj; - } - - return ret; -} - -} // ripple diff --git a/src/ripple/app/ledger/impl/DisputedTx.h b/src/ripple/app/ledger/impl/DisputedTx.h index 7590fbee2a..c4aed8e40e 100644 --- a/src/ripple/app/ledger/impl/DisputedTx.h +++ b/src/ripple/app/ledger/impl/DisputedTx.h @@ -36,22 +36,28 @@ namespace ripple { Undisputed transactions have no corresponding @ref DisputedTx object. */ + +template class DisputedTx { public: - // VFALCO `Blob` is a poor choice of parameter - DisputedTx (uint256 const& txID, - Blob const& tx, bool ourVote, beast::Journal j) - : mTransactionID (txID) + + using Tx_t = typename Traits::Tx_t; + using TxID_t = typename Traits::TxID_t; + using NodeID_t = typename Traits::NodeID_t; + + DisputedTx (Tx_t const& tx, + bool ourVote, beast::Journal j) + : mTransactionID (tx.getID()) , mYays (0) , mNays (0) , mOurVote (ourVote) - , transaction (tx.data(), tx.size()) + , transaction (tx) , j_ (j) { } - uint256 const& getTransactionID () const + TxID_t const& getID () const { return mTransactionID; } @@ -61,9 +67,7 @@ public: return mOurVote; } - // VFALCO TODO make this const - // VFALCO TODO Don't return a Serializer (doh) - Serializer& peekTransaction () + Tx_t const& tx () const { return transaction; } @@ -73,26 +77,158 @@ public: mOurVote = o; } - // VFALCO NOTE its not really a peer, its the 160 bit hash of the - // validator's public key. - void setVote (NodeID const& peer, bool votesYes); - void unVote (NodeID const& peer); + void setVote (NodeID_t const& peer, bool votesYes); + void unVote (NodeID_t const& peer); bool updateVote (int percentTime, bool proposing); Json::Value getJson (); private: - uint256 mTransactionID; + TxID_t mTransactionID; int mYays; int mNays; bool mOurVote; - // VFALCO Why is this being stored as a Serializer? - Serializer transaction; + Tx_t transaction; - hash_map mVotes; + hash_map mVotes; beast::Journal j_; }; +// Track a peer's yes/no vote on a particular disputed transaction +template +void DisputedTx::setVote (NodeID_t const& peer, bool votesYes) +{ + auto res = mVotes.insert (std::make_pair (peer, votesYes)); + + // new vote + if (res.second) + { + if (votesYes) + { + JLOG (j_.debug()) + << "Peer " << peer << " votes YES on " << mTransactionID; + ++mYays; + } + else + { + JLOG (j_.debug()) + << "Peer " << peer << " votes NO on " << mTransactionID; + ++mNays; + } + } + // changes vote to yes + else if (votesYes && !res.first->second) + { + JLOG (j_.debug()) + << "Peer " << peer << " now votes YES on " << mTransactionID; + --mNays; + ++mYays; + res.first->second = true; + } + // changes vote to no + else if (!votesYes && res.first->second) + { + JLOG (j_.debug()) + << "Peer " << peer << " now votes NO on " << mTransactionID; + ++mNays; + --mYays; + res.first->second = false; + } +} + +// Remove a peer's vote on this disputed transasction +template +void DisputedTx::unVote (NodeID_t const& peer) +{ + auto it = mVotes.find (peer); + + if (it != mVotes.end ()) + { + if (it->second) + --mYays; + else + --mNays; + + mVotes.erase (it); + } +} + +template +bool DisputedTx::updateVote (int percentTime, bool proposing) +{ + if (mOurVote && (mNays == 0)) + return false; + + if (!mOurVote && (mYays == 0)) + return false; + + bool newPosition; + int weight; + + if (proposing) // give ourselves full weight + { + // This is basically the percentage of nodes voting 'yes' (including us) + weight = (mYays * 100 + (mOurVote ? 100 : 0)) / (mNays + mYays + 1); + + // VFALCO TODO Rename these macros and turn them into language + // constructs. consolidate them into a class that collects + // all these related values. + // + // To prevent avalanche stalls, we increase the needed weight slightly + // over time. + if (percentTime < AV_MID_CONSENSUS_TIME) + newPosition = weight > AV_INIT_CONSENSUS_PCT; + else if (percentTime < AV_LATE_CONSENSUS_TIME) + newPosition = weight > AV_MID_CONSENSUS_PCT; + else if (percentTime < AV_STUCK_CONSENSUS_TIME) + newPosition = weight > AV_LATE_CONSENSUS_PCT; + else + newPosition = weight > AV_STUCK_CONSENSUS_PCT; + } + else + { + // don't let us outweigh a proposing node, just recognize consensus + weight = -1; + newPosition = mYays > mNays; + } + + if (newPosition == mOurVote) + { + JLOG (j_.info()) + << "No change (" << (mOurVote ? "YES" : "NO") << ") : weight " + << weight << ", percent " << percentTime; + JLOG (j_.debug()) << getJson (); + return false; + } + + mOurVote = newPosition; + JLOG (j_.debug()) + << "We now vote " << (mOurVote ? "YES" : "NO") + << " on " << mTransactionID; + JLOG (j_.debug()) << getJson (); + return true; +} + +template +Json::Value DisputedTx::getJson () +{ + Json::Value ret (Json::objectValue); + + ret["yays"] = mYays; + ret["nays"] = mNays; + ret["our_vote"] = mOurVote; + + if (!mVotes.empty ()) + { + Json::Value votesj (Json::objectValue); + for (auto& vote : mVotes) + votesj[to_string (vote.first)] = vote.second; + ret["votes"] = std::move (votesj); + } + + return ret; +} + } // ripple #endif diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp index a89f257642..90c92dac3f 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp @@ -18,13 +18,13 @@ //============================================================================== #include +#include #include #include #include #include #include #include -#include #include #include #include @@ -45,15 +45,18 @@ #include #include #include +#include #include #include #include #include #include + namespace ripple { -LedgerConsensusImp::LedgerConsensusImp ( +template +LedgerConsensusImp::LedgerConsensusImp ( Application& app, ConsensusImp& consensus, InboundTransactions& inboundTransactions, @@ -66,6 +69,7 @@ LedgerConsensusImp::LedgerConsensusImp ( , localTX_ (localtx) , ledgerMaster_ (ledgerMaster) , feeVote_ (feeVote) + , ourID_ (calcNodeID (app.nodeIdentity().first)) , state_ (State::open) , closeTime_ {} , valPublic_ (app_.config().VALIDATION_PUB) @@ -83,7 +87,8 @@ LedgerConsensusImp::LedgerConsensusImp ( JLOG (j_.debug()) << "Creating consensus object"; } -Json::Value LedgerConsensusImp::getJson (bool full) +template +Json::Value LedgerConsensusImp::getJson (bool full) { Json::Value ret (Json::objectValue); std::lock_guard _(lock_); @@ -134,7 +139,7 @@ Json::Value LedgerConsensusImp::getJson (bool full) ret["current_ms"] = static_cast(roundTime_.count()); ret["close_percent"] = closePercent_; ret["close_resolution"] = closeResolution_.count(); - ret["have_timeconsensus_"] = haveCloseTimeConsensus_; + ret["have_time_consensus"] = haveCloseTimeConsensus_; ret["previous_proposers"] = previousProposers_; ret["previous_mseconds"] = static_cast(previousRoundTime_.count()); @@ -145,21 +150,17 @@ Json::Value LedgerConsensusImp::getJson (bool full) for (auto& pp : peerPositions_) { - ppj[to_string (pp.first)] = pp.second->getJson (); + ppj[to_string (pp.first)] = pp.second.getJson (); } ret["peer_positions"] = std::move(ppj); } if (! acquired_.empty ()) { - // acquired - Json::Value acq (Json::objectValue); + Json::Value acq (Json::arrayValue); for (auto& at : acquired_) { - if (at.second) - acq[to_string (at.first)] = "acquired"; - else - acq[to_string (at.first)] = "failed"; + acq.append (to_string (at.first)); } ret["acquired"] = std::move(acq); } @@ -169,7 +170,7 @@ Json::Value LedgerConsensusImp::getJson (bool full) Json::Value dsj (Json::objectValue); for (auto& dt : disputes_) { - dsj[to_string (dt.first)] = dt.second->getJson (); + dsj[to_string (dt.first)] = dt.second.getJson (); } ret["disputes"] = std::move(dsj); } @@ -198,13 +199,23 @@ Json::Value LedgerConsensusImp::getJson (bool full) return ret; } -uint256 LedgerConsensusImp::getLCL () +template +auto +LedgerConsensusImp::getLCL () -> LgrID_t { std::lock_guard _(lock_); return prevLedgerHash_; } +template +void LedgerConsensusImp::shareSet (TxSet_t const& set) +{ + // Temporary until Consensus refactor is complete + inboundTransactions_.giveSet (set.getID(), + set.map(), false); +} + // Called when: // 1) We take our initial position // 2) We take a new position @@ -213,76 +224,33 @@ uint256 LedgerConsensusImp::getLCL () // We store it, notify peers that we have it, // and update our tracking if any validators currently // propose it -void LedgerConsensusImp::mapCompleteInternal ( - std::shared_ptr const& map, +template +void +LedgerConsensusImp::mapCompleteInternal ( + TxSet_t const& map, bool acquired) { - auto hash = map->getHash ().as_uint256(); + auto const hash = map.getID (); - { - auto it = acquired_.find (hash); - if (it != acquired_.end ()) - { - // We have already acquired (or proven invalid) this transaction set - if (map && ! it->second) - { - JLOG (j_.warn()) << "Map " << hash << " proven invalid then acquired"; - assert (false); - } - else if (it->second && ! map) - { - JLOG (j_.warn()) << "Map " << hash << " acquired then proven invalid"; - assert (false); - return; - } - else - { - // nothing to do - return; - } - } - } + if (acquired_.find (hash) != acquired_.end()) + return; if (acquired) { JLOG (j_.trace()) << "We have acquired txs " << hash; } - if (!map) // If the map was invalid - { - JLOG (j_.warn()) - << "Tried to acquire invalid transaction map: " - << hash; - acquired_[hash] = map; - return; - } - - assert (hash == map->getHash ().as_uint256()); - // We now have a map that we did not have before if (! acquired) { - // Put the map where others can get it - inboundTransactions_.giveSet (hash, map, false); + // If we generated this locally, + // put the map where others can get it + // If we acquired it, it's already shared + shareSet (map); } - if (ourPosition_ && (! ourPosition_->isBowOut ()) - && (hash != ourPosition_->getCurrentHash ())) - { - // this will create disputed transactions - auto it = acquired_.find (ourPosition_->getCurrentHash ()); - - if (it == acquired_.end()) - LogicError ("We cannot find our own position!"); - - assert ((it->first == ourPosition_->getCurrentHash ()) - && it->second); - compares_.insert(hash); - // Our position is not the same as the acquired position - createDisputes (it->second, map); - } - else if (! ourPosition_) + if (! ourPosition_) { JLOG (j_.debug()) << "Not creating disputes: no position yet."; @@ -292,21 +260,25 @@ void LedgerConsensusImp::mapCompleteInternal ( JLOG (j_.warn()) << "Not creating disputes: not participating."; } - else + else if (hash == ourPosition_->getCurrentHash ()) { JLOG (j_.debug()) << "Not creating disputes: identical position."; } - - acquired_[hash] = map; + else + { + // Our position is not the same as the acquired position + // create disputed txs if needed + createDisputes (*ourSet_, map); + compares_.insert(hash); + } // Adjust tracking for each peer that takes this position std::vector peers; - auto const mapHash = map->getHash ().as_uint256(); for (auto& it : peerPositions_) { - if (it.second->getCurrentHash () == mapHash) - peers.push_back (it.second->getPeerID ()); + if (it.second.getCurrentHash () == hash) + peers.push_back (it.second.getNodeID ()); } if (!peers.empty ()) @@ -319,10 +291,13 @@ void LedgerConsensusImp::mapCompleteInternal ( << "By the time we got the map " << hash << " no peers were proposing it"; } + + acquired_.emplace (hash, map); } -void LedgerConsensusImp::gotMap ( - std::shared_ptr const& map) +template +void LedgerConsensusImp::gotMap ( + TxSet_t const& map) { std::lock_guard _(lock_); @@ -340,7 +315,8 @@ void LedgerConsensusImp::gotMap ( } } -void LedgerConsensusImp::checkLCL () +template +void LedgerConsensusImp::checkLCL () { uint256 netLgr = prevLedgerHash_; int netLgrCount = 0; @@ -423,7 +399,8 @@ void LedgerConsensusImp::checkLCL () } // Handle a change in the LCL during a consensus round -void LedgerConsensusImp::handleLCL (uint256 const& lclHash) +template +void LedgerConsensusImp::handleLCL (LgrID_t const& lclHash) { assert (lclHash != prevLedgerHash_ || previousLedger_->info().hash != lclHash); @@ -436,8 +413,7 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash) if (haveCorrectLCL_ && proposing_ && ourPosition_) { JLOG (j_.info()) << "Bowing out of consensus"; - ourPosition_->bowOut (); - propose (); + leaveConsensus(); } // Stop proposing because we are out of sync @@ -494,7 +470,8 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash) proposing_ = false; } -void LedgerConsensusImp::timerEntry () +template +void LedgerConsensusImp::timerEntry () { std::lock_guard _(lock_); @@ -548,7 +525,8 @@ void LedgerConsensusImp::timerEntry () } } -void LedgerConsensusImp::statePreClose () +template +void LedgerConsensusImp::statePreClose () { // it is shortly before ledger close time bool anyTransactions = ! app_.openLedger().empty(); @@ -590,7 +568,8 @@ void LedgerConsensusImp::statePreClose () } } -void LedgerConsensusImp::stateEstablish () +template +void LedgerConsensusImp::stateEstablish () { // Give everyone a chance to take an initial position if (roundTime_ < LEDGER_MIN_CONSENSUS) @@ -615,7 +594,8 @@ void LedgerConsensusImp::stateEstablish () beginAccept (false); } -bool LedgerConsensusImp::haveConsensus () +template +bool LedgerConsensusImp::haveConsensus () { // CHECKME: should possibly count unacquired TX sets as disagreeing int agree = 0, disagree = 0; @@ -624,30 +604,29 @@ bool LedgerConsensusImp::haveConsensus () // Count number of agreements/disagreements with our position for (auto& it : peerPositions_) { - if (!it.second->isBowOut ()) + if (it.second.isBowOut ()) + continue; + + if (it.second.getCurrentHash () == ourPosition) { - if (it.second->getCurrentHash () == ourPosition) - { - ++agree; - } - else - { - JLOG (j_.debug()) << to_string (it.first) - << " has " << to_string (it.second->getCurrentHash ()); - ++disagree; - if (compares_.count(it.second->getCurrentHash()) == 0) - { // Make sure we have generated disputes - uint256 hash = it.second->getCurrentHash(); - JLOG (j_.debug()) - << "We have not compared to " << hash; - auto it1 = acquired_.find (hash); - auto it2 = acquired_.find(ourPosition_->getCurrentHash ()); - if ((it1 != acquired_.end()) && (it2 != acquired_.end()) - && (it1->second) && (it2->second)) - { - compares_.insert(hash); - createDisputes(it2->second, it1->second); - } + ++agree; + } + else + { + JLOG (j_.debug()) << to_string (it.first) + << " has " << to_string (it.second.getCurrentHash ()); + ++disagree; + if (compares_.count(it.second.getCurrentHash()) == 0) + { // Make sure we have generated disputes + uint256 hash = it.second.getCurrentHash(); + JLOG (j_.debug()) + << "We have not compared to " << hash; + auto it1 = acquired_.find (hash); + auto it2 = acquired_.find(ourPosition_->getCurrentHash ()); + if ((it1 != acquired_.end()) && (it2 != acquired_.end())) + { + compares_.insert(hash); + createDisputes(it2->second, it1->second); } } } @@ -671,36 +650,26 @@ bool LedgerConsensusImp::haveConsensus () // without us. consensusFail_ = (ret == ConsensusState::MovedOn); + if (consensusFail_) + { + JLOG (j_.error()) << "Unable to reach consensus"; + JLOG (j_.error()) << getJson(true); + } + return true; } -std::shared_ptr LedgerConsensusImp::getTransactionTree ( - uint256 const& hash) +template +bool LedgerConsensusImp::peerPosition (Pos_t const& newPosition) { + auto const peerID = newPosition.getNodeID (); + std::lock_guard _(lock_); - auto it = acquired_.find (hash); - if (it != acquired_.end() && it->second) - return it->second; - - auto set = inboundTransactions_.getSet (hash, true); - - if (set) - acquired_[hash] = set; - - return set; -} - -bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) -{ - std::lock_guard _(lock_); - - auto const peerID = newPosition->getPeerID (); - - if (newPosition->getPrevLedger() != prevLedgerHash_) + if (newPosition.getPrevLedger() != prevLedgerHash_) { JLOG (j_.debug()) << "Got proposal for " - << newPosition->getPrevLedger () + << newPosition.getPrevLedger () << " but we are on " << prevLedgerHash_; return false; } @@ -712,62 +681,83 @@ bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) return false; } - LedgerProposal::pointer& currentPosition = peerPositions_[peerID]; - - if (currentPosition) { - assert (peerID == currentPosition->getPeerID ()); + // update current position + auto currentPosition = peerPositions_.find(peerID); - if (newPosition->getProposeSeq () - <= currentPosition->getProposeSeq ()) + if (currentPosition != peerPositions_.end()) { - return false; + if (newPosition.getProposeSeq () + <= currentPosition->second.getProposeSeq ()) + { + return false; + } } + + if (newPosition.isBowOut ()) + { + JLOG (j_.info()) + << "Peer bows out: " << to_string (peerID); + + for (auto& it : disputes_) + it.second.unVote (peerID); + if (currentPosition != peerPositions_.end()) + peerPositions_.erase (peerID); + deadNodes_.insert (peerID); + + return true; + } + + if (currentPosition != peerPositions_.end()) + currentPosition->second = newPosition; + else + peerPositions_.emplace (peerID, newPosition); } - if (newPosition->isBowOut ()) - { - JLOG (j_.info()) - << "Peer bows out: " << to_string (peerID); - for (auto& it : disputes_) - it.second->unVote (peerID); - peerPositions_.erase (peerID); - deadNodes_.insert (peerID); - return true; - } - - if (newPosition->isInitial ()) + if (newPosition.isInitial ()) { // Record the close time estimate JLOG (j_.trace()) << "Peer reports close time as " - << newPosition->getCloseTime().time_since_epoch().count(); - ++closeTimes_[newPosition->getCloseTime()]; + << newPosition.getCloseTime().time_since_epoch().count(); + ++closeTimes_[newPosition.getCloseTime()]; } JLOG (j_.trace()) << "Processing peer proposal " - << newPosition->getProposeSeq () << "/" - << newPosition->getCurrentHash (); - currentPosition = newPosition; + << newPosition.getProposeSeq () << "/" + << newPosition.getCurrentHash (); - std::shared_ptr set - = getTransactionTree (newPosition->getCurrentHash ()); + { + auto ait = acquired_.find (newPosition.getCurrentHash()); + if (ait == acquired_.end()) + { + if (auto setPtr = inboundTransactions_.getSet ( + newPosition.getCurrentHash(), true)) + { + ait = acquired_.emplace (newPosition.getCurrentHash(), + std::move(setPtr)).first; + } + } - if (set) - { - for (auto& it : disputes_) - it.second->setVote (peerID, set->hasItem (it.first)); - } - else - { - JLOG (j_.debug()) - << "Don't have tx set for peer"; + + if (ait != acquired_.end()) + { + for (auto& it : disputes_) + it.second.setVote (peerID, + ait->second.hasEntry (it.first)); + } + else + { + JLOG (j_.debug()) + << "Don't have tx set for peer"; + } } return true; } -void LedgerConsensusImp::simulate ( +template +void LedgerConsensusImp::simulate ( boost::optional consensusDelay) { std::lock_guard _(lock_); @@ -779,7 +769,8 @@ void LedgerConsensusImp::simulate ( JLOG (j_.info()) << "Simulation complete"; } -void LedgerConsensusImp::accept (std::shared_ptr set) +template +void LedgerConsensusImp::accept (TxSet_t const& set) { auto closeTime = ourPosition_->getCloseTime(); bool closeTimeCorrect; @@ -813,12 +804,12 @@ void LedgerConsensusImp::accept (std::shared_ptr set) << "Report: Prev = " << prevLedgerHash_ << ":" << previousLedger_->info().seq; JLOG (j_.debug()) - << "Report: TxSt = " << set->getHash () + << "Report: TxSt = " << set.getID () << ", close " << closeTime.time_since_epoch().count() << (closeTimeCorrect ? "" : "X"); // Put transactions into a deterministic, but unpredictable, order - CanonicalTXSet retriableTxs (set->getHash ().as_uint256()); + CanonicalTXSet retriableTxs (set.getID()); std::shared_ptr sharedLCL; { @@ -853,7 +844,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) else { // Normal case, we are not replaying a ledger close - retriableTxs = applyTransactions (app_, *set, accum, + retriableTxs = applyTransactions (app_, set, accum, [&buildLCL](uint256 const& txID) { return ! buildLCL->txExists(txID); @@ -970,7 +961,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) bool anyDisputes = false; for (auto& it : disputes_) { - if (!it.second->getOurVote ()) + if (!it.second.getOurVote ()) { // we voted NO try @@ -978,7 +969,9 @@ void LedgerConsensusImp::accept (std::shared_ptr set) JLOG (j_.debug()) << "Test applying disputed transaction that did" << " not get in"; - SerialIter sit (it.second->peekTransaction().slice()); + + RCLCxTx cTxn {it.second.tx()}; + SerialIter sit (cTxn.txn().slice()); auto txn = std::make_shared(sit); @@ -1071,48 +1064,42 @@ void LedgerConsensusImp::accept (std::shared_ptr set) endConsensus (correct); } -void LedgerConsensusImp::createDisputes ( - std::shared_ptr const& m1, - std::shared_ptr const& m2) +template +void LedgerConsensusImp::createDisputes ( + TxSet_t const& m1, + TxSet_t const& m2) { - if (m1->getHash() == m2->getHash()) + if (m1.getID() == m2.getID()) return; JLOG (j_.debug()) << "createDisputes " - << m1->getHash() << " to " << m2->getHash(); - SHAMap::Delta differences; - m1->compare (*m2, differences, 16384); + << m1.getID() << " to " << m2.getID(); + auto differences = m1.getDifferences (m2); int dc = 0; // for each difference between the transactions - for (auto& pos : differences) + for (auto& id : differences) { ++dc; // create disputed transactions (from the ledger that has them) - if (pos.second.first) - { - // transaction is only in first map - assert (!pos.second.second); - addDisputedTransaction (pos.first - , pos.second.first->peekData ()); - } - else if (pos.second.second) - { - // transaction is only in second map - assert (!pos.second.first); - addDisputedTransaction (pos.first - , pos.second.second->peekData ()); - } - else // No other disagreement over a transaction should be possible - assert (false); + assert ( + (id.second && m1.getEntry(id.first) && !m2.getEntry(id.first)) || + (!id.second && !m1.getEntry(id.first) && m2.getEntry(id.first)) + ); + if (id.second) + addDisputedTransaction (*m1.getEntry (id.first)); + else + addDisputedTransaction (*m2.getEntry (id.first)); } JLOG (j_.debug()) << dc << " differences found"; } -void LedgerConsensusImp::addDisputedTransaction ( - uint256 const& txID, - Blob const& tx) +template +void LedgerConsensusImp::addDisputedTransaction ( + Tx_t const& tx) { + auto txID = tx.getID(); + if (disputes_.find (txID) != disputes_.end ()) return; @@ -1122,35 +1109,28 @@ void LedgerConsensusImp::addDisputedTransaction ( bool ourVote = false; // Update our vote on the disputed transaction - if (ourPosition_) - { - auto mit (acquired_.find (ourPosition_->getCurrentHash ())); + if (ourSet_) + ourVote = ourSet_->hasEntry (txID); - if (mit != acquired_.end ()) - ourVote = mit->second->hasItem (txID); - else - assert (false); // We don't have our own position? - } - - auto txn = std::make_shared (txID, tx, ourVote, j_); - disputes_[txID] = txn; + Dispute_t txn {tx, ourVote, j_}; // Update all of the peer's votes on the disputed transaction for (auto& pit : peerPositions_) { - auto cit (acquired_.find (pit.second->getCurrentHash ())); + auto cit (acquired_.find (pit.second.getCurrentHash ())); - if ((cit != acquired_.end ()) && cit->second) - { - txn->setVote (pit.first, cit->second->hasItem (txID)); - } + if (cit != acquired_.end ()) + txn.setVote (pit.first, + cit->second.hasEntry (txID)); } // If we didn't relay this transaction recently, relay it to all peers if (app_.getHashRouter ().shouldRelay (txID)) { + auto const slice = tx.txn().slice(); + protocol::TMTransaction msg; - msg.set_rawtransaction (& (tx.front ()), tx.size ()); + msg.set_rawtransaction (slice.data(), slice.size()); msg.set_status (protocol::tsNEW); msg.set_receivetimestamp ( app_.timeKeeper().now().time_since_epoch().count()); @@ -1158,33 +1138,35 @@ void LedgerConsensusImp::addDisputedTransaction ( std::make_shared ( msg, protocol::mtTRANSACTION))); } + + disputes_.emplace (txID, std::move (txn)); } -void LedgerConsensusImp::adjustCount (std::shared_ptr const& map, - const std::vector& peers) +template +void LedgerConsensusImp::adjustCount (TxSet_t const& map, + std::vector const& peers) { for (auto& it : disputes_) { - bool setHas = map->hasItem (it.second->getTransactionID ()); + bool setHas = map.hasEntry (it.first); for (auto const& pit : peers) - it.second->setVote (pit, setHas); + it.second.setVote (pit, setHas); } } -void LedgerConsensusImp::leaveConsensus () +template +void LedgerConsensusImp::leaveConsensus () { - if (proposing_) + if (ourPosition_ && ! ourPosition_->isBowOut ()) { - if (ourPosition_ && ! ourPosition_->isBowOut ()) - { - ourPosition_->bowOut(); - propose(); - } - proposing_ = false; + ourPosition_->bowOut(app_.timeKeeper().closeTime()); + propose(); } + proposing_ = false; } -void LedgerConsensusImp::propose () +template +void LedgerConsensusImp::propose () { JLOG (j_.trace()) << "We propose: " << (ourPosition_->isBowOut () @@ -1201,19 +1183,22 @@ void LedgerConsensusImp::propose () prop.set_nodepubkey (valPublic_.data(), valPublic_.size()); - ourPosition_->setSignature ( - signDigest ( - valPublic_, - valSecret_, - ourPosition_->getSigningHash())); + auto signingHash = sha512Half( + HashPrefix::proposal, + std::uint32_t(ourPosition_->getSequence()), + ourPosition_->getCloseTime().time_since_epoch().count(), + ourPosition_->getPrevLedger(), ourPosition_->getCurrentHash()); + + auto sig = signDigest ( + valPublic_, valSecret_, signingHash); - auto sig = ourPosition_->getSignature(); prop.set_signature (sig.data(), sig.size()); app_.overlay().send(prop); } -void LedgerConsensusImp::statusChange ( +template +void LedgerConsensusImp::statusChange ( protocol::NodeEvent event, ReadView const& ledger) { protocol::TMStatusChange s; @@ -1251,10 +1236,10 @@ void LedgerConsensusImp::statusChange ( JLOG (j_.trace()) << "send status change to peer"; } -// For the consensus refactor, takeInitialPosition has been split -// into two pieces. This piece, makeInitialPosition does the -// non-consensus parts -std::shared_ptr LedgerConsensusImp::makeInitialPosition () +template +auto +LedgerConsensusImp::makeInitialPosition () -> + std::pair { // Tell the ledger master not to acquire the ledger we're probably building ledgerMaster_.setBuildingLedger (previousLedger_->info().seq + 1); @@ -1274,6 +1259,7 @@ std::shared_ptr LedgerConsensusImp::makeInitialPosition () SHAMapItem (tx.first->getTransactionID(), std::move (s)), true, false); } + // Add pseudo-transactions to the set if ((app_.config().standalone() || (proposing_ && haveCorrectLCL_)) && ((previousLedger_->info().seq % 256) == 0)) { @@ -1302,45 +1288,52 @@ std::shared_ptr LedgerConsensusImp::makeInitialPosition () } } - // Set should be immutable snapshot - return initialSet->snapShot (false); + // Now we need an immutable snapshot + initialSet = initialSet->snapShot(false); + auto setHash = initialSet->getHash().as_uint256(); + + return std::make_pair ( + std::move (initialSet), + LedgerProposal { + initialLedger->info().parentHash, + setHash, + closeTime_, + app_.timeKeeper().closeTime()}); } -void LedgerConsensusImp::takeInitialPosition() +template +void LedgerConsensusImp::takeInitialPosition() { - auto initialSet = makeInitialPosition(); + auto pair = makeInitialPosition(); + auto const& initialSet = pair.first; + auto const& initialPos = pair.second; + assert (initialSet.getID() == initialPos.getCurrentHash()); - mapCompleteInternal (initialSet, false); - - ourPosition_ = std::make_shared ( - previousLedger_->info().hash, - initialSet->getHash().as_uint256(), - closeTime_); + ourPosition_ = initialPos; + ourSet_ = initialSet; for (auto& it : disputes_) { - it.second->setOurVote (initialSet->hasItem (it.first)); + it.second.setOurVote (initialSet.hasEntry (it.first)); } - // if any peers have taken a contrary position, process disputes - hash_set found; - + // When we take our initial position, + // we need to create any disputes required by our position + // and any peers who have already taken positions + compares_.emplace (initialSet.getID()); for (auto& it : peerPositions_) { - uint256 const& set = it.second->getCurrentHash (); - - if (found.insert (set).second) + auto hash = it.second.getCurrentHash(); + auto iit (acquired_.find (hash)); + if (iit != acquired_.end ()) { - auto iit (acquired_.find (set)); - - if (iit != acquired_.end ()) - { - compares_.insert(iit->second->getHash().as_uint256()); + if (compares_.emplace (hash).second) createDisputes (initialSet, iit->second); - } } } + mapCompleteInternal (initialSet, false); + if (proposing_) propose (); } @@ -1366,8 +1359,9 @@ participantsNeeded (int participants, int percent) return (result == 0) ? 1 : result; } +template NetClock::time_point -LedgerConsensusImp::effectiveCloseTime(NetClock::time_point closeTime) +LedgerConsensusImp::effectiveCloseTime(NetClock::time_point closeTime) { if (closeTime == NetClock::time_point{}) return closeTime; @@ -1377,71 +1371,70 @@ LedgerConsensusImp::effectiveCloseTime(NetClock::time_point closeTime) (previousLedger_->info().closeTime + 1s)); } -void LedgerConsensusImp::updateOurPositions () +template +void LedgerConsensusImp::updateOurPositions () { // Compute a cutoff time - auto peerCutoff - = std::chrono::steady_clock::now (); - auto ourCutoff - = peerCutoff - PROPOSE_INTERVAL; + auto peerCutoff = app_.timeKeeper().closeTime(); + auto ourCutoff = peerCutoff - PROPOSE_INTERVAL; peerCutoff -= PROPOSE_FRESHNESS; - bool changes = false; - std::shared_ptr ourPosition; - // std::vector addedTx, removedTx; - // Verify freshness of peer positions and compute close times std::map closeTimes; { auto it = peerPositions_.begin (); while (it != peerPositions_.end ()) { - if (it->second->isStale (peerCutoff)) + if (it->second.isStale (peerCutoff)) { // peer's proposal is stale, so remove it - auto const& peerID = it->second->getPeerID (); + auto const& peerID = it->second.getNodeID (); JLOG (j_.warn()) << "Removing stale proposal from " << peerID; for (auto& dt : disputes_) - dt.second->unVote (peerID); + dt.second.unVote (peerID); it = peerPositions_.erase (it); } else { // proposal is still fresh - ++closeTimes[effectiveCloseTime(it->second->getCloseTime())]; + ++closeTimes[effectiveCloseTime(it->second.getCloseTime())]; ++it; } } } - // Update votes on disputed transactions - for (auto& it : disputes_) - { - // Because the threshold for inclusion increases, - // time can change our position on a dispute - if (it.second->updateVote (closePercent_, proposing_)) - { - if (!changes) - { - ourPosition = acquired_[ourPosition_->getCurrentHash ()] - ->snapShot (true); - assert (ourPosition); - changes = true; - } + // This will stay unseated unless there are any changes + boost::optional ourSet; - if (it.second->getOurVote ()) // now a yes + // Update votes on disputed transactions + { + boost::optional changedSet; + for (auto& it : disputes_) + { + // Because the threshold for inclusion increases, + // time can change our position on a dispute + if (it.second.updateVote (closePercent_, proposing_)) { - ourPosition->addItem (SHAMapItem (it.first - , it.second->peekTransaction ()), true, false); - // addedTx.push_back(it.first); - } - else // now a no - { - ourPosition->delItem (it.first); - // removedTx.push_back(it.first); + if (! changedSet) + changedSet.emplace (*ourSet_); + + if (it.second.getOurVote ()) + { + // now a yes + changedSet->addEntry (it.second.tx()); + } + else + { + // now a no + changedSet->removeEntry (it.first); + } } } + if (changedSet) + { + ourSet.emplace (*changedSet); + } } int neededWeight; @@ -1518,38 +1511,67 @@ void LedgerConsensusImp::updateOurPositions () // claimed close time. Once the new close time code is deployed // to the full network, this can be relaxed to force a change // only if the rounded close time has changed. - if (!changes && + if (! ourSet && ((closeTime != ourPosition_->getCloseTime()) || ourPosition_->isStale (ourCutoff))) { // close time changed or our position is stale - ourPosition = acquired_[ourPosition_->getCurrentHash ()] - ->snapShot (true); - assert (ourPosition); - changes = true; // We pretend our position changed to force - } // a new proposal + ourSet.emplace (*ourSet_); + } - if (changes) + if (ourSet) { - ourPosition = ourPosition->snapShot (false); + auto newHash = ourSet->getID(); + + // Setting ourSet_ here prevents mapCompleteInternal + // from checking for new disputes. But we only changed + // positions on existing disputes, so no need to. + ourSet_ = ourSet; - auto newHash = ourPosition->getHash ().as_uint256(); JLOG (j_.info()) << "Position change: CTime " << closeTime.time_since_epoch().count() << ", tx " << newHash; - if (ourPosition_->changePosition(newHash, closeTime)) + if (ourPosition_->changePosition (newHash, closeTime, + app_.timeKeeper().closeTime())) { if (proposing_) propose (); - mapCompleteInternal (ourPosition, false); + mapCompleteInternal (*ourSet, false); } } } -void LedgerConsensusImp::playbackProposals () +static void +relay (Application& app, RCLCxPos const& pos) +{ + auto& proposal = pos.peek(); + + protocol::TMProposeSet prop; + + prop.set_proposeseq ( + proposal.getProposeSeq ()); + prop.set_closetime ( + proposal.getCloseTime ().time_since_epoch().count()); + + prop.set_currenttxhash ( + proposal.getCurrentHash().begin(), 256 / 8); + prop.set_previousledger ( + proposal.getPrevLedger().begin(), 256 / 8); + + auto const pk = proposal.getPublicKey().slice(); + prop.set_nodepubkey (pk.data(), pk.size()); + + auto const sig = proposal.getSignature(); + prop.set_signature (sig.data(), sig.size()); + + app.overlay().relay (prop, proposal.getSuppressionID ()); +} + +template +void LedgerConsensusImp::playbackProposals () { auto proposals = consensus_.getStoredProposals (prevLedgerHash_); @@ -1559,31 +1581,13 @@ void LedgerConsensusImp::playbackProposals () { // Now that we know this proposal // is useful, relay it - protocol::TMProposeSet prop; - - prop.set_proposeseq ( - proposal->getProposeSeq ()); - prop.set_closetime ( - proposal->getCloseTime ().time_since_epoch().count()); - - prop.set_currenttxhash ( - proposal->getCurrentHash().begin(), 256 / 8); - prop.set_previousledger ( - proposal->getPrevLedger().begin(), 256 / 8); - - auto const pk = proposal->getPublicKey().slice(); - prop.set_nodepubkey (pk.data(), pk.size()); - - auto const sig = proposal->getSignature(); - prop.set_signature (sig.data(), sig.size()); - - app_.overlay().relay ( - prop, proposal->getSuppressionID ()); + relay (app_, proposal); } } } -void LedgerConsensusImp::closeLedger () +template +void LedgerConsensusImp::closeLedger () { checkOurValidation (); state_ = State::establish; @@ -1595,7 +1599,8 @@ void LedgerConsensusImp::closeLedger () takeInitialPosition (); } -void LedgerConsensusImp::checkOurValidation () +template +void LedgerConsensusImp::checkOurValidation () { // This only covers some cases - Fix for the case where we can't ever // acquire the consensus ledger @@ -1624,7 +1629,7 @@ void LedgerConsensusImp::checkOurValidation () addLoad(v); v->setTrusted (); auto const signingHash = v->sign (valSecret_); - // FIXME: wrong supression + // FIXME: wrong suppression app_.getHashRouter ().addSuppression (signingHash); app_.getValidations ().addValidation (v, "localMissing"); Blob validation = v->getSigned (); @@ -1634,11 +1639,10 @@ void LedgerConsensusImp::checkOurValidation () JLOG (j_.warn()) << "Sending partial validation"; } -void LedgerConsensusImp::beginAccept (bool synchronous) +template +void LedgerConsensusImp::beginAccept (bool synchronous) { - auto consensusSet = acquired_[ourPosition_->getCurrentHash ()]; - - if (!consensusSet) + if (! ourPosition_ || ! ourSet_) { JLOG (j_.fatal()) << "We don't have a consensus set"; @@ -1648,24 +1652,30 @@ void LedgerConsensusImp::beginAccept (bool synchronous) consensus_.newLCL (peerPositions_.size (), roundTime_); if (synchronous) - accept (consensusSet); + accept (*ourSet_); else { app_.getJobQueue().addJob (jtACCEPT, "acceptLedger", - std::bind (&LedgerConsensusImp::accept, shared_from_this (), - consensusSet)); + [that = this->shared_from_this(), + consensusSet = *ourSet_] + (Job &) + { + that->accept (consensusSet); + }); } } -void LedgerConsensusImp::endConsensus (bool correctLCL) +template +void LedgerConsensusImp::endConsensus (bool correctLCL) { app_.getOPs ().endConsensus (correctLCL); } -void LedgerConsensusImp::startRound ( - LedgerHash const& prevLCLHash, +template +void LedgerConsensusImp::startRound ( + LgrID_t const& prevLCLHash, std::shared_ptr const& prevLedger, - NetClock::time_point closeTime, + Time_t closeTime, int previousProposers, std::chrono::milliseconds previousConvergeTime) { @@ -1682,6 +1692,7 @@ void LedgerConsensusImp::startRound ( prevLedgerHash_ = prevLCLHash; previousLedger_ = prevLedger; ourPosition_.reset(); + ourSet_.reset(); consensusFail_ = false; roundTime_ = 0ms; closePercent_ = 0; @@ -1757,7 +1768,8 @@ void LedgerConsensusImp::startRound ( } -void LedgerConsensusImp::addLoad(STValidation::ref val) +template +void LedgerConsensusImp::addLoad(STValidation::ref val) { auto const& feeTrack = app_.getFeeTrack(); std::uint32_t fee = std::max( @@ -1769,7 +1781,7 @@ void LedgerConsensusImp::addLoad(STValidation::ref val) } //------------------------------------------------------------------------------ -std::shared_ptr +std::shared_ptr > make_LedgerConsensus ( Application& app, ConsensusImp& consensus, @@ -1778,7 +1790,7 @@ make_LedgerConsensus ( LedgerMaster& ledgerMaster, FeeVote& feeVote) { - return std::make_shared (app, consensus, + return std::make_shared > (app, consensus, inboundTransactions, localtx, ledgerMaster, feeVote); } @@ -1787,12 +1799,13 @@ make_LedgerConsensus ( CanonicalTXSet applyTransactions ( Application& app, - SHAMap const& set, + RCLTxSet const& cSet, OpenView& view, std::function txFilter) { auto j = app.journal ("LedgerConsensus"); + auto& set = *(cSet.map()); CanonicalTXSet retriableTxs (set.getHash().as_uint256()); for (auto const& item : set) @@ -1872,4 +1885,6 @@ applyTransactions ( return retriableTxs; } +template class LedgerConsensusImp ; + } // ripple diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.h b/src/ripple/app/ledger/impl/LedgerConsensusImp.h index 4b946d6279..c003b37e45 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.h +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.h @@ -37,19 +37,16 @@ namespace ripple { Provides the implementation for LedgerConsensus. Achieves consensus on the next ledger. - This object is created when the consensus process starts, and - is destroyed when the process is complete. - - Nearly everything herein is invoked with the master lock. Two things need consensus: 1. The set of transactions. 2. The close time for the ledger. */ +template class LedgerConsensusImp - : public LedgerConsensus - , public std::enable_shared_from_this - , public CountedObject + : public LedgerConsensus + , public std::enable_shared_from_this > + , public CountedObject > { private: enum class State @@ -70,6 +67,17 @@ private: }; public: + + using typename Traits::Time_t; + using typename Traits::Pos_t; + using typename Traits::TxSet_t; + using typename Traits::Tx_t; + using typename Traits::LgrID_t; + using typename Traits::TxID_t; + using typename Traits::TxSetID_t; + using typename Traits::NodeID_t; + using Dispute_t = DisputedTx ; + /** * The result of applying a transaction to a ledger. */ @@ -82,6 +90,7 @@ public: ~LedgerConsensusImp () = default; + /** @param localtx transactions issued by local clients @param inboundTransactions set of inbound transaction sets @@ -104,9 +113,9 @@ public: @param previousConvergeTime how long the last round took (ms) */ void startRound ( - LedgerHash const& prevLCLHash, + LgrID_t const& prevLCLHash, std::shared_ptr const& prevLedger, - NetClock::time_point closeTime, + Time_t closeTime, int previousProposers, std::chrono::milliseconds previousConvergeTime) override; @@ -120,24 +129,20 @@ public: Json::Value getJson (bool full) override; /* The hash of the last closed ledger */ - uint256 getLCL () override; + LgrID_t getLCL () override; /** We have a complete transaction set, typically acquired from the network @param map the transaction set. - @param acquired true if we have acquired the transaction set. */ - void gotMap ( - std::shared_ptr const& map) override; + void gotMap (TxSet_t const& map) override; /** On timer call the correct handler for each state. */ void timerEntry () override; - std::shared_ptr getTransactionTree (uint256 const& hash); - /** A server has taken a new position, adjust our tracking Called when a peer takes a new postion. @@ -145,11 +150,16 @@ public: @param newPosition the new position @return true if we should do delayed relay of this position. */ - bool peerPosition (LedgerProposal::ref newPosition) override; + bool peerPosition (Pos_t const& newPosition) override; void simulate( boost::optional consensusDelay) override; + /** + Put a transaction set where peers can find it + */ + void shareSet (TxSet_t const&); + private: /** Handle pre-close state. @@ -179,7 +189,7 @@ private: @param lclHash Hash of the last closed ledger. */ - void handleLCL (uint256 const& lclHash); + void handleLCL (LgrID_t const& lclHash); /** We have a complete transaction set, typically acquired from the network @@ -188,14 +198,14 @@ private: @param acquired true if we have acquired the transaction set. */ void mapCompleteInternal ( - std::shared_ptr const& map, + TxSet_t const& map, bool acquired); /** We have a new last closed ledger, process it. Final accept logic @param set Our consensus set */ - void accept (std::shared_ptr set); + void accept (TxSet_t const& set); /** Compare two proposed transaction sets and create disputed @@ -204,17 +214,16 @@ private: @param m1 One transaction set @param m2 The other transaction set */ - void createDisputes (std::shared_ptr const& m1, - std::shared_ptr const& m2); + void createDisputes (TxSet_t const& m1, + TxSet_t const& m2); /** Add a disputed transaction (one that at least one node wants in the consensus set and at least one node does not) to our tracking - @param txID The ID of the disputed transaction - @param tx The data of the disputed transaction + @param tx The disputed transaction */ - void addDisputedTransaction (uint256 const& txID, Blob const& tx); + void addDisputedTransaction (Tx_t const& tx); /** Adjust the votes on all disputed transactions based @@ -223,8 +232,8 @@ private: @param map A disputed position @param peers peers which are taking the position map */ - void adjustCount (std::shared_ptr const& map, - const std::vector& peers); + void adjustCount (TxSet_t const& map, + std::vector const& peers); /** Revoke our outstanding proposal, if any, and @@ -247,7 +256,7 @@ private: /** Determine our initial proposed transaction set based on our open ledger */ - std::shared_ptr makeInitialPosition(); + std::pair makeInitialPosition(); /** Take an initial position on what we think the consensus set should be */ @@ -296,16 +305,18 @@ private: FeeVote& feeVote_; std::recursive_mutex lock_; + NodeID_t ourID_; State state_; // The wall time this ledger closed - NetClock::time_point closeTime_; + Time_t closeTime_; - uint256 prevLedgerHash_; - uint256 acquiringLedger_; + LgrID_t prevLedgerHash_; + LgrID_t acquiringLedger_; std::shared_ptr previousLedger_; - LedgerProposal::pointer ourPosition_; + boost::optional ourPosition_; + boost::optional ourSet_; PublicKey valPublic_; SecretKey valSecret_; bool proposing_, validating_, haveCorrectLCL_, consensusFail_; @@ -328,26 +339,26 @@ private: std::chrono::milliseconds previousRoundTime_; // Convergence tracking, trusted peers indexed by hash of public key - hash_map peerPositions_; + hash_map peerPositions_; // Transaction Sets, indexed by hash of transaction tree - hash_map> acquired_; + hash_map acquired_; // Disputed transactions - hash_map> disputes_; - hash_set compares_; + hash_map disputes_; + hash_set compares_; // Close time estimates, keep ordered for predictable traverse - std::map closeTimes_; + std::map closeTimes_; // nodes that have bowed out of this consensus process - hash_set deadNodes_; + hash_set deadNodes_; beast::Journal j_; }; //------------------------------------------------------------------------------ -std::shared_ptr +std::shared_ptr > make_LedgerConsensus ( Application& app, ConsensusImp& consensus, @@ -370,10 +381,12 @@ make_LedgerConsensus ( CanonicalTXSet applyTransactions ( Application& app, - SHAMap const& set, + RCLTxSet const& set, OpenView& view, std::function txFilter); +extern template class LedgerConsensusImp ; + } // ripple #endif diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 553302136e..8ae32030f0 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -459,8 +459,7 @@ public: , [this](std::shared_ptr const& set, bool fromAcquire) { - if (set) - gotTXSet (set, fromAcquire); + gotTXSet (set, fromAcquire); })) , m_acceptedLedgerCache ("AcceptedLedger", 4, 60, stopwatch(), diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index bcb9969a60..f2932036ee 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -529,7 +530,7 @@ private: DeadlineTimer m_clusterTimer; std::unique_ptr mConsensus; - std::shared_ptr mLedgerConsensus; + std::shared_ptr> mLedgerConsensus; LedgerMaster& m_ledgerMaster; std::shared_ptr mAcquiringLedger; @@ -1503,14 +1504,12 @@ void NetworkOPsImp::processTrustedProposal ( std::shared_ptr set, NodeID const& node) { - { - mConsensus->storeProposal (proposal, node); + mConsensus->storeProposal (proposal, node); - if (mLedgerConsensus->peerPosition (proposal)) - app_.overlay().relay(*set, proposal->getSuppressionID()); - else - JLOG(m_journal.info()) << "Not relaying trusted proposal"; - } + if (mLedgerConsensus->peerPosition (*proposal)) + app_.overlay().relay(*set, proposal->getSuppressionID()); + else + JLOG(m_journal.info()) << "Not relaying trusted proposal"; } void @@ -1531,7 +1530,7 @@ NetworkOPsImp::mapComplete ( // We acquired it because consensus asked us to if (fromAcquire) - mLedgerConsensus->gotMap (map); + mLedgerConsensus->gotMap (RCLTxSet{map}); } void NetworkOPsImp::endConsensus (bool correctLCL) diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 116b5e1a1f..90825f38b5 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1233,7 +1233,7 @@ PeerImp::onMessage (std::shared_ptr const& m) PublicKey const publicKey (makeSlice(set.nodepubkey())); NetClock::time_point const closeTime { NetClock::duration{set.closetime()} }; - Buffer signature (set.signature().data(), set.signature ().size()); + Slice signature (set.signature().data(), set.signature ().size()); uint256 proposeHash, prevLedger; memcpy (proposeHash.begin (), set.currenttxhash ().data (), 32); @@ -1278,8 +1278,8 @@ PeerImp::onMessage (std::shared_ptr const& m) auto proposal = std::make_shared ( prevLedger, set.proposeseq (), proposeHash, closeTime, - publicKey, calcNodeID(publicKey), suppression); - proposal->setSignature (std::move(signature)); + app_.timeKeeper().closeTime(), publicKey, calcNodeID(publicKey), + signature, suppression); std::weak_ptr weak = shared_from_this(); app_.getJobQueue ().addJob ( diff --git a/src/ripple/unity/app_ledger.cpp b/src/ripple/unity/app_ledger.cpp index 7a41ca836d..0b7999b894 100644 --- a/src/ripple/unity/app_ledger.cpp +++ b/src/ripple/unity/app_ledger.cpp @@ -31,7 +31,6 @@ #include #include -#include #include #include #include