From 9b61a8372146d6c80e68521df37ffbbae984e85b Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 17 Jan 2014 20:32:58 -0500 Subject: [PATCH] Refactor, tidy up: * Fix for msvc std::function return types * Convert macros to constants * Add Journal to PeerSet and use in InboundLedger * Break lines and add annotations * Remove some warnings --- .../VisualStudio2012/RippleD.vcxproj.filters | 12 +- src/ripple/peerfinder/impl/Logic.h | 4 + src/ripple_app/consensus/LedgerConsensus.cpp | 3 +- src/ripple_app/ledger/InboundLedger.cpp | 373 ++++++++++++------ src/ripple_app/ledger/InboundLedger.h | 12 +- src/ripple_app/ledger/InboundLedgers.cpp | 2 +- src/ripple_app/peers/PeerSet.cpp | 27 +- src/ripple_app/peers/PeerSet.h | 13 +- src/ripple_app/tx/TransactionAcquire.cpp | 15 +- src/ripple_app/tx/TransactionAcquire.h | 2 +- src/ripple_core/functional/JobQueue.cpp | 3 +- src/ripple_core/functional/JobQueue.h | 9 +- 12 files changed, 315 insertions(+), 160 deletions(-) diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index d89508969..d4143c712 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -702,9 +702,6 @@ [2] Old Ripple\ripple_app\peers - - [2] Old Ripple\ripple_app\peers - [2] Old Ripple\ripple_app\peers @@ -1437,6 +1434,9 @@ [2] Old Ripple\ripple_core\nodestore\impl + + [2] Old Ripple\ripple_app\peers + @@ -1895,9 +1895,6 @@ [2] Old Ripple\ripple_app\peers - - [2] Old Ripple\ripple_app\peers - [2] Old Ripple\ripple_app\peers @@ -2916,6 +2913,9 @@ [2] Old Ripple\ripple_core\nodestore\impl + + [2] Old Ripple\ripple_app\peers + diff --git a/src/ripple/peerfinder/impl/Logic.h b/src/ripple/peerfinder/impl/Logic.h index 13f1d8526..9d7207d84 100644 --- a/src/ripple/peerfinder/impl/Logic.h +++ b/src/ripple/peerfinder/impl/Logic.h @@ -501,7 +501,9 @@ public: // void sendEndpoints (PeerInfo const& peer, Giveaways &giveaway) { +#if 0 typedef std::vector List; +#endif std::vector endpoints; // Add us to the list if we want incoming @@ -754,7 +756,9 @@ public: if (! results.error) { std::size_t newEntries (0); +#if 0 DiscreteTime now (get_now()); +#endif for (std::vector ::const_iterator iter (results.list.begin()); iter != results.list.end(); ++iter) diff --git a/src/ripple_app/consensus/LedgerConsensus.cpp b/src/ripple_app/consensus/LedgerConsensus.cpp index 78d6863a9..c7b015fd0 100644 --- a/src/ripple_app/consensus/LedgerConsensus.cpp +++ b/src/ripple_app/consensus/LedgerConsensus.cpp @@ -281,8 +281,7 @@ public: return empty; } - acquiring = boost::make_shared ( - std::ref (m_clock), hash); + acquiring = boost::make_shared (hash, std::ref (m_clock)); startAcquiring (acquiring); } } diff --git a/src/ripple_app/ledger/InboundLedger.cpp b/src/ripple_app/ledger/InboundLedger.cpp index c541a8b6e..7020f4cf4 100644 --- a/src/ripple_app/ledger/InboundLedger.cpp +++ b/src/ripple_app/ledger/InboundLedger.cpp @@ -17,16 +17,25 @@ */ //============================================================================== -SETUP_LOG (InboundLedger) +//SETUP_LOG (InboundLedger) +template <> char const* LogPartition::getPartitionName () { return "InLedger"; } -// VFALCO TODO replace macros -#define LA_DEBUG -#define LEDGER_ACQUIRE_TIMEOUT 6000 // millisecond for each ledger timeout -#define LEDGER_TIMEOUT_COUNT 10 // how many timeouts before we giveup -#define LEDGER_TIMEOUT_AGGRESSIVE 6 // how many timeouts before we get aggressive +enum +{ + // millisecond for each ledger timeout + ledgerAcquireTimeoutMillis = 6000 -InboundLedger::InboundLedger (clock_type& clock, uint256 const& hash, uint32 seq) - : PeerSet (clock, hash, LEDGER_ACQUIRE_TIMEOUT, false) + // how many timeouts before we giveup + ,ledgerTimeoutRetriesMax = 10 + + // how many timeouts before we get aggressive + ,ledgerBecomeAggressiveThreshold = 6 +}; + +InboundLedger::InboundLedger (uint256 const& hash, uint32 seq, + clock_type& clock) + : PeerSet (hash, ledgerAcquireTimeoutMillis, false, clock, + LogPartition::getJournal ()) , mHaveBase (false) , mHaveState (false) , mHaveTransactions (false) @@ -36,7 +45,9 @@ InboundLedger::InboundLedger (clock_type& clock, uint256 const& hash, uint32 seq , mSeq (seq) , mReceiveDispatched (false) { - WriteLog (lsTRACE, InboundLedger) << "Acquiring ledger " << mHash; + + if (m_journal.trace) m_journal.trace << + "Acquiring ledger " << mHash; } bool InboundLedger::checkLocal () @@ -75,7 +86,8 @@ void InboundLedger::init(ScopedLockType& collectionLock, bool couldBeNew) } else if (!isFailed ()) { - WriteLog (lsDEBUG, InboundLedger) << "Acquiring ledger we already have locally: " << getHash (); + if (m_journal.debug) m_journal.debug << + "Acquiring ledger we already have locally: " << getHash (); mLedger->setClosed (); mLedger->setImmutable (); getApp ().getLedgerMaster ().storeLedger (mLedger); @@ -103,19 +115,23 @@ bool InboundLedger::tryLocal () if (!getApp().getOPs ().getFetchPack (mHash, data)) return false; - WriteLog (lsTRACE, InboundLedger) << "Ledger base found in fetch pack"; + if (m_journal.trace) m_journal.trace << + "Ledger base found in fetch pack"; mLedger = boost::make_shared (data, true); - getApp().getNodeStore ().store (hotLEDGER, mLedger->getLedgerSeq (), data, mHash); + getApp().getNodeStore ().store (hotLEDGER, + mLedger->getLedgerSeq (), data, mHash); } else { - mLedger = boost::make_shared (strCopy (node->getData ()), true); + mLedger = boost::make_shared ( + strCopy (node->getData ()), true); } if (mLedger->getHash () != mHash) { // We know for a fact the ledger can never be acquired - WriteLog (lsWARNING, InboundLedger) << mHash << " cannot be a ledger"; + if (m_journal.warning) m_journal.warning << + mHash << " cannot be a ledger"; mFailed = true; return true; } @@ -127,20 +143,24 @@ bool InboundLedger::tryLocal () { if (mLedger->getTransHash ().isZero ()) { - WriteLog (lsTRACE, InboundLedger) << "No TXNs to fetch"; + if (m_journal.trace) m_journal.trace << + "No TXNs to fetch"; mHaveTransactions = true; } else { TransactionStateSF filter (mLedger->getLedgerSeq ()); - if (mLedger->peekTransactionMap ()->fetchRoot (mLedger->getTransHash (), &filter)) + if (mLedger->peekTransactionMap ()->fetchRoot ( + mLedger->getTransHash (), &filter)) { - std::vector h = mLedger->getNeededTransactionHashes (1, &filter); + std::vector h (mLedger->getNeededTransactionHashes ( + 1, &filter)); if (h.empty ()) { - WriteLog (lsTRACE, InboundLedger) << "Had full txn map locally"; + if (m_journal.trace) m_journal.trace << + "Had full txn map locally"; mHaveTransactions = true; } } @@ -151,7 +171,8 @@ bool InboundLedger::tryLocal () { if (mLedger->getAccountHash ().isZero ()) { - WriteLog (lsFATAL, InboundLedger) << "We are acquiring a ledger with a zero account hash"; + if (m_journal.fatal) m_journal.fatal << + "We are acquiring a ledger with a zero account hash"; mFailed = true; return true; } @@ -159,13 +180,16 @@ bool InboundLedger::tryLocal () { AccountStateSF filter (mLedger->getLedgerSeq ()); - if (mLedger->peekAccountStateMap ()->fetchRoot (mLedger->getAccountHash (), &filter)) + if (mLedger->peekAccountStateMap ()->fetchRoot ( + mLedger->getAccountHash (), &filter)) { - std::vector h = mLedger->getNeededAccountStateHashes (1, &filter); + std::vector h (mLedger->getNeededAccountStateHashes ( + 1, &filter)); if (h.empty ()) { - WriteLog (lsTRACE, InboundLedger) << "Had full AS map locally"; + if (m_journal.trace) m_journal.trace << + "Had full AS map locally"; mHaveState = true; } } @@ -174,7 +198,8 @@ bool InboundLedger::tryLocal () if (mHaveTransactions && mHaveState) { - WriteLog (lsDEBUG, InboundLedger) << "Had everything locally"; + if (m_journal.debug) m_journal.debug << + "Had everything locally"; mComplete = true; mLedger->setClosed (); mLedger->setImmutable (); @@ -192,16 +217,23 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) if (isDone()) { - WriteLog (lsINFO, InboundLedger) << "Already done " << mHash; + if (m_journal.info) m_journal.info << + "Already done " << mHash; return; } - if (getTimeouts () > LEDGER_TIMEOUT_COUNT) + if (getTimeouts () > ledgerTimeoutRetriesMax) { if (mSeq != 0) - WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mSeq; + { + if (m_journal.warning) m_journal.warning << + getTimeouts() << " timeouts for ledger " << mSeq; + } else - WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mHash; + { + if (m_journal.warning) m_journal.warning << + getTimeouts() << " timeouts for ledger " << mHash; + } setFailed (); done (); return; @@ -214,7 +246,8 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) mAggressive = true; mByHash = true; int pc = getPeerCount (); - WriteLog (lsDEBUG, InboundLedger) << "No progress(" << pc << ") for ledger " << mHash; + if (m_journal.debug) m_journal.debug << + "No progress(" << pc << ") for ledger " << mHash; trigger (Peer::pointer ()); if (pc < 4) @@ -222,8 +255,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) } } -/** Add more peers to the set, if possible -*/ +/** Add more peers to the set, if possible */ void InboundLedger::addPeers () { std::vector peerList = getApp().getPeers ().getPeerVector (); @@ -233,7 +265,12 @@ void InboundLedger::addPeers () if (vSize == 0) return; - // We traverse the peer list in random order so as not to favor any particular peer + // We traverse the peer list in random order so as not to favor + // any particular peer + // + // VFALCO Use random_shuffle + // http://en.cppreference.com/w/cpp/algorithm/random_shuffle + // int firstPeer = rand () % vSize; int found = 0; @@ -258,14 +295,28 @@ void InboundLedger::addPeers () ++found; } if (mSeq != 0) - WriteLog (lsDEBUG, InboundLedger) << "Chose " << found << " peer(s) for ledger " << mSeq; + { + if (m_journal.debug) m_journal.debug << + "Chose " << found << " peer(s) for ledger " << mSeq; + } else - WriteLog (lsDEBUG, InboundLedger) << "Chose " << found << " peer(s) for ledger " << getHash ().GetHex(); + { + if (m_journal.debug) m_journal.debug << + "Chose " << found << " peer(s) for ledger " << + getHash ().GetHex(); + } } else if (mSeq != 0) - WriteLog (lsDEBUG, InboundLedger) << "Found " << found << " peer(s) with ledger " << mSeq; + { + if (m_journal.debug) m_journal.debug << + "Found " << found << " peer(s) with ledger " << mSeq; + } else - WriteLog (lsDEBUG, InboundLedger) << "Found " << found << " peer(s) with ledger " << getHash ().GetHex(); + { + if (m_journal.debug) m_journal.debug << + "Found " << found << " peer(s) with ledger " << + getHash ().GetHex(); + } } boost::weak_ptr InboundLedger::pmDowncast () @@ -297,7 +348,8 @@ void InboundLedger::done () mSignaled = true; touch (); - WriteLog (lsTRACE, InboundLedger) << "Done acquiring ledger " << mHash; + if (m_journal.trace) m_journal.trace << + "Done acquiring ledger " << mHash; assert (isComplete () || isFailed ()); @@ -318,10 +370,11 @@ void InboundLedger::done () // We hold the PeerSet lock, so must dispatch getApp().getJobQueue ().addJob (jtLEDGER_DATA, "triggers", - BIND_TYPE (LADispatch, P_1, shared_from_this (), triggers)); + BIND_TYPE (LADispatch, P_1, shared_from_this (), triggers)); } -bool InboundLedger::addOnComplete (std::function triggerFunc) +bool InboundLedger::addOnComplete ( + std::function triggerFunc) { ScopedLockType sl (mLock, __FILE__, __LINE__); @@ -340,22 +393,29 @@ void InboundLedger::trigger (Peer::ref peer) if (isDone ()) { - WriteLog (lsDEBUG, InboundLedger) << "Trigger on ledger: " << mHash << - (mAborted ? " aborted" : "") << (mComplete ? " completed" : "") << (mFailed ? " failed" : ""); + if (m_journal.debug) m_journal.debug << + "Trigger on ledger: " << mHash << (mAborted ? " aborted" : "") << + (mComplete ? " completed" : "") << (mFailed ? " failed" : ""); return; } - if (ShouldLog (lsTRACE, InboundLedger)) + if (m_journal.trace) { if (peer) - WriteLog (lsTRACE, InboundLedger) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP (); + m_journal.trace << + "Trigger acquiring ledger " << mHash << " from " << + peer->getIP (); else - WriteLog (lsTRACE, InboundLedger) << "Trigger acquiring ledger " << mHash; + m_journal.trace << + "Trigger acquiring ledger " << mHash; if (mComplete || mFailed) - WriteLog (lsTRACE, InboundLedger) << "complete=" << mComplete << " failed=" << mFailed; + m_journal.trace << + "complete=" << mComplete << " failed=" << mFailed; else - WriteLog (lsTRACE, InboundLedger) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState; + m_journal.trace << + "base=" << mHaveBase << " tx=" << mHaveTransactions << + " as=" << mHaveState; } if (!mHaveBase) @@ -364,7 +424,8 @@ void InboundLedger::trigger (Peer::ref peer) if (mFailed) { - WriteLog (lsWARNING, InboundLedger) << " failed local for " << mHash; + if (m_journal.warning) m_journal.warning << + " failed local for " << mHash; return; } } @@ -376,7 +437,8 @@ void InboundLedger::trigger (Peer::ref peer) { // Be more aggressive if we've timed out at least once tmGL.set_querytype (protocol::qtINDIRECT); - if (!isProgress () && !mFailed && mByHash && (getTimeouts () > LEDGER_TIMEOUT_AGGRESSIVE)) + if (!isProgress () && !mFailed && mByHash && ( + getTimeouts () > ledgerBecomeAggressiveThreshold)) { std::vector need = getNeededHashes (); @@ -388,7 +450,8 @@ void InboundLedger::trigger (Peer::ref peer) bool typeSet = false; BOOST_FOREACH (neededHash_t & p, need) { - WriteLog (lsWARNING, InboundLedger) << "Want: " << p.second; + if (m_journal.warning) m_journal.warning + << "Want: " << p.second; if (!typeSet) { @@ -402,14 +465,17 @@ void InboundLedger::trigger (Peer::ref peer) io->set_hash (p.second.begin (), p.second.size ()); } } - PackedMessage::pointer packet = boost::make_shared (tmBH, protocol::mtGET_OBJECTS); + + PackedMessage::pointer packet (boost::make_shared ( + tmBH, protocol::mtGET_OBJECTS)); { ScopedLockType sl (mLock, __FILE__, __LINE__); for (boost::unordered_map::iterator it = mPeers.begin (), end = mPeers.end (); it != end; ++it) { - Peer::pointer iPeer = getApp().getPeers ().getPeerById (it->first); + Peer::pointer iPeer ( + getApp().getPeers ().getPeerById (it->first)); if (iPeer) { @@ -418,11 +484,13 @@ void InboundLedger::trigger (Peer::ref peer) } } } - WriteLog (lsINFO, InboundLedger) << "Attempting by hash fetch for ledger " << mHash; + if (m_journal.info) m_journal.info << + "Attempting by hash fetch for ledger " << mHash; } else { - WriteLog (lsINFO, InboundLedger) << "getNeededHashes says acquire is complete"; + if (m_journal.info) m_journal.info << + "getNeededHashes says acquire is complete"; mHaveBase = true; mHaveTransactions = true; mHaveState = true; @@ -436,7 +504,8 @@ void InboundLedger::trigger (Peer::ref peer) if (!mHaveBase && !mFailed) { tmGL.set_itype (protocol::liBASE); - WriteLog (lsTRACE, InboundLedger) << "Sending base request to " << (peer ? "selected peer" : "all peers"); + if (m_journal.trace) m_journal.trace << + "Sending base request to " << (peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } @@ -455,7 +524,8 @@ void InboundLedger::trigger (Peer::ref peer) // we need the root node tmGL.set_itype (protocol::liAS_NODE); * (tmGL.add_nodeids ()) = SHAMapNode ().getRawString (); - WriteLog (lsTRACE, InboundLedger) << "Sending AS root request to " << (peer ? "selected peer" : "all peers"); + if (m_journal.trace) m_journal.trace << + "Sending AS root request to " << (peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } @@ -463,10 +533,12 @@ void InboundLedger::trigger (Peer::ref peer) { std::vector nodeIDs; std::vector nodeHashes; + // VFALCO Why 256? Make this a constant nodeIDs.reserve (256); nodeHashes.reserve (256); AccountStateSF filter (mSeq); - mLedger->peekAccountStateMap ()->getMissingNodes (nodeIDs, nodeHashes, 256, &filter); + mLedger->peekAccountStateMap ()->getMissingNodes ( + nodeIDs, nodeHashes, 256, &filter); if (nodeIDs.empty ()) { @@ -482,8 +554,10 @@ void InboundLedger::trigger (Peer::ref peer) } else { + // VFALCO Why 128? Make this a constant if (!mAggressive) - filterNodes (nodeIDs, nodeHashes, mRecentASNodes, 128, !isProgress ()); + filterNodes (nodeIDs, nodeHashes, mRecentASNodes, + 128, !isProgress ()); if (!nodeIDs.empty ()) { @@ -492,14 +566,18 @@ void InboundLedger::trigger (Peer::ref peer) { * (tmGL.add_nodeids ()) = it.getRawString (); } - WriteLog (lsTRACE, InboundLedger) << "Sending AS node " << nodeIDs.size () - << " request to " << (peer ? "selected peer" : "all peers"); - CondLog (nodeIDs.size () == 1, lsTRACE, InboundLedger) << "AS node: " << nodeIDs[0]; + if (m_journal.trace) m_journal.trace << + "Sending AS node " << nodeIDs.size () << + " request to " << ( + peer ? "selected peer" : "all peers"); + if (nodeIDs.size () == 1 && m_journal.trace) m_journal.trace << + "AS node: " << nodeIDs[0]; sendRequest (tmGL, peer); return; } else - WriteLog (lsTRACE, InboundLedger) << "All AS nodes filtered"; + if (m_journal.trace) m_journal.trace << + "All AS nodes filtered"; } } } @@ -513,7 +591,9 @@ void InboundLedger::trigger (Peer::ref peer) // we need the root node tmGL.set_itype (protocol::liTX_NODE); * (tmGL.add_nodeids ()) = SHAMapNode ().getRawString (); - WriteLog (lsTRACE, InboundLedger) << "Sending TX root request to " << (peer ? "selected peer" : "all peers"); + if (m_journal.trace) m_journal.trace << + "Sending TX root request to " << ( + peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } @@ -524,7 +604,8 @@ void InboundLedger::trigger (Peer::ref peer) nodeIDs.reserve (256); nodeHashes.reserve (256); TransactionStateSF filter (mSeq); - mLedger->peekTransactionMap ()->getMissingNodes (nodeIDs, nodeHashes, 256, &filter); + mLedger->peekTransactionMap ()->getMissingNodes ( + nodeIDs, nodeHashes, 256, &filter); if (nodeIDs.empty ()) { @@ -541,7 +622,8 @@ void InboundLedger::trigger (Peer::ref peer) else { if (!mAggressive) - filterNodes (nodeIDs, nodeHashes, mRecentTXNodes, 128, !isProgress ()); + filterNodes (nodeIDs, nodeHashes, mRecentTXNodes, + 128, !isProgress ()); if (!nodeIDs.empty ()) { @@ -550,28 +632,34 @@ void InboundLedger::trigger (Peer::ref peer) { * (tmGL.add_nodeids ()) = it.getRawString (); } - WriteLog (lsTRACE, InboundLedger) << "Sending TX node " << nodeIDs.size () - << " request to " << (peer ? "selected peer" : "all peers"); + if (m_journal.trace) m_journal.trace << + "Sending TX node " << nodeIDs.size () << + " request to " << ( + peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } else - WriteLog (lsTRACE, InboundLedger) << "All TX nodes filtered"; + if (m_journal.trace) m_journal.trace << + "All TX nodes filtered"; } } } if (mComplete || mFailed) { - WriteLog (lsDEBUG, InboundLedger) << "Done:" << (mComplete ? " complete" : "") << (mFailed ? " failed " : " ") - << mLedger->getLedgerSeq (); + if (m_journal.debug) m_journal.debug << + "Done:" << (mComplete ? " complete" : "") << + (mFailed ? " failed " : " ") << + mLedger->getLedgerSeq (); sl.unlock (); done (); } } -void InboundLedger::filterNodes (std::vector& nodeIDs, std::vector& nodeHashes, - std::set& recentNodes, int max, bool aggressive) +void InboundLedger::filterNodes (std::vector& nodeIDs, + std::vector& nodeHashes, std::set& recentNodes, + int max, bool aggressive) { // ask for new nodes in preference to ones we've already asked for assert (nodeIDs.size () == nodeHashes.size ()); @@ -599,7 +687,8 @@ void InboundLedger::filterNodes (std::vector& nodeIDs, std::vector& nodeIDs, std::vector& nodeIDs, std::vectorgetHash () != mHash) { - WriteLog (lsWARNING, InboundLedger) << "Acquire hash mismatch"; - WriteLog (lsWARNING, InboundLedger) << mLedger->getHash () << "!=" << mHash; + if (m_journal.warning) m_journal.warning << + "Acquire hash mismatch"; + if (m_journal.warning) m_journal.warning << + mLedger->getHash () << "!=" << mHash; mLedger.reset (); #ifdef TRUST_NETWORK assert (false); @@ -669,7 +761,8 @@ bool InboundLedger::takeBase (const std::string& data) // data must not have has Serializer s (data.size () + 4); s.add32 (HashPrefix::ledgerMaster); s.addRaw (data); - getApp().getNodeStore ().store (hotLEDGER, mLedger->getLedgerSeq (), s.modData (), mHash); + getApp().getNodeStore ().store (hotLEDGER, + mLedger->getLedgerSeq (), s.modData (), mHash); progress (); @@ -687,12 +780,12 @@ bool InboundLedger::takeBase (const std::string& data) // data must not have has Call with a lock */ bool InboundLedger::takeTxNode (const std::list& nodeIDs, - const std::list< Blob >& data, SHAMapAddNode& san) + const std::list< Blob >& data, SHAMapAddNode& san) { - if (!mHaveBase) { - WriteLog (lsWARNING, InboundLedger) << "TX node without base"; + if (m_journal.warning) m_journal.warning << + "TX node without base"; san.incInvalid(); return false; } @@ -711,14 +804,15 @@ bool InboundLedger::takeTxNode (const std::list& nodeIDs, { if (nodeIDit->isRoot ()) { - san += mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), *nodeDatait, - snfWIRE, &tFilter); + san += mLedger->peekTransactionMap ()->addRootNode ( + mLedger->getTransHash (), *nodeDatait, snfWIRE, &tFilter); if (!san.isGood()) return false; } else { - san += mLedger->peekTransactionMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter); + san += mLedger->peekTransactionMap ()->addKnownNode ( + *nodeIDit, *nodeDatait, &tFilter); if (!san.isGood()) return false; } @@ -746,16 +840,19 @@ bool InboundLedger::takeTxNode (const std::list& nodeIDs, Call with a lock */ bool InboundLedger::takeAsNode (const std::list& nodeIDs, - const std::list< Blob >& data, SHAMapAddNode& san) + const std::list< Blob >& data, SHAMapAddNode& san) { - WriteLog (lsTRACE, InboundLedger) << "got ASdata (" << nodeIDs.size () << ") acquiring ledger " << mHash; - CondLog (nodeIDs.size () == 1, lsTRACE, InboundLedger) << "got AS node: " << nodeIDs.front (); + if (m_journal.trace) m_journal.trace << + "got ASdata (" << nodeIDs.size () << ") acquiring ledger " << mHash; + if (nodeIDs.size () == 1 && m_journal.trace) m_journal.trace << + "got AS node: " << nodeIDs.front (); ScopedLockType sl (mLock, __FILE__, __LINE__); if (!mHaveBase) { - WriteLog (lsWARNING, InboundLedger) << "Don't have ledger base"; + if (m_journal.warning) m_journal.warning << + "Don't have ledger base"; san.incInvalid(); return false; } @@ -774,20 +871,23 @@ bool InboundLedger::takeAsNode (const std::list& nodeIDs, { if (nodeIDit->isRoot ()) { - san += mLedger->peekAccountStateMap () - ->addRootNode (mLedger->getAccountHash (), *nodeDatait, snfWIRE, &tFilter); + san += mLedger->peekAccountStateMap ()->addRootNode ( + mLedger->getAccountHash (), *nodeDatait, snfWIRE, &tFilter); if (!san.isGood ()) { - WriteLog (lsWARNING, InboundLedger) << "Bad ledger base"; + if (m_journal.warning) m_journal.warning << + "Bad ledger base"; return false; } } else { - san += mLedger->peekAccountStateMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter); + san += mLedger->peekAccountStateMap ()->addKnownNode ( + *nodeIDit, *nodeDatait, &tFilter); if (!san.isGood ()) { - WriteLog (lsWARNING, InboundLedger) << "Unable to add AS node"; + if (m_journal.warning) m_journal.warning << + "Unable to add AS node"; return false; } } @@ -830,7 +930,8 @@ bool InboundLedger::takeAsRootNode (Blob const& data, SHAMapAddNode& san) } AccountStateSF tFilter (mLedger->getLedgerSeq ()); - san += mLedger->peekAccountStateMap ()->addRootNode (mLedger->getAccountHash (), data, snfWIRE, &tFilter); + san += mLedger->peekAccountStateMap ()->addRootNode ( + mLedger->getAccountHash (), data, snfWIRE, &tFilter); return san.isGood(); } @@ -853,7 +954,8 @@ bool InboundLedger::takeTxRootNode (Blob const& data, SHAMapAddNode& san) } TransactionStateSF tFilter (mLedger->getLedgerSeq ()); - san += mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), data, snfWIRE, &tFilter); + san += mLedger->peekTransactionMap ()->addRootNode ( + mLedger->getTransHash (), data, snfWIRE, &tFilter); return san.isGood(); } @@ -863,27 +965,34 @@ std::vector InboundLedger::getNeededHashes () if (!mHaveBase) { - ret.push_back (std::make_pair (protocol::TMGetObjectByHash::otLEDGER, mHash)); + ret.push_back (std::make_pair ( + protocol::TMGetObjectByHash::otLEDGER, mHash)); return ret; } if (!mHaveState) { AccountStateSF filter (mLedger->getLedgerSeq ()); - std::vector v = mLedger->getNeededAccountStateHashes (4, &filter); + // VFALCO NOTE What's the number 4? + std::vector v = mLedger->getNeededAccountStateHashes ( + 4, &filter); BOOST_FOREACH (uint256 const & h, v) { - ret.push_back (std::make_pair (protocol::TMGetObjectByHash::otSTATE_NODE, h)); + ret.push_back (std::make_pair ( + protocol::TMGetObjectByHash::otSTATE_NODE, h)); } } if (!mHaveTransactions) { TransactionStateSF filter (mLedger->getLedgerSeq ()); - std::vector v = mLedger->getNeededTransactionHashes (4, &filter); + // VFALCO NOTE What's the number 4? + std::vector v = mLedger->getNeededTransactionHashes ( + 4, &filter); BOOST_FOREACH (uint256 const & h, v) { - ret.push_back (std::make_pair (protocol::TMGetObjectByHash::otTRANSACTION_NODE, h)); + ret.push_back (std::make_pair ( + protocol::TMGetObjectByHash::otTRANSACTION_NODE, h)); } } @@ -893,7 +1002,9 @@ std::vector InboundLedger::getNeededHashes () /** Stash a TMLedgerData received from a peer for later processing Returns 'true' if we need to dispatch */ -bool InboundLedger::gotData (boost::weak_ptr peer, boost::shared_ptr data) +// VFALCO TODO Why isn't the shared_ptr passed by const& ? +bool InboundLedger::gotData (boost::weak_ptr peer, + boost::shared_ptr data) { ScopedLockType sl (mReceivedDataLock, __FILE__, __LINE__); @@ -909,7 +1020,13 @@ bool InboundLedger::gotData (boost::weak_ptr peer, boost::shared_ptr peer, protocol::TMLedgerData& packet) +// VFALCO NOTE, it is not necessary to pass the entire Peer, +// we can get away with just a Resource::Consumer endpoint. +// +// TODO Change peer to Consumer +// +int InboundLedger::processData (boost::shared_ptr peer, + protocol::TMLedgerData& packet) { ScopedLockType sl (mLock, __FILE__, __LINE__); @@ -917,7 +1034,8 @@ int InboundLedger::processData (boost::shared_ptr peer, protocol::TMLedger { if (packet.nodes_size () < 1) { - WriteLog (lsWARNING, InboundLedger) << "Got empty base data"; + if (m_journal.warning) m_journal.warning << + "Got empty base data"; peer->charge (Resource::feeInvalidRequest); return -1; } @@ -930,7 +1048,8 @@ int InboundLedger::processData (boost::shared_ptr peer, protocol::TMLedger san.incUseful (); else { - WriteLog (lsWARNING, InboundLedger) << "Got invalid base data"; + if (m_journal.warning) m_journal.warning << + "Got invalid base data"; peer->charge (Resource::feeInvalidRequest); return -1; } @@ -940,31 +1059,36 @@ int InboundLedger::processData (boost::shared_ptr peer, protocol::TMLedger if (!mHaveState && (packet.nodes ().size () > 1) && !takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san)) { - WriteLog (lsWARNING, InboundLedger) << "Included ASbase invalid"; + if (m_journal.warning) m_journal.warning << + "Included ASbase invalid"; } if (!mHaveTransactions && (packet.nodes ().size () > 2) && !takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san)) { - WriteLog (lsWARNING, InboundLedger) << "Included TXbase invalid"; + if (m_journal.warning) m_journal.warning << + "Included TXbase invalid"; } if (!san.isInvalid ()) progress (); else - WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid base data"; + if (m_journal.debug) m_journal.debug << + "Peer sends invalid base data"; return san.getGood (); } - if ((packet.type () == protocol::liTX_NODE) || (packet.type () == protocol::liAS_NODE)) + if ((packet.type () == protocol::liTX_NODE) || ( + packet.type () == protocol::liAS_NODE)) { std::list nodeIDs; std::list< Blob > nodeData; if (packet.nodes ().size () == 0) { - WriteLog (lsINFO, InboundLedger) << "Got response with no nodes"; + if (m_journal.info) m_journal.info << + "Got response with no nodes"; peer->charge (Resource::feeInvalidRequest); return -1; } @@ -975,13 +1099,16 @@ int InboundLedger::processData (boost::shared_ptr peer, protocol::TMLedger if (!node.has_nodeid () || !node.has_nodedata ()) { - WriteLog (lsWARNING, InboundLedger) << "Got bad node"; + if (m_journal.warning) m_journal.warning << + "Got bad node"; peer->charge (Resource::feeInvalidRequest); return -1; } - nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ())); - nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); + nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), + node.nodeid ().size ())); + nodeData.push_back (Blob (node.nodedata ().begin (), + node.nodedata ().end ())); } SHAMapAddNode ret; @@ -989,18 +1116,21 @@ int InboundLedger::processData (boost::shared_ptr peer, protocol::TMLedger if (packet.type () == protocol::liTX_NODE) { takeTxNode (nodeIDs, nodeData, ret); - WriteLog (lsDEBUG, InboundLedger) << "Ledger TX node stats: " << ret.get(); + if (m_journal.debug) m_journal.debug << + "Ledger TX node stats: " << ret.get(); } else { takeAsNode (nodeIDs, nodeData, ret); - WriteLog (lsDEBUG, InboundLedger) << "Ledger AS node stats: " << ret.get(); + if (m_journal.debug) m_journal.debug << + "Ledger AS node stats: " << ret.get(); } if (!ret.isInvalid ()) progress (); else - WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid node data"; + if (m_journal.debug) m_journal.debug << + "Peer sends invalid node data"; return ret.getGood (); } @@ -1019,7 +1149,6 @@ void InboundLedger::runData () std::vector data; do { - data.clear(); { ScopedLockType sl (mReceivedDataLock, __FILE__, __LINE__); @@ -1052,10 +1181,8 @@ void InboundLedger::runData () if (chosenPeer) trigger (chosenPeer); - } - Json::Value InboundLedger::getJson (int) { Json::Value ret (Json::objectValue); @@ -1089,7 +1216,9 @@ Json::Value InboundLedger::getJson (int) if (mHaveBase && !mHaveState) { Json::Value hv (Json::arrayValue); - std::vector v = mLedger->getNeededAccountStateHashes (16, NULL); + // VFALCO Why 16? + std::vector v = mLedger->getNeededAccountStateHashes ( + 16, NULL); BOOST_FOREACH (uint256 const & h, v) { hv.append (h.GetHex ()); @@ -1100,7 +1229,9 @@ Json::Value InboundLedger::getJson (int) if (mHaveBase && !mHaveTransactions) { Json::Value hv (Json::arrayValue); - std::vector v = mLedger->getNeededTransactionHashes (16, NULL); + // VFALCO Why 16? + std::vector v = mLedger->getNeededTransactionHashes ( + 16, NULL); BOOST_FOREACH (uint256 const & h, v) { hv.append (h.GetHex ()); diff --git a/src/ripple_app/ledger/InboundLedger.h b/src/ripple_app/ledger/InboundLedger.h index 1efa332db..f021be607 100644 --- a/src/ripple_app/ledger/InboundLedger.h +++ b/src/ripple_app/ledger/InboundLedger.h @@ -34,7 +34,7 @@ public: typedef std::pair < boost::weak_ptr, boost::shared_ptr > PeerDataPairType; public: - InboundLedger (clock_type& clock, uint256 const& hash, uint32 seq); + InboundLedger (uint256 const& hash, uint32 seq, clock_type& clock); ~InboundLedger (); @@ -82,7 +82,8 @@ public: std::vector getNeededHashes (); - static void filterNodes (std::vector& nodeIDs, std::vector& nodeHashes, + // VFALCO TODO Replace uint256 with something semanticallyh meaningful + void filterNodes (std::vector& nodeIDs, std::vector& nodeHashes, std::set& recentNodes, int max, bool aggressive); Json::Value getJson (int); @@ -106,6 +107,11 @@ private: bool takeTxNode (const std::list& IDs, const std::list& data, SHAMapAddNode&); bool takeTxRootNode (Blob const& data, SHAMapAddNode&); + + // VFALCO TODO Rename to receiveAccountStateNode + // Don't use acronyms, but if we are going to use them at least + // capitalize them correctly. + // bool takeAsNode (const std::list& IDs, const std::list& data, SHAMapAddNode&); bool takeAsRootNode (Blob const& data, SHAMapAddNode&); @@ -133,5 +139,3 @@ private: }; #endif - -// vim:ts=4 diff --git a/src/ripple_app/ledger/InboundLedgers.cpp b/src/ripple_app/ledger/InboundLedgers.cpp index e54b085cf..55b1f48ab 100644 --- a/src/ripple_app/ledger/InboundLedgers.cpp +++ b/src/ripple_app/ledger/InboundLedgers.cpp @@ -57,7 +57,7 @@ public: } else { - ret = boost::make_shared (std::ref (m_clock), hash, seq); + ret = boost::make_shared (hash, seq, std::ref (m_clock)); assert (ret); mLedgers.insert (std::make_pair (hash, ret)); ret->init (sl, couldBeNew); diff --git a/src/ripple_app/peers/PeerSet.cpp b/src/ripple_app/peers/PeerSet.cpp index f2feb8e25..c479fb264 100644 --- a/src/ripple_app/peers/PeerSet.cpp +++ b/src/ripple_app/peers/PeerSet.cpp @@ -19,9 +19,18 @@ class InboundLedger; -PeerSet::PeerSet (clock_type& clock_, - uint256 const& hash, int interval, bool txnData) - : mLock (this, "PeerSet", __FILE__, __LINE__) +// VFALCO NOTE The txnData constructor parameter is a code smell. +// It is true if we are the base of a TransactionAcquire, +// or false if we are base of InboundLedger. All it does +// is change the behavior of the timer depending on the +// derived class. Why not just make the timer callback +// function pure virtual? +// +PeerSet::PeerSet (uint256 const& hash, int interval, bool txnData, + clock_type& clock, Journal journal) + : m_journal (journal) + , m_clock (clock) + , mLock (this, "PeerSet", __FILE__, __LINE__) , mHash (hash) , mTimerInterval (interval) , mTimeouts (0) @@ -31,9 +40,8 @@ PeerSet::PeerSet (clock_type& clock_, , mTxnData (txnData) , mProgress (false) , mTimer (getApp().getIOService ()) - , m_clock (clock_) { - mLastAction = clock().now(); + mLastAction = m_clock.now(); assert ((mTimerInterval > 10) && (mTimerInterval < 30000)); } @@ -41,11 +49,6 @@ PeerSet::~PeerSet () { } -PeerSet::clock_type& PeerSet::clock () -{ - return m_clock; -} - bool PeerSet::peerHas (Peer::ref ptr) { ScopedLockType sl (mLock, __FILE__, __LINE__); @@ -101,6 +104,10 @@ void PeerSet::TimerEntry (boost::weak_ptr wptr, const boost::system::er if (ptr) { + // VFALCO NOTE So this function is really two different functions depending on + // the value of mTxnData, which is directly tied to whether we are + // a base class of IncomingLedger or TransactionAcquire + // if (ptr->mTxnData) { getApp().getJobQueue ().addJob (jtTXN_DATA, "timerEntryTxn", diff --git a/src/ripple_app/peers/PeerSet.h b/src/ripple_app/peers/PeerSet.h index a3a394c13..8e81c13bc 100644 --- a/src/ripple_app/peers/PeerSet.h +++ b/src/ripple_app/peers/PeerSet.h @@ -69,7 +69,7 @@ public: void touch () { - mLastAction = clock().now(); + mLastAction = m_clock.now(); } clock_type::time_point getLastAction () const @@ -103,13 +103,12 @@ private: static void TimerJobEntry (Job&, boost::shared_ptr); protected: - clock_type& clock (); - // VFALCO TODO try to make some of these private typedef RippleRecursiveMutex LockType; typedef LockType::ScopedLockType ScopedLockType; - PeerSet (clock_type& clock, uint256 const& hash, int interval, bool txnData); + PeerSet (uint256 const& hash, int interval, bool txnData, + clock_type& clock, Journal journal); virtual ~PeerSet () = 0; virtual void newPeer (Peer::ref) = 0; @@ -130,6 +129,9 @@ protected: void sendRequest (const protocol::TMGetLedger& message, Peer::ref peer); protected: + Journal m_journal; + clock_type& m_clock; + LockType mLock; uint256 mHash; @@ -151,9 +153,6 @@ protected: typedef int ReceivedChunkCount; typedef boost::unordered_map Map; Map mPeers; - -private: - clock_type& m_clock; }; #endif diff --git a/src/ripple_app/tx/TransactionAcquire.cpp b/src/ripple_app/tx/TransactionAcquire.cpp index 2ac993973..b013904ef 100644 --- a/src/ripple_app/tx/TransactionAcquire.cpp +++ b/src/ripple_app/tx/TransactionAcquire.cpp @@ -17,15 +17,22 @@ */ //============================================================================== -SETUP_LOG (TransactionAcquire) +//SETUP_LOG (TransactionAcquire) +template <> char const* LogPartition::getPartitionName () { return "TxAcquire"; } -#define TX_ACQUIRE_TIMEOUT 250 +enum +{ + // VFALCO NOTE This should be a std::chrono::duration constant. + // TODO Document this. Is it seconds? Milliseconds? WTF? + TX_ACQUIRE_TIMEOUT = 250 +}; typedef std::map::value_type u160_prop_pair; typedef std::map::value_type u256_lct_pair; -TransactionAcquire::TransactionAcquire (clock_type& clock, uint256 const& hash) - : PeerSet (clock, hash, TX_ACQUIRE_TIMEOUT, true) +TransactionAcquire::TransactionAcquire (uint256 const& hash, clock_type& clock) + : PeerSet (hash, TX_ACQUIRE_TIMEOUT, true, clock, + LogPartition::getJournal ()) , mHaveRoot (false) { mMap = boost::make_shared (smtTRANSACTION, hash); diff --git a/src/ripple_app/tx/TransactionAcquire.h b/src/ripple_app/tx/TransactionAcquire.h index 9ca7673b8..a44a398a4 100644 --- a/src/ripple_app/tx/TransactionAcquire.h +++ b/src/ripple_app/tx/TransactionAcquire.h @@ -33,7 +33,7 @@ public: typedef boost::shared_ptr pointer; public: - TransactionAcquire (clock_type& clock, uint256 const& hash); + TransactionAcquire (uint256 const& hash, clock_type& clock); ~TransactionAcquire (); SHAMap::ref getMap () diff --git a/src/ripple_core/functional/JobQueue.cpp b/src/ripple_core/functional/JobQueue.cpp index b00ea9506..2de7c2000 100644 --- a/src/ripple_core/functional/JobQueue.cpp +++ b/src/ripple_core/functional/JobQueue.cpp @@ -129,7 +129,8 @@ public: m_metrics.job_count = m_jobSet.size (); } - void addJob (JobType type, const std::string& name, const std::function& jobFunc) + void addJob (JobType type, std::string const& name, + boost::function const& jobFunc) { bassert (type != jtINVALID); diff --git a/src/ripple_core/functional/JobQueue.h b/src/ripple_core/functional/JobQueue.h index e50e90373..c320c9e60 100644 --- a/src/ripple_core/functional/JobQueue.h +++ b/src/ripple_core/functional/JobQueue.h @@ -31,10 +31,13 @@ public: virtual ~JobQueue () { } - // VFALCO TODO make convenience functions that allow the caller to not - // have to call bind. + // VFALCO NOTE Using boost::function here because Visual Studio 2012 + // std::function doesn't swallow return types. // - virtual void addJob (JobType type, const std::string& name, const std::function& job) = 0; + // TODO Replace with std::function + // + virtual void addJob (JobType type, + std::string const& name, boost::function const& job) = 0; // Jobs waiting at this priority virtual int getJobCount (JobType t) = 0;