From d065231d8dd57dc275e28d8e7e997d3adc48ccbc Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 18:59:50 -0800 Subject: [PATCH] Rewrite of ledger proposal receiving code. Dispatch to thread pool for signature check. Re-dispatch to main I/O service for handover to consensus engine. --- src/NetworkOPs.cpp | 73 ++++++++++++------------- src/NetworkOPs.h | 6 ++- src/Peer.cpp | 129 ++++++++++++++++++++++++++++++++++++--------- src/Peer.h | 3 +- 4 files changed, 143 insertions(+), 68 deletions(-) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index ce52fde7b..fc030f2c9 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -694,60 +694,55 @@ bool NetworkOPs::haveConsensusObject() return mConsensus; } -// <-- bool: true to relay -bool NetworkOPs::recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, - const uint256& prevLedger, uint32 closeTime, const std::string& signature, - const RippleAddress& nodePublic) +uint256 NetworkOPs::getConsensusLCL() { - // JED: does mConsensus need to be locked? + if (!haveConsensusObject()) + return uint256(); + return mConsensus->getLCL(); +} - // XXX Validate key. - // XXX Take a vuc for pubkey. +void NetworkOPs::processTrustedProposal(uint256 suppression, LedgerProposal::pointer proposal, + boost::shared_ptr set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood) +{ + bool relay = true; if (!haveConsensusObject()) { cLog(lsINFO) << "Received proposal outside consensus window"; - return mMode != omFULL; + if (mMode == omFULL) + relay = false; } - - if (mConsensus->isOurPubKey(nodePublic)) + else { - cLog(lsTRACE) << "Received our own validation"; - return false; - } + storeProposal(proposal, nodePublic); - // Is this node on our UNL? - if (!theApp->getUNL().nodeInUNL(nodePublic)) - { - cLog(lsINFO) << "Untrusted proposal: " << nodePublic.humanNodePublic() << " " << proposeHash; - return true; - } + uint256 consensusLCL = mConsensus->getLCL(); - if (prevLedger.isNonZero()) - { // proposal includes a previous ledger - LedgerProposal::pointer proposal = - boost::make_shared(prevLedger, proposeSeq, proposeHash, closeTime, nodePublic); - if (!proposal->checkSign(signature)) + if (!set->has_previousledger() && (checkLedger != consensusLCL)) { - cLog(lsWARNING) << "New-style ledger proposal fails signature check"; - return false; + cLog(lsWARNING) << "Have to re-check proposal signature due to consensus view change"; + assert(proposal->hasSignature()); + proposal->setPrevLedger(consensusLCL); + if (proposal->checkSign()) + sigGood = true; + } + + if (sigGood && (consensusLCL == proposal->getPrevLedger())) + { + relay = mConsensus->peerPosition(proposal); + cLog(lsTRACE) << "Proposal processing finished, relay=" << relay; } - if (prevLedger == mConsensus->getLCL()) - return mConsensus->peerPosition(proposal); - storeProposal(proposal, nodePublic); - return false; } - LedgerProposal::pointer proposal = - boost::make_shared(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, nodePublic); - if (!proposal->checkSign(signature)) - { // Note that if the LCL is different, the signature check will fail - cLog(lsWARNING) << "Ledger proposal fails signature check"; - proposal->setSignature(signature); - storeProposal(proposal, nodePublic); - return false; + if (relay) + { + std::set peers; + theApp->getSuppression().swapSet(suppression, peers, SF_RELAYED); + PackedMessage::pointer message = boost::make_shared(*set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); } - return mConsensus->peerPosition(proposal); + else + cLog(lsINFO) << "Not relaying trusted proposal"; } SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash) diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index cd5243b26..07b9d6cab 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -168,8 +168,9 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - bool recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, - const uint256& prevLedger, uint32 closeTime, const std::string& signature, const RippleAddress& nodePublic); + void processTrustedProposal(uint256 supression, LedgerProposal::pointer proposal, + boost::shared_ptr set, + RippleAddress nodePublic, uint256 checkLedger, bool sigGood); bool gotTXData(const boost::shared_ptr& peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); bool recvValidation(const SerializedValidation::pointer& val); @@ -199,6 +200,7 @@ public: boost::unordered_map >& peekStoredProposals() { return mStoredProposals; } void storeProposal(const LedgerProposal::pointer& proposal, const RippleAddress& peerPublic); + uint256 getConsensusLCL(); // client information retrieval functions std::vector< std::pair > diff --git a/src/Peer.cpp b/src/Peer.cpp index 7a06d0f86..8cc12b2ff 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -500,8 +500,8 @@ void Peer::processReadBuffer() case ripple::mtPROPOSE_LEDGER: { - ripple::TMProposeSet msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + boost::shared_ptr msg = boost::make_shared(); + if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvPropose(msg); else cLog(lsWARNING) << "parse error: " << type; @@ -709,6 +709,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx if (tx->getStatus() == INVALID) { theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD); + punshPeer(peer, PP_BAD_SIGNATURE); return; } else @@ -748,7 +749,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) { // we have seen this transaction recently if ((flags & SF_BAD) != 0) { - punishPeer(PP_INVALID_REQUEST); + punishPeer(PP_BAD_SIGNATURE); return; } @@ -757,7 +758,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } theApp->getJobQueue().addJob(jtTRANSACTION, - boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); + boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); #ifndef TRUST_NETWORK } @@ -774,42 +775,118 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } -void Peer::recvPropose(ripple::TMProposeSet& packet) +static void checkPropose(Job& job, boost::shared_ptr packet, + uint256 suppression, LedgerProposal::pointer proposal, uint256 consensusLCL, + RippleAddress nodePublic, boost::weak_ptr peer) +{ // FIXME: Suppress relaying proposals with incorrect LCLs + bool sigGood = false; + bool isTrusted = (job.getType() == jtPROPOSAL_t); + + cLog(lsTRACE) << "Checking " << (isTrusted ? "trusted" : "UNtrusted") << " proposal"; + + assert(packet); + ripple::TMProposeSet& set = *packet; + + uint256 prevLedger; + if (set.has_previousledger()) + { // proposal includes a previous ledger + cLog(lsTRACE) << "proposal with previous ledger"; + memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8); + if (!proposal->checkSign(set.signature())) + { + cLog(lsWARNING) << "proposal with previous ledger fails signature check"; + Peer::punishPeer(peer, PP_BAD_SIGNATURE); + return; + } + else + sigGood = true; + } + else + { + if (consensusLCL.isNonZero() && proposal->checkSign(set.signature())) + { + prevLedger = consensusLCL; + sigGood = true; + } + else + { + cLog(lsWARNING) << "Ledger proposal fails signature check"; + proposal->setSignature(set.signature()); + } + } + + if (isTrusted) + { + theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(), + suppression, proposal, packet, nodePublic, prevLedger, sigGood)); + } + else + { // untrusted proposal, just relay it + cLog(lsTRACE) << "relaying untrusted proposal"; + std::set peers; + theApp->getSuppression().swapSet(suppression, peers, SF_RELAYED); + PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); + } +} + +void Peer::recvPropose(const boost::shared_ptr& packet) { - if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) || - (packet.signature().size() < 56) || (packet.nodepubkey().size() > 128) || (packet.signature().size() > 128)) + assert(packet); + ripple::TMProposeSet& set = *packet; + + if ((set.currenttxhash().size() != 32) || (set.nodepubkey().size() < 28) || + (set.signature().size() < 56) || (set.nodepubkey().size() > 128) || (set.signature().size() > 128)) { cLog(lsWARNING) << "Received proposal is malformed"; return; } - uint256 currentTxHash, prevLedger; - memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32); + if (set.has_previousledger() && (set.previousledger().size() != 32)) + { + cLog(lsWARNING) << "Received proposal is malformed"; + return; + } - if ((packet.has_previousledger()) && (packet.previousledger().size() == 32)) - memcpy(prevLedger.begin(), packet.previousledger().data(), 32); + uint256 proposeHash, prevLedger; + memcpy(proposeHash.begin(), set.currenttxhash().data(), 32); + + if ((set.has_previousledger()) && (set.previousledger().size() == 32)) + memcpy(prevLedger.begin(), set.previousledger().data(), 32); Serializer s(512); - s.add256(currentTxHash); - s.add256(prevLedger); - s.add32(packet.proposeseq()); - s.add32(packet.closetime()); - s.addVL(packet.nodepubkey()); - s.addVL(packet.signature()); + s.add256(proposeHash); + s.add32(set.proposeseq()); + s.add32(set.closetime()); + s.addVL(set.nodepubkey()); + s.addVL(set.signature()); + if (set.has_previousledger()) + s.add256(prevLedger); uint256 suppression = s.getSHA512Half(); if (!theApp->isNew(suppression, mPeerId)) + { + cLog(lsTRACE) << "Received duplicate proposal from peer " << mPeerId; return; - - RippleAddress nodePublic = RippleAddress::createNodePublic(strCopy(packet.nodepubkey())); -// bool isTrusted = theApp->getUNL().nodeInUNL(nodePublic); - - if(theApp->getOPs().recvPropose(suppression, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(), - packet.signature(), nodePublic)) - { // FIXME: Not all nodes will want proposals - PackedMessage::pointer message = boost::make_shared(packet, ripple::mtPROPOSE_LEDGER); - theApp->getConnectionPool().relayMessage(this, message); } + + RippleAddress signerPublic = RippleAddress::createNodePublic(strCopy(set.nodepubkey())); + if (signerPublic == theConfig.VALIDATION_PUB) + { + cLog(lsTRACE) << "Received our own proposal from peer " << mPeerId; + return; + } + bool isTrusted = theApp->getUNL().nodeInUNL(signerPublic); + cLog(lsTRACE) << "Received " << (isTrusted ? "trusted" : "UNtrusted") << " proposal from " << mPeerId; + + uint256 consensusLCL = theApp->getOPs().getConsensusLCL(); + LedgerProposal::pointer proposal = boost::make_shared( + prevLedger.isNonZero() ? prevLedger : consensusLCL, + set.proposeseq(), proposeHash, set.closetime(), signerPublic); + + theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, + boost::bind(&checkPropose, _1, packet, suppression, proposal, consensusLCL, + mNodePublic, boost::weak_ptr(shared_from_this()))); } void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet) diff --git a/src/Peer.h b/src/Peer.h index 256319c83..e70d702fb 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -19,6 +19,7 @@ enum PeerPunish PP_INVALID_REQUEST = 1, // The peer sent a request that makes no sense PP_UNKNOWN_REQUEST = 2, // The peer sent a request that might be garbage PP_UNWANTED_DATA = 3, // The peer sent us data we didn't want/need + PP_BAD_SIGNATURE = 4, // Object had bad signature }; typedef std::pair ipPort; @@ -116,7 +117,7 @@ protected: void recvGetLedger(ripple::TMGetLedger& packet); void recvLedger(ripple::TMLedgerData& packet); void recvStatus(ripple::TMStatusChange& packet); - void recvPropose(ripple::TMProposeSet& packet); + void recvPropose(const boost::shared_ptr& packet); void recvHaveTxSet(ripple::TMHaveTransactionSet& packet); void getSessionCookie(std::string& strDst);