From 9a4862a5d97572cad7258e9fec808d088baadc8b Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 21 May 2012 06:05:32 -0700 Subject: [PATCH] Commit some ledger close/consensus work. --- src/LedgerConsensus.cpp | 8 +++++- src/LedgerConsensus.h | 22 ++++++++++---- src/NetworkOPs.cpp | 46 +++++++++++++++++++++++------- src/NetworkOPs.h | 6 +++- src/Peer.cpp | 63 +++++++++++++++++++++++------------------ src/Peer.h | 3 +- 6 files changed, 101 insertions(+), 47 deletions(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index c9b06e2a28..9e069aff94 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -5,6 +5,12 @@ void LedgerConsensus::abort() { } -void LedgerConsensus::startup() +int LedgerConsensus::startup() { + return 5; } + +int LedgerConsensus::timerEntry() +{ + return 5; +} \ No newline at end of file diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 0bdc14d353..c6f3b36936 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -17,14 +17,21 @@ class LCPosition protected: uint256 mPubKeyHash; CKey::pointer mPubKey; - uint256 mCurrentPosition; + uint256 mPreviousPosition, mCurrentPosition; uint32 mSequence; public: typedef boost::shared_ptr pointer; - LCPosition(CKey::pointer pubKey, const uint256& currentPosition, uint32 seq) : - mPubKey(pubKey), mCurrentPosition(currentPosition), mSequence(seq) { ; } + // for remote positions + LCPosition(uint32 closingSeq, uint32 proposeSeq, const uint256& previousTxHash, + const uint256& currentTxHash, CKey::pointer nodePubKey, const std::string& signature); + + // for our initial position + LCPosition(CKey::pointer privKey, uint32 ledgerSeq, const uint256& currentPosition); + + // for our subsequent positions + LCPosition(LCPosition::pointer previousPosition, CKey::pointer privKey, const uint256& newPosition); const uint256& getPubKeyHash() const { return mPubKeyHash; } const uint256& getCurrentPosition() const { return mCurrentPosition; } @@ -68,6 +75,8 @@ protected: Ledger::pointer mPreviousLedger, mCurrentLedger; LedgerProposal::pointer mCurrentProposal; + LCPosition::pointer mOurPosition; + // Convergence tracking, trusted peers indexed by hash of public key boost::unordered_map mPeerPositions; @@ -78,13 +87,13 @@ protected: // Peer sets boost::unordered_map > > mPeerData; - void startup(); void weHave(const uint256& id, Peer::pointer avoidPeer); public: LedgerConsensus(Ledger::pointer previousLedger, Ledger::pointer currentLedger) : - mPreviousLedger(previousLedger), mCurrentLedger(currentLedger) - { startup(); } + mPreviousLedger(previousLedger), mCurrentLedger(currentLedger) { ; } + + int startup(); Ledger::pointer peekPreviousLedger() { return mPreviousLedger; } Ledger::pointer peekCurrentLedger() { return mCurrentLedger; } @@ -103,6 +112,7 @@ public: bool peerHasSet(Peer::pointer peer, const std::vector& sets); bool peerGaveNodes(Peer::pointer peer, const uint256& setHash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); + int timerEntry(void); }; diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index d46a9bbf5a..9b665f25da 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -310,9 +310,18 @@ void NetworkOPs::checkState(const boost::system::error_code& result) // check if the ledger is bad enough to go to omTRACKING } + if (mConsensus) + { + setStateTimer(mConsensus->timerEntry()); + return; + } + Ledger::pointer currentLedger = theApp->getMasterLedger().getCurrentLedger(); - if ( (getNetworkTimeNC() >= currentLedger->getCloseTimeNC()) && !mConsensus) - beginConsensus(currentLedger); + if (getNetworkTimeNC() >= currentLedger->getCloseTimeNC()) + { + setStateTimer(beginConsensus(currentLedger)); + return; + } setStateTimer(10); } @@ -321,7 +330,8 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger) { // set the newledger as our last closed ledger -- this is abnormal code #ifdef DEBUG - std::cerr << "Switching last closed ledger to " << newLedger->getHash().GetHex() << std::endl; + assert(false); + std::cerr << "ABNORMAL Switching last closed ledger to " << newLedger->getHash().GetHex() << std::endl; #endif if (mConsensus) @@ -341,18 +351,16 @@ void NetworkOPs::switchLastClosedLedger(Ledger::pointer newLedger) } // vim:ts=4 -void NetworkOPs::beginConsensus(Ledger::pointer closingLedger) +int NetworkOPs::beginConsensus(Ledger::pointer closingLedger) { - if (mMode != omFULL) - { // We just close this ledger and start a new one - switchLastClosedLedger(closingLedger); - return; - } +#ifdef DEBUG + std::cerr << "Ledger close time for ledger " << closingLedger->getLedgerSeq() << std::endl; +#endif Ledger::pointer prevLedger = theApp->getMasterLedger().getLedgerByHash(closingLedger->getParentHash()); if (!prevLedger) { // this shouldn't happen if we jump ledgers mMode = omTRACKING; - return; + return 3; } // Create a new ledger to be the open ledger @@ -374,4 +382,22 @@ void NetworkOPs::beginConsensus(Ledger::pointer closingLedger) PackedMessage::pointer packet = boost::make_shared(PackedMessage::MessagePointer(s), newcoin::mtSTATUS_CHANGE); theApp->getConnectionPool().relayMessage(NULL, packet); + + return mConsensus->startup(); +} + +bool NetworkOPs::proposeLedger(uint32 closingSeq, uint32 proposeSeq, + const uint256& prevHash, const uint256& proposeHash, const std::string& pubKey, const std::string& signature) +{ + uint256 nodeID = Serializer::getSHA512Half(pubKey); + + // Is this node on our UNL? + // WRITEME + + // Are we currently closing? + + // Yes: Is it an update? + // WRITEME + + return true; } diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index c8792db760..f679a2df92 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -70,10 +70,14 @@ public: bool getAccountStateNodes(uint32 ledgerSeq, const uint256& myNodeId, const std::vector& myNode, std::list >& newNodes); + // ledger proposal/close functions + bool proposeLedger(uint32 closingSeq, uint32 proposeSeq, const uint256& prevHash, const uint256& proposeHash, + const std::string& pubKey, const std::string& signature); + // network state machine void checkState(const boost::system::error_code& result); void switchLastClosedLedger(Ledger::pointer newLedger); // Used for the "jump" case - void beginConsensus(Ledger::pointer closingLedger); + int beginConsensus(Ledger::pointer closingLedger); void setStateTimer(int seconds); }; diff --git a/src/Peer.cpp b/src/Peer.cpp index 25a9a48855..f5b15abd47 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -310,7 +310,7 @@ void Peer::handle_read_body(const boost::system::error_code& error) void Peer::processReadBuffer() { - int type=PackedMessage::getType(mReadbuf); + int type = PackedMessage::getType(mReadbuf); #ifdef DEBUG std::cerr << "PRB(" << type << "), len=" << (mReadbuf.size()-HEADER_SIZE) << std::endl; #endif @@ -417,6 +417,15 @@ void Peer::processReadBuffer() } break; + case newcoin::mtPROPOSE_LEDGER: + { + boost::shared_ptr msg = boost::make_shared(); + if(msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + recvPropose(msg); + else std::cerr << "parse error: " << type << std::endl; + } + break; + case newcoin::mtGET_LEDGER: @@ -438,15 +447,6 @@ void Peer::processReadBuffer() break; #if 0 - case newcoin::mtPROPOSE_LEDGER: - { - newcoin::TM msg; - if(msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) - recv(msg); - else std::cerr << "parse error: " << type << std::endl; - } - break; - case newcoin::mtCLOSE_LEDGER: { newcoin::TM msg; @@ -593,7 +593,7 @@ void Peer::recvTransaction(newcoin::TMTransaction& packet) tx = theApp->getOPs().processTransaction(tx, this); - if(tx->getStatus()!=INCLUDED) + if(tx->getStatus() != INCLUDED) { // transaction wasn't accepted into ledger #ifdef DEBUG std::cerr << "Transaction from peer won't go in ledger" << std::endl; @@ -601,6 +601,25 @@ void Peer::recvTransaction(newcoin::TMTransaction& packet) } } +void Peer::recvPropose(boost::shared_ptr packet) +{ + if ((packet->previoustxhash().size() != 32) || (packet->currenttxhash().size() != 32) || + (packet->nodepubkey().size() < 28) || (packet->signature().size() < 56)) + return; + + uint32 closingSeq = packet->closingseq(), proposeSeq = packet->proposeseq(); + uint256 previousTxHash, currentTxHash; + memcpy(previousTxHash.begin(), packet->previoustxhash().data(), 32); + memcpy(currentTxHash.begin(), packet->currenttxhash().data(), 32); + + if(theApp->getOPs().proposeLedger(closingSeq, proposeSeq, previousTxHash, currentTxHash, + packet->nodepubkey(), packet->signature())) + { // FIXME: Not all nodes will want proposals + PackedMessage::pointer message = boost::make_shared(packet, newcoin::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessage(this, message); + } +} + void Peer::recvValidation(newcoin::TMValidation& packet) { } @@ -654,6 +673,10 @@ void Peer::recvStatus(newcoin::TMStatusChange& packet) #ifdef DEBUG std::cerr << "Received status change from peer" << std::endl; #endif + if (!packet.has_networktime()) + packet.set_networktime(theApp->getOPs().getNetworkTimeNC()); + mLastStatus = packet; + if (packet.has_ledgerhash() && (packet.ledgerhash().size() == (256 / 8))) { // a peer has changed ledgers if (packet.has_previousledgerhash() && (packet.previousledgerhash().size() == (256 / 8))) @@ -661,10 +684,7 @@ void Peer::recvStatus(newcoin::TMStatusChange& packet) else mPreviousLedgerHash = mClosedLedgerHash; memcpy(mClosedLedgerHash.begin(), packet.ledgerhash().data(), 256 / 8); - if (packet.has_networktime()) - mClosedLedgerTime = ptFromSeconds(packet.networktime()); - else - mClosedLedgerTime = theApp->getOPs().getNetworkTimePT(); + mClosedLedgerTime = ptFromSeconds(packet.networktime()); #ifdef DEBUG std::cerr << "peer LCL is " << mClosedLedgerHash.GetHex() << std::endl; #endif @@ -869,19 +889,6 @@ PackedMessage::pointer Peer::createFullLedger(Ledger::pointer ledger) return(PackedMessage::pointer()); }*/ -PackedMessage::pointer Peer::createLedgerProposal(Ledger::pointer ledger) -{ - uint256& hash=ledger->getHash(); - newcoin::ProposeLedger* prop=new newcoin::ProposeLedger(); - prop->set_ledgerindex(ledger->getIndex()); - prop->set_hash(hash.begin(), hash.GetSerializeSize()); - prop->set_numtransactions(ledger->getNumTransactions()); - - PackedMessage::pointer packet=boost::make_shared - (PackedMessage::MessagePointer(prop), newcoin::PROPOSE_LEDGER); - return(packet); -} - PackedMessage::pointer Peer::createValidation(Ledger::pointer ledger) { uint256 hash=ledger->getHash(); diff --git a/src/Peer.h b/src/Peer.h index 62b30c8a9b..b8a33378c6 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -53,6 +53,7 @@ protected: std::vector mReadbuf; std::list mSendQ; PackedMessage::pointer mSendingPacket; + newcoin::TMStatusChange mLastStatus; Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx); @@ -85,7 +86,7 @@ protected: void recvGetLedger(newcoin::TMGetLedger& packet); void recvLedger(newcoin::TMLedgerData& packet); void recvStatus(newcoin::TMStatusChange& packet); - void recvPropose(newcoin::TMProposeSet& packet); + void recvPropose(boost::shared_ptr packet); void recvHaveTxSet(newcoin::TMHaveTransactionSet& packet); void getSessionCookie(std::string& strDst);