diff --git a/src/Application.h b/src/Application.h index 288f505bc..aeca10973 100644 --- a/src/Application.h +++ b/src/Application.h @@ -95,7 +95,11 @@ public: HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; } ValidationCollection& getValidations() { return mValidations; } JobQueue& getJobQueue() { return mJobQueue; } + SuppressionTable& getSuppression() { return mSuppressions; } + bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } + bool isNew(const uint256& s, uint64 p) { return mSuppressions.addSuppressionPeer(s, p); } + bool isNewFlag(const uint256& s, int f) { return mSuppressions.setFlag(s, f); } bool running() { return mTxnDB != NULL; } bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); } diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 79f0c82a8..f95c0a970 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -251,22 +251,30 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m return sentTo; } -int ConnectionPool::relayMessage(const std::set& fromPeers, const PackedMessage::pointer& msg) -{ - int sentTo = 0; +void ConnectionPool::relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg) +{ // Relay message to all but the specified peers boost::mutex::scoped_lock sl(mPeerLock); BOOST_FOREACH(naPeer pair, mConnectedMap) { Peer::ref peer = pair.second; if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0)) - { - ++sentTo; peer->sendPacket(msg); - } } - return sentTo; +} + +void ConnectionPool::relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg) +{ // Relay message to the specified peers + boost::mutex::scoped_lock sl(mPeerLock); + + BOOST_FOREACH(naPeer pair, mConnectedMap) + { + Peer::ref peer = pair.second; + if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) > 0)) + peer->sendPacket(msg); + } + } // Schedule a connection via scanning. diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index f230bd0ec..e3801ef73 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -62,7 +62,8 @@ public: // Send message to network. int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg); - int relayMessage(const std::set& fromPeers, const PackedMessage::pointer& msg); + void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg); + void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg); // Manual connection request. // Queue for immediate scanning. diff --git a/src/Ledger.cpp b/src/Ledger.cpp index 69807417e..ec44a5dc9 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -1073,7 +1073,7 @@ int Ledger::getPendingSaves() void Ledger::pendSave(bool fromConsensus) { - if (!fromConsensus && !theApp->isNew(getHash())) + if (!fromConsensus && !theApp->isNewFlag(getHash(), SF_SAVED)) return; boost::thread thread(boost::bind(&Ledger::saveAcceptedLedger, shared_from_this(), fromConsensus)); diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index beeaaa400..9f55c2243 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -845,7 +845,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec txn->setVote(pit.first, cit->second->hasItem(txID)); } - if (!ourVote && theApp->isNew(txID)) + if (!ourVote && theApp->isNewFlag(txID, SF_RELAYED)) { ripple::TMTransaction msg; msg.set_rawtransaction(&(tx.front()), tx.size()); diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index dbeb8fd1d..f39faa6d8 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -103,7 +103,7 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t return tpTransNew; } -Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, Peer* source) +Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) { Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true); if (dbtx) return dbtx; @@ -151,27 +151,28 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, trans->setStatus(INCLUDED); theApp->getMasterTransaction().canonicalize(trans, true); -// FIXME: Need code to get all accounts affected by a transaction and re-synch -// any of them that affect local accounts cached in memory. Or, we need to -// no cache the account balance information and always get it from the current ledger -// theApp->getWallet().applyTransaction(trans); + std::set peers; + if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) + { + ripple::TMTransaction tx; + Serializer s; + trans->getSTransaction()->add(s); + tx.set_rawtransaction(&s.getData().front(), s.getLength()); + tx.set_status(ripple::tsCURRENT); + tx.set_receivetimestamp(getNetworkTimeNC()); - ripple::TMTransaction tx; - Serializer s; - trans->getSTransaction()->add(s); - tx.set_rawtransaction(&s.getData().front(), s.getLength()); - tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); - - PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - int sentTo = theApp->getConnectionPool().relayMessage(source, packet); - cLog(lsINFO) << "Transaction relayed to " << sentTo << " node(s)"; + PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); + theApp->getConnectionPool().relayMessageBut(peers, packet); + } return trans; } cLog(lsDEBUG) << "Status other than success " << r; - if ((mMode != omFULL) && (mMode != omTRACKING) && (theApp->isNew(trans->getID()))) + std::set peers; + + if ((mMode != omFULL) && (mMode != omTRACKING) && + theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) { ripple::TMTransaction tx; Serializer s; @@ -180,7 +181,7 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, tx.set_status(ripple::tsCURRENT); tx.set_receivetimestamp(getNetworkTimeNC()); PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessage(source, packet); + theApp->getConnectionPool().relayMessageTo(peers, packet); } trans->setStatus(INVALID); @@ -685,7 +686,7 @@ bool NetworkOPs::haveConsensusObject() } // <-- bool: true to relay -bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, +bool NetworkOPs::recvPropose(uint64 peerId, uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, uint32 closeTime, const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic) { // JED: does mConsensus need to be locked? @@ -701,7 +702,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons s.add32(closeTime); s.addRaw(pubKey); s.addRaw(signature); - if (!theApp->isNew(s.getSHA512Half())) + if (!theApp->isNew(s.getSHA512Half(), peerId)) return false; RippleAddress naPeerPublic = RippleAddress::createNodePublic(strCopy(pubKey)); diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 5c54c3a16..e2d476c43 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -121,7 +121,7 @@ public: // Transaction::pointer submitTransaction(const Transaction::pointer& tpTrans); - Transaction::pointer processTransaction(Transaction::pointer transaction, Peer* source = NULL); + Transaction::pointer processTransaction(Transaction::pointer transaction); Transaction::pointer findTransactionByID(const uint256& transactionID); int findTransactionsBySource(const uint256& uLedger, std::list&, const RippleAddress& sourceAccount, uint32 minSeq, uint32 maxSeq); @@ -168,8 +168,8 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - bool recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, uint32 closeTime, - const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic); + bool recvPropose(uint64 peerId, uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, + uint32 closeTime, const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic); bool gotTXData(const boost::shared_ptr& peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); bool recvValidation(const SerializedValidation::pointer& val); diff --git a/src/Peer.cpp b/src/Peer.cpp index 3b8a10f88..d441876dc 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -708,8 +708,12 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) SerializerIterator sit(s); SerializedTransaction::pointer stx = boost::make_shared(boost::ref(sit)); + if (!theApp->isNew(stx->getTransactionID(), mPeerId)) + return; + tx = boost::make_shared(stx, true); - if (tx->getStatus() == INVALID) throw(0); + if (tx->getStatus() == INVALID) + throw(0); #ifndef TRUST_NETWORK } catch (...) @@ -723,7 +727,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } #endif - tx = theApp->getOPs().processTransaction(tx, this); + tx = theApp->getOPs().processTransaction(tx); if(tx->getStatus() != INCLUDED) { // transaction wasn't accepted into ledger @@ -748,7 +752,7 @@ void Peer::recvPropose(ripple::TMProposeSet& packet) if ((packet.has_previousledger()) && (packet.previousledger().size() == 32)) memcpy(prevLedger.begin(), packet.previousledger().data(), 32); - if(theApp->getOPs().recvPropose(packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(), + if(theApp->getOPs().recvPropose(mPeerId, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(), packet.nodepubkey(), packet.signature(), mNodePublic)) { // FIXME: Not all nodes will want proposals PackedMessage::pointer message = boost::make_shared(packet, ripple::mtPROPOSE_LEDGER); @@ -822,7 +826,7 @@ void Peer::recvValidation(const boost::shared_ptr& packet) SerializedValidation::pointer val = boost::make_shared(boost::ref(sit), false); uint256 signingHash = val->getSigningHash(); - if (!theApp->isNew(signingHash)) + if (!theApp->isNew(signingHash, mPeerId)) { cLog(lsTRACE) << "Validation is duplicate"; return; diff --git a/src/Suppression.cpp b/src/Suppression.cpp index 2576a2aed..ad96d4b15 100644 --- a/src/Suppression.cpp +++ b/src/Suppression.cpp @@ -66,3 +66,35 @@ bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag) findCreateEntry(index, created).setFlag(flag); return created; } + +bool SuppressionTable::setFlag(const uint256& index, int flag) +{ // return: true = changed, false = unchanged + assert(flag != 0); + + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + Suppression &s = findCreateEntry(index, created); + + if ((s.getFlags() & flag) == flag) + return false; + + s.setFlag(flag); + return true; +} + +bool SuppressionTable::swapSet(const uint256& index, std::set& peers, int flag) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + Suppression &s = findCreateEntry(index, created); + + if ((s.getFlags() & flag) == flag) + return false; + + s.swapSet(peers); + s.setFlag(flag); + + return true; +} \ No newline at end of file diff --git a/src/Suppression.h b/src/Suppression.h index 18abb3a98..14b611ede 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -14,6 +14,11 @@ DEFINE_INSTANCE(Suppression); +#define SF_RELAYED 0x01 +#define SF_SIGBAD 0x02 +#define SF_SIGGOOD 0x04 +#define SF_SAVED 0x08 + class Suppression : private IS_INSTANCE(Suppression) { protected: @@ -27,9 +32,11 @@ public: void addPeer(uint64 peer) { mPeers.insert(peer); } bool hasPeer(uint64 peer) { return mPeers.count(peer) > 0; } + int getFlags(void) { return mFlags; } bool hasFlag(int f) { return (mFlags & f) != 0; } void setFlag(int f) { mFlags |= f; } void clearFlag(int f) { mFlags &= ~f; } + void swapSet(std::set& s) { mPeers.swap(s); } }; class SuppressionTable @@ -55,8 +62,11 @@ public: bool addSuppressionPeer(const uint256& index, uint64 peer); bool addSuppressionFlags(const uint256& index, int flag); + bool setFlag(const uint256& index, int flag); Suppression getEntry(const uint256&); + + bool swapSet(const uint256& index, std::set& peers, int flag); }; #endif