diff --git a/src/Application.cpp b/src/Application.cpp index 758fae4ae..f4f1e99d1 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -18,7 +18,7 @@ #include SETUP_LOG(); - +LogPartition TaggedCachePartition("TaggedCache"); Application* theApp = NULL; DatabaseCon::DatabaseCon(const std::string& strName, const char *initStrings[], int initCount) @@ -39,7 +39,7 @@ DatabaseCon::~DatabaseCon() Application::Application() : mIOWork(mIOService), mAuxWork(mAuxService), mUNL(mIOService), - mNetOps(mIOService, &mMasterLedger), mTempNodeCache(16384, 90), mHashedObjectStore(16384, 300), + mNetOps(mIOService, &mMasterLedger), mTempNodeCache("NodeCache", 16384, 90), mHashedObjectStore(16384, 300), mSNTPClient(mAuxService), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mSweepTimer(mAuxService) diff --git a/src/Application.h b/src/Application.h index 288f505bc..aeca10973 100644 --- a/src/Application.h +++ b/src/Application.h @@ -95,7 +95,11 @@ public: HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; } ValidationCollection& getValidations() { return mValidations; } JobQueue& getJobQueue() { return mJobQueue; } + SuppressionTable& getSuppression() { return mSuppressions; } + bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } + bool isNew(const uint256& s, uint64 p) { return mSuppressions.addSuppressionPeer(s, p); } + bool isNewFlag(const uint256& s, int f) { return mSuppressions.setFlag(s, f); } bool running() { return mTxnDB != NULL; } bool getSystemTimeOffset(int& offset) { return mSNTPClient.getOffset(offset); } diff --git a/src/CallRPC.cpp b/src/CallRPC.cpp index 50896b858..a8337198e 100644 --- a/src/CallRPC.cpp +++ b/src/CallRPC.cpp @@ -36,7 +36,7 @@ std::string EncodeBase64(const std::string& s) BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); bmem = BIO_new(BIO_s_mem()); b64 = BIO_push(b64, bmem); - BIO_write(b64, s.c_str(), s.size()); + BIO_write(b64, s.data(), s.size()); (void) BIO_flush(b64); BIO_get_mem_ptr(b64, &bptr); diff --git a/src/ConnectionPool.cpp b/src/ConnectionPool.cpp index 79f0c82a8..6b01a1439 100644 --- a/src/ConnectionPool.cpp +++ b/src/ConnectionPool.cpp @@ -251,22 +251,30 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m return sentTo; } -int ConnectionPool::relayMessage(const std::set& fromPeers, const PackedMessage::pointer& msg) -{ - int sentTo = 0; +void ConnectionPool::relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg) +{ // Relay message to all but the specified peers boost::mutex::scoped_lock sl(mPeerLock); BOOST_FOREACH(naPeer pair, mConnectedMap) { Peer::ref peer = pair.second; if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0)) - { - ++sentTo; peer->sendPacket(msg); - } } - return sentTo; +} + +void ConnectionPool::relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg) +{ // Relay message to the specified peers + boost::mutex::scoped_lock sl(mPeerLock); + + BOOST_FOREACH(naPeer pair, mConnectedMap) + { + Peer::ref peer = pair.second; + if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) > 0)) + peer->sendPacket(msg); + } + } // Schedule a connection via scanning. @@ -305,7 +313,7 @@ Peer::pointer ConnectionPool::peerConnect(const std::string& strIp, int iPort) if ((it = mIpMap.find(pipPeer)) == mIpMap.end()) { - Peer::pointer ppNew(Peer::create(theApp->getIOService(), mCtx)); + Peer::pointer ppNew(Peer::create(theApp->getIOService(), mCtx, ++mLastPeer)); // Did not find it. Not already connecting or connected. ppNew->connect(strIp, iPort); diff --git a/src/ConnectionPool.h b/src/ConnectionPool.h index f230bd0ec..e3801ef73 100644 --- a/src/ConnectionPool.h +++ b/src/ConnectionPool.h @@ -62,7 +62,8 @@ public: // Send message to network. int relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg); - int relayMessage(const std::set& fromPeers, const PackedMessage::pointer& msg); + void relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg); + void relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg); // Manual connection request. // Queue for immediate scanning. diff --git a/src/DeterministicKeys.cpp b/src/DeterministicKeys.cpp index 1f1e1a0c9..622c98598 100644 --- a/src/DeterministicKeys.cpp +++ b/src/DeterministicKeys.cpp @@ -19,7 +19,7 @@ uint128 CKey::PassPhraseToKey(const std::string& passPhrase) { Serializer s; - s.addRaw(passPhrase.c_str(), passPhrase.size()); + s.addRaw(passPhrase); uint256 hash256 = s.getSHA512Half(); uint128 ret(hash256); diff --git a/src/HashedObject.cpp b/src/HashedObject.cpp index bff9c34c6..9fc95a825 100644 --- a/src/HashedObject.cpp +++ b/src/HashedObject.cpp @@ -12,7 +12,7 @@ SETUP_LOG(); DECLARE_INSTANCE(HashedObject); HashedObjectStore::HashedObjectStore(int cacheSize, int cacheAge) : - mCache(cacheSize, cacheAge), mWritePending(false) + mCache("HashedObjectStore", cacheSize, cacheAge), mWritePending(false) { mWriteSet.reserve(128); } diff --git a/src/Ledger.cpp b/src/Ledger.cpp index 69807417e..ec44a5dc9 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -1073,7 +1073,7 @@ int Ledger::getPendingSaves() void Ledger::pendSave(bool fromConsensus) { - if (!fromConsensus && !theApp->isNew(getHash())) + if (!fromConsensus && !theApp->isNewFlag(getHash(), SF_SAVED)) return; boost::thread thread(boost::bind(&Ledger::saveAcceptedLedger, shared_from_this(), fromConsensus)); diff --git a/src/LedgerAcquire.cpp b/src/LedgerAcquire.cpp index ca26c885c..a9c4cf8ca 100644 --- a/src/LedgerAcquire.cpp +++ b/src/LedgerAcquire.cpp @@ -11,6 +11,7 @@ #include "HashPrefixes.h" SETUP_LOG(); +DECLARE_INSTANCE(PeerSet); #define LA_DEBUG #define LEDGER_ACQUIRE_TIMEOUT 750 diff --git a/src/LedgerAcquire.h b/src/LedgerAcquire.h index 6044d1047..2dcf01885 100644 --- a/src/LedgerAcquire.h +++ b/src/LedgerAcquire.h @@ -14,9 +14,12 @@ #include "Ledger.h" #include "Peer.h" #include "TaggedCache.h" +#include "InstanceCounter.h" #include "../obj/src/ripple.pb.h" -class PeerSet +DEFINE_INSTANCE(PeerSet); + +class PeerSet : private IS_INSTANCE(PeerSet) { protected: uint256 mHash; diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index beeaaa400..15e439784 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -24,6 +24,7 @@ typedef std::pair u160_prop_pair; typedef std::pair u256_lct_pair; SETUP_LOG(); +DECLARE_INSTANCE(LedgerConsensus); TransactionAcquire::TransactionAcquire(const uint256& hash) : PeerSet(hash, TX_ACQUIRE_TIMEOUT), mHaveRoot(false) { @@ -845,7 +846,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec txn->setVote(pit.first, cit->second->hasItem(txID)); } - if (!ourVote && theApp->isNew(txID)) + if (!ourVote && theApp->isNewFlag(txID, SF_RELAYED)) { ripple::TMTransaction msg; msg.set_rawtransaction(&(tx.front()), tx.size()); diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 38a2dde9f..6eae25750 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -17,6 +17,9 @@ #include "Peer.h" #include "CanonicalTXSet.h" #include "TransactionEngine.h" +#include "InstanceCounter.h" + +DEFINE_INSTANCE(LedgerConsensus); class TransactionAcquire : public PeerSet, public boost::enable_shared_from_this { // A transaction set we are trying to acquire @@ -78,7 +81,7 @@ enum LCState lcsACCEPTED, // We have accepted/validated a new last closed ledger }; -class LedgerConsensus : public boost::enable_shared_from_this +class LedgerConsensus : public boost::enable_shared_from_this, IS_INSTANCE(LedgerConsensus) { protected: LCState mState; diff --git a/src/LedgerEntrySet.cpp b/src/LedgerEntrySet.cpp index 3134f1bd0..3224f07eb 100644 --- a/src/LedgerEntrySet.cpp +++ b/src/LedgerEntrySet.cpp @@ -7,6 +7,8 @@ #include "Log.h" SETUP_LOG(); +DECLARE_INSTANCE(LedgerEntrySetEntry); +DECLARE_INSTANCE(LedgerEntrySet) // #define META_DEBUG diff --git a/src/LedgerEntrySet.h b/src/LedgerEntrySet.h index 1554ab89e..51c61b269 100644 --- a/src/LedgerEntrySet.h +++ b/src/LedgerEntrySet.h @@ -7,6 +7,10 @@ #include "TransactionMeta.h" #include "Ledger.h" #include "TransactionErr.h" +#include "InstanceCounter.h" + +DEFINE_INSTANCE(LedgerEntrySetEntry); +DEFINE_INSTANCE(LedgerEntrySet); enum LedgerEntryAction { @@ -17,7 +21,7 @@ enum LedgerEntryAction taaCREATE, // Newly created. }; -class LedgerEntrySetEntry +class LedgerEntrySetEntry : private IS_INSTANCE(LedgerEntrySetEntry) { public: SLE::pointer mEntry; @@ -28,7 +32,7 @@ public: }; -class LedgerEntrySet +class LedgerEntrySet : private IS_INSTANCE(LedgerEntrySet) { protected: Ledger::pointer mLedger; diff --git a/src/LedgerHistory.cpp b/src/LedgerHistory.cpp index 854fc011f..0c084d2f7 100644 --- a/src/LedgerHistory.cpp +++ b/src/LedgerHistory.cpp @@ -10,16 +10,16 @@ #include "Application.h" #ifndef CACHED_LEDGER_NUM -#define CACHED_LEDGER_NUM 512 +#define CACHED_LEDGER_NUM 128 #endif #ifndef CACHED_LEDGER_AGE #define CACHED_LEDGER_AGE 900 #endif -// FIXME: Need to clean up ledgers by index, probably should switch to just mapping sequence to hash +// FIXME: Need to clean up ledgers by index at some point -LedgerHistory::LedgerHistory() : mLedgersByHash(CACHED_LEDGER_NUM, CACHED_LEDGER_AGE) +LedgerHistory::LedgerHistory() : mLedgersByHash("LedgerCache", CACHED_LEDGER_NUM, CACHED_LEDGER_AGE) { ; } void LedgerHistory::addLedger(Ledger::pointer ledger) @@ -36,7 +36,7 @@ void LedgerHistory::addAcceptedLedger(Ledger::pointer ledger, bool fromConsensus assert(ledger); assert(ledger->isAccepted()); assert(ledger->isImmutable()); - mLedgersByIndex.insert(std::make_pair(ledger->getLedgerSeq(), ledger)); + mLedgersByIndex[ledger->getLedgerSeq()] = ledger->getHash(); ledger->pendSave(fromConsensus); } @@ -44,9 +44,13 @@ void LedgerHistory::addAcceptedLedger(Ledger::pointer ledger, bool fromConsensus Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index) { boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex()); - std::map::iterator it(mLedgersByIndex.find(index)); + std::map::iterator it(mLedgersByIndex.find(index)); if (it != mLedgersByIndex.end()) - return it->second; + { + uint256 hash = it->second; + sl.unlock(); + return getLedgerByHash(hash); + } sl.unlock(); Ledger::pointer ret(Ledger::loadByIndex(index)); @@ -56,8 +60,8 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index) sl.lock(); mLedgersByHash.canonicalize(ret->getHash(), ret); - mLedgersByIndex.insert(std::make_pair(index, ret)); - return ret; + mLedgersByIndex[ret->getLedgerSeq()] = ret->getHash(); + return (ret->getLedgerSeq() == index) ? ret : Ledger::pointer(); } Ledger::pointer LedgerHistory::getLedgerByHash(const uint256& hash) @@ -91,7 +95,7 @@ Ledger::pointer LedgerHistory::canonicalizeLedger(Ledger::pointer ledger, bool s boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex()); mLedgersByHash.canonicalize(h, ledger); if (ledger->isAccepted()) - mLedgersByIndex[ledger->getLedgerSeq()] = ledger; + mLedgersByIndex[ledger->getLedgerSeq()] = ledger->getHash(); return ledger; } diff --git a/src/LedgerHistory.h b/src/LedgerHistory.h index 51ecaf714..1709e5954 100644 --- a/src/LedgerHistory.h +++ b/src/LedgerHistory.h @@ -7,7 +7,7 @@ class LedgerHistory { TaggedCache mLedgersByHash; - std::map mLedgersByIndex; // accepted ledgers + std::map mLedgersByIndex; // accepted ledgers public: LedgerHistory(); diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index 088a717f2..966570be6 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -112,7 +112,7 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t return tpTransNew; } -Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, Peer* source) +Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) { Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true); if (dbtx) return dbtx; @@ -160,27 +160,28 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, trans->setStatus(INCLUDED); theApp->getMasterTransaction().canonicalize(trans, true); -// FIXME: Need code to get all accounts affected by a transaction and re-synch -// any of them that affect local accounts cached in memory. Or, we need to -// no cache the account balance information and always get it from the current ledger -// theApp->getWallet().applyTransaction(trans); + std::set peers; + if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) + { + ripple::TMTransaction tx; + Serializer s; + trans->getSTransaction()->add(s); + tx.set_rawtransaction(&s.getData().front(), s.getLength()); + tx.set_status(ripple::tsCURRENT); + tx.set_receivetimestamp(getNetworkTimeNC()); - ripple::TMTransaction tx; - Serializer s; - trans->getSTransaction()->add(s); - tx.set_rawtransaction(&s.getData().front(), s.getLength()); - tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); - - PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - int sentTo = theApp->getConnectionPool().relayMessage(source, packet); - cLog(lsINFO) << "Transaction relayed to " << sentTo << " node(s)"; + PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); + theApp->getConnectionPool().relayMessageBut(peers, packet); + } return trans; } cLog(lsDEBUG) << "Status other than success " << r; - if ((mMode != omFULL) && (mMode != omTRACKING) && (theApp->isNew(trans->getID()))) + std::set peers; + + if ((mMode != omFULL) && (mMode != omTRACKING) && + theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) { ripple::TMTransaction tx; Serializer s; @@ -189,7 +190,7 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, tx.set_status(ripple::tsCURRENT); tx.set_receivetimestamp(getNetworkTimeNC()); PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessage(source, packet); + theApp->getConnectionPool().relayMessageTo(peers, packet); } trans->setStatus(INVALID); @@ -694,50 +695,38 @@ bool NetworkOPs::haveConsensusObject() } // <-- bool: true to relay -bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, - uint32 closeTime, const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic) +bool NetworkOPs::recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, + const uint256& prevLedger, uint32 closeTime, const std::string& signature, + const RippleAddress& nodePublic) { // JED: does mConsensus need to be locked? // XXX Validate key. // XXX Take a vuc for pubkey. - // Get a preliminary hash to use to suppress duplicates - Serializer s(256); - s.add256(proposeHash); - s.add256(prevLedger); - s.add32(proposeSeq); - s.add32(closeTime); - s.addRaw(pubKey); - s.addRaw(signature); - if (!theApp->isNew(s.getSHA512Half())) - return false; - - RippleAddress naPeerPublic = RippleAddress::createNodePublic(strCopy(pubKey)); - if (!haveConsensusObject()) { cLog(lsINFO) << "Received proposal outside consensus window"; return mMode != omFULL; } - if (mConsensus->isOurPubKey(naPeerPublic)) + if (mConsensus->isOurPubKey(nodePublic)) { cLog(lsTRACE) << "Received our own validation"; return false; } // Is this node on our UNL? - if (!theApp->getUNL().nodeInUNL(naPeerPublic)) + if (!theApp->getUNL().nodeInUNL(nodePublic)) { - cLog(lsINFO) << "Untrusted proposal: " << naPeerPublic.humanNodePublic() << " " << proposeHash; + cLog(lsINFO) << "Untrusted proposal: " << nodePublic.humanNodePublic() << " " << proposeHash; return true; } if (prevLedger.isNonZero()) { // proposal includes a previous ledger LedgerProposal::pointer proposal = - boost::make_shared(prevLedger, proposeSeq, proposeHash, closeTime, naPeerPublic); + boost::make_shared(prevLedger, proposeSeq, proposeHash, closeTime, nodePublic); if (!proposal->checkSign(signature)) { cLog(lsWARNING) << "New-style ledger proposal fails signature check"; @@ -750,7 +739,7 @@ bool NetworkOPs::recvPropose(uint32 proposeSeq, const uint256& proposeHash, cons } LedgerProposal::pointer proposal = - boost::make_shared(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, naPeerPublic); + boost::make_shared(mConsensus->getLCL(), proposeSeq, proposeHash, closeTime, nodePublic); if (!proposal->checkSign(signature)) { // Note that if the LCL is different, the signature check will fail cLog(lsWARNING) << "Ledger proposal fails signature check"; diff --git a/src/NetworkOPs.h b/src/NetworkOPs.h index 5c54c3a16..cd5243b26 100644 --- a/src/NetworkOPs.h +++ b/src/NetworkOPs.h @@ -121,7 +121,7 @@ public: // Transaction::pointer submitTransaction(const Transaction::pointer& tpTrans); - Transaction::pointer processTransaction(Transaction::pointer transaction, Peer* source = NULL); + Transaction::pointer processTransaction(Transaction::pointer transaction); Transaction::pointer findTransactionByID(const uint256& transactionID); int findTransactionsBySource(const uint256& uLedger, std::list&, const RippleAddress& sourceAccount, uint32 minSeq, uint32 maxSeq); @@ -168,8 +168,8 @@ public: const std::vector& myNode, std::list< std::vector >& newNodes); // ledger proposal/close functions - bool recvPropose(uint32 proposeSeq, const uint256& proposeHash, const uint256& prevLedger, uint32 closeTime, - const std::string& pubKey, const std::string& signature, const RippleAddress& nodePublic); + bool recvPropose(const uint256& suppression, uint32 proposeSeq, const uint256& proposeHash, + const uint256& prevLedger, uint32 closeTime, const std::string& signature, const RippleAddress& nodePublic); bool gotTXData(const boost::shared_ptr& peer, const uint256& hash, const std::list& nodeIDs, const std::list< std::vector >& nodeData); bool recvValidation(const SerializedValidation::pointer& val); diff --git a/src/Peer.cpp b/src/Peer.cpp index 3b8a10f88..83f8f62ad 100644 --- a/src/Peer.cpp +++ b/src/Peer.cpp @@ -24,14 +24,14 @@ DECLARE_INSTANCE(Peer); // Node has this long to verify its identity from connection accepted or connection attempt. #define NODE_VERIFY_SECONDS 15 -Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) : +Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerID) : mHelloed(false), mDetaching(false), + mPeerId(peerID), mSocketSsl(io_service, ctx), mVerifyTimer(io_service) { cLog(lsDEBUG) << "CREATING PEER: " << ADDRESS(this); - mPeerId = theApp->getConnectionPool().assignPeerId(); } void Peer::handle_write(const boost::system::error_code& error, size_t bytes_transferred) @@ -708,8 +708,12 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) SerializerIterator sit(s); SerializedTransaction::pointer stx = boost::make_shared(boost::ref(sit)); + if (!theApp->isNew(stx->getTransactionID(), mPeerId)) + return; + tx = boost::make_shared(stx, true); - if (tx->getStatus() == INVALID) throw(0); + if (tx->getStatus() == INVALID) + throw(0); #ifndef TRUST_NETWORK } catch (...) @@ -723,7 +727,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) } #endif - tx = theApp->getOPs().processTransaction(tx, this); + tx = theApp->getOPs().processTransaction(tx); if(tx->getStatus() != INCLUDED) { // transaction wasn't accepted into ledger @@ -736,7 +740,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) void Peer::recvPropose(ripple::TMProposeSet& packet) { if ((packet.currenttxhash().size() != 32) || (packet.nodepubkey().size() < 28) || - (packet.signature().size() < 56)) + (packet.signature().size() < 56) || (packet.nodepubkey().size() > 128) || (packet.signature().size() > 128)) { cLog(lsWARNING) << "Received proposal is malformed"; return; @@ -748,8 +752,23 @@ void Peer::recvPropose(ripple::TMProposeSet& packet) if ((packet.has_previousledger()) && (packet.previousledger().size() == 32)) memcpy(prevLedger.begin(), packet.previousledger().data(), 32); - if(theApp->getOPs().recvPropose(packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(), - packet.nodepubkey(), packet.signature(), mNodePublic)) + Serializer s(512); + s.add256(currentTxHash); + s.add256(prevLedger); + s.add32(packet.proposeseq()); + s.add32(packet.closetime()); + s.addVL(packet.nodepubkey()); + s.addVL(packet.signature()); + uint256 suppression = s.getSHA512Half(); + + if (!theApp->isNew(suppression, mPeerId)) + return; + + RippleAddress nodePublic = RippleAddress::createNodePublic(strCopy(packet.nodepubkey())); +// bool isTrusted = theApp->getUNL().nodeInUNL(nodePublic); + + if(theApp->getOPs().recvPropose(suppression, packet.proposeseq(), currentTxHash, prevLedger, packet.closetime(), + packet.signature(), nodePublic)) { // FIXME: Not all nodes will want proposals PackedMessage::pointer message = boost::make_shared(packet, ripple::mtPROPOSE_LEDGER); theApp->getConnectionPool().relayMessage(this, message); @@ -788,11 +807,11 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig return; } - if (theApp->getOPs().recvValidation(val)) + std::set peers; + if (theApp->getOPs().recvValidation(val) && theApp->getSuppression().swapSet(signingHash, peers, SF_RELAYED)) { - Peer::pointer pp = peer.lock(); PackedMessage::pointer message = boost::make_shared(*packet, ripple::mtVALIDATION); - theApp->getConnectionPool().relayMessage(pp ? pp.get() : NULL, message); + theApp->getConnectionPool().relayMessageBut(peers, message); } } #ifndef TRUST_NETWORK @@ -822,7 +841,7 @@ void Peer::recvValidation(const boost::shared_ptr& packet) SerializedValidation::pointer val = boost::make_shared(boost::ref(sit), false); uint256 signingHash = val->getSigningHash(); - if (!theApp->isNew(signingHash)) + if (!theApp->isNew(signingHash, mPeerId)) { cLog(lsTRACE) << "Validation is duplicate"; return; @@ -831,7 +850,7 @@ void Peer::recvValidation(const boost::shared_ptr& packet) bool isTrusted = theApp->getUNL().nodeInUNL(val->getSignerPublic()); theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, boost::bind(&checkValidation, _1, val, signingHash, isTrusted, packet, - boost::weak_ptr(shared_from_this()))); + boost::weak_ptr(shared_from_this()))); } #ifndef TRUST_NETWORK catch (...) diff --git a/src/Peer.h b/src/Peer.h index 49857e3e6..256319c83 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "../obj/src/ripple.pb.h" @@ -72,7 +73,7 @@ protected: ripple::TMStatusChange mLastStatus; ripple::TMHello mHello; - Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx); + Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerId); void handleShutdown(const boost::system::error_code& error) { ; } static void sHandleShutdown(Peer::ref ptr, const boost::system::error_code& error) @@ -132,9 +133,9 @@ public: void setIpPort(const std::string& strIP, int iPort); - static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx) + static pointer create(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 id) { - return pointer(new Peer(io_service, ctx)); + return pointer(new Peer(io_service, ctx, id)); } boost::asio::ssl::stream::lowest_layer_type& getSocket() diff --git a/src/PeerDoor.cpp b/src/PeerDoor.cpp index 354481672..a3740846b 100644 --- a/src/PeerDoor.cpp +++ b/src/PeerDoor.cpp @@ -39,7 +39,8 @@ PeerDoor::PeerDoor(boost::asio::io_service& io_service) : void PeerDoor::startListening() { - Peer::pointer new_connection = Peer::create(mAcceptor.get_io_service(), mCtx); + Peer::pointer new_connection = Peer::create(mAcceptor.get_io_service(), mCtx, + theApp->getConnectionPool().assignPeerId()); mAcceptor.async_accept(new_connection->getSocket(), boost::bind(&PeerDoor::handleConnect, this, new_connection, diff --git a/src/RPCHandler.cpp b/src/RPCHandler.cpp index 36fcfdffd..b4a22162b 100644 --- a/src/RPCHandler.cpp +++ b/src/RPCHandler.cpp @@ -411,7 +411,7 @@ Json::Value RPCHandler::doAccountEmailSet(const Json::Value ¶ms) boost::to_lower(strEmail); std::vector vucMD5(128/8, 0); - MD5(reinterpret_cast(strEmail.c_str()), strEmail.size(), &vucMD5.front()); + MD5(reinterpret_cast(strEmail.data()), strEmail.size(), &vucMD5.front()); uint128 uEmailHash(vucMD5); std::vector vucDomain; @@ -2607,7 +2607,7 @@ Json::Value RPCHandler::doLogin(const Json::Value& params) Json::Value RPCHandler::doGetCounts(const Json::Value& params) { - int minCount = 1; + int minCount = 10; if (params.size() > 0) minCount = params[0u].asInt(); diff --git a/src/Serializer.cpp b/src/Serializer.cpp index 8b67d3b65..016c6e2a6 100644 --- a/src/Serializer.cpp +++ b/src/Serializer.cpp @@ -305,7 +305,7 @@ uint256 Serializer::getSHA512Half(const unsigned char *data, int len) uint256 Serializer::getSHA512Half(const std::string& strData) { - return getSHA512Half(reinterpret_cast(strData.c_str()), strData.size()); + return getSHA512Half(reinterpret_cast(strData.data()), strData.size()); } uint256 Serializer::getPrefixHash(uint32 prefix, const unsigned char *data, int len) @@ -367,7 +367,16 @@ int Serializer::addVL(const std::vector& vector) int Serializer::addVL(const void *ptr, int len) { int ret = addRaw(encodeVL(len)); - if (len) addRaw(ptr, len); + if (len) + addRaw(ptr, len); + return ret; +} + +int Serializer::addVL(const std::string& string) +{ + int ret = addRaw(string.size()); + if (!string.empty()) + addRaw(string.data(), string.size()); return ret; } diff --git a/src/Serializer.h b/src/Serializer.h index fc1f1df49..7b7b67fe2 100644 --- a/src/Serializer.h +++ b/src/Serializer.h @@ -44,6 +44,7 @@ public: int addZeros(size_t uBytes); int addVL(const std::vector &vector); + int addVL(const std::string& string); int addVL(const void *ptr, int len); int addTaggedList(const std::list&); int addTaggedList(const std::vector&); @@ -87,7 +88,7 @@ public: static uint256 getPrefixHash(uint32 prefix, const std::vector& data) { return getPrefixHash(prefix, &(data.front()), data.size()); } static uint256 getPrefixHash(uint32 prefix, const std::string& strData) - { return getPrefixHash(prefix, reinterpret_cast(strData.c_str()), strData.size()); } + { return getPrefixHash(prefix, reinterpret_cast(strData.data()), strData.size()); } // totality functions const std::vector& peekData() const { return mData; } diff --git a/src/Suppression.cpp b/src/Suppression.cpp index 2576a2aed..ad96d4b15 100644 --- a/src/Suppression.cpp +++ b/src/Suppression.cpp @@ -66,3 +66,35 @@ bool SuppressionTable::addSuppressionFlags(const uint256& index, int flag) findCreateEntry(index, created).setFlag(flag); return created; } + +bool SuppressionTable::setFlag(const uint256& index, int flag) +{ // return: true = changed, false = unchanged + assert(flag != 0); + + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + Suppression &s = findCreateEntry(index, created); + + if ((s.getFlags() & flag) == flag) + return false; + + s.setFlag(flag); + return true; +} + +bool SuppressionTable::swapSet(const uint256& index, std::set& peers, int flag) +{ + boost::mutex::scoped_lock sl(mSuppressionMutex); + + bool created; + Suppression &s = findCreateEntry(index, created); + + if ((s.getFlags() & flag) == flag) + return false; + + s.swapSet(peers); + s.setFlag(flag); + + return true; +} \ No newline at end of file diff --git a/src/Suppression.h b/src/Suppression.h index 18abb3a98..14b611ede 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -14,6 +14,11 @@ DEFINE_INSTANCE(Suppression); +#define SF_RELAYED 0x01 +#define SF_SIGBAD 0x02 +#define SF_SIGGOOD 0x04 +#define SF_SAVED 0x08 + class Suppression : private IS_INSTANCE(Suppression) { protected: @@ -27,9 +32,11 @@ public: void addPeer(uint64 peer) { mPeers.insert(peer); } bool hasPeer(uint64 peer) { return mPeers.count(peer) > 0; } + int getFlags(void) { return mFlags; } bool hasFlag(int f) { return (mFlags & f) != 0; } void setFlag(int f) { mFlags |= f; } void clearFlag(int f) { mFlags &= ~f; } + void swapSet(std::set& s) { mPeers.swap(s); } }; class SuppressionTable @@ -55,8 +62,11 @@ public: bool addSuppressionPeer(const uint256& index, uint64 peer); bool addSuppressionFlags(const uint256& index, int flag); + bool setFlag(const uint256& index, int flag); Suppression getEntry(const uint256&); + + bool swapSet(const uint256& index, std::set& peers, int flag); }; #endif diff --git a/src/TaggedCache.h b/src/TaggedCache.h index fe7e03d24..f7a0781a1 100644 --- a/src/TaggedCache.h +++ b/src/TaggedCache.h @@ -1,12 +1,17 @@ #ifndef __TAGGEDCACHE__ #define __TAGGEDCACHE__ +#include + #include #include #include #include #include +#include "Log.h" +extern LogPartition TaggedCachePartition; + // This class implemented a cache and a map. The cache keeps objects alive // in the map. The map allows multiple code paths that reference objects // with the same tag to get the same actual object. @@ -30,6 +35,7 @@ public: protected: mutable boost::recursive_mutex mLock; + std::string mName; int mTargetSize, mTargetAge; boost::unordered_map mCache; // Hold strong reference to recent objects @@ -38,7 +44,8 @@ protected: boost::unordered_map mMap; // Track stored objects public: - TaggedCache(int size, int age) : mTargetSize(size), mTargetAge(age), mLastSweep(time(NULL)) { ; } + TaggedCache(const char *name, int size, int age) + : mName(name), mTargetSize(size), mTargetAge(age), mLastSweep(time(NULL)) { ; } int getTargetSize() const; int getTargetAge() const; @@ -89,35 +96,40 @@ template void TaggedCache::sweep { boost::recursive_mutex::scoped_lock sl(mLock); - if (mCache.size() < mTargetSize) - return; - - time_t now = time(NULL); - if ((mLastSweep + 10) < now) - return; - - mLastSweep = now; - time_t target = now - mTargetAge; + mLastSweep = time(NULL); + time_t target = mLastSweep - mTargetAge; // Pass 1, remove old objects from cache + int cacheRemovals = 0; typename boost::unordered_map::iterator cit = mCache.begin(); while (cit != mCache.end()) { if (cit->second.first < target) + { + ++cacheRemovals; mCache.erase(cit++); + } else ++cit; } // Pass 2, remove dead objects from map + int mapRemovals = 0; typename boost::unordered_map::iterator mit = mMap.begin(); while (mit != mMap.end()) { if (mit->second.expired()) + { + ++mapRemovals; mMap.erase(mit++); + } else ++mit; } + + if (TaggedCachePartition.doLog(lsTRACE) && (mapRemovals || cacheRemovals)) + Log(lsTRACE, TaggedCachePartition) << mName << ": cache = " << mCache.size() << "-" << cacheRemovals << + ", map = " << mMap.size() << "-" << mapRemovals; } template bool TaggedCache::touch(const key_type& key) diff --git a/src/TransactionEngine.cpp b/src/TransactionEngine.cpp index 04f4ecf09..0b29d4e85 100644 --- a/src/TransactionEngine.cpp +++ b/src/TransactionEngine.cpp @@ -15,6 +15,7 @@ #include "utils.h" SETUP_LOG(); +DECLARE_INSTANCE(TransactionEngine); void TransactionEngine::txnWrite() { diff --git a/src/TransactionEngine.h b/src/TransactionEngine.h index 70d464b7a..e16755d30 100644 --- a/src/TransactionEngine.h +++ b/src/TransactionEngine.h @@ -9,6 +9,9 @@ #include "SerializedLedger.h" #include "LedgerEntrySet.h" #include "TransactionErr.h" +#include "InstanceCounter.h" + +DEFINE_INSTANCE(TransactionEngine); // A TransactionEngine applies serialized transactions to a ledger // It can also, verify signatures, verify fees, and give rejection reasons @@ -29,7 +32,7 @@ enum TransactionEngineParams // One instance per ledger. // Only one transaction applied at a time. -class TransactionEngine +class TransactionEngine : private IS_INSTANCE(TransactionEngine) { private: LedgerEntrySet mNodes; diff --git a/src/TransactionMaster.cpp b/src/TransactionMaster.cpp index eabf17622..e260ccde2 100644 --- a/src/TransactionMaster.cpp +++ b/src/TransactionMaster.cpp @@ -13,7 +13,7 @@ #define CACHED_TRANSACTION_AGE 1800 #endif -TransactionMaster::TransactionMaster() : mCache(CACHED_TRANSACTION_NUM, CACHED_TRANSACTION_AGE) +TransactionMaster::TransactionMaster() : mCache("TransactionCache", CACHED_TRANSACTION_NUM, CACHED_TRANSACTION_AGE) { ; } diff --git a/src/rpc.cpp b/src/rpc.cpp index 15f53061c..2beec4c66 100644 --- a/src/rpc.cpp +++ b/src/rpc.cpp @@ -189,7 +189,7 @@ std::string DecodeBase64(std::string s) b64 = BIO_new(BIO_f_base64()); BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); - bmem = BIO_new_mem_buf(const_cast(s.c_str()), s.size()); + bmem = BIO_new_mem_buf(const_cast(s.data()), s.size()); bmem = BIO_push(b64, bmem); BIO_read(bmem, buffer, s.size()); BIO_free_all(bmem);