diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index ef2f7a9f00..0bdc14d353 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -3,11 +3,14 @@ #include +#include #include #include "key.h" #include "Transaction.h" #include "LedgerAcquire.h" +#include "LedgerProposal.h" +#include "Peer.h" class LCPosition { // A position taken by one of our trusted peers @@ -63,6 +66,7 @@ class LedgerConsensus { protected: Ledger::pointer mPreviousLedger, mCurrentLedger; + LedgerProposal::pointer mCurrentProposal; // Convergence tracking, trusted peers indexed by hash of public key boost::unordered_map mPeerPositions; @@ -74,9 +78,13 @@ protected: // Peer sets boost::unordered_map > > mPeerData; + void startup(); + void weHave(const uint256& id, Peer::pointer avoidPeer); + public: LedgerConsensus(Ledger::pointer previousLedger, Ledger::pointer currentLedger) : - mPreviousLedger(previousLedger), mCurrentLedger(currentLedger) { ; } + mPreviousLedger(previousLedger), mCurrentLedger(currentLedger) + { startup(); } Ledger::pointer peekPreviousLedger() { return mPreviousLedger; } Ledger::pointer peekCurrentLedger() { return mCurrentLedger; } @@ -90,6 +98,7 @@ public: LCPosition::pointer getPeerPosition(const uint256& peer); // high-level functions + void abort(); bool peerPosition(Peer::pointer peer, const Serializer& report); bool peerHasSet(Peer::pointer peer, const std::vector& sets); bool peerGaveNodes(Peer::pointer peer, const uint256& setHash, diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index c7ff5e825c..d46a9bbf5a 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -7,6 +7,7 @@ #include "utils.h" #include "Application.h" #include "Transaction.h" +#include "LedgerConsensus.h" // This is the primary interface into the "client" portion of the program. @@ -155,7 +156,7 @@ void NetworkOPs::setStateTimer(int sec) uint64 closedTime = theApp->getMasterLedger().getCurrentLedger()->getCloseTimeNC(); uint64 now = getNetworkTimeNC(); - if (mMode == omFULL) + if ((mMode == omFULL) && !mConsensus) { if (now >= closedTime) sec = 0; else if (sec > (closedTime - now)) sec = (closedTime - now); @@ -289,7 +290,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result) } consensus = acq->getLedger(); } - switchLastClosedLedger(consensus, false); + switchLastClosedLedger(consensus); } if (mMode == omCONNECTED) @@ -300,6 +301,7 @@ void NetworkOPs::checkState(const boost::system::error_code& result) if (mMode == omTRACKING) { // check if the ledger is good enough to go to omFULL + // Note: Do not go to omFULL if we don't have the previous ledger // check if the ledger is bad enough to go to omCONNECTED } @@ -309,29 +311,25 @@ void NetworkOPs::checkState(const boost::system::error_code& result) } Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger(); - if (getNetworkTimeNC() >= currentLedger->getCloseTimeNC()) - { - currentLedger->setClosed(); - switchLastClosedLedger(currentLedger, true); - } + if ( (getNetworkTimeNC() >= currentLedger->getCloseTimeNC()) && !mConsensus) + beginConsensus(currentLedger); setStateTimer(10); } -void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger, bool normal) -{ // set the newledger as our last closed ledger - // FIXME: Correct logic is: - // 1) Mark this ledger closed, schedule it to be saved - // 2) If normal, reprocess transactions - // 3) Open a new subsequent ledger - // 4) Walk back the previous ledger chain from our current ledger and the new last closed ledger - // find a common previous ledger, if possible. Try to insert any transactions in our ledger - // chain into the new open ledger. Broadcast any that make it in. +void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger) +{ // set the newledger as our last closed ledger -- this is abnormal code #ifdef DEBUG std::cerr << "Switching last closed ledger to " << newLedger->getHash().GetHex() << std::endl; #endif + if (mConsensus) + { + mConsensus->abort(); + mConsensus = boost::shared_ptr(); + } + newLedger->setClosed(); Ledger::pointer openLedger = boost::make_shared(newLedger); theApp->getMasterLedger().switchLedgers(newLedger, openLedger); @@ -340,24 +338,40 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger, bool normal) { // this ledger has already closed } - boost::shared_ptr s = boost::make_shared(); +} +// vim:ts=4 - s->set_newevent(normal ? newcoin::neACCEPTED_LEDGER : newcoin::neSWITCHED_LEDGER); - s->set_ledgerseq(newLedger->getLedgerSeq()); - s->set_networktime(getNetworkTimeNC()); +void NetworkOPs::beginConsensus(Ledger::pointer closingLedger) +{ + if (mMode != omFULL) + { // We just close this ledger and start a new one + switchLastClosedLedger(closingLedger); + return; + } + Ledger::pointer prevLedger = theApp->getMasterLedger().getLedgerByHash(closingLedger->getParentHash()); + if (!prevLedger) + { // this shouldn't happen if we jump ledgers + mMode = omTRACKING; + return; + } - uint256 lhash = newLedger->getHash(); - s->set_ledgerhash(lhash.begin(), lhash.size()); - lhash = newLedger->getParentHash(); - s->set_previousledgerhash(lhash.begin(), lhash.size()); + // Create a new ledger to be the open ledger + theApp->getMasterLedger().pushLedger(boost::make_shared(closingLedger)); + // Create a consensus object to get consensus on this ledger + if (!!mConsensus) mConsensus->abort(); + mConsensus = boost::make_shared(prevLedger, closingLedger); #ifdef DEBUG - std::cerr << "Broadcasting ledger change" << std::endl; + std::cerr << "Broadcasting ledger close" << std::endl; #endif - + boost::shared_ptr s = boost::make_shared(); + s->set_newevent(newcoin::neCLOSING_LEDGER); + s->set_ledgerseq(closingLedger->getLedgerSeq()); + s->set_networktime(getNetworkTimeNC()); + uint256 plhash = closingLedger->getParentHash(); + s->set_previousledgerhash(plhash.begin(), plhash.size()); PackedMessage::pointer packet = boost::make_shared(PackedMessage::MessagePointer(s), newcoin::mtSTATUS_CHANGE); theApp->getConnectionPool().relayMessage(NULL, packet); } -// vim:ts=4 diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 5b8ce6c804..c8792db760 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -11,6 +11,7 @@ // Master operational handler, server sequencer, network tracker class Peer; +class LedgerConsensus; class NetworkOPs { @@ -30,8 +31,9 @@ public: }; protected: - OperatingMode mMode; - boost::asio::deadline_timer mNetTimer; + OperatingMode mMode; + boost::asio::deadline_timer mNetTimer; + boost::shared_ptr mConsensus; public: NetworkOPs(boost::asio::io_service& io_service); @@ -70,7 +72,8 @@ public: // network state machine void checkState(const boost::system::error_code& result); - void switchLastClosedLedger(Ledger::pointer newLedger, bool normal); + void switchLastClosedLedger(Ledger::pointer newLedger); // Used for the "jump" case + void beginConsensus(Ledger::pointer closingLedger); void setStateTimer(int seconds); }; diff --git a/src/Peer.h b/src/Peer.h index 8ed5d0adce..62b30c8a9b 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -85,6 +85,8 @@ protected: void recvGetLedger(newcoin::TMGetLedger& packet); void recvLedger(newcoin::TMLedgerData& packet); void recvStatus(newcoin::TMStatusChange& packet); + void recvPropose(newcoin::TMProposeSet& packet); + void recvHaveTxSet(newcoin::TMHaveTransactionSet& packet); void getSessionCookie(std::string& strDst); diff --git a/src/newcoin.proto b/src/newcoin.proto index ada5c59433..1fd4d225a4 100644 --- a/src/newcoin.proto +++ b/src/newcoin.proto @@ -83,7 +83,7 @@ enum NodeStatus { } enum NodeEvent { - neCLOSED_LEDGER = 1; // closing a ledger because its close time has come + neCLOSING_LEDGER = 1; // closing a ledger because its close time has come neACCEPTED_LEDGER = 2; // accepting a closed ledger, we have finished computing it neSWITCHED_LEDGER = 3; // switching ledgers due to network consensus neSHUTTING_DOWN = 4; @@ -91,7 +91,7 @@ enum NodeEvent { message TMStatusChange { optional NodeStatus newStatus = 1; - optional NodeEvent newEvent = 2; + optional NodeEvent newEvent = 2; optional uint32 ledgerSeq = 3; optional bytes ledgerHash = 4; optional bytes previousLedgerHash = 5; @@ -99,43 +99,31 @@ message TMStatusChange { } -message TMPeerPosition { - required uint32 ledgerSequence = 1; - required bytes pubKey = 2; - required uint32 sequence = 3; - required bytes transactionHash = 4; - required bytes signature = 5; +// Announce to the network our position on a closing ledger +message TMProposeSet { + required uint32 closingSeq = 1; + required uint32 proposeSeq = 2; + required bytes previousTxHash = 3; // 0 if first proposal, hash we no longer propose + required bytes currentTxHash = 4; // the hash of the ledger we are proposing + required bytes nodePubKey = 5; + required bytes signature = 6; // signature of above fields + repeated bytes addedTransactions = 7; // not required if number is large + repeated bytes removedTransactions = 8; // not required if number is large } +// Announce to a peer that we have fully acquired a transaction set message TMHaveTransactionSet { repeated bytes hashes = 1; } -message TMProposeLedger { - required uint32 closingSeq = 1; - required uint32 proposeSeq = 2; - required bytes previousLedgerHash = 3; // 0 if first proposal, hash we no longer propose - required bytes currentLedgerHash = 4; // the hash of the ledger we are proposing - required bytes hanko = 5; - repeated bytes addedTransactions = 6; - repeated bytes removedTransactions = 7; - required bytes signature = 8; -} - - -// Used to propose/validate during ledger close +// Used to sign a final closed ledger after reprocessing message TMValidation { - required uint32 ledgerIndex = 1; - required bytes ledgerHash = 2; - optional uint64 timestamp = 3; // only in proposed ledgers - optional uint32 confidence = 4; // only in proposed ledgers - required bytes hanko = 5; - required bytes sig = 6; + required bytes validation = 1; // in SerializedValidation form + required bytes signature = 2; } - message TMGetValidations { required uint32 ledgerIndex = 1; repeated bytes hanko = 2;