From 75ad54591651b3042bc354377a4956b8c9dbd864 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 3 Jun 2012 15:30:39 -0700 Subject: [PATCH] Finish the wobble rework. --- src/LedgerConsensus.cpp | 9 +-- src/LedgerConsensus.h | 1 + src/LedgerMaster.cpp | 150 ++++++---------------------------------- src/LedgerMaster.h | 9 +++ src/NetworkOPs.cpp | 20 ++++-- src/NetworkOPs.h | 2 +- src/Peer.cpp | 13 ++-- src/newcoin.proto | 13 ++-- 8 files changed, 62 insertions(+), 155 deletions(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 01dc99b49e..3ebcab3a3d 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -284,16 +284,18 @@ void LedgerConsensus::abort() int LedgerConsensus::startup() { + // create wobble ledger in case peers target transactions to it + theApp->getMasterLedger().beginWobble(); return 1; } int LedgerConsensus::statePreClose(int secondsSinceClose) { // it is shortly before ledger close time if (secondsSinceClose >= 0) - { // it is time to close the ledger + { // it is time to close the ledger (swap default and wobble ledgers) Log(lsINFO) << "Closing ledger"; mState = lcsPOST_CLOSE; - Ledger::pointer initial = theApp->getMasterLedger().getCurrentLedger(); + Ledger::pointer initial = theApp->getMasterLedger().closeTime(); statusChange(newcoin::neCLOSING_LEDGER, mPreviousLedger); } return 1; @@ -305,7 +307,7 @@ int LedgerConsensus::statePostClose(int secondsSinceClose) { Log(lsINFO) << "Wobble is over, it's consensus time"; mState = lcsESTABLISH; - Ledger::pointer initial = theApp->getMasterLedger().getCurrentLedger(); + Ledger::pointer initial = theApp->getMasterLedger().endWobble(); takeInitialPosition(initial); initial->bumpSeq(); } @@ -459,7 +461,6 @@ void LedgerConsensus::propose(const std::vector& added, const std::vect Log(lsDEBUG) << "We propose: " << mOurPosition->getCurrentHash().GetHex(); newcoin::TMProposeSet prop; prop.set_currenttxhash(mOurPosition->getCurrentHash().begin(), 256 / 8); - prop.set_prevclosedhash(mOurPosition->getPrevLedger().begin(), 256 / 8); prop.set_proposeseq(mOurPosition->getProposeSeq()); std::vector pubKey = mOurPosition->getPubKey(); diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 9c21973fee..e2b69e8fd2 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -129,6 +129,7 @@ public: int startup(); Ledger::pointer peekPreviousLedger() { return mPreviousLedger; } + uint256 getLCL() { return mPreviousLedger->getHash(); } SHAMap::pointer getTransactionTree(const uint256& hash, bool doAcquire); TransactionAcquire::pointer getAcquiring(const uint256& hash); diff --git a/src/LedgerMaster.cpp b/src/LedgerMaster.cpp index ca1d4274c5..0dca9c5489 100644 --- a/src/LedgerMaster.cpp +++ b/src/LedgerMaster.cpp @@ -63,143 +63,33 @@ void LedgerMaster::switchLedgers(Ledger::pointer lastClosed, Ledger::pointer cur mEngine.setLedger(mCurrentLedger); } -#if 0 - -void LedgerMaster::startFinalization() +Ledger::pointer LedgerMaster::closeTime() { - mFinalizedLedger=mCurrentLedger; - mCurrentLedger=Ledger::pointer(new Ledger(mCurrentLedger->getIndex()+1)); - - applyFutureProposals( mFinalizedLedger->getIndex() ); - applyFutureTransactions( mCurrentLedger->getIndex() ); + boost::recursive_mutex::scoped_lock sl(mLock); + assert(mCurrentLedger && mWobbleLedger); + Ledger::pointer ret = mCurrentLedger; + mCurrentLedger = mWobbleLedger; + mEngine.setLedger(mCurrentLedger); + mWobbleLedger = ret; + return ret; } -void LedgerMaster::sendProposal() +void LedgerMaster::beginWobble() { - PackedMessage::pointer packet=Peer::createLedgerProposal(mFinalizedLedger); - theApp->getConnectionPool().relayMessage(NULL,packet); + boost::recursive_mutex::scoped_lock sl(mLock); + assert(!mWobbleLedger); + mWobbleLedger = mCurrentLedger; + mCurrentLedger = boost::make_shared(mCurrentLedger); + mEngine.setLedger(mCurrentLedger); } - -void LedgerMaster::endFinalization() +Ledger::pointer LedgerMaster::endWobble() { - mFinalizedLedger->publishValidation(); - mLedgerHistory.addAcceptedLedger(mFinalizedLedger); - mLedgerHistory.addLedger(mFinalizedLedger); - - mFinalizedLedger=Ledger::pointer(); + boost::recursive_mutex::scoped_lock sl(mLock); + assert(mWobbleLedger && mCurrentLedger); + Ledger::pointer ret = mWobbleLedger; + mWobbleLedger = Ledger::pointer(); + return ret; } -void LedgerMaster::addFutureProposal(Peer::pointer peer,newcoin::ProposeLedger& otherLedger) -{ - mFutureProposals.push_front(pair(peer,otherLedger)); -} - -void LedgerMaster::applyFutureProposals(uint32 ledgerIndex) -{ - for(list< pair >::iterator iter=mFutureProposals.begin(); iter !=mFutureProposals.end(); ) - { - if ((*iter).second.ledgerindex() == ledgerIndex) - { - checkLedgerProposal((*iter).first,(*iter).second); - mFutureProposals.erase(iter); - }else iter++; - } -} - -void LedgerMaster::checkLedgerProposal(Peer::pointer peer, newcoin::ProposeLedger& otherLedger) -{ - // see if this matches yours - // if you haven't finalized yet save it for when you do - // if doesn't match and you have <= transactions ask for the complete ledger - // if doesn't match and you have > transactions send your complete ledger - - - if(otherLedger.ledgerindex()getIndex()) - { - if( (!mFinalizedLedger) || - otherLedger.ledgerindex()getIndex()) - { // you have already closed this ledger - Ledger::pointer oldLedger=mLedgerHistory.getAcceptedLedger(otherLedger.ledgerindex()); - if(oldLedger) - { - if( (oldLedger->getHash()!=protobufTo256(otherLedger.hash())) && - (oldLedger->getNumTransactions()>=otherLedger.numtransactions())) - { - peer->sendLedgerProposal(oldLedger); - } - } - }else - { // you guys are on the same page - uint256 otherHash=protobufTo256(otherLedger.hash()); - if(mFinalizedLedger->getHash()!= otherHash) - { - if( mFinalizedLedger->getNumTransactions()>=otherLedger.numtransactions()) - { - peer->sendLedgerProposal(mFinalizedLedger); - }else - { - peer->sendGetFullLedger(otherHash); - } - } - } - }else - { // you haven't started finalizde this one yet save it for when you do - addFutureProposal(peer,otherLedger); - } -} - - -// TODO: optimize. this is expensive so limit the amount it is run -void LedgerMaster::checkConsensus(uint32 ledgerIndex) -{ - Ledger::pointer ourAcceptedLedger=mLedgerHistory.getAcceptedLedger(ledgerIndex); - if(ourAcceptedLedger) - { - Ledger::pointer consensusLedger; - uint256 consensusHash; - - if( theApp->getValidationCollection().getConsensusLedger(ledgerIndex,ourAcceptedLedger->getHash(), consensusLedger, consensusHash) ) - { // our accepted ledger isn't compatible with the consensus - if(consensusLedger) - { // switch to this ledger. Re-validate - mLedgerHistory.addAcceptedLedger(consensusLedger); - consensusLedger->publishValidation(); - }else - { // we don't know the consensus one. Ask peers for it - // TODO: make sure this isn't sent many times before we have a chance to get a reply - PackedMessage::pointer msg=Peer::createGetFullLedger(consensusHash); - theApp->getConnectionPool().relayMessage(NULL,msg); - } - } - } -} -/* -if( consensusHash && -(ourAcceptedLedger->getHash()!= *consensusHash)) -{ -Ledger::pointer consensusLedger=mLedgerHistory.getLedger(*consensusHash); -if(consensusLedger) -{ // see if these are compatible -if(ourAcceptedLedger->isCompatible(consensusLedger)) -{ // try to merge any transactions from the consensus one into ours -ourAcceptedLedger->mergeIn(consensusLedger); -// Ledger::pointer child=ourAcceptedLedger->getChild(); -Ledger::pointer child=mLedgerHistory.getAcceptedLedger(ledgerIndex+1); -if(child) child->recalculate(); -}else -{ // switch to this ledger. Re-validate -mLedgerHistory.addAcceptedLedger(consensusLedger); -consensusLedger->publishValidation(); -} - -}else -{ // we don't know the consensus one. Ask peers for it -PackedMessage::pointer msg=Peer::createGetFullLedger(*consensusHash); -theApp->getConnectionPool().relayMessage(NULL,msg); -} -} -*/ - -#endif // vim:ts=4 diff --git a/src/LedgerMaster.h b/src/LedgerMaster.h index 47ee5320b3..a420b05519 100644 --- a/src/LedgerMaster.h +++ b/src/LedgerMaster.h @@ -38,8 +38,13 @@ public: ScopedLock getLock() { return ScopedLock(mLock); } + // The current ledger is the ledger we believe new transactions should go in Ledger::pointer getCurrentLedger() { return mCurrentLedger; } + + // The wobble ledger is a ledger that new transactions can go in if requested Ledger::pointer getWobbleLedger() { return mWobbleLedger; } + + // The finalized ledger is the last closed/accepted ledger Ledger::pointer getClosedLedger() { return mFinalizedLedger; } TransactionEngineResult doTransaction(const SerializedTransaction& txn, uint32 targetLedger, @@ -51,6 +56,10 @@ public: void switchLedgers(Ledger::pointer lastClosed, Ledger::pointer newCurrent); + Ledger::pointer closeTime(); + void beginWobble(); + Ledger::pointer endWobble(); + Ledger::pointer getLedgerBySeq(uint32 index) { if (mCurrentLedger && (mCurrentLedger->getLedgerSeq() == index)) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index eb3cdc49b9..ccb6d1abdd 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -442,20 +442,27 @@ int NetworkOPs::beginConsensus(Ledger::pointer closingLedger) return mConsensus->startup(); } -bool NetworkOPs::recvPropose(const uint256& prevLgr, uint32 proposeSeq, const uint256& proposeHash, +bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, const std::string& pubKey, const std::string& signature) { if (mMode != omFULL) // FIXME: Should we relay? - Log(lsWARNING) << "Received proposal when not full: " << mMode; - if (!mConsensus) { + Log(lsWARNING) << "Received proposal when not full: " << mMode; return true; } + if (!mConsensus) + { + Log(lsWARNING) << "Received proposal when full but not during consensus window"; + return false; + } + + boost::shared_ptr consensus = mConsensus; + if (!consensus) return false; LedgerProposal::pointer proposal = - boost::make_shared(prevLgr, proposeSeq, proposeHash, pubKey); + boost::make_shared(consensus->getLCL(), proposeSeq, proposeHash, pubKey); if (!proposal->checkSign(signature)) - { + { // Note that if the LCL is different, the signature check will fail Log(lsWARNING) << "Ledger proposal fails signature check"; return false; } @@ -463,8 +470,7 @@ bool NetworkOPs::recvPropose(const uint256& prevLgr, uint32 proposeSeq, const ui // Is this node on our UNL? // WRITEME - if (!mConsensus) return false; - return mConsensus->peerPosition(proposal); + return consensus->peerPosition(proposal); } SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash) diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 5d6f48b044..789ff97663 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -101,7 +101,7 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - bool recvPropose(const uint256& prevLgrHash, uint32 proposeSeq, const uint256& proposeHash, + bool recvPropose(uint32 proposeSeq, const uint256& proposeHash, const std::string& pubKey, const std::string& signature); bool gotTXData(boost::shared_ptr peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); diff --git a/src/Peer.cpp b/src/Peer.cpp index 3d6717ffbe..02b975cae5 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -616,17 +616,18 @@ void Peer::recvTransaction(newcoin::TMTransaction& packet) void Peer::recvPropose(newcoin::TMProposeSet& packet) { Log(lsINFO) << "Received proposal from peer"; - if ((packet.currenttxhash().size() != 32) || (packet.prevclosedhash().size() != 32) || - (packet.nodepubkey().size() < 28) || (packet.signature().size() < 56)) + if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) || + (packet.signature().size() < 56)) + { + Log(lsWARNING) << "Proposal is malformed"; return; + } uint32 proposeSeq = packet.proposeseq(); - uint256 currentTxHash, prevLgrHash; + uint256 currentTxHash; memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32); - memcpy(prevLgrHash.begin(), packet.prevclosedhash().data(), 32); - if(theApp->getOPs().recvPropose(prevLgrHash, proposeSeq, currentTxHash, - packet.nodepubkey(), packet.signature())) + if(theApp->getOPs().recvPropose(proposeSeq, currentTxHash, packet.nodepubkey(), packet.signature())) { // FIXME: Not all nodes will want proposals PackedMessage::pointer message = boost::make_shared(packet, newcoin::mtPROPOSE_LEDGER); theApp->getConnectionPool().relayMessage(this, message); diff --git a/src/newcoin.proto b/src/newcoin.proto index f69aee626d..5355f52a11 100644 --- a/src/newcoin.proto +++ b/src/newcoin.proto @@ -100,13 +100,12 @@ message TMStatusChange { // Announce to the network our position on a closing ledger message TMProposeSet { - required bytes prevClosedHash = 1; - required uint32 proposeSeq = 2; - required bytes currentTxHash = 3; // the hash of the ledger we are proposing - required bytes nodePubKey = 4; - required bytes signature = 5; // signature of above fields - repeated bytes addedTransactions = 6; // not required if number is large - repeated bytes removedTransactions = 7; // not required if number is large + required uint32 proposeSeq = 1; + required bytes currentTxHash = 2; // the hash of the ledger we are proposing + required bytes nodePubKey = 3; + required bytes signature = 4; // signature of above fields + repeated bytes addedTransactions = 5; // not required if number is large + repeated bytes removedTransactions = 6; // not required if number is large } // Announce to a peer that we have fully acquired a transaction set