Merge branch 'continuousClose'

Conflicts:
	src/LedgerAcquire.cpp
	src/NetworkOPs.h

Merge continuous ledger close into main branch. YAY!
This commit is contained in:
JoelKatz
2012-07-16 14:48:21 -07:00
35 changed files with 750 additions and 546 deletions

View File

@@ -155,7 +155,7 @@ void LCTransaction::setVote(const uint160& peer, bool votesYes)
}
}
bool LCTransaction::updatePosition(int seconds, bool proposing)
bool LCTransaction::updatePosition(int percentTime, bool proposing)
{ // this many seconds after close, should our position change
if (mOurPosition && (mNays == 0))
return false;
@@ -169,9 +169,9 @@ bool LCTransaction::updatePosition(int seconds, bool proposing)
int weight = (mYays * 100 + (mOurPosition ? 100 : 0)) / (mNays + mYays + 1);
// To prevent avalanche stalls, we increase the needed weight slightly over time
if (seconds <= LEDGER_ACCEL_CONVERGE) newPosition = weight > AV_MIN_CONSENSUS;
else if (seconds >= LEDGER_CONVERGE) newPosition = weight > AV_AVG_CONSENSUS;
else newPosition = weight > AV_MAX_CONSENSUS;
if (percentTime < AV_MID_CONSENSUS_TIME) newPosition = weight > AV_INIT_CONSENSUS_PCT;
else if (percentTime < AV_LATE_CONSENSUS_TIME) newPosition = weight > AV_MID_CONSENSUS_PCT;
else newPosition = weight > AV_LATE_CONSENSUS_PCT;
}
else // don't let us outweight a proposing node, just recognize consensus
newPosition = mYays > mNays;
@@ -189,26 +189,41 @@ bool LCTransaction::updatePosition(int seconds, bool proposing)
return true;
}
int LCTransaction::getAgreeLevel()
{ // how much do nodes agree with us
if (mOurPosition) return (mYays * 100 + 100) / (mYays + mNays + 1);
return (mNays * 100 + 100) / (mYays + mNays + 1);
}
LedgerConsensus::LedgerConsensus(const uint256& prevLCLHash, Ledger::pointer previousLedger, uint32 closeTime)
: mState(lcsPRE_CLOSE), mCloseTime(closeTime), mPrevLedgerHash(prevLCLHash), mPreviousLedger(previousLedger)
: mState(lcsPRE_CLOSE), mCloseTime(closeTime), mPrevLedgerHash(prevLCLHash), mPreviousLedger(previousLedger),
mCurrentSeconds(0), mClosePercent(0), mHaveCloseTimeConsensus(false)
{
mValSeed = theConfig.VALIDATION_SEED;
Log(lsDEBUG) << "Creating consensus object";
Log(lsTRACE) << "LCL:" << previousLedger->getHash().GetHex() <<", ct=" << closeTime;
if (previousLedger->getHash() != prevLCLHash)
mPreviousProposers = theApp->getOPs().getPreviousProposers();
mPreviousSeconds = theApp->getOPs().getPreviousSeconds();
assert(mPreviousSeconds);
mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution(
previousLedger->getCloseResolution(), previousLedger->getCloseAgree(), previousLedger->getLedgerSeq() + 1);
mHaveCorrectLCL = previousLedger->getHash() == prevLCLHash;
if (!mHaveCorrectLCL)
{
mHaveCorrectLCL = mProposing = mValidating = false;
mAcquiringLedger = theApp->getMasterLedgerAcquire().findCreate(prevLCLHash);
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
for (std::vector<Peer::pointer>::const_iterator it = peerList.begin(), end = peerList.end(); it != end; ++it)
if ((*it)->hasLedger(prevLCLHash))
mAcquiringLedger->peerHas(*it);
}
else if (mValSeed.isValid())
{
mValidating = true;
mHaveCorrectLCL = mValidating = true;
mProposing = theApp->getOPs().getOperatingMode() == NetworkOPs::omFULL;
}
else mProposing = mValidating = false;
else
{
mHaveCorrectLCL = true;
mProposing = mValidating = false;
}
}
void LedgerConsensus::takeInitialPosition(Ledger::pointer initialLedger)
@@ -233,9 +248,9 @@ void LedgerConsensus::takeInitialPosition(Ledger::pointer initialLedger)
if (mValidating)
mOurPosition = boost::make_shared<LedgerProposal>
(mValSeed, initialLedger->getParentHash(), txSet);
(mValSeed, initialLedger->getParentHash(), txSet, mCloseTime);
else
mOurPosition = boost::make_shared<LedgerProposal>(initialLedger->getParentHash(), txSet);
mOurPosition = boost::make_shared<LedgerProposal>(initialLedger->getParentHash(), txSet, mCloseTime);
mapComplete(txSet, initialSet, false);
if (mProposing) propose(std::vector<uint256>(), std::vector<uint256>());
}
@@ -342,84 +357,70 @@ void LedgerConsensus::statusChange(newcoin::NodeEvent event, Ledger::pointer led
Log(lsINFO) << "send status change to peer";
}
void LedgerConsensus::abort()
{
Log(lsWARNING) << "consensus aborted";
mState = lcsABORTED;
}
int LedgerConsensus::startup()
{
// create wobble ledger in case peers target transactions to it
theApp->getMasterLedger().beginWobble();
return 1;
}
int LedgerConsensus::statePreClose(int secondsSinceClose)
void LedgerConsensus::statePreClose()
{ // it is shortly before ledger close time
if (secondsSinceClose >= 0)
{ // it is time to close the ledger (swap default and wobble ledgers)
Log(lsINFO) << "Closing ledger";
mState = lcsPOST_CLOSE;
theApp->getMasterLedger().closeTime();
statusChange(newcoin::neCLOSING_LEDGER, mPreviousLedger);
}
return 1;
}
bool anyTransactions = theApp->getMasterLedger().getCurrentLedger()->peekTransactionMap()->getHash().isNonZero();
int proposersClosed = mPeerPositions.size();
int LedgerConsensus::statePostClose(int secondsSinceClose)
{ // we are in the transaction wobble time
if (secondsSinceClose > LEDGER_WOBBLE_TIME)
{
Log(lsINFO) << "Wobble is over, it's consensus time";
int sinceClose = theApp->getOPs().getNetworkTimeNC() - theApp->getOPs().getLastCloseNetTime();
if (sinceClose >= ContinuousLedgerTiming::shouldClose(anyTransactions, mPreviousProposers, proposersClosed,
mPreviousSeconds, sinceClose))
{ // it is time to close the ledger (swap default and wobble ledgers)
Log(lsINFO) << "CLC:: closing ledger";
mState = lcsESTABLISH;
Ledger::pointer initial = theApp->getMasterLedger().endWobble();
mConsensusStartTime = boost::posix_time::second_clock::universal_time();
mCloseTime = theApp->getOPs().getNetworkTimeNC();
theApp->getOPs().setLastCloseNetTime(mCloseTime);
statusChange(newcoin::neCLOSING_LEDGER, mPreviousLedger);
Ledger::pointer initial = theApp->getMasterLedger().closeLedger();
assert (initial->getParentHash() == mPreviousLedger->getHash());
takeInitialPosition(initial);
}
return 1;
}
int LedgerConsensus::stateEstablish(int secondsSinceClose)
void LedgerConsensus::stateEstablish()
{ // we are establishing consensus
if (mProposing)
updateOurPositions(secondsSinceClose);
if (secondsSinceClose > LEDGER_MAX_CONVERGE)
if (mCurrentSeconds < LEDGER_MIN_CONSENSUS)
return;
updateOurPositions();
if (!mHaveCloseTimeConsensus)
{
Log(lsINFO) << "No close time consensus";
}
else if (haveConsensus())
{
Log(lsINFO) << "Converge cutoff";
mState = lcsCUTOFF;
}
return 1;
}
int LedgerConsensus::stateCutoff(int secondsSinceClose)
{ // we are making sure everyone else agrees
bool haveConsensus = updateOurPositions(secondsSinceClose);
if (haveConsensus || (secondsSinceClose > LEDGER_CONVERGE))
{
Log(lsINFO) << "Consensus complete (" << haveConsensus << ")";
mState = lcsFINISHED;
beginAccept();
}
return 1;
}
int LedgerConsensus::stateFinished(int secondsSinceClose)
void LedgerConsensus::stateFinished()
{ // we are processing the finished ledger
// logic of calculating next ledger advances us out of this state
return 1;
// CHECKME: Should we count proposers that didn't converge to our consensus set?
int convergeTime = (boost::posix_time::second_clock::universal_time() - mConsensusStartTime).seconds();
if (convergeTime <= 0) convergeTime = 1;
theApp->getOPs().newLCL(mPeerPositions.size(), convergeTime, mNewLedgerHash);
}
int LedgerConsensus::stateAccepted(int secondsSinceClose)
void LedgerConsensus::stateAccepted()
{ // we have accepted a new ledger
endConsensus();
return 4;
}
int LedgerConsensus::timerEntry()
void LedgerConsensus::timerEntry()
{
if (!mHaveCorrectLCL)
{
Log(lsINFO) << "Checking for consensus ledger " << mPrevLedgerHash.GetHex();
Ledger::pointer consensus = theApp->getMasterLedger().getLedgerByHash(mPrevLedgerHash);
if (consensus)
{
@@ -429,45 +430,42 @@ int LedgerConsensus::timerEntry()
mPreviousLedger = consensus;
mHaveCorrectLCL = true;
}
else Log(lsINFO) << "We still don't have it";
}
int sinceClose = theApp->getOPs().getNetworkTimeNC() - mCloseTime;
mCurrentSeconds = (mCloseTime != 0) ? (theApp->getOPs().getNetworkTimeNC() - mCloseTime) : 0;
mClosePercent = mCurrentSeconds * 100 / mPreviousSeconds;
switch (mState)
{
case lcsPRE_CLOSE: return statePreClose(sinceClose);
case lcsPOST_CLOSE: return statePostClose(sinceClose);
case lcsESTABLISH: return stateEstablish(sinceClose);
case lcsCUTOFF: return stateCutoff(sinceClose);
case lcsFINISHED: return stateFinished(sinceClose);
case lcsACCEPTED: return stateAccepted(sinceClose);
case lcsABORTED: return 1;
case lcsPRE_CLOSE: statePreClose(); return;
case lcsESTABLISH: stateEstablish(); return;
case lcsFINISHED: stateFinished(); return;
case lcsACCEPTED: stateAccepted(); return;
}
assert(false);
return 1;
}
bool LedgerConsensus::updateOurPositions(int sinceClose)
{ // returns true if the network has consensus
void LedgerConsensus::updateOurPositions()
{
Log(lsINFO) << "Updating our positions";
bool changes = false;
bool stable = true;
SHAMap::pointer ourPosition;
std::vector<uint256> addedTx, removedTx;
for(boost::unordered_map<uint256, LCTransaction::pointer>::iterator it = mDisputes.begin(),
end = mDisputes.end(); it != end; ++it)
{
if (it->second->updatePosition(sinceClose, mProposing))
if (it->second->updatePosition(mClosePercent, mProposing))
{
if (!changes)
{
ourPosition = mComplete[mOurPosition->getCurrentHash()]->snapShot(true);
changes = true;
stable = false;
}
if (it->second->getOurPosition()) // now a yes
{
ourPosition->addItem(SHAMapItem(it->first, it->second->peekTransaction()), true);
ourPosition->addItem(SHAMapItem(it->first, it->second->peekTransaction()), true, false);
addedTx.push_back(it->first);
}
else // now a no
@@ -476,20 +474,60 @@ bool LedgerConsensus::updateOurPositions(int sinceClose)
removedTx.push_back(it->first);
}
}
else if (it->second->getAgreeLevel() < AV_PCT_STOP)
stable = false;
}
std::map<uint32, int> closeTimes;
for (boost::unordered_map<uint160, LedgerProposal::pointer>::iterator it = mPeerPositions.begin(),
end = mPeerPositions.end(); it != end; ++it)
++closeTimes[it->second->getCloseTime() - (it->second->getCloseTime() % mCloseResolution)];
++closeTimes[mOurPosition->getCloseTime() - (mOurPosition->getCloseTime() % mCloseResolution)];
int neededWeight;
if (mClosePercent < AV_MID_CONSENSUS_TIME)
neededWeight = AV_INIT_CONSENSUS_PCT;
else if (mClosePercent < AV_LATE_CONSENSUS_TIME)
neededWeight = AV_MID_CONSENSUS_PCT;
else neededWeight = AV_LATE_CONSENSUS_PCT;
int thresh = mPeerPositions.size() * neededWeight / 100;
uint32 closeTime = 0;
for (std::map<uint32, int>::iterator it = closeTimes.begin(), end = closeTimes.end(); it != end; ++it)
{
if (it->second > thresh)
{
mHaveCloseTimeConsensus = true;
closeTime = it->first;
}
}
if (closeTime != (mOurPosition->getCloseTime() - (mOurPosition->getCloseTime() % mCloseResolution)))
changes = true;
if (changes)
{
uint256 newHash = ourPosition->getHash();
mOurPosition->changePosition(newHash);
mOurPosition->changePosition(newHash, closeTime);
if (mProposing) propose(addedTx, removedTx);
mapComplete(newHash, ourPosition, false);
Log(lsINFO) << "We change our position to " << newHash.GetHex();
}
}
return stable;
bool LedgerConsensus::haveConsensus()
{
int agree = 0, disagree = 0;
uint256 ourPosition = mOurPosition->getCurrentHash();
for (boost::unordered_map<uint160, LedgerProposal::pointer>::iterator it = mPeerPositions.begin(),
end = mPeerPositions.end(); it != end; ++it)
{
if (it->second->getCurrentHash() == ourPosition)
++agree;
else
++disagree;
}
int currentValidations = theApp->getValidations().getCurrentValidationCount(mPreviousLedger->getCloseTimeNC());
return ContinuousLedgerTiming::haveConsensus(mPreviousProposers, agree + disagree, agree, currentValidations,
mPreviousSeconds, mCurrentSeconds);
}
SHAMap::pointer LedgerConsensus::getTransactionTree(const uint256& hash, bool doAcquire)
@@ -546,6 +584,7 @@ void LedgerConsensus::propose(const std::vector<uint256>& added, const std::vect
newcoin::TMProposeSet prop;
prop.set_currenttxhash(mOurPosition->getCurrentHash().begin(), 256 / 8);
prop.set_proposeseq(mOurPosition->getProposeSeq());
prop.set_closetime(mOurPosition->getCloseTime());
std::vector<unsigned char> pubKey = mOurPosition->getPubKey();
std::vector<unsigned char> sig = mOurPosition->sign();
@@ -586,6 +625,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec
bool LedgerConsensus::peerPosition(LedgerProposal::pointer newPosition)
{
LedgerProposal::pointer& currentPosition = mPeerPositions[newPosition->getPeerID()];
if (currentPosition)
{
assert(newPosition->getPeerID() == currentPosition->getPeerID());
@@ -598,6 +638,10 @@ bool LedgerConsensus::peerPosition(LedgerProposal::pointer newPosition)
return true;
}
}
else if (newPosition->getProposeSeq() == 0)
{ // new initial close time estimate
++mCloseTimes[newPosition->getCloseTime()];
}
Log(lsINFO) << "Processing peer proposal " << newPosition->getProposeSeq() << "/"
<< newPosition->getCurrentHash().GetHex();
currentPosition = newPosition;
@@ -666,7 +710,7 @@ void LedgerConsensus::applyTransaction(TransactionEngine& engine, SerializedTran
try
{
#endif
TransactionEngineResult result = engine.applyTransaction(*txn, parms, 0);
TransactionEngineResult result = engine.applyTransaction(*txn, parms);
if (result > 0)
{
Log(lsINFO) << " retry";
@@ -725,7 +769,7 @@ void LedgerConsensus::applyTransactions(SHAMap::pointer set, Ledger::pointer led
{
try
{
TransactionEngineResult result = engine.applyTransaction(*it->second, parms, 0);
TransactionEngineResult result = engine.applyTransaction(*it->second, parms);
if (result <= 0)
{
if (result == 0) ++successes;
@@ -758,7 +802,16 @@ void LedgerConsensus::accept(SHAMap::pointer set)
CanonicalTXSet failedTransactions(set->getHash());
applyTransactions(set, newLCL, failedTransactions, true);
newLCL->setClosed();
newLCL->setAccepted();
uint32 closeTime = mOurPosition->getCloseTime();
bool closeTimeCorrect = true;
if (closeTime == 0)
{ // we didn't agree
closeTimeCorrect = false;
closeTime = mPreviousLedger->getCloseTimeNC() + 1;
}
newLCL->setAccepted(closeTime, mCloseResolution, closeTimeCorrect);
newLCL->updateHash();
uint256 newLCLHash = newLCL->getHash();
Log(lsTRACE) << "newLCL " << newLCLHash.GetHex();
@@ -804,6 +857,7 @@ void LedgerConsensus::accept(SHAMap::pointer set)
applyTransactions(theApp->getMasterLedger().getCurrentLedger()->peekTransactionMap(), newOL,
failedTransactions, false);
theApp->getMasterLedger().pushLedger(newLCL, newOL);
mNewLedgerHash = newLCL->getHash();
mState = lcsACCEPTED;
sl.unlock();