Consensus refactor preliminary changes (RIPD-1011):

* Standardize names of LedgerConsensusImp members
* Rework visitStoredProposals
* Clean up mapComplete
* Move status helpers out of LedgerConsensusImp
* Move applyTransaction out of LedgerConsensusUmp
* Clean up applyTransactions
This commit is contained in:
JoelKatz
2016-07-11 01:22:16 -07:00
committed by Nik Bougalis
parent 4758050444
commit 33f153fc9a
11 changed files with 729 additions and 669 deletions

View File

@@ -47,8 +47,8 @@ public:
virtual uint256 getLCL () = 0;
virtual void mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map, bool acquired) = 0;
virtual void gotMap (uint256 const& hash,
std::shared_ptr<SHAMap> const& map) = 0;
virtual void timerEntry () = 0;
@@ -74,41 +74,6 @@ public:
boost::optional<std::chrono::milliseconds> consensusDelay) = 0;
};
//------------------------------------------------------------------------------
/** Apply a set of transactions to a ledger
@param set The set of transactions to apply
@param applyLedger The ledger to which the transactions should
be applied.
@param checkLedger A reference ledger for determining error
messages (typically new last closed
ledger).
@param retriableTransactions collect failed transactions in this set
@param openLgr true if applyLedger is open, else false.
*/
void applyTransactions (
Application& app,
SHAMap const* set,
OpenView& view,
ReadView const& checkLedger,
CanonicalTXSet& retriableTransactions,
ApplyFlags flags);
/** Apply a single transaction to a ledger
@param view The open view to apply to
@param txn The transaction to apply
@param retryAssured True if another pass is assured
@param flags Flags for transactor
@return resultSuccess, resultFail or resultRetry
*/
int applyTransaction (
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& txn,
bool retryAssured,
ApplyFlags flags,
beast::Journal j);
} // ripple
#endif

View File

@@ -107,10 +107,6 @@ public:
return signature_;
}
bool isPrevLedger (uint256 const& pl) const
{
return mPreviousLedger == pl;
}
bool isInitial () const
{
return mProposeSeq == seqJoin;

View File

@@ -23,6 +23,7 @@
#include <chrono>
#include <cstdint>
#include <ripple/basics/chrono.h>
#include <ripple/beast/utility/Journal.h>
namespace ripple {
@@ -50,6 +51,80 @@ roundCloseTime (
NetClock::time_point closeTime,
NetClock::duration closeResolution);
/** Determines whether the current ledger should close at this time.
This function should be called when a ledger is open and there is no close
in progress, or when a transaction is received and no close is in progress.
@param anyTransactions indicates whether any transactions have been received
@param previousProposers proposers in the last closing
@param proposersClosed proposers who have currently closed this ledger
@param proposersValidated proposers who have validated the last closed
ledger
@param previousTime time for the previous ledger to reach consensus
@param currentTime time since the previous ledger's
(possibly rounded) close time
@param openTime time waiting to close this ledger
@param idleInterval the network's desired idle interval
@param j journal for logging
*/
bool
shouldCloseLedger (
bool anyTransactions,
int previousProposers,
int proposersClosed,
int proposersValidated,
std::chrono::milliseconds previousTime,
std::chrono::milliseconds currentTime, // Time since last ledger's close time
std::chrono::milliseconds openTime, // Time waiting to close this ledger
std::chrono::seconds idleInterval,
beast::Journal j);
/** Determine if a consensus has been reached
This function determines if a consensus has been reached
@param agreeing count of agreements with our position
@param total count of participants other than us
@param count_self whether we count ourselves
@return True if a consensus has been reached
*/
bool
checkConsensusReached (int agreeing, int total, bool count_self);
/** Whether we have or don't have a consensus */
enum class ConsensusState
{
No, // We do not have consensus
MovedOn, // The network has consensus without us
Yes // We have consensus along with the network
};
/** Determine whether the network reached consensus and whether we joined.
@param previousProposers proposers in the last closing (not including us)
@param currentProposers proposers in this closing so far (not including us)
@param currentAgree proposers who agree with us
@param currentFinished proposers who have validated a ledger after this one
@param previousAgreeTime how long, in milliseconds, it took to agree on the
last ledger
@param currentAgreeTime how long, in milliseconds, we've been trying to
agree
@param proposing whether we should count ourselves
@param j journal for logging
*/
ConsensusState
checkConsensus (
int previousProposers,
int currentProposers,
int currentAgree,
int currentFinished,
std::chrono::milliseconds previousAgreeTime,
std::chrono::milliseconds currentAgreeTime,
bool proposing,
beast::Journal j);
//------------------------------------------------------------------------------
// These are protocol parameters used to control the behavior of the system and

View File

@@ -177,15 +177,22 @@ ConsensusImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position)
}
}
void
ConsensusImp::visitStoredProposals (
std::function<void(LedgerProposal::ref)> const& f)
std::vector <std::shared_ptr <LedgerProposal>>
ConsensusImp::getStoredProposals (uint256 const& prevLedger)
{
std::lock_guard <std::mutex> _(lock_);
for (auto const& it : storedProposals_)
for (auto const& prop : it.second)
f(prop);
std::vector <std::shared_ptr <LedgerProposal>> ret;
{
std::lock_guard <std::mutex> _(lock_);
for (auto const& it : storedProposals_)
for (auto const& prop : it.second)
if (prop->getPrevLedger() == prevLedger)
ret.push_back (prop);
}
return ret;
}
//==============================================================================

View File

@@ -96,8 +96,8 @@ public:
void takePosition (int seq, std::shared_ptr<SHAMap> const& position);
void
visitStoredProposals (std::function<void(LedgerProposal::ref)> const&);
std::vector <std::shared_ptr <LedgerProposal>>
getStoredProposals (uint256 const& previousLedger);
private:
beast::Journal journal_;

File diff suppressed because it is too large Load Diff

View File

@@ -129,10 +129,9 @@ public:
@param map the transaction set.
@param acquired true if we have acquired the transaction set.
*/
void mapComplete (
void gotMap (
uint256 const& hash,
std::shared_ptr<SHAMap> const& map,
bool acquired) override;
std::shared_ptr<SHAMap> const& map) override;
/**
On timer call the correct handler for each state.
@@ -303,56 +302,57 @@ private:
Application& app_;
ConsensusImp& consensus_;
InboundTransactions& inboundTransactions_;
LocalTxs& m_localTX;
LocalTxs& localTX_;
LedgerMaster& ledgerMaster_;
FeeVote& m_feeVote;
FeeVote& feeVote_;
std::recursive_mutex lock_;
State state_;
// The wall time this ledger closed
NetClock::time_point mCloseTime;
NetClock::time_point closeTime_;
uint256 mPrevLedgerHash;
uint256 mAcquiringLedger;
uint256 prevLedgerHash_;
uint256 acquiringLedger_;
std::shared_ptr<Ledger const> mPreviousLedger;
LedgerProposal::pointer mOurPosition;
PublicKey mValPublic;
SecretKey mValSecret;
bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail;
std::shared_ptr<Ledger const> previousLedger_;
LedgerProposal::pointer ourPosition_;
PublicKey valPublic_;
SecretKey valSecret_;
bool proposing_, validating_, haveCorrectLCL_, consensusFail_;
std::chrono::milliseconds mCurrentMSeconds;
// How much time has elapsed since the round started
std::chrono::milliseconds roundTime_;
// How long the close has taken, expressed as a percentage of the time that
// we expected it to take.
int mClosePercent;
int closePercent_;
NetClock::duration mCloseResolution;
NetClock::duration closeResolution_;
bool mHaveCloseTimeConsensus;
bool haveCloseTimeConsensus_;
std::chrono::steady_clock::time_point mConsensusStartTime;
int mPreviousProposers;
std::chrono::steady_clock::time_point consensusStartTime_;
int previousProposers_;
// The time it took for the last consensus process to converge
std::chrono::milliseconds mPreviousMSeconds;
// Time it took for the last consensus round to converge
std::chrono::milliseconds previousRoundTime_;
// Convergence tracking, trusted peers indexed by hash of public key
hash_map<NodeID, LedgerProposal::pointer> mPeerPositions;
hash_map<NodeID, LedgerProposal::pointer> peerPositions_;
// Transaction Sets, indexed by hash of transaction tree
hash_map<uint256, std::shared_ptr<SHAMap>> mAcquired;
hash_map<uint256, std::shared_ptr<SHAMap>> acquired_;
// Disputed transactions
hash_map<uint256, std::shared_ptr <DisputedTx>> mDisputes;
hash_set<uint256> mCompares;
hash_map<uint256, std::shared_ptr <DisputedTx>> disputes_;
hash_set<uint256> compares_;
// Close time estimates, keep ordered for predictable traverse
std::map<NetClock::time_point, int> mCloseTimes;
std::map<NetClock::time_point, int> closeTimes_;
// nodes that have bowed out of this consensus process
hash_set<NodeID> mDeadNodes;
hash_set<NodeID> deadNodes_;
beast::Journal j_;
};
@@ -367,6 +367,24 @@ make_LedgerConsensus (
LedgerMaster& ledgerMaster,
FeeVote& feeVote);
//------------------------------------------------------------------------------
/** Apply a set of transactions to a ledger
Typically the txFilter is used to reject transactions
that already got in the prior ledger
@param set set of transactions to apply
@param view ledger to apply to
@param txFilter callback, return false to reject txn
@return retriable transactions
*/
CanonicalTXSet
applyTransactions (
Application& app,
SHAMap const& set,
OpenView& view,
std::function<bool(uint256 const&)> txFilter);
} // ripple
#endif

View File

@@ -73,4 +73,137 @@ roundCloseTime (
closeTime += (closeResolution / 2);
return closeTime - (closeTime.time_since_epoch() % closeResolution);
}
bool
shouldCloseLedger (
bool anyTransactions,
int previousProposers,
int proposersClosed,
int proposersValidated,
std::chrono::milliseconds previousTime,
std::chrono::milliseconds currentTime, // Time since last ledger's close time
std::chrono::milliseconds openTime, // Time waiting to close this ledger
std::chrono::seconds idleInterval,
beast::Journal j)
{
using namespace std::chrono_literals;
if ((previousTime < -1s) || (previousTime > 10min) ||
(currentTime > 10min))
{
// These are unexpected cases, we just close the ledger
JLOG (j.warn()) <<
"shouldCloseLedger Trans=" << (anyTransactions ? "yes" : "no") <<
" Prop: " << previousProposers << "/" << proposersClosed <<
" Secs: " << currentTime.count() << " (last: " <<
previousTime.count() << ")";
return true;
}
if ((proposersClosed + proposersValidated) > (previousProposers / 2))
{
// If more than half of the network has closed, we close
JLOG (j.trace()) << "Others have closed";
return true;
}
if (!anyTransactions)
{
// Only close at the end of the idle interval
return currentTime >= idleInterval; // normal idle
}
// Preserve minimum ledger open time
if (openTime < LEDGER_MIN_CLOSE)
{
JLOG (j.debug()) <<
"Must wait minimum time before closing";
return false;
}
// Don't let this ledger close more than twice as fast as the previous
// ledger reached consensus so that slower validators can slow down
// the network
if (openTime < (previousTime / 2))
{
JLOG (j.debug()) <<
"Ledger has not been open long enough";
return false;
}
// Close the ledger
return true;
}
bool
checkConsensusReached (int agreeing, int total, bool count_self)
{
// If we are alone, we have a consensus
if (total == 0)
return true;
if (count_self)
{
++agreeing;
++total;
}
int currentPercentage = (agreeing * 100) / total;
return currentPercentage > minimumConsensusPercentage;
}
ConsensusState
checkConsensus (
int previousProposers,
int currentProposers,
int currentAgree,
int currentFinished,
std::chrono::milliseconds previousAgreeTime,
std::chrono::milliseconds currentAgreeTime,
bool proposing,
beast::Journal j)
{
JLOG (j.trace()) <<
"checkConsensus: prop=" << currentProposers <<
"/" << previousProposers <<
" agree=" << currentAgree << " validated=" << currentFinished <<
" time=" << currentAgreeTime.count() << "/" << previousAgreeTime.count();
if (currentAgreeTime <= LEDGER_MIN_CONSENSUS)
return ConsensusState::No;
if (currentProposers < (previousProposers * 3 / 4))
{
// Less than 3/4 of the last ledger's proposers are present; don't
// rush: we may need more time.
if (currentAgreeTime < (previousAgreeTime + LEDGER_MIN_CONSENSUS))
{
JLOG (j.trace()) <<
"too fast, not enough proposers";
return ConsensusState::No;
}
}
// Have we, together with the nodes on our UNL list, reached the threshold
// to declare consensus?
if (checkConsensusReached (currentAgree, currentProposers, proposing))
{
JLOG (j.debug()) << "normal consensus";
return ConsensusState::Yes;
}
// Have sufficient nodes on our UNL list moved on and reached the threshold
// to declare consensus?
if (checkConsensusReached (currentFinished, currentProposers, false))
{
JLOG (j.warn()) <<
"We see no consensus, but 80% of nodes have moved on";
return ConsensusState::MovedOn;
}
// no consensus yet
JLOG (j.trace()) << "no consensus";
return ConsensusState::No;
}
} // ripple

View File

@@ -1514,7 +1514,7 @@ void
NetworkOPsImp::mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map)
{
mLedgerConsensus->mapComplete (hash, map, true);
mLedgerConsensus->gotMap (hash, map);
}
void NetworkOPsImp::endConsensus (bool correctLCL)

View File

@@ -95,6 +95,25 @@ apply (Application& app, OpenView& view,
STTx const& tx, ApplyFlags flags,
beast::Journal journal);
/** Class for return value from applyTransaction */
enum class ApplyResult
{
Success, // Applied to this ledger
Fail, // Should not be retried in this ledger
Retry // Should be retried in this ledger
};
/** Transaction application helper
Provides more detailed logging and decodes the
correct behavior based on the TER type
*/
ApplyResult
applyTransaction(Application& app, OpenView& view,
STTx const& tx, bool retryAssured, ApplyFlags flags,
beast::Journal journal);
} // ripple
#endif

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <BeastConfig.h>
#include <ripple/basics/Log.h>
#include <ripple/app/tx/apply.h>
#include <ripple/app/tx/applySteps.h>
#include <ripple/app/misc/HashRouter.h>
@@ -114,4 +115,51 @@ apply (Application& app, OpenView& view,
return doApply(pcresult, app, view);
}
ApplyResult
applyTransaction (Application& app, OpenView& view,
STTx const& txn,
bool retryAssured, ApplyFlags flags,
beast::Journal j)
{
// Returns false if the transaction has need not be retried.
if (retryAssured)
flags = flags | tapRETRY;
JLOG (j.debug()) << "TXN "
<< txn.getTransactionID ()
//<< (engine.view().open() ? " open" : " closed")
// because of the optional in engine
<< (retryAssured ? "/retry" : "/final");
try
{
auto const result = apply(app,
view, txn, flags, j);
if (result.second)
{
JLOG (j.debug())
<< "Transaction applied: " << transHuman (result.first);
return ApplyResult::Success;
}
if (isTefFailure (result.first) || isTemMalformed (result.first) ||
isTelLocal (result.first))
{
// failure
JLOG (j.debug())
<< "Transaction failure: " << transHuman (result.first);
return ApplyResult::Fail;
}
JLOG (j.debug())
<< "Transaction retry: " << transHuman (result.first);
return ApplyResult::Retry;
}
catch (std::exception const&)
{
JLOG (j.warn()) << "Throws";
return ApplyResult::Fail;
}
}
} // ripple