Refactor LedgerConsensus code. Add new LC work.

Properly sequence LC states
Properly handle wobble time.
Complete LC sequence correctly and exit it.
This commit is contained in:
JoelKatz
2012-05-29 18:02:17 -07:00
parent 0c4c27880d
commit 6123e3886a
5 changed files with 190 additions and 109 deletions

View File

@@ -1,4 +1,3 @@
#include "LedgerConsensus.h"
#include "Application.h"
@@ -150,21 +149,16 @@ LedgerConsensus::LedgerConsensus(Ledger::pointer previousLedger, uint32 closeTim
{
}
void LedgerConsensus::closeTime(Ledger::pointer& current)
void LedgerConsensus::takeInitialPosition(Ledger::pointer initialLedger)
{
if (mState != lcsPRE_CLOSE)
{
assert(false);
return;
}
CKey::pointer nodePrivKey = boost::make_shared<CKey>();
nodePrivKey->MakeNewKey(); // FIXME
current->updateHash();
uint256 txSet = current->getTransHash();
mOurPosition = boost::make_shared<LedgerProposal>(nodePrivKey, current->getParentHash(), txSet);
mapComplete(txSet, current->peekTransactionMap()->snapShot());
SHAMap::pointer initialSet = initialLedger->peekTransactionMap()->snapShot();
uint256 txSet = initialSet->getHash();
mOurPosition = boost::make_shared<LedgerProposal>(nodePrivKey, initialLedger->getParentHash(), txSet);
mapComplete(txSet, initialSet);
propose(std::vector<uint256>(), std::vector<uint256>());
}
void LedgerConsensus::mapComplete(const uint256& hash, SHAMap::pointer map)
@@ -237,6 +231,18 @@ void LedgerConsensus::adjustCount(SHAMap::pointer map, const std::vector<uint256
}
}
void LedgerConsensus::statusChange(newcoin::NodeEvent event, Ledger::pointer ledger)
{
newcoin::TMStatusChange s;
s.set_newevent(event);
s.set_ledgerseq(ledger->getLedgerSeq());
s.set_networktime(theApp->getOPs().getNetworkTimeNC());
uint256 plhash = ledger->getParentHash();
s.set_previousledgerhash(plhash.begin(), plhash.size());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(s, newcoin::mtSTATUS_CHANGE);
theApp->getConnectionPool().relayMessage(NULL, packet);
}
void LedgerConsensus::abort()
{
mState = lcsABORTED;
@@ -247,64 +253,119 @@ int LedgerConsensus::startup()
return 1;
}
int LedgerConsensus::statePreClose(int secondsSinceClose)
{ // it is shortly before ledger close time
if (secondsSinceClose >= 0)
{ // it is time to close the ledger
mState = lcsPOST_CLOSE;
closeLedger();
}
return 1;
}
int LedgerConsensus::statePostClose(int secondsSinceClose)
{ // we are in the transaction wobble time
if (secondsSinceClose > LEDGER_WOBBLE_TIME)
mState = lcsESTABLISH;
return 1;
}
int LedgerConsensus::stateEstablish(int secondsSinceClose)
{ // we are establishing consensus
updateOurPositions(secondsSinceClose);
if (secondsSinceClose > LEDGER_CONVERGE)
mState = lcsCUTOFF;
return 1;
}
int LedgerConsensus::stateCutoff(int secondsSinceClose)
{ // we are making sure everyone else agrees
bool haveConsensus = updateOurPositions(secondsSinceClose);
if (haveConsensus || (secondsSinceClose > LEDGER_FORCE_CONVERGE))
{
mState = lcsFINISHED;
beginAccept();
}
return 1;
}
int LedgerConsensus::stateFinished(int secondsSinceClose)
{ // we are processing the finished ledger
// logic of calculating next ledger advances us out of this state
return 1;
}
int LedgerConsensus::stateAccepted(int secondsSinceClose)
{ // we have accepted a new ledger
statusChange(newcoin::neACCEPTED_LEDGER, theApp->getMasterLedger().getClosedLedger());
endConsensus();
return 4;
}
int LedgerConsensus::timerEntry()
{
int sinceClose = theApp->getOPs().getNetworkTimeNC() - mCloseTime;
if ((mState == lcsESTABLISH) || (mState == lcsCUTOFF))
switch (mState)
{
if (sinceClose >= LEDGER_FORCE_CONVERGE)
{
mState = lcsCUTOFF;
sinceClose = LEDGER_FORCE_CONVERGE;
}
bool changes = false;
bool stable = true;
SHAMap::pointer ourPosition;
std::vector<uint256> addedTx, removedTx;
for(boost::unordered_map<uint256, LCTransaction::pointer>::iterator it = mDisputes.begin(),
end = mDisputes.end(); it != end; ++it)
{
if (it->second->updatePosition(sinceClose))
{
if (changes)
{
ourPosition = mComplete[mOurPosition->getCurrentHash()]->snapShot();
changes = true;
stable = false;
}
if (it->second->getOurPosition()) // now a yes
{
ourPosition->addItem(SHAMapItem(it->first, it->second->peekTransaction()), true);
addedTx.push_back(it->first);
}
else // now a no
{
ourPosition->delItem(it->first);
removedTx.push_back(it->first);
}
}
else if (it->second->getAgreeLevel() < AV_PCT_STOP)
stable = false;
}
if (changes)
{
uint256 newHash = ourPosition->getHash();
mOurPosition->changePosition(newHash);
propose(addedTx, removedTx);
std::vector<uint256> hashes;
hashes.push_back(newHash);
sendHaveTxSet(hashes);
}
else if (stable && (mState == lcsCUTOFF))
mState = lcsFINISHED;
case lcsPRE_CLOSE: return statePreClose(sinceClose);
case lcsPOST_CLOSE: return statePostClose(sinceClose);
case lcsESTABLISH: return stateEstablish(sinceClose);
case lcsCUTOFF: return stateCutoff(sinceClose);
case lcsFINISHED: return stateFinished(sinceClose);
case lcsACCEPTED: return stateAccepted(sinceClose);
case lcsABORTED: return stateAccepted(sinceClose);
}
assert(false);
return 1;
}
bool LedgerConsensus::updateOurPositions(int sinceClose)
{ // returns true if the network has consensus
bool changes = false;
bool stable = true;
SHAMap::pointer ourPosition;
std::vector<uint256> addedTx, removedTx;
for(boost::unordered_map<uint256, LCTransaction::pointer>::iterator it = mDisputes.begin(),
end = mDisputes.end(); it != end; ++it)
{
if (it->second->updatePosition(sinceClose))
{
if (changes)
{
ourPosition = mComplete[mOurPosition->getCurrentHash()]->snapShot();
changes = true;
stable = false;
}
if (it->second->getOurPosition()) // now a yes
{
ourPosition->addItem(SHAMapItem(it->first, it->second->peekTransaction()), true);
addedTx.push_back(it->first);
}
else // now a no
{
ourPosition->delItem(it->first);
removedTx.push_back(it->first);
}
}
else if (it->second->getAgreeLevel() < AV_PCT_STOP)
stable = false;
}
if (changes)
{
uint256 newHash = ourPosition->getHash();
mOurPosition->changePosition(newHash);
propose(addedTx, removedTx);
std::vector<uint256> hashes;
hashes.push_back(newHash);
sendHaveTxSet(hashes);
}
return stable;
}
SHAMap::pointer LedgerConsensus::getTransactionTree(const uint256& hash, bool doAcquire)
{
boost::unordered_map<uint256, SHAMap::pointer>::iterator it = mComplete.find(hash);
@@ -324,6 +385,15 @@ SHAMap::pointer LedgerConsensus::getTransactionTree(const uint256& hash, bool do
return it->second;
}
void LedgerConsensus::closeLedger()
{
Ledger::pointer initial = theApp->getMasterLedger().getCurrentLedger();
statusChange(newcoin::neCLOSING_LEDGER, initial);
takeInitialPosition(initial);
initial->bumpSeq();
statusChange(newcoin::neCLOSING_LEDGER, mPreviousLedger);
}
void LedgerConsensus::startAcquiring(TransactionAcquire::pointer acquire)
{
boost::unordered_map< uint256, std::vector< boost::weak_ptr<Peer> > >::iterator it =
@@ -449,3 +519,21 @@ bool LedgerConsensus::peerGaveNodes(Peer::pointer peer, const uint256& setHash,
if (acq == mAcquiring.end()) return false;
return acq->second->takeNodes(nodeIDs, nodeData, peer);
}
void LedgerConsensus::beginAccept()
{
// WRITEME
// 1) Extract the consensus transaction set
// 2) Snapshot the last closed ledger
// 3) Dispatch a thread to:
// A) apply the consensus transaction set in canonical order
// B) Apply the consensus transaction set and replace the last closed ledger
// C) Rebuild the current ledger, applying as many transactions as possible
// D) Send a network state change
// E) Change the consensus state to lcsACCEPTED
}
void LedgerConsensus::endConsensus()
{
theApp->getOPs().endConsensus();
}

View File

@@ -106,11 +106,18 @@ protected:
void addPosition(LedgerProposal&, bool ours);
void removePosition(LedgerProposal&, bool ours);
void sendHaveTxSet(const std::vector<uint256>& txSetHashes);
void closeLedger();
// manipulating our own position
void takeInitialPosition(Ledger::pointer initialLedger);
bool updateOurPositions(int sinceClose);
void statusChange(newcoin::NodeEvent, Ledger::pointer ledger);
int getThreshold();
void beginAccept();
void endConsensus();
public:
LedgerConsensus(Ledger::pointer previousLedger, uint32 closeTime);
void closeTime(Ledger::pointer& currentLedger);
int startup();
@@ -121,7 +128,15 @@ public:
void mapComplete(const uint256& hash, SHAMap::pointer map);
void abort();
int timerEntry(void);
int timerEntry();
// state handlers
int statePreClose(int secondsSinceClose);
int statePostClose(int secondsSinceClose);
int stateEstablish(int secondsSinceClose);
int stateCutoff(int secondsSinceClose);
int stateFinished(int secondsSinceClose);
int stateAccepted(int secondsSinceClose);
bool peerPosition(LedgerProposal::pointer);

View File

@@ -8,7 +8,7 @@
#include "Application.h"
#include "Transaction.h"
#include "LedgerConsensus.h"
#include "LedgerTiming.h"
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
@@ -152,13 +152,13 @@ AccountState::pointer NetworkOPs::getAccountState(const NewcoinAddress& accountI
void NetworkOPs::setStateTimer(int sec)
{ // set timer early if ledger is closing
uint64 closedTime = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC();
uint64 consensusTime = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC() - LEDGER_WOBBLE_TIME;
uint64 now = getNetworkTimeNC();
if ((mMode == omFULL) && !mConsensus)
{
if (now >= closedTime) sec = 0;
else if (sec > (closedTime - now)) sec = (closedTime - now);
if (now >= consensusTime) sec = 0;
else if (sec > (consensusTime - now)) sec = (consensusTime - now);
}
mNetTimer.expires_from_now(boost::posix_time::seconds(sec));
@@ -309,20 +309,13 @@ void NetworkOPs::checkState(const boost::system::error_code& result)
// check if the ledger is bad enough to go to omTRACKING
}
int secondsToClose = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC() -
theApp->getOPs().getNetworkTimeNC();
if ((!mConsensus) && (secondsToClose < LEDGER_WOBBLE_TIME)) // pre close wobble
beginConsensus(theApp->getMasterLedger().getCurrentLedger());
if (mConsensus)
{
setStateTimer(mConsensus->timerEntry());
return;
}
Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger();
if (getNetworkTimeNC() >= currentLedger->getCloseTimeNC())
{
setStateTimer(beginConsensus(currentLedger, false));
return;
}
setStateTimer(10);
else setStateTimer(10);
}
void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger)
@@ -350,7 +343,7 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger)
}
// vim:ts=4
int NetworkOPs::beginConsensus(Ledger::pointer closingLedger, bool isEarly)
int NetworkOPs::beginConsensus(Ledger::pointer closingLedger)
{
#ifdef DEBUG
std::cerr << "Ledger close time for ledger " << closingLedger->getLedgerSeq() << std::endl;
@@ -362,34 +355,21 @@ int NetworkOPs::beginConsensus(Ledger::pointer closingLedger, bool isEarly)
return 3;
}
// Create a new ledger to be the open ledger
theApp->getMasterLedger().pushLedger(boost::make_shared<Ledger>(closingLedger));
// Create a consensus object to get consensus on this ledger
if (!!mConsensus) mConsensus->abort();
mConsensus = boost::make_shared<LedgerConsensus>(prevLedger,
theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC());
mConsensus->closeTime(closingLedger); // FIXME: Create consensus a few seconds before close time
mConsensus = boost::make_shared<LedgerConsensus>
(prevLedger, theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC());
#ifdef DEBUG
std::cerr << "Broadcasting ledger close" << std::endl;
#endif
newcoin::TMStatusChange s;
s.set_newevent(newcoin::neCLOSING_LEDGER);
s.set_ledgerseq(closingLedger->getLedgerSeq());
s.set_networktime(getNetworkTimeNC());
uint256 plhash = closingLedger->getParentHash();
s.set_previousledgerhash(plhash.begin(), plhash.size());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(s, newcoin::mtSTATUS_CHANGE);
theApp->getConnectionPool().relayMessage(NULL, packet);
return mConsensus->startup();
}
bool NetworkOPs::proposeLedger(const uint256& prevLgr, uint32 proposeSeq, const uint256& proposeHash,
bool NetworkOPs::recvPropose(const uint256& prevLgr, uint32 proposeSeq, const uint256& proposeHash,
const std::string& pubKey, const std::string& signature)
{
if (mMode != omFULL)
if (mMode != omFULL) // FIXME: Should we relay?
return true;
LedgerProposal::pointer proposal =
@@ -403,15 +383,7 @@ bool NetworkOPs::proposeLedger(const uint256& prevLgr, uint32 proposeSeq, const
// Is this node on our UNL?
// WRITEME
Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger();
if (!mConsensus)
{
if ((getNetworkTimeNC() + 2) >= currentLedger->getCloseTimeNC())
setStateTimer(beginConsensus(currentLedger, true));
if (!mConsensus) return false;
}
if (!mConsensus) return false;
return mConsensus->peerPosition(proposal);
}
@@ -440,4 +412,9 @@ void NetworkOPs::mapComplete(const uint256& hash, SHAMap::pointer map)
mConsensus->mapComplete(hash, map);
}
void NetworkOPs::endConsensus()
{
mConsensus = boost::shared_ptr<LedgerConsensus>();
}
// vim:ts=4

View File

@@ -77,7 +77,7 @@ public:
const std::vector<unsigned char>& myNode, std::list< std::vector<unsigned char> >& newNodes);
// ledger proposal/close functions
bool proposeLedger(const uint256& prevLgrHash, uint32 proposeSeq, const uint256& proposeHash,
bool recvPropose(const uint256& prevLgrHash, uint32 proposeSeq, const uint256& proposeHash,
const std::string& pubKey, const std::string& signature);
bool gotTXData(boost::shared_ptr<Peer> peer, const uint256& hash,
const std::list<SHAMapNode>& nodeIDs, const std::list< std::vector<unsigned char> >& nodeData);
@@ -88,7 +88,8 @@ public:
// network state machine
void checkState(const boost::system::error_code& result);
void switchLastClosedLedger(Ledger::pointer newLedger); // Used for the "jump" case
int beginConsensus(Ledger::pointer closingLedger, bool isEarly);
int beginConsensus(Ledger::pointer closingLedger);
void endConsensus();
void setStateTimer(int seconds);
};

View File

@@ -621,7 +621,7 @@ void Peer::recvPropose(newcoin::TMProposeSet& packet)
memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32);
memcpy(prevLgrHash.begin(), packet.prevclosedhash().data(), 32);
if(theApp->getOPs().proposeLedger(prevLgrHash, proposeSeq, currentTxHash,
if(theApp->getOPs().recvPropose(prevLgrHash, proposeSeq, currentTxHash,
packet.nodepubkey(), packet.signature()))
{ // FIXME: Not all nodes will want proposals
PackedMessage::pointer message = boost::make_shared<PackedMessage>(packet, newcoin::mtPROPOSE_LEDGER);