diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 2a5f41ea2a..207406388b 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -1,4 +1,3 @@ - #include "LedgerConsensus.h" #include "Application.h" @@ -150,21 +149,16 @@ LedgerConsensus::LedgerConsensus(Ledger::pointer previousLedger, uint32 closeTim { } -void LedgerConsensus::closeTime(Ledger::pointer& current) +void LedgerConsensus::takeInitialPosition(Ledger::pointer initialLedger) { - if (mState != lcsPRE_CLOSE) - { - assert(false); - return; - } - CKey::pointer nodePrivKey = boost::make_shared(); nodePrivKey->MakeNewKey(); // FIXME - current->updateHash(); - uint256 txSet = current->getTransHash(); - mOurPosition = boost::make_shared(nodePrivKey, current->getParentHash(), txSet); - mapComplete(txSet, current->peekTransactionMap()->snapShot()); + SHAMap::pointer initialSet = initialLedger->peekTransactionMap()->snapShot(); + uint256 txSet = initialSet->getHash(); + mOurPosition = boost::make_shared(nodePrivKey, initialLedger->getParentHash(), txSet); + mapComplete(txSet, initialSet); + propose(std::vector(), std::vector()); } void LedgerConsensus::mapComplete(const uint256& hash, SHAMap::pointer map) @@ -237,6 +231,18 @@ void LedgerConsensus::adjustCount(SHAMap::pointer map, const std::vectorgetLedgerSeq()); + s.set_networktime(theApp->getOPs().getNetworkTimeNC()); + uint256 plhash = ledger->getParentHash(); + s.set_previousledgerhash(plhash.begin(), plhash.size()); + PackedMessage::pointer packet = boost::make_shared(s, newcoin::mtSTATUS_CHANGE); + theApp->getConnectionPool().relayMessage(NULL, packet); +} + void LedgerConsensus::abort() { mState = lcsABORTED; @@ -247,64 +253,119 @@ int LedgerConsensus::startup() return 1; } +int LedgerConsensus::statePreClose(int secondsSinceClose) +{ // it is shortly before ledger close time + if (secondsSinceClose >= 0) + { // it is time to close the ledger + mState = lcsPOST_CLOSE; + closeLedger(); + } + return 1; +} + +int LedgerConsensus::statePostClose(int secondsSinceClose) +{ // we are in the transaction wobble time + if (secondsSinceClose > LEDGER_WOBBLE_TIME) + mState = lcsESTABLISH; + return 1; +} + +int LedgerConsensus::stateEstablish(int secondsSinceClose) +{ // we are establishing consensus + updateOurPositions(secondsSinceClose); + if (secondsSinceClose > LEDGER_CONVERGE) + mState = lcsCUTOFF; + return 1; +} + +int LedgerConsensus::stateCutoff(int secondsSinceClose) +{ // we are making sure everyone else agrees + bool haveConsensus = updateOurPositions(secondsSinceClose); + if (haveConsensus || (secondsSinceClose > LEDGER_FORCE_CONVERGE)) + { + mState = lcsFINISHED; + beginAccept(); + } + return 1; +} + +int LedgerConsensus::stateFinished(int secondsSinceClose) +{ // we are processing the finished ledger + // logic of calculating next ledger advances us out of this state + return 1; +} + +int LedgerConsensus::stateAccepted(int secondsSinceClose) +{ // we have accepted a new ledger + statusChange(newcoin::neACCEPTED_LEDGER, theApp->getMasterLedger().getClosedLedger()); + endConsensus(); + return 4; +} + int LedgerConsensus::timerEntry() { int sinceClose = theApp->getOPs().getNetworkTimeNC() - mCloseTime; - if ((mState == lcsESTABLISH) || (mState == lcsCUTOFF)) + switch (mState) { - if (sinceClose >= LEDGER_FORCE_CONVERGE) - { - mState = lcsCUTOFF; - sinceClose = LEDGER_FORCE_CONVERGE; - } - - bool changes = false; - bool stable = true; - SHAMap::pointer ourPosition; - std::vector addedTx, removedTx; - - for(boost::unordered_map::iterator it = mDisputes.begin(), - end = mDisputes.end(); it != end; ++it) - { - if (it->second->updatePosition(sinceClose)) - { - if (changes) - { - ourPosition = mComplete[mOurPosition->getCurrentHash()]->snapShot(); - changes = true; - stable = false; - } - if (it->second->getOurPosition()) // now a yes - { - ourPosition->addItem(SHAMapItem(it->first, it->second->peekTransaction()), true); - addedTx.push_back(it->first); - } - else // now a no - { - ourPosition->delItem(it->first); - removedTx.push_back(it->first); - } - } - else if (it->second->getAgreeLevel() < AV_PCT_STOP) - stable = false; - } - - if (changes) - { - uint256 newHash = ourPosition->getHash(); - mOurPosition->changePosition(newHash); - propose(addedTx, removedTx); - std::vector hashes; - hashes.push_back(newHash); - sendHaveTxSet(hashes); - } - else if (stable && (mState == lcsCUTOFF)) - mState = lcsFINISHED; + case lcsPRE_CLOSE: return statePreClose(sinceClose); + case lcsPOST_CLOSE: return statePostClose(sinceClose); + case lcsESTABLISH: return stateEstablish(sinceClose); + case lcsCUTOFF: return stateCutoff(sinceClose); + case lcsFINISHED: return stateFinished(sinceClose); + case lcsACCEPTED: return stateAccepted(sinceClose); + case lcsABORTED: return stateAccepted(sinceClose); } + assert(false); return 1; } +bool LedgerConsensus::updateOurPositions(int sinceClose) +{ // returns true if the network has consensus + bool changes = false; + bool stable = true; + SHAMap::pointer ourPosition; + std::vector addedTx, removedTx; + + for(boost::unordered_map::iterator it = mDisputes.begin(), + end = mDisputes.end(); it != end; ++it) + { + if (it->second->updatePosition(sinceClose)) + { + if (changes) + { + ourPosition = mComplete[mOurPosition->getCurrentHash()]->snapShot(); + changes = true; + stable = false; + } + if (it->second->getOurPosition()) // now a yes + { + ourPosition->addItem(SHAMapItem(it->first, it->second->peekTransaction()), true); + addedTx.push_back(it->first); + } + else // now a no + { + ourPosition->delItem(it->first); + removedTx.push_back(it->first); + } + } + else if (it->second->getAgreeLevel() < AV_PCT_STOP) + stable = false; + } + + if (changes) + { + uint256 newHash = ourPosition->getHash(); + mOurPosition->changePosition(newHash); + propose(addedTx, removedTx); + std::vector hashes; + hashes.push_back(newHash); + sendHaveTxSet(hashes); + } + + return stable; +} + SHAMap::pointer LedgerConsensus::getTransactionTree(const uint256& hash, bool doAcquire) { boost::unordered_map::iterator it = mComplete.find(hash); @@ -324,6 +385,15 @@ SHAMap::pointer LedgerConsensus::getTransactionTree(const uint256& hash, bool do return it->second; } +void LedgerConsensus::closeLedger() +{ + Ledger::pointer initial = theApp->getMasterLedger().getCurrentLedger(); + statusChange(newcoin::neCLOSING_LEDGER, initial); + takeInitialPosition(initial); + initial->bumpSeq(); + statusChange(newcoin::neCLOSING_LEDGER, mPreviousLedger); +} + void LedgerConsensus::startAcquiring(TransactionAcquire::pointer acquire) { boost::unordered_map< uint256, std::vector< boost::weak_ptr > >::iterator it = @@ -449,3 +519,21 @@ bool LedgerConsensus::peerGaveNodes(Peer::pointer peer, const uint256& setHash, if (acq == mAcquiring.end()) return false; return acq->second->takeNodes(nodeIDs, nodeData, peer); } + +void LedgerConsensus::beginAccept() +{ + // WRITEME + // 1) Extract the consensus transaction set + // 2) Snapshot the last closed ledger + // 3) Dispatch a thread to: + // A) apply the consensus transaction set in canonical order + // B) Apply the consensus transaction set and replace the last closed ledger + // C) Rebuild the current ledger, applying as many transactions as possible + // D) Send a network state change + // E) Change the consensus state to lcsACCEPTED +} + +void LedgerConsensus::endConsensus() +{ + theApp->getOPs().endConsensus(); +} diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 2041351fa4..48842d2b26 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -106,11 +106,18 @@ protected: void addPosition(LedgerProposal&, bool ours); void removePosition(LedgerProposal&, bool ours); void sendHaveTxSet(const std::vector& txSetHashes); + void closeLedger(); + + // manipulating our own position + void takeInitialPosition(Ledger::pointer initialLedger); + bool updateOurPositions(int sinceClose); + void statusChange(newcoin::NodeEvent, Ledger::pointer ledger); int getThreshold(); + void beginAccept(); + void endConsensus(); public: LedgerConsensus(Ledger::pointer previousLedger, uint32 closeTime); - void closeTime(Ledger::pointer& currentLedger); int startup(); @@ -121,7 +128,15 @@ public: void mapComplete(const uint256& hash, SHAMap::pointer map); void abort(); - int timerEntry(void); + int timerEntry(); + + // state handlers + int statePreClose(int secondsSinceClose); + int statePostClose(int secondsSinceClose); + int stateEstablish(int secondsSinceClose); + int stateCutoff(int secondsSinceClose); + int stateFinished(int secondsSinceClose); + int stateAccepted(int secondsSinceClose); bool peerPosition(LedgerProposal::pointer); diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 31da23af49..29e3cd1b03 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -8,7 +8,7 @@ #include "Application.h" #include "Transaction.h" #include "LedgerConsensus.h" - +#include "LedgerTiming.h" // This is the primary interface into the "client" portion of the program. // Code that wants to do normal operations on the network such as @@ -152,13 +152,13 @@ AccountState::pointer NetworkOPs::getAccountState(const NewcoinAddress& accountI void NetworkOPs::setStateTimer(int sec) { // set timer early if ledger is closing - uint64 closedTime = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC(); + uint64 consensusTime = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC() - LEDGER_WOBBLE_TIME; uint64 now = getNetworkTimeNC(); if ((mMode == omFULL) && !mConsensus) { - if (now >= closedTime) sec = 0; - else if (sec > (closedTime - now)) sec = (closedTime - now); + if (now >= consensusTime) sec = 0; + else if (sec > (consensusTime - now)) sec = (consensusTime - now); } mNetTimer.expires_from_now(boost::posix_time::seconds(sec)); @@ -309,20 +309,13 @@ void NetworkOPs::checkState(const boost::system::error_code& result) // check if the ledger is bad enough to go to omTRACKING } + int secondsToClose = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC() - + theApp->getOPs().getNetworkTimeNC(); + if ((!mConsensus) && (secondsToClose < LEDGER_WOBBLE_TIME)) // pre close wobble + beginConsensus(theApp->getMasterLedger().getCurrentLedger()); if (mConsensus) - { setStateTimer(mConsensus->timerEntry()); - return; - } - - Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger(); - if (getNetworkTimeNC() >= currentLedger->getCloseTimeNC()) - { - setStateTimer(beginConsensus(currentLedger, false)); - return; - } - - setStateTimer(10); + else setStateTimer(10); } void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger) @@ -350,7 +343,7 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger) } // vim:ts=4 -int NetworkOPs::beginConsensus(Ledger::pointer closingLedger, bool isEarly) +int NetworkOPs::beginConsensus(Ledger::pointer closingLedger) { #ifdef DEBUG std::cerr << "Ledger close time for ledger " << closingLedger->getLedgerSeq() << std::endl; @@ -362,34 +355,21 @@ int NetworkOPs::beginConsensus(Ledger::pointer closingLedger, bool isEarly) return 3; } - // Create a new ledger to be the open ledger - theApp->getMasterLedger().pushLedger(boost::make_shared(closingLedger)); - // Create a consensus object to get consensus on this ledger if (!!mConsensus) mConsensus->abort(); - mConsensus = boost::make_shared(prevLedger, - theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC()); - mConsensus->closeTime(closingLedger); // FIXME: Create consensus a few seconds before close time + mConsensus = boost::make_shared + (prevLedger, theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC()); #ifdef DEBUG std::cerr << "Broadcasting ledger close" << std::endl; #endif - newcoin::TMStatusChange s; - s.set_newevent(newcoin::neCLOSING_LEDGER); - s.set_ledgerseq(closingLedger->getLedgerSeq()); - s.set_networktime(getNetworkTimeNC()); - uint256 plhash = closingLedger->getParentHash(); - s.set_previousledgerhash(plhash.begin(), plhash.size()); - PackedMessage::pointer packet = boost::make_shared(s, newcoin::mtSTATUS_CHANGE); - theApp->getConnectionPool().relayMessage(NULL, packet); - return mConsensus->startup(); } -bool NetworkOPs::proposeLedger(const uint256& prevLgr, uint32 proposeSeq, const uint256& proposeHash, +bool NetworkOPs::recvPropose(const uint256& prevLgr, uint32 proposeSeq, const uint256& proposeHash, const std::string& pubKey, const std::string& signature) { - if (mMode != omFULL) + if (mMode != omFULL) // FIXME: Should we relay? return true; LedgerProposal::pointer proposal = @@ -403,15 +383,7 @@ bool NetworkOPs::proposeLedger(const uint256& prevLgr, uint32 proposeSeq, const // Is this node on our UNL? // WRITEME - Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger(); - - if (!mConsensus) - { - if ((getNetworkTimeNC() + 2) >= currentLedger->getCloseTimeNC()) - setStateTimer(beginConsensus(currentLedger, true)); - if (!mConsensus) return false; - } - + if (!mConsensus) return false; return mConsensus->peerPosition(proposal); } @@ -440,4 +412,9 @@ void NetworkOPs::mapComplete(const uint256& hash, SHAMap::pointer map) mConsensus->mapComplete(hash, map); } +void NetworkOPs::endConsensus() +{ + mConsensus = boost::shared_ptr(); +} + // vim:ts=4 diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 1d7c63dd69..8537e36c48 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -77,7 +77,7 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - bool proposeLedger(const uint256& prevLgrHash, uint32 proposeSeq, const uint256& proposeHash, + bool recvPropose(const uint256& prevLgrHash, uint32 proposeSeq, const uint256& proposeHash, const std::string& pubKey, const std::string& signature); bool gotTXData(boost::shared_ptr peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); @@ -88,7 +88,8 @@ public: // network state machine void checkState(const boost::system::error_code& result); void switchLastClosedLedger(Ledger::pointer newLedger); // Used for the "jump" case - int beginConsensus(Ledger::pointer closingLedger, bool isEarly); + int beginConsensus(Ledger::pointer closingLedger); + void endConsensus(); void setStateTimer(int seconds); }; diff --git a/src/Peer.cpp b/src/Peer.cpp index dca72707e8..68e18de43f 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -621,7 +621,7 @@ void Peer::recvPropose(newcoin::TMProposeSet& packet) memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32); memcpy(prevLgrHash.begin(), packet.prevclosedhash().data(), 32); - if(theApp->getOPs().proposeLedger(prevLgrHash, proposeSeq, currentTxHash, + if(theApp->getOPs().recvPropose(prevLgrHash, proposeSeq, currentTxHash, packet.nodepubkey(), packet.signature())) { // FIXME: Not all nodes will want proposals PackedMessage::pointer message = boost::make_shared(packet, newcoin::mtPROPOSE_LEDGER);