From bcff9fad177a4711c8a4bca9b8f1ae1202c21b98 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 31 Oct 2012 17:32:26 -0700 Subject: [PATCH 1/3] Next generation supression code. --- src/Application.h | 1 - src/Suppression.cpp | 75 +++++++++++++++++++++++++++++++-------------- src/Suppression.h | 38 ++++++++++++++++++++--- 3 files changed, 85 insertions(+), 29 deletions(-) diff --git a/src/Application.h b/src/Application.h index 8193408327..288f505bcd 100644 --- a/src/Application.h +++ b/src/Application.h @@ -96,7 +96,6 @@ public: ValidationCollection& getValidations() { return mValidations; } JobQueue& getJobQueue() { return mJobQueue; } bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } - bool isNew(const uint160& s) { return mSuppressions.addSuppression(s); } bool running() { return mTxnDB != NULL; } bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); } diff --git a/src/Suppression.cpp b/src/Suppression.cpp index 4b7cad5711..2576a2aede 100644 --- a/src/Suppression.cpp +++ b/src/Suppression.cpp @@ -1,39 +1,68 @@ + #include "Suppression.h" #include -bool SuppressionTable::addSuppression(const uint160& suppression) -{ - boost::mutex::scoped_lock sl(mSuppressionMutex); +DECLARE_INSTANCE(Suppression); - if (mSuppressionMap.find(suppression) != mSuppressionMap.end()) - return false; +Suppression& SuppressionTable::findCreateEntry(const uint256& index, bool& created) +{ + boost::unordered_map::iterator fit = mSuppressionMap.find(index); + + if (fit != mSuppressionMap.end()) + { + created = false; + return fit->second; + } + created = true; time_t now = time(NULL); time_t expireTime = now - mHoldTime; - boost::unordered_map< time_t, std::list >::iterator - it = mSuppressionTimes.begin(), end = mSuppressionTimes.end(); - while (it != end) + // See if any supressions need to be expired + std::map< time_t, std::list >::iterator it = mSuppressionTimes.begin(); + if ((it != mSuppressionTimes.end()) && (it->first <= expireTime)) { - if (it->first <= expireTime) - { - BOOST_FOREACH(const uint160& lit, it->second) - mSuppressionMap.erase(lit); - it = mSuppressionTimes.erase(it); - } - else ++it; + BOOST_FOREACH(const uint256& lit, it->second) + mSuppressionMap.erase(lit); + mSuppressionTimes.erase(it); } - mSuppressionMap[suppression] = now; - mSuppressionTimes[now].push_back(suppression); - - return true; + mSuppressionTimes[now].push_back(index); + return mSuppressionMap.insert(std::make_pair(index, Suppression())).first->second; } -bool SuppressionTable::addSuppression(const uint256& suppression) +bool SuppressionTable::addSuppression(const uint256& index) { - uint160 u; - memcpy(u.begin(), suppression.begin() + (suppression.size() - u.size()), u.size()); - return addSuppression(u); + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + findCreateEntry(index, created); + return created; +} + +Suppression SuppressionTable::getEntry(const uint256& index) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + return findCreateEntry(index, created); +} + +bool SuppressionTable::addSuppressionPeer(const uint256& index, uint64 peer) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + findCreateEntry(index, created).addPeer(peer); + return created; +} + +bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + findCreateEntry(index, created).setFlag(flag); + return created; } diff --git a/src/Suppression.h b/src/Suppression.h index 608a57d733..18abb3a982 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -1,14 +1,36 @@ #ifndef __SUPPRESSION__ #define __SUPPRESSION__ +#include +#include #include #include #include #include "uint256.h" +#include "types.h" +#include "InstanceCounter.h" -extern std::size_t hash_value(const uint160& u); +DEFINE_INSTANCE(Suppression); + +class Suppression : private IS_INSTANCE(Suppression) +{ +protected: + int mFlags; + std::set mPeers; + +public: + Suppression() : mFlags(0) { ; } + + const std::set& peekPeers() { return mPeers; } + void addPeer(uint64 peer) { mPeers.insert(peer); } + bool hasPeer(uint64 peer) { return mPeers.count(peer) > 0; } + + bool hasFlag(int f) { return (mFlags & f) != 0; } + void setFlag(int f) { mFlags |= f; } + void clearFlag(int f) { mFlags &= ~f; } +}; class SuppressionTable { @@ -17,18 +39,24 @@ protected: boost::mutex mSuppressionMutex; // Stores all suppressed hashes and their expiration time - boost::unordered_map mSuppressionMap; + boost::unordered_map mSuppressionMap; // Stores all expiration times and the hashes indexed for them - boost::unordered_map< time_t, std::list > mSuppressionTimes; + std::map< time_t, std::list > mSuppressionTimes; int mHoldTime; + Suppression& findCreateEntry(const uint256&, bool& created); + public: SuppressionTable(int holdTime = 120) : mHoldTime(holdTime) { ; } - bool addSuppression(const uint256& suppression); - bool addSuppression(const uint160& suppression); + bool addSuppression(const uint256& index); + + bool addSuppressionPeer(const uint256& index, uint64 peer); + bool addSuppressionFlags(const uint256& index, int flag); + + Suppression getEntry(const uint256&); }; #endif From 0830cf4ca8a94d36669526a654b8e8f7ee81495f Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 31 Oct 2012 19:25:05 -0700 Subject: [PATCH 2/3] Change count to 'SerializedValue' to better reflect what it is. --- src/SerializedTypes.cpp | 2 +- src/SerializedTypes.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/SerializedTypes.cpp b/src/SerializedTypes.cpp index 5fc0ebbe3f..d147a3fdfe 100644 --- a/src/SerializedTypes.cpp +++ b/src/SerializedTypes.cpp @@ -14,7 +14,7 @@ #include "TransactionErr.h" SETUP_LOG(); -DECLARE_INSTANCE(SerializedEntry); +DECLARE_INSTANCE(SerializedValue); STAmount saZero(CURRENCY_ONE, ACCOUNT_ONE, 0); STAmount saOne(CURRENCY_ONE, ACCOUNT_ONE, 1); diff --git a/src/SerializedTypes.h b/src/SerializedTypes.h index 6e1267448f..4832b4621e 100644 --- a/src/SerializedTypes.h +++ b/src/SerializedTypes.h @@ -30,9 +30,9 @@ enum PathFlags #define ACCOUNT_XNS uint160(0) #define ACCOUNT_ONE uint160(1) // Used as a place holder -DEFINE_INSTANCE(SerializedEntry); +DEFINE_INSTANCE(SerializedValue); -class SerializedType : private IS_INSTANCE(SerializedEntry) +class SerializedType : private IS_INSTANCE(SerializedValue) { protected: SField::ptr fName; From 41da9c740f253ecea6ce9695471f3364b19e805a Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 31 Oct 2012 19:46:30 -0700 Subject: [PATCH 3/3] Support for 64-bit peer IDs. --- src/ConnectionPool.cpp | 27 ++++++++++++++++++++++++++- src/ConnectionPool.h | 9 ++++++++- src/Peer.cpp | 3 ++- src/Peer.h | 3 +++ 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index a6e1a99241..79f0c82a8b 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -29,6 +29,7 @@ void splitIpPort(const std::string& strIpPort, std::string& strIp, int& iPort) } ConnectionPool::ConnectionPool(boost::asio::io_service& io_service) : + mLastPeer(0), mCtx(boost::asio::ssl::context::sslv23), mScanTimer(io_service), mPolicyTimer(io_service) @@ -237,7 +238,7 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m BOOST_FOREACH(naPeer pair, mConnectedMap) { - Peer::pointer peer = pair.second; + Peer::ref peer = pair.second; if (!peer) std::cerr << "CP::RM null peer in list" << std::endl; else if ((!fromPeer || !(peer.get() == fromPeer)) && peer->isConnected()) @@ -250,6 +251,24 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m return sentTo; } +int ConnectionPool::relayMessage(const std::set& fromPeers, const PackedMessage::pointer& msg) +{ + int sentTo = 0; + boost::mutex::scoped_lock sl(mPeerLock); + + BOOST_FOREACH(naPeer pair, mConnectedMap) + { + Peer::ref peer = pair.second; + if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0)) + { + ++sentTo; + peer->sendPacket(msg); + } + } + + return sentTo; +} + // Schedule a connection via scanning. // // Add or modify into PeerIps as a manual entry for immediate scanning. @@ -354,6 +373,12 @@ std::vector ConnectionPool::getPeerVector() return ret; } +uint64 ConnectionPool::assignPeerId() +{ + boost::mutex::scoped_lock sl(mPeerLock); + return ++mLastPeer; +} + // Now know peer's node public key. Determine if we want to stay connected. // <-- bNew: false = redundant bool ConnectionPool::peerConnected(Peer::ref peer, const RippleAddress& naPeer, diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index 1b909f626a..f230bd0eca 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -1,6 +1,8 @@ #ifndef __CONNECTION_POOL__ #define __CONNECTION_POOL__ +#include + #include #include @@ -14,7 +16,8 @@ class ConnectionPool { private: - boost::mutex mPeerLock; + boost::mutex mPeerLock; + uint64 mLastPeer; typedef std::pair naPeer; typedef std::pair pipPeer; @@ -59,6 +62,7 @@ public: // Send message to network. int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg); + int relayMessage(const std::set& fromPeers, const PackedMessage::pointer& msg); // Manual connection request. // Queue for immediate scanning. @@ -87,6 +91,9 @@ public: Json::Value getPeersJson(); std::vector getPeerVector(); + // Peer 64-bit ID function + uint64 assignPeerId(); + // // Scanning // diff --git a/src/Peer.cpp b/src/Peer.cpp index 491991c23f..3b8a10f88c 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -30,7 +30,8 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) mSocketSsl(io_service, ctx), mVerifyTimer(io_service) { - // cLog(lsDEBUG) << "CREATING PEER: " << ADDRESS(this); + cLog(lsDEBUG) << "CREATING PEER: " << ADDRESS(this); + mPeerId = theApp->getConnectionPool().assignPeerId(); } void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred) diff --git a/src/Peer.h b/src/Peer.h index cc3a525c5b..49857e3e6f 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -46,6 +46,7 @@ private: ipPort mIpPort; ipPort mIpPortConnect; uint256 mCookieHash; + uint64 mPeerId; uint256 mClosedLedgerHash, mPreviousLedgerHash; std::list mRecentLedgers; @@ -162,6 +163,8 @@ public: uint256 getClosedLedgerHash() const { return mClosedLedgerHash; } bool hasLedger(const uint256& hash) const; bool hasTxSet(const uint256& hash) const; + uint64 getPeerId() const { return mPeerId; } + RippleAddress getNodePublic() const { return mNodePublic; } void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); } };