From eb58c6abf15c4ec26acbb98b8553cd05ca7fe38f Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 14:39:30 -0800 Subject: [PATCH 01/23] Whitespace fix. --- src/LedgerAcquire.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/LedgerAcquire.cpp b/src/LedgerAcquire.cpp index f9d9f1bef..50497b3cd 100644 --- a/src/LedgerAcquire.cpp +++ b/src/LedgerAcquire.cpp @@ -170,7 +170,7 @@ void LedgerAcquire::addPeers() if (!found) { BOOST_FOREACH(Peer::ref peer, peerList) - peerHas(peer); + peerHas(peer); } } From 5134c9d5744ecce72bd5f62cf830b917e95b0577 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 14:39:50 -0800 Subject: [PATCH 02/23] Suppression helper function. --- src/Application.h | 1 + src/Suppression.cpp | 11 +++++++++++ src/Suppression.h | 8 +++++--- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/Application.h b/src/Application.h index aeca10973..ed6a25f00 100644 --- a/src/Application.h +++ b/src/Application.h @@ -99,6 +99,7 @@ public: bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } bool isNew(const uint256& s, uint64 p) { return mSuppressions.addSuppressionPeer(s, p); } + bool isNew(const uint256& s, uint64 p, int& f) { return mSuppressions.addSuppressionPeer(s, p, f); } bool isNewFlag(const uint256& s, int f) { return mSuppressions.setFlag(s, f); } bool running() { return mTxnDB != NULL; } bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); } diff --git a/src/Suppression.cpp b/src/Suppression.cpp index ad96d4b15..38f10f4a6 100644 --- a/src/Suppression.cpp +++ b/src/Suppression.cpp @@ -58,6 +58,17 @@ bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer) return created; } +bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer, int& flags) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + Suppression &s = findCreateEntry(index, created); + s.addPeer(peer); + flags = s.getFlags(); + return created; +} + bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag) { boost::mutex::scoped_lock sl(mSuppressionMutex); diff --git a/src/Suppression.h b/src/Suppression.h index 14b611ede..4b08c7c57 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -14,10 +14,11 @@ DEFINE_INSTANCE(Suppression); -#define SF_RELAYED 0x01 -#define SF_SIGBAD 0x02 -#define SF_SIGGOOD 0x04 +#define SF_RELAYED 0x01 // Has already been relayed to other nodes +#define SF_BAD 0x02 // Signature/format is bad +#define SF_SIGGOOD 0x04 // Signature is good #define SF_SAVED 0x08 +#define SF_RETRY 0x10 // Transaction can be retried class Suppression : private IS_INSTANCE(Suppression) { @@ -61,6 +62,7 @@ public: bool addSuppression(const uint256& index); bool addSuppressionPeer(const uint256& index, uint64 peer); + bool addSuppressionPeer(const uint256& index, uint64 peer, int& flags); bool addSuppressionFlags(const uint256& index, int flag); bool setFlag(const uint256& index, int flag); From 3a1fb14bbc47fdb8d5a8c38206d8a747c68b112f Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 14:40:05 -0800 Subject: [PATCH 03/23] Don't clear mPeerData on LCL view change. Correctly add peers to tx set fetches. --- src/LedgerConsensus.cpp | 25 ++++++++++++++++++++++++- src/LedgerConsensus.h | 2 +- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 876a14cc6..ce36f0688 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -45,6 +45,29 @@ void TransactionAcquire::done() } } +void TransactionAcquire::onTimer() +{ + if (!getPeerCount()) + { // out of peers + bool found = false; + std::vector peerList = theApp->getConnectionPool().getPeerVector(); + BOOST_FOREACH(Peer::ref peer, peerList) + { + if (peer->hasTxSet(getHash())) + { + found = true; + peerHas(peer); + } + } + if (!found) + { + BOOST_FOREACH(Peer::ref peer, peerList) + peerHas(peer); + } + } + trigger(Peer::pointer(), true); +} + boost::weak_ptr TransactionAcquire::pmDowncast() { return boost::shared_polymorphic_downcast(shared_from_this()); @@ -59,6 +82,7 @@ void TransactionAcquire::trigger(Peer::ref peer, bool timer) } if (!mHaveRoot) { + cLog(lsTRACE) << "TransactionAcquire::trigger " << (peer ? "havePeer" : "noPeer") << " no root"; ripple::TMGetLedger tmGL; tmGL.set_ledgerhash(mHash.begin(), mHash.size()); tmGL.set_itype(ripple::liTS_CANDIDATE); @@ -323,7 +347,6 @@ void LedgerConsensus::handleLCL(const uint256& lclHash) mProposing = false; mValidating = false; mPeerPositions.clear(); - mPeerData.clear(); mDisputes.clear(); mCloseTimes.clear(); mDeadNodes.clear(); diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 6eae25750..8ac72634a 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -30,7 +30,7 @@ protected: SHAMap::pointer mMap; bool mHaveRoot; - void onTimer() { trigger(Peer::pointer(), true); } + void onTimer(); void newPeer(Peer::ref peer) { trigger(peer, false); } void done(); From c3ad3bb873dcbfeea481c7b04b8d5c65772f6746 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 14:40:44 -0800 Subject: [PATCH 04/23] Next phase of improving the way we handle transactions received from peers. --- src/Peer.cpp | 63 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/src/Peer.cpp b/src/Peer.cpp index c9c6df5db..7a06d0f86 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -694,6 +694,41 @@ void Peer::recvHello(ripple::TMHello& packet) } } +static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx, boost::weak_ptr peer) +{ + +#ifndef TRUST_NETWORK + try + { +#endif + Transaction::pointer tx; + + if ((flags & SF_SIGGOOD) != 0) + { + tx = boost::make_shared(stx, true); + if (tx->getStatus() == INVALID) + { + theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD); + return; + } + else + theApp->getSuppression().setFlag(stx->getTransactionID(), SF_SIGGOOD); + } + else + tx = boost::make_shared(stx, false); + + theApp->getIOService().post(boost::bind(&NetworkOPs::processTransaction, &theApp->getOPs(), tx)); + +#ifndef TRUST_NETWORK + } + catch (...) + { + theApp->getSuppression().setFlags(stx->getTransactionID(), SF_BAD); + punishPeer(peer, PP_INVALID_REQUEST); + } +#endif +} + void Peer::recvTransaction(ripple::TMTransaction& packet) { cLog(lsDEBUG) << "Got transaction from peer"; @@ -708,12 +743,22 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) SerializerIterator sit(s); SerializedTransaction::pointer stx = boost::make_shared(boost::ref(sit)); - if (!theApp->isNew(stx->getTransactionID(), mPeerId)) - return; + int flags; + if (!theApp->isNew(stx->getTransactionID(), mPeerId, flags)) + { // we have seen this transaction recently + if ((flags & SF_BAD) != 0) + { + punishPeer(PP_INVALID_REQUEST); + return; + } + + if ((flags & SF_RETRY) == 0) + return; + } + + theApp->getJobQueue().addJob(jtTRANSACTION, + boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); - tx = boost::make_shared(stx, true); - if (tx->getStatus() == INVALID) - throw(0); #ifndef TRUST_NETWORK } catch (...) @@ -727,14 +772,6 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } #endif - tx = theApp->getOPs().processTransaction(tx); - - if(tx->getStatus() != INCLUDED) - { // transaction wasn't accepted into ledger -#ifdef DEBUG - std::cerr << "Transaction from peer won't go in ledger" << std::endl; -#endif - } } void Peer::recvPropose(ripple::TMProposeSet& packet) From 51793d1801fc649c724678e22208c9350cd5cbd4 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 14:43:50 -0800 Subject: [PATCH 05/23] Cleanups. --- src/HashedObject.cpp | 2 +- src/SHAMap.cpp | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/HashedObject.cpp b/src/HashedObject.cpp index 9fc95a825..ee992abdb 100644 --- a/src/HashedObject.cpp +++ b/src/HashedObject.cpp @@ -21,7 +21,6 @@ HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) : bool HashedObjectStore::store(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash) { // return: false = already in cache, true = added to cache - assert(hash == Serializer::getSHA512Half(data)); if (!theApp->getHashNodeDB()) { cLog(lsTRACE) << "HOS: no db"; @@ -32,6 +31,7 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index, cLog(lsTRACE) << "HOS: " << hash << " store: incache"; return false; } + assert(hash == Serializer::getSHA512Half(data)); HashedObject::pointer object = boost::make_shared(type, index, data, hash); if (!mCache.canonicalize(hash, object)) diff --git a/src/SHAMap.cpp b/src/SHAMap.cpp index e3de616d2..def66e4f3 100644 --- a/src/SHAMap.cpp +++ b/src/SHAMap.cpp @@ -716,23 +716,22 @@ SHAMapTreeNode::pointer SHAMap::fetchNodeExternal(const SHAMapNode& id, const ui // Log(lsTRACE) << "fetchNodeExternal: missing " << hash; throw SHAMapMissingNode(mType, id, hash); } - assert(Serializer::getSHA512Half(obj->getData()) == hash); try { SHAMapTreeNode::pointer ret = boost::make_shared(id, obj->getData(), mSeq - 1, snfPREFIX); -#ifdef DEBUG if (id != *ret) { Log(lsFATAL) << "id:" << id << ", got:" << *ret; assert(false); + return SHAMapTreeNode::pointer(); } if (ret->getNodeHash() != hash) { Log(lsFATAL) << "Hashes don't match"; assert(false); + return SHAMapTreeNode::pointer(); } -#endif return ret; } catch (...) From d1acf6953dc5033f5180ad2ee78bed4738d0e671 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 18:59:16 -0800 Subject: [PATCH 06/23] Mark that an issue was fixed. --- src/LedgerProposal.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/LedgerProposal.cpp b/src/LedgerProposal.cpp index 75e286e9e..383014382 100644 --- a/src/LedgerProposal.cpp +++ b/src/LedgerProposal.cpp @@ -26,7 +26,7 @@ LedgerProposal::LedgerProposal(const RippleAddress& naPub, const RippleAddress& const uint256& prevLgr, const uint256& position, uint32 closeTime) : mPreviousLedger(prevLgr), mCurrentHash(position), mCloseTime(closeTime), mProposeSeq(0), mPublicKey(naPub), mPrivateKey(naPriv) -{ // OPTIMIZEME: This is expensive. We create both the public and private keys separately each time +{ mPeerID = mPublicKey.getNodeID(); mTime = boost::posix_time::second_clock::universal_time(); } From d065231d8dd57dc275e28d8e7e997d3adc48ccbc Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 18:59:50 -0800 Subject: [PATCH 07/23] Rewrite of ledger proposal receiving code. Dispatch to thread pool for signature check. Re-dispatch to main I/O service for handover to consensus engine. --- src/NetworkOPs.cpp | 73 ++++++++++++------------- src/NetworkOPs.h | 6 ++- src/Peer.cpp | 129 ++++++++++++++++++++++++++++++++++++--------- src/Peer.h | 3 +- 4 files changed, 143 insertions(+), 68 deletions(-) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index ce52fde7b..fc030f2c9 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -694,60 +694,55 @@ bool NetworkOPs::haveConsensusObject() return mConsensus; } -// <-- bool: true to relay -bool NetworkOPs::recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, - const uint256& prevLedger, uint32 closeTime, const std::string& signature, - const RippleAddress& nodePublic) +uint256 NetworkOPs::getConsensusLCL() { - // JED: does mConsensus need to be locked? + if (!haveConsensusObject()) + return uint256(); + return mConsensus->getLCL(); +} - // XXX Validate key. - // XXX Take a vuc for pubkey. +void NetworkOPs::processTrustedProposal(uint256 suppression, LedgerProposal::pointer proposal, + boost::shared_ptr set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood) +{ + bool relay = true; if (!haveConsensusObject()) { cLog(lsINFO) << "Received proposal outside consensus window"; - return mMode != omFULL; + if (mMode == omFULL) + relay = false; } - - if (mConsensus->isOurPubKey(nodePublic)) + else { - cLog(lsTRACE) << "Received our own validation"; - return false; - } + storeProposal(proposal, nodePublic); - // Is this node on our UNL? - if (!theApp->getUNL().nodeInUNL(nodePublic)) - { - cLog(lsINFO) << "Untrusted proposal: " << nodePublic.humanNodePublic() << " " << proposeHash; - return true; - } + uint256 consensusLCL = mConsensus->getLCL(); - if (prevLedger.isNonZero()) - { // proposal includes a previous ledger - LedgerProposal::pointer proposal = - boost::make_shared(prevLedger, proposeSeq, proposeHash, closeTime, nodePublic); - if (!proposal->checkSign(signature)) + if (!set->has_previousledger() && (checkLedger != consensusLCL)) { - cLog(lsWARNING) << "New-style ledger proposal fails signature check"; - return false; + cLog(lsWARNING) << "Have to re-check proposal signature due to consensus view change"; + assert(proposal->hasSignature()); + proposal->setPrevLedger(consensusLCL); + if (proposal->checkSign()) + sigGood = true; + } + + if (sigGood && (consensusLCL == proposal->getPrevLedger())) + { + relay = mConsensus->peerPosition(proposal); + cLog(lsTRACE) << "Proposal processing finished, relay=" << relay; } - if (prevLedger == mConsensus->getLCL()) - return mConsensus->peerPosition(proposal); - storeProposal(proposal, nodePublic); - return false; } - LedgerProposal::pointer proposal = - boost::make_shared(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, nodePublic); - if (!proposal->checkSign(signature)) - { // Note that if the LCL is different, the signature check will fail - cLog(lsWARNING) << "Ledger proposal fails signature check"; - proposal->setSignature(signature); - storeProposal(proposal, nodePublic); - return false; + if (relay) + { + std::set peers; + theApp->getSuppression().swapSet(suppression, peers, SF_RELAYED); + PackedMessage::pointer message = boost::make_shared(*set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); } - return mConsensus->peerPosition(proposal); + else + cLog(lsINFO) << "Not relaying trusted proposal"; } SHAMap::pointer NetworkOPs::getTXMap(const uint256& hash) diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index cd5243b26..07b9d6cab 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -168,8 +168,9 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - bool recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, - const uint256& prevLedger, uint32 closeTime, const std::string& signature, const RippleAddress& nodePublic); + void processTrustedProposal(uint256 supression, LedgerProposal::pointer proposal, + boost::shared_ptr set, + RippleAddress nodePublic, uint256 checkLedger, bool sigGood); bool gotTXData(const boost::shared_ptr& peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); bool recvValidation(const SerializedValidation::pointer& val); @@ -199,6 +200,7 @@ public: boost::unordered_map >& peekStoredProposals() { return mStoredProposals; } void storeProposal(const LedgerProposal::pointer& proposal, const RippleAddress& peerPublic); + uint256 getConsensusLCL(); // client information retrieval functions std::vector< std::pair > diff --git a/src/Peer.cpp b/src/Peer.cpp index 7a06d0f86..8cc12b2ff 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -500,8 +500,8 @@ void Peer::processReadBuffer() case ripple::mtPROPOSE_LEDGER: { - ripple::TMProposeSet msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + boost::shared_ptr msg = boost::make_shared(); + if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvPropose(msg); else cLog(lsWARNING) << "parse error: " << type; @@ -709,6 +709,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx if (tx->getStatus() == INVALID) { theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD); + punshPeer(peer, PP_BAD_SIGNATURE); return; } else @@ -748,7 +749,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) { // we have seen this transaction recently if ((flags & SF_BAD) != 0) { - punishPeer(PP_INVALID_REQUEST); + punishPeer(PP_BAD_SIGNATURE); return; } @@ -757,7 +758,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } theApp->getJobQueue().addJob(jtTRANSACTION, - boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); + boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); #ifndef TRUST_NETWORK } @@ -774,42 +775,118 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } -void Peer::recvPropose(ripple::TMProposeSet& packet) +static void checkPropose(Job& job, boost::shared_ptr packet, + uint256 suppression, LedgerProposal::pointer proposal, uint256 consensusLCL, + RippleAddress nodePublic, boost::weak_ptr peer) +{ // FIXME: Suppress relaying proposals with incorrect LCLs + bool sigGood = false; + bool isTrusted = (job.getType() == jtPROPOSAL_t); + + cLog(lsTRACE) << "Checking " << (isTrusted ? "trusted" : "UNtrusted") << " proposal"; + + assert(packet); + ripple::TMProposeSet& set = *packet; + + uint256 prevLedger; + if (set.has_previousledger()) + { // proposal includes a previous ledger + cLog(lsTRACE) << "proposal with previous ledger"; + memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8); + if (!proposal->checkSign(set.signature())) + { + cLog(lsWARNING) << "proposal with previous ledger fails signature check"; + Peer::punishPeer(peer, PP_BAD_SIGNATURE); + return; + } + else + sigGood = true; + } + else + { + if (consensusLCL.isNonZero() && proposal->checkSign(set.signature())) + { + prevLedger = consensusLCL; + sigGood = true; + } + else + { + cLog(lsWARNING) << "Ledger proposal fails signature check"; + proposal->setSignature(set.signature()); + } + } + + if (isTrusted) + { + theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(), + suppression, proposal, packet, nodePublic, prevLedger, sigGood)); + } + else + { // untrusted proposal, just relay it + cLog(lsTRACE) << "relaying untrusted proposal"; + std::set peers; + theApp->getSuppression().swapSet(suppression, peers, SF_RELAYED); + PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); + } +} + +void Peer::recvPropose(const boost::shared_ptr& packet) { - if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) || - (packet.signature().size() < 56) || (packet.nodepubkey().size() > 128) || (packet.signature().size() > 128)) + assert(packet); + ripple::TMProposeSet& set = *packet; + + if ((set.currenttxhash().size() != 32) || (set.nodepubkey().size() < 28) || + (set.signature().size() < 56) || (set.nodepubkey().size() > 128) || (set.signature().size() > 128)) { cLog(lsWARNING) << "Received proposal is malformed"; return; } - uint256 currentTxHash, prevLedger; - memcpy(currentTxHash.begin(), packet.currenttxhash().data(), 32); + if (set.has_previousledger() && (set.previousledger().size() != 32)) + { + cLog(lsWARNING) << "Received proposal is malformed"; + return; + } - if ((packet.has_previousledger()) && (packet.previousledger().size() == 32)) - memcpy(prevLedger.begin(), packet.previousledger().data(), 32); + uint256 proposeHash, prevLedger; + memcpy(proposeHash.begin(), set.currenttxhash().data(), 32); + + if ((set.has_previousledger()) && (set.previousledger().size() == 32)) + memcpy(prevLedger.begin(), set.previousledger().data(), 32); Serializer s(512); - s.add256(currentTxHash); - s.add256(prevLedger); - s.add32(packet.proposeseq()); - s.add32(packet.closetime()); - s.addVL(packet.nodepubkey()); - s.addVL(packet.signature()); + s.add256(proposeHash); + s.add32(set.proposeseq()); + s.add32(set.closetime()); + s.addVL(set.nodepubkey()); + s.addVL(set.signature()); + if (set.has_previousledger()) + s.add256(prevLedger); uint256 suppression = s.getSHA512Half(); if (!theApp->isNew(suppression, mPeerId)) + { + cLog(lsTRACE) << "Received duplicate proposal from peer " << mPeerId; return; - - RippleAddress nodePublic = RippleAddress::createNodePublic(strCopy(packet.nodepubkey())); -// bool isTrusted = theApp->getUNL().nodeInUNL(nodePublic); - - if(theApp->getOPs().recvPropose(suppression, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(), - packet.signature(), nodePublic)) - { // FIXME: Not all nodes will want proposals - PackedMessage::pointer message = boost::make_shared(packet, ripple::mtPROPOSE_LEDGER); - theApp->getConnectionPool().relayMessage(this, message); } + + RippleAddress signerPublic = RippleAddress::createNodePublic(strCopy(set.nodepubkey())); + if (signerPublic == theConfig.VALIDATION_PUB) + { + cLog(lsTRACE) << "Received our own proposal from peer " << mPeerId; + return; + } + bool isTrusted = theApp->getUNL().nodeInUNL(signerPublic); + cLog(lsTRACE) << "Received " << (isTrusted ? "trusted" : "UNtrusted") << " proposal from " << mPeerId; + + uint256 consensusLCL = theApp->getOPs().getConsensusLCL(); + LedgerProposal::pointer proposal = boost::make_shared( + prevLedger.isNonZero() ? prevLedger : consensusLCL, + set.proposeseq(), proposeHash, set.closetime(), signerPublic); + + theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, + boost::bind(&checkPropose, _1, packet, suppression, proposal, consensusLCL, + mNodePublic, boost::weak_ptr(shared_from_this()))); } void Peer::recvHaveTxSet(ripple::TMHaveTransactionSet& packet) diff --git a/src/Peer.h b/src/Peer.h index 256319c83..e70d702fb 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -19,6 +19,7 @@ enum PeerPunish PP_INVALID_REQUEST = 1, // The peer sent a request that makes no sense PP_UNKNOWN_REQUEST = 2, // The peer sent a request that might be garbage PP_UNWANTED_DATA = 3, // The peer sent us data we didn't want/need + PP_BAD_SIGNATURE = 4, // Object had bad signature }; typedef std::pair ipPort; @@ -116,7 +117,7 @@ protected: void recvGetLedger(ripple::TMGetLedger& packet); void recvLedger(ripple::TMLedgerData& packet); void recvStatus(ripple::TMStatusChange& packet); - void recvPropose(ripple::TMProposeSet& packet); + void recvPropose(const boost::shared_ptr& packet); void recvHaveTxSet(ripple::TMHaveTransactionSet& packet); void getSessionCookie(std::string& strDst); From b381c057d816693aabb89a0c37642f420b77ba14 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 19:01:22 -0800 Subject: [PATCH 08/23] Newer fix one last thing and then commit. :/ --- src/Peer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Peer.cpp b/src/Peer.cpp index 8cc12b2ff..f039d6244 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -709,7 +709,7 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx if (tx->getStatus() == INVALID) { theApp->getSuppression().setFlag(stx->getTransactionID(), SF_BAD); - punshPeer(peer, PP_BAD_SIGNATURE); + Peer::punishPeer(peer, PP_BAD_SIGNATURE); return; } else From 4ad910f293fc7b99cb2436aa59a651a02436455e Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 19:40:55 -0800 Subject: [PATCH 09/23] Helper function. --- src/Suppression.cpp | 8 ++++++++ src/Suppression.h | 1 + 2 files changed, 9 insertions(+) diff --git a/src/Suppression.cpp b/src/Suppression.cpp index 38f10f4a6..5eee0ab7f 100644 --- a/src/Suppression.cpp +++ b/src/Suppression.cpp @@ -69,6 +69,14 @@ bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer, int return created; } +int SuppressionTable::getFlags(const uint256& index) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + return findCreateEntry(index, created).getFlags(); +} + bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag) { boost::mutex::scoped_lock sl(mSuppressionMutex); diff --git a/src/Suppression.h b/src/Suppression.h index 4b08c7c57..9744f1488 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -65,6 +65,7 @@ public: bool addSuppressionPeer(const uint256& index, uint64 peer, int& flags); bool addSuppressionFlags(const uint256& index, int flag); bool setFlag(const uint256& index, int flag); + int getFlags(const uint256& index); Suppression getEntry(const uint256&); From ebc0b05ecba4ce0d271d48dc200060ef50512592 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 19:41:51 -0800 Subject: [PATCH 10/23] Avoid redundant signature checks. --- src/NetworkOPs.cpp | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index fc030f2c9..820b1168a 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -115,19 +115,36 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) { Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true); - if (dbtx) return dbtx; + if (dbtx) + return dbtx; - if (!trans->checkSign()) - { - cLog(lsINFO) << "Transaction has bad signature"; + int newFlags = theApp->getSuppression().getFlags(trans->getID()); + if ((newFlags & SF_BAD) != 0) + { // cached bad trans->setStatus(INVALID); return trans; } - TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER); + if ((newFlags & SF_SIGGOOD) == 0) + { // signature not checked + if (!trans->checkSign()) + { + cLog(lsINFO) << "Transaction has bad signature"; + trans->setStatus(INVALID); + theApp->isNewFlag(trans->getID(), SF_BAD); + return trans; + } + theApp->isNewFlag(trans->getID(), SF_SIGGOOD); + } + TER r = mLedgerMaster->doTransaction(*trans->getSTransaction(), tapOPEN_LEDGER | tapNO_CHECK_SIGN); trans->setResult(r); + if (isTemMalformed(r)) // malformed, cache bad + theApp->isNewFlag(trans->getID(), SF_BAD); + else if(isTelLocal(r) || isTerRetry(r)) // can be retried + theApp->isNewFlag(trans->getID(), SF_RETRY); + #ifdef DEBUG if (r != tesSUCCESS) { @@ -180,8 +197,8 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) cLog(lsDEBUG) << "Status other than success " << r; std::set peers; - if ((mMode != omFULL) && (mMode != omTRACKING) && - theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) + if ((mMode != omFULL) && (mMode != omTRACKING) + && theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) { ripple::TMTransaction tx; Serializer s; From 9432f79ed25ed37b8414d4f580bf62793768d06b Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 20:05:01 -0800 Subject: [PATCH 11/23] Better debugging. --- src/LedgerConsensus.cpp | 5 ++++- src/Peer.cpp | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index ce36f0688..9042b0455 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -49,6 +49,8 @@ void TransactionAcquire::onTimer() { if (!getPeerCount()) { // out of peers + cLog(lsWARNING) << "Out of peers for TX set " << getHash(); + bool found = false; std::vector peerList = theApp->getConnectionPool().getPeerVector(); BOOST_FOREACH(Peer::ref peer, peerList) @@ -65,7 +67,8 @@ void TransactionAcquire::onTimer() peerHas(peer); } } - trigger(Peer::pointer(), true); + else + trigger(Peer::pointer(), true); } boost::weak_ptr TransactionAcquire::pmDowncast() diff --git a/src/Peer.cpp b/src/Peer.cpp index f039d6244..545856a0b 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -1162,6 +1162,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) if ((!packet.has_ledgerhash() || packet.ledgerhash().size() != 32)) { punishPeer(PP_INVALID_REQUEST); + cLog(lsWARNING) << "invalid request"; return; } uint256 txHash; @@ -1285,6 +1286,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) SHAMapNode mn(packet.nodeids(i).data(), packet.nodeids(i).size()); if(!mn.isValid()) { + cLog(lsWARNING) << "Request for invalid node"; punishPeer(PP_INVALID_REQUEST); return; } @@ -1292,6 +1294,8 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) std::list< std::vector > rawNodes; if(map->getNodeFat(mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) { + assert(nodeIDs.size() == rawNodes.size()); + cLog(lsDEBUG) << "getNodeFat got " << rawNodes.size() << " nodes"; std::vector::iterator nodeIDIterator; std::list< std::vector >::iterator rawNodeIterator; for(nodeIDIterator = nodeIDs.begin(), rawNodeIterator = rawNodes.begin(); @@ -1304,6 +1308,8 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) node->set_nodedata(&rawNodeIterator->front(), rawNodeIterator->size()); } } + else + cLog(lsWARNING) << "getNodeFat returns false"; } PackedMessage::pointer oPacket = boost::make_shared(reply, ripple::mtLEDGER_DATA); sendPacket(oPacket); From 66825d0e9da1124b3253006846c8b788953af288 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 22:49:24 -0800 Subject: [PATCH 12/23] Track the suppression in the ledger proposal. --- src/LedgerProposal.h | 5 +++-- src/NetworkOPs.h | 3 +-- src/Peer.cpp | 30 ++++++++++++++++-------------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/LedgerProposal.h b/src/LedgerProposal.h index a531de214..b5176c7c9 100644 --- a/src/LedgerProposal.h +++ b/src/LedgerProposal.h @@ -18,7 +18,7 @@ class LedgerProposal : private IS_INSTANCE(LedgerProposal) { protected: - uint256 mPreviousLedger, mCurrentHash; + uint256 mPreviousLedger, mCurrentHash, mSuppression; uint32 mCloseTime, mProposeSeq; uint160 mPeerID; @@ -35,7 +35,7 @@ public: // proposal from peer LedgerProposal(const uint256& prevLgr, uint32 proposeSeq, const uint256& propose, - uint32 closeTime, const RippleAddress& naPeerPublic); + uint32 closeTime, const RippleAddress& naPeerPublic, const uint256& suppress); // our first proposal LedgerProposal(const RippleAddress& pubKey, const RippleAddress& privKey, @@ -52,6 +52,7 @@ public: const uint160& getPeerID() const { return mPeerID; } const uint256& getCurrentHash() const { return mCurrentHash; } const uint256& getPrevLedger() const { return mPreviousLedger; } + const uint256& getSuppression() const { return mSuppression; } uint32 getProposeSeq() const { return mProposeSeq; } uint32 getCloseTime() const { return mCloseTime; } const RippleAddress& peekPublic() const { return mPublicKey; } diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 07b9d6cab..6b5659b66 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -168,8 +168,7 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - void processTrustedProposal(uint256 supression, LedgerProposal::pointer proposal, - boost::shared_ptr set, + void processTrustedProposal(LedgerProposal::pointer proposal, boost::shared_ptr set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood); bool gotTXData(const boost::shared_ptr& peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); diff --git a/src/Peer.cpp b/src/Peer.cpp index 545856a0b..89a196e79 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -776,9 +776,8 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } static void checkPropose(Job& job, boost::shared_ptr packet, - uint256 suppression, LedgerProposal::pointer proposal, uint256 consensusLCL, - RippleAddress nodePublic, boost::weak_ptr peer) -{ // FIXME: Suppress relaying proposals with incorrect LCLs + LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, boost::weak_ptr peer) +{ bool sigGood = false; bool isTrusted = (job.getType() == jtPROPOSAL_t); @@ -818,16 +817,18 @@ static void checkPropose(Job& job, boost::shared_ptr packe if (isTrusted) { theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(), - suppression, proposal, packet, nodePublic, prevLedger, sigGood)); + proposal, packet, nodePublic, prevLedger, sigGood)); } - else - { // untrusted proposal, just relay it + else if (sigGood && (prevLedger == consensusLCL)) + { // relay untrusted proposal cLog(lsTRACE) << "relaying untrusted proposal"; - std::set peers; - theApp->getSuppression().swapSet(suppression, peers, SF_RELAYED); + std::set peers; + theApp->getSuppression().swapSet(proposal->getSuppression(), peers, SF_RELAYED); PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); theApp->getConnectionPool().relayMessageBut(peers, message); } + else + cLog(lsDEBUG) << "Not relaying untrusted proposal"; } void Peer::recvPropose(const boost::shared_ptr& packet) @@ -839,19 +840,20 @@ void Peer::recvPropose(const boost::shared_ptr& packet) (set.signature().size() < 56) || (set.nodepubkey().size() > 128) || (set.signature().size() > 128)) { cLog(lsWARNING) << "Received proposal is malformed"; + punishPeer(PP_INVALID_REQUEST); return; } if (set.has_previousledger() && (set.previousledger().size() != 32)) { cLog(lsWARNING) << "Received proposal is malformed"; + punishPeer(PP_INVALID_REQUEST); return; } uint256 proposeHash, prevLedger; memcpy(proposeHash.begin(), set.currenttxhash().data(), 32); - - if ((set.has_previousledger()) && (set.previousledger().size() == 32)) + if (set.has_previousledger()) memcpy(prevLedger.begin(), set.previousledger().data(), 32); Serializer s(512); @@ -882,10 +884,10 @@ void Peer::recvPropose(const boost::shared_ptr& packet) uint256 consensusLCL = theApp->getOPs().getConsensusLCL(); LedgerProposal::pointer proposal = boost::make_shared( prevLedger.isNonZero() ? prevLedger : consensusLCL, - set.proposeseq(), proposeHash, set.closetime(), signerPublic); + set.proposeseq(), proposeHash, set.closetime(), signerPublic, suppression); theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, - boost::bind(&checkPropose, _1, packet, suppression, proposal, consensusLCL, + boost::bind(&checkPropose, _1, packet, proposal, consensusLCL, mNodePublic, boost::weak_ptr(shared_from_this()))); } @@ -1157,7 +1159,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) reply.set_requestcookie(packet.requestcookie()); if (packet.itype() == ripple::liTS_CANDIDATE) - { // Request is for a transaction candidate set + { // Request is for a transaction candidate set cLog(lsINFO) << "Received request for TX candidate set data " << getIP(); if ((!packet.has_ledgerhash() || packet.ledgerhash().size() != 32)) { @@ -1488,7 +1490,7 @@ Json::Value Peer::getJson() if (mHello.has_protoversion() && (mHello.protoversion() != MAKE_VERSION_INT(PROTO_VERSION_MAJOR, PROTO_VERSION_MINOR))) - ret["protocol"] = boost::lexical_cast(GET_VERSION_MAJOR(mHello.protoversion())) + "." + + ret["protocol"] = boost::lexical_cast(GET_VERSION_MAJOR(mHello.protoversion())) + "." + boost::lexical_cast(GET_VERSION_MINOR(mHello.protoversion())); if (!!mClosedLedgerHash) From 88c88148fb9296eec761d268ee77686e9e5ca96c Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 22:49:41 -0800 Subject: [PATCH 13/23] Track the supresssion. --- src/LedgerProposal.cpp | 5 +++-- src/NetworkOPs.cpp | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/LedgerProposal.cpp b/src/LedgerProposal.cpp index 383014382..5f5f05873 100644 --- a/src/LedgerProposal.cpp +++ b/src/LedgerProposal.cpp @@ -10,8 +10,9 @@ DECLARE_INSTANCE(LedgerProposal); LedgerProposal::LedgerProposal(const uint256& pLgr, uint32 seq, const uint256& tx, uint32 closeTime, - const RippleAddress& naPeerPublic) : - mPreviousLedger(pLgr), mCurrentHash(tx), mCloseTime(closeTime), mProposeSeq(seq), mPublicKey(naPeerPublic) + const RippleAddress& naPeerPublic, const uint256& suppression) : + mPreviousLedger(pLgr), mCurrentHash(tx), mSuppression(suppression), mCloseTime(closeTime), + mProposeSeq(seq), mPublicKey(naPeerPublic) { // XXX Validate key. // if (!mKey->SetPubKey(pubKey)) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 820b1168a..d5e19d4ef 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -718,7 +718,7 @@ uint256 NetworkOPs::getConsensusLCL() return mConsensus->getLCL(); } -void NetworkOPs::processTrustedProposal(uint256 suppression, LedgerProposal::pointer proposal, +void NetworkOPs::processTrustedProposal(LedgerProposal::pointer proposal, boost::shared_ptr set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood) { bool relay = true; @@ -754,7 +754,7 @@ void NetworkOPs::processTrustedProposal(uint256 suppression, LedgerProposal::poi if (relay) { std::set peers; - theApp->getSuppression().swapSet(suppression, peers, SF_RELAYED); + theApp->getSuppression().swapSet(proposal->getSuppression(), peers, SF_RELAYED); PackedMessage::pointer message = boost::make_shared(*set, ripple::mtPROPOSE_LEDGER); theApp->getConnectionPool().relayMessageBut(peers, message); } From aae34de65f514e88f5bb3413a82d3d4d668b9b37 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 4 Nov 2012 22:49:50 -0800 Subject: [PATCH 14/23] Mark a FIXME. --- src/LedgerConsensus.cpp | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 9042b0455..86b073b7c 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -982,6 +982,7 @@ void LedgerConsensus::playbackProposals() for (boost::unordered_map< uint160, std::list >::iterator it = storedProposals.begin(), end = storedProposals.end(); it != end; ++it) { + bool relay = false; BOOST_FOREACH(const LedgerProposal::pointer& proposal, it->second) { if (proposal->hasSignature()) @@ -990,11 +991,33 @@ void LedgerConsensus::playbackProposals() if (proposal->checkSign()) { cLog(lsINFO) << "Applying stored proposal"; - peerPosition(proposal); + relay = peerPosition(proposal); } } else if (proposal->isPrevLedger(mPrevLedgerHash)) - peerPosition(proposal); + relay = peerPosition(proposal); + + if (relay) + { + cLog(lsWARNING) << "We should do delayed relay of this proposal, but we cannot"; + } +#if 0 // FIXME: We can't do delayed relay because we don't have the signature + std::set peers + if (relay && theApp->getSuppression().swapSet(proposal.getSuppress(), set, SF_RELAYED)) + { + cLog(lsDEBUG) << "Stored proposal delayed relay"; + ripple::TMProposeSet set; + set.set_proposeseq + set.set_currenttxhash(, 256 / 8); + previousledger + closetime + nodepubkey + signature + PackedMessage::pointer message = boost::make_shared(set, ripple::mtPROPOSE_LEDGER); + theApp->getConnectionPool().relayMessageBut(peers, message); + } +#endif + } } } From 17a911121d2867a5b8ca7864e26c19665e0a92f8 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 01:45:07 -0800 Subject: [PATCH 15/23] Fix handling of retriable transactions that don't go in the open ledger immediately. --- src/CanonicalTXSet.h | 7 +++++++ src/LedgerConsensus.cpp | 4 ++-- src/LedgerMaster.cpp | 28 +++++++++++++++++++++++++--- src/LedgerMaster.h | 9 +++++---- src/NetworkOPs.cpp | 39 ++++++++++++++++----------------------- src/Suppression.h | 1 + 6 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/CanonicalTXSet.h b/src/CanonicalTXSet.h index c75fb6b1b..fead540db 100644 --- a/src/CanonicalTXSet.h +++ b/src/CanonicalTXSet.h @@ -39,6 +39,13 @@ public: CanonicalTXSet(const uint256& lclHash) : mSetHash(lclHash) { ; } void push_back(SerializedTransaction::ref txn); + + void reset(const uint256& newLCL) + { + mSetHash = newLCL; + mMap.clear(); + } + iterator erase(const iterator& it); iterator begin() { return mMap.begin(); } diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 86b073b7c..f75f21720 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -563,7 +563,7 @@ void LedgerConsensus::closeLedger() mCloseTime = theApp->getOPs().getCloseTimeNC(); theApp->getOPs().setLastCloseTime(mCloseTime); statusChange(ripple::neCLOSING_LEDGER, *mPreviousLedger); - takeInitialPosition(*theApp->getMasterLedger().closeLedger()); + takeInitialPosition(*theApp->getMasterLedger().closeLedger(true)); } void LedgerConsensus::stateEstablish() @@ -868,7 +868,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec { boost::unordered_map::const_iterator cit = mAcquired.find(pit.second->getCurrentHash()); - if (cit != mAcquired.end() && cit->second) + if ((cit != mAcquired.end()) && cit->second) txn->setVote(pit.first, cit->second->hasItem(txID)); } diff --git a/src/LedgerMaster.cpp b/src/LedgerMaster.cpp index cd477b21c..b07c86e27 100644 --- a/src/LedgerMaster.cpp +++ b/src/LedgerMaster.cpp @@ -14,10 +14,10 @@ uint32 LedgerMaster::getCurrentLedgerIndex() return mCurrentLedger->getLedgerSeq(); } -bool LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction) +void LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction) { // returns true if transaction was added boost::recursive_mutex::scoped_lock ml(mLock); - return mHeldTransactionsByID.insert(std::make_pair(transaction->getID(), transaction)).second; + mHeldTransactions.push_back(transaction->getSTransaction()); } void LedgerMaster::pushLedger(Ledger::ref newLedger) @@ -79,12 +79,34 @@ void LedgerMaster::storeLedger(Ledger::ref ledger) } -Ledger::pointer LedgerMaster::closeLedger() +Ledger::pointer LedgerMaster::closeLedger(bool recover) { boost::recursive_mutex::scoped_lock sl(mLock); Ledger::pointer closingLedger = mCurrentLedger; + + if (recover) + { + int recovers = 0; + for (CanonicalTXSet::iterator it = mHeldTransactions.begin(), end = mHeldTransactions.end(); it != end; ++it) + { + try + { + TER result = mEngine.applyTransaction(*it->second, tapOPEN_LEDGER); + if (isTepSuccess(result)) + ++recovers; + } + catch (...) + { + cLog(lsWARNING) << "Held transaction throws"; + } + } + tLog(recovers != 0, lsINFO) << "Recovered " << recovers << " held transactions"; + mHeldTransactions.reset(closingLedger->getHash()); + } + mCurrentLedger = boost::make_shared(boost::ref(*closingLedger), true); mEngine.setLedger(mCurrentLedger); + return closingLedger; } diff --git a/src/LedgerMaster.h b/src/LedgerMaster.h index e3270710e..152952b50 100644 --- a/src/LedgerMaster.h +++ b/src/LedgerMaster.h @@ -9,6 +9,7 @@ #include "Transaction.h" #include "TransactionEngine.h" #include "RangeSet.h" +#include "CanonicalTXSet.h" // Tracks the current ledger and any ledgers in the process of closing // Tracks ledger history @@ -25,7 +26,7 @@ class LedgerMaster LedgerHistory mLedgerHistory; - std::map mHeldTransactionsByID; + CanonicalTXSet mHeldTransactions; RangeSet mCompleteLedgers; LedgerAcquire::pointer mMissingLedger; @@ -40,7 +41,7 @@ class LedgerMaster public: - LedgerMaster() : mMissingSeq(0) { ; } + LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0) { ; } uint32 getCurrentLedgerIndex(); @@ -64,7 +65,7 @@ public: std::string getCompleteLedgers() { return mCompleteLedgers.toString(); } - Ledger::pointer closeLedger(); + Ledger::pointer closeLedger(bool recoverHeldTransactions); Ledger::pointer getLedgerBySeq(uint32 index) { @@ -90,7 +91,7 @@ public: void setLedgerRangePresent(uint32 minV, uint32 maxV) { mCompleteLedgers.setRange(minV, maxV); } - bool addHeldTransaction(const Transaction::pointer& trans); + void addHeldTransaction(const Transaction::pointer& trans); void sweep(void) { mLedgerHistory.sweep(); } }; diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index d5e19d4ef..d9fa2ddc9 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -156,9 +156,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) if (r == tefFAILURE) throw Fault(IO_ERROR); - if (r == terPRE_SEQ) + if (isTerRetry(r)) { // transaction should be held - cLog(lsDEBUG) << "Transaction should be held"; + cLog(lsDEBUG) << "Transaction should be held: " << r; trans->setStatus(HELD); theApp->getMasterTransaction().canonicalize(trans, true); mLedgerMaster->addHeldTransaction(trans); @@ -171,12 +171,24 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) return trans; } + bool relay = true; + if (r == tesSUCCESS) { - cLog(lsINFO) << "Transaction is now included"; + cLog(lsINFO) << "Transaction is now included in open ledger"; trans->setStatus(INCLUDED); theApp->getMasterTransaction().canonicalize(trans, true); + } + else + { + cLog(lsDEBUG) << "Status other than success " << r; + if (mMode == omFULL) + relay = false; + trans->setStatus(INVALID); + } + if (relay) + { std::set peers; if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) { @@ -185,32 +197,13 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) trans->getSTransaction()->add(s); tx.set_rawtransaction(&s.getData().front(), s.getLength()); tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); + tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); theApp->getConnectionPool().relayMessageBut(peers, packet); } - - return trans; } - cLog(lsDEBUG) << "Status other than success " << r; - std::set peers; - - if ((mMode != omFULL) && (mMode != omTRACKING) - && theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) - { - ripple::TMTransaction tx; - Serializer s; - trans->getSTransaction()->add(s); - tx.set_rawtransaction(&s.getData().front(), s.getLength()); - tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); - PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessageTo(peers, packet); - } - - trans->setStatus(INVALID); return trans; } diff --git a/src/Suppression.h b/src/Suppression.h index 9744f1488..fe07e522b 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -70,6 +70,7 @@ public: Suppression getEntry(const uint256&); bool swapSet(const uint256& index, std::set& peers, int flag); + bool swapSet(const uint256& index, std::set& peers); }; #endif From e4b954c30cd64a09b27d893f080e7c6e9faa8dd9 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 02:05:51 -0800 Subject: [PATCH 16/23] Logging improvements. --- src/LedgerConsensus.cpp | 18 +++++++++++++++++- src/NetworkOPs.cpp | 3 +++ src/Peer.cpp | 1 + 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index f75f21720..fa20269e4 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -35,11 +35,12 @@ void TransactionAcquire::done() { if (mFailed) { - cLog(lsWARNING) << "Failed to acquire TXs " << mHash; + cLog(lsWARNING) << "Failed to acquire TX set " << mHash; theApp->getOPs().mapComplete(mHash, SHAMap::pointer()); } else { + cLog(lsINFO) << "Acquired TX set " << mHash; mMap->setImmutable(); theApp->getOPs().mapComplete(mHash, mMap); } @@ -125,9 +126,15 @@ bool TransactionAcquire::takeNodes(const std::list& nodeIDs, const std::list< std::vector >& data, Peer::ref peer) { if (mComplete) + { + cLog(lsTRACE) << "TX set complete"; return true; + } if (mFailed) + { + cLog(lsTRACE) << "TX set failed"; return false; + } try { std::list::const_iterator nodeIDit = nodeIDs.begin(); @@ -143,12 +150,18 @@ bool TransactionAcquire::takeNodes(const std::list& nodeIDs, return false; } if (!mMap->addRootNode(getHash(), *nodeDatait, snfWIRE, NULL)) + { + cLog(lsWARNING) << "TX acquire got bad root node"; return false; + } else mHaveRoot = true; } else if (!mMap->addKnownNode(*nodeIDit, *nodeDatait, &sf)) + { + cLog(lsWARNING) << "TX acquire got bad non-root node"; return false; + } ++nodeIDit; ++nodeDatait; } @@ -953,7 +966,10 @@ bool LedgerConsensus::peerGaveNodes(Peer::ref peer, const uint256& setHash, { boost::unordered_map::iterator acq = mAcquiring.find(setHash); if (acq == mAcquiring.end()) + { + cLog(lsINFO) << "Got TX data for set not acquiring: " << setHash; return false; + } TransactionAcquire::pointer set = acq->second; // We must keep the set around during the function return set->takeNodes(nodeIDs, nodeData, peer); } diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index d9fa2ddc9..bbb17dfee 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -766,7 +766,10 @@ bool NetworkOPs::gotTXData(const boost::shared_ptr& peer, const uint256& h const std::list& nodeIDs, const std::list< std::vector >& nodeData) { if (!haveConsensusObject()) + { + cLog(lsWARNING) << "Got TX data with no consensus object"; return false; + } return mConsensus->peerGaveNodes(peer, hash, nodeIDs, nodeData); } diff --git a/src/Peer.cpp b/src/Peer.cpp index 89a196e79..f6858864f 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -1331,6 +1331,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet) uint256 hash; if(packet.ledgerhash().size() != 32) { + cLog(lsWARNING) << "TX candidate reply with invalid hash size"; punishPeer(PP_INVALID_REQUEST); return; } From 51c79f74fa0d67b4fb8fccd3f9c16f3fdd65e9b5 Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Mon, 5 Nov 2012 02:18:56 -0800 Subject: [PATCH 17/23] added local events library --- js/events.js | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 js/events.js diff --git a/js/events.js b/js/events.js new file mode 100644 index 000000000..09d0ce899 --- /dev/null +++ b/js/events.js @@ -0,0 +1,57 @@ +var EventEmitter = function () { + this._events = {}; +}; + +EventEmitter.prototype.on = function (e, f) { + console.log('on', e, f) + if (e in this._events) { + if (this._events[e].indexOf(f) < 0) { + this._events[e].push(f); + } + } else { + this._events[e] = [f]; + } + return this; +}; + +EventEmitter.prototype.off = function (e, f) { + if (f) { + function eq(x) { return function (y) { return x === y; } } + this._events[e] = this.listeners(e).filter(eq(f)); + } else { + delete this._events[e]; + } +}; + +EventEmitter.prototype.removeListener = function (e, f) { + this.off(e, f); +}; + +EventEmitter.prototype.removeAllListeners = function (e) { + this.off(e); +}; + +EventEmitter.prototype.emit = function (e) { + var args = Array.prototype.slice.call(arguments, 1), + fs = this.listeners(e); + console.log('emit', e, args) + + for (var i = 0; i < fs.length; i++) { + fs[i].apply(e, args); + } +}; + +EventEmitter.prototype.listeners = function (e) { + return this._events[e] || []; +}; + +EventEmitter.prototype.once = function (e, f) { + var that = this; + this.on(e, function g() { + f.apply(es, arguments); + that.off(e, g); + }); + return this; +}; + +exports.EventEmitter = EventEmitter; \ No newline at end of file From c54913f0dbb6a260cf0faf9a3f2fb20c4426331d Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Mon, 5 Nov 2012 02:21:36 -0800 Subject: [PATCH 18/23] . --- js/events.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/events.js b/js/events.js index 09d0ce899..7a2098f98 100644 --- a/js/events.js +++ b/js/events.js @@ -48,7 +48,7 @@ EventEmitter.prototype.listeners = function (e) { EventEmitter.prototype.once = function (e, f) { var that = this; this.on(e, function g() { - f.apply(es, arguments); + f.apply(e, arguments); that.off(e, g); }); return this; From d895858d0ee994c3c8a8d6f815d5557ec9105e18 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 03:05:39 -0800 Subject: [PATCH 19/23] Fix a cosmetic bug. --- src/LedgerEntrySet.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/LedgerEntrySet.cpp b/src/LedgerEntrySet.cpp index 3224f07eb..fa3872ea9 100644 --- a/src/LedgerEntrySet.cpp +++ b/src/LedgerEntrySet.cpp @@ -447,9 +447,8 @@ void LedgerEntrySet::calcRawMeta(Serializer& s, TER result) it != end; ++it) entryModify(it->second); - cLog(lsTRACE) << "Metadata:" << mSet.getJson(0); - mSet.addRaw(s, result); + cLog(lsTRACE) << "Metadata:" << mSet.getJson(0); } // <-- uNodeDir: For deletion, present to make dirDelete efficient. From f4912163028f5b78740b3794229897de9c71427b Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 03:05:46 -0800 Subject: [PATCH 20/23] Fix a bug that breaks TX acquire logic. --- src/NetworkOPs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index bbb17dfee..a4757caea 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -785,7 +785,7 @@ bool NetworkOPs::hasTXSet(const boost::shared_ptr& peer, const uint256& se void NetworkOPs::mapComplete(const uint256& hash, SHAMap::ref map) { - if (!haveConsensusObject()) + if (haveConsensusObject()) mConsensus->mapComplete(hash, map, true); } From 6b006e1e98d782b10fcf99ec85570395272491bd Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 03:06:10 -0800 Subject: [PATCH 21/23] Cosmetic change. --- src/Peer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Peer.cpp b/src/Peer.cpp index f6858864f..5e4b12128 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -1321,7 +1321,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet) { if (packet.nodes().size() <= 0) { - cLog(lsWARNING) << "Ledger data with no nodes"; + cLog(lsWARNING) << "Ledger/TXset data with no nodes"; punishPeer(PP_INVALID_REQUEST); return; } From 10e3153091a1cbca99b7677d7d08461af80d4b58 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 03:06:27 -0800 Subject: [PATCH 22/23] Cleanup. --- src/TransactionEngine.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/TransactionEngine.cpp b/src/TransactionEngine.cpp index 053502939..3c6789ce5 100644 --- a/src/TransactionEngine.cpp +++ b/src/TransactionEngine.cpp @@ -455,7 +455,7 @@ TER TransactionEngine::applyTransaction(const SerializedTransaction& txn, Transa terResult = terRETRY; } - if (tesSUCCESS == terResult || isTepPartial(terResult)) + if ((tesSUCCESS == terResult) || isTepPartial(terResult)) { // Transaction succeeded fully or (retries are not allowed and the transaction succeeded partially). Serializer m; From 5f7c6c4a6b67481006d5d3afbe0c24ab368c1df0 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 03:06:37 -0800 Subject: [PATCH 23/23] Add an assert. --- src/TransactionMeta.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/TransactionMeta.cpp b/src/TransactionMeta.cpp index f8fdff08d..8866b039f 100644 --- a/src/TransactionMeta.cpp +++ b/src/TransactionMeta.cpp @@ -121,6 +121,7 @@ static bool compare(const STObject& o1, const STObject& o2) STObject TransactionMetaSet::getAsObject() const { STObject metaData(sfTransactionMetaData); + assert(mResult != 255); metaData.setFieldU8(sfTransactionResult, mResult); metaData.addObject(mNodes); return metaData;