Refactor NetworkOPs:

* Reduce public API
* Use LedgerMaster interface from RPC
* Remove fetch pack management to LedgerMaster
This commit is contained in:
Nik Bougalis
2015-06-30 18:27:05 -07:00
parent 163e8eb8fc
commit 761f218c0a
46 changed files with 552 additions and 609 deletions

View File

@@ -36,9 +36,14 @@
#include <ripple/app/paths/PathRequests.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/basics/TaggedCache.h>
#include <ripple/basics/UptimeTimer.h>
#include <ripple/core/LoadFeeTrack.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/Peer.h>
#include <ripple/protocol/digest.h>
#include <ripple/protocol/HashPrefix.h>
#include <ripple/resource/Fees.h>
#include <ripple/validators/Manager.h>
#include <algorithm>
#include <cassert>
@@ -55,8 +60,6 @@ class LedgerMasterImp
: public LedgerMaster
{
public:
using callback = std::function <void (Ledger::ref)>;
using LockType = RippleRecursiveMutex;
using ScopedLockType = std::lock_guard <LockType>;
using ScopedUnlockType = beast::GenericScopedUnlock <LockType>;
@@ -84,7 +87,6 @@ public:
int mMinValidations; // The minimum validations to publish a ledger
uint256 mLastValidateHash;
std::uint32_t mLastValidateSeq;
std::list<callback> mOnValidate; // Called when a ledger has enough validations
bool mAdvanceThread; // Publish thread is running
bool mAdvanceWork; // Publish thread has work to do
@@ -110,9 +112,14 @@ public:
int const ledger_fetch_size_;
TaggedCache<uint256, Blob> fetch_packs_;
std::uint32_t fetch_seq_;
//--------------------------------------------------------------------------
LedgerMasterImp (Config const& config, Stoppable& parent,
LedgerMasterImp (Config const& config, Stopwatch& stopwatch,
Stoppable& parent,
beast::insight::Collector::ptr const& collector, beast::Journal journal)
: LedgerMaster (parent)
, m_journal (journal)
@@ -136,6 +143,9 @@ public:
, fetch_depth_ (getApp ().getSHAMapStore ().clampFetchDepth (config.FETCH_DEPTH))
, ledger_history_ (config.LEDGER_HISTORY)
, ledger_fetch_size_ (config.getSize (siLedgerFetch))
, fetch_packs_ ("FetchPack", 65536, 45, stopwatch,
deprecatedLogs().journal("TaggedCache"))
, fetch_seq_ (0)
{
}
@@ -377,7 +387,7 @@ public:
#endif
{
OpenView view(&*ledger);
OpenView view(&*ledger);
for (auto const& it : mHeldTransactions)
{
ApplyFlags tepFlags = tapNONE;
@@ -975,145 +985,20 @@ public:
return *ret;
}
// Try to publish ledgers, acquire missing ledgers
void doAdvance ()
bool shouldFetchPack (std::uint32_t seq) const
{
do
{
mAdvanceWork = false; // If there's work to do, we'll make progress
bool progress = false;
auto const pubLedgers = findNewLedgersToPublish ();
if (pubLedgers.empty())
{
if (!standalone_ && !getApp().getFeeTrack().isLoadedLocal() &&
(getApp().getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) &&
(mValidLedgerSeq == mPubLedgerSeq) &&
(getValidatedLedgerAge() < MAX_LEDGER_AGE_ACQUIRE))
{ // We are in sync, so can acquire
std::uint32_t missing;
{
ScopedLockType sl (mCompleteLock);
missing = mCompleteLedgers.prevMissing(mPubLedger->getLedgerSeq());
}
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance discovered missing " << missing;
if ((missing != RangeSet::absent) && (missing > 0) &&
shouldAcquire (mValidLedgerSeq, ledger_history_,
getApp ().getSHAMapStore ().getCanDelete (), missing) &&
((mFillInProgress == 0) || (missing > mFillInProgress)))
{
WriteLog (lsTRACE, LedgerMaster) << "advanceThread should acquire";
{
ScopedUnlockType sl(m_mutex);
uint256 hash = getLedgerHashForHistory (missing);
if (hash.isNonZero())
{
Ledger::pointer ledger = getLedgerByHash (hash);
if (!ledger)
{
if (!getApp().getInboundLedgers().isFailure (hash))
{
ledger =
getApp().getInboundLedgers().acquire(hash,
missing,
InboundLedger::fcHISTORY);
if (! ledger && (missing > 32600) && getApp().getOPs().shouldFetchPack (missing))
{
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance want fetch pack " << missing;
getFetchPack(hash, missing);
}
else
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance no fetch pack for " << missing;
}
else
WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found failed acquire";
}
if (ledger)
{
assert(ledger->getLedgerSeq() == missing);
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance acquired " << ledger->getLedgerSeq();
setFullLedger(ledger, false, false);
mHistLedger = ledger;
if ((mFillInProgress == 0) && (Ledger::getHashByIndex(ledger->getLedgerSeq() - 1) == ledger->getParentHash()))
{
// Previous ledger is in DB
ScopedLockType lock (m_mutex);
mFillInProgress = ledger->getLedgerSeq();
getApp().getJobQueue().addJob(jtADVANCE, "tryFill", std::bind (
&LedgerMasterImp::tryFill, this,
std::placeholders::_1, ledger));
}
progress = true;
}
else
{
try
{
for (int i = 0; i < ledger_fetch_size_; ++i)
{
std::uint32_t seq = missing - i;
uint256 hash = getLedgerHashForHistory (seq);
if (hash.isNonZero())
getApp().getInboundLedgers().acquire(hash,
seq, InboundLedger::fcHISTORY);
}
}
catch (...)
{
WriteLog (lsWARNING, LedgerMaster) << "Threw while prefetching";
}
}
}
else
{
WriteLog (lsFATAL, LedgerMaster) << "Unable to find ledger following prevMissing " << missing;
WriteLog (lsFATAL, LedgerMaster) << "Pub:" << mPubLedgerSeq << " Val:" << mValidLedgerSeq;
WriteLog (lsFATAL, LedgerMaster) << "Ledgers: " << getApp().getLedgerMaster().getCompleteLedgers();
clearLedger (missing + 1);
progress = true;
}
}
if (mValidLedgerSeq != mPubLedgerSeq)
{
WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found last valid changed";
progress = true;
}
}
}
else
{
mHistLedger.reset();
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance not fetching history";
}
}
else
{
WriteLog (lsTRACE, LedgerMaster) <<
"tryAdvance found " << pubLedgers.size() <<
" ledgers to publish";
for(auto ledger : pubLedgers)
{
{
ScopedUnlockType sul (m_mutex);
WriteLog(lsDEBUG, LedgerMaster) <<
"tryAdvance publishing seq " << ledger->getLedgerSeq();
setFullLedger(ledger, true, true);
getApp().getOPs().pubLedger(ledger);
}
setPubLedger(ledger);
progress = true;
}
getApp().getOPs().clearNeedNetworkLedger();
newPFWork ("pf:newLedger");
}
if (progress)
mAdvanceWork = true;
} while (mAdvanceWork);
return (fetch_seq_ != seq);
}
bool shouldAcquire (
std::uint32_t const currentLedger,
std::uint32_t const ledgerHistory,
std::uint32_t const ledgerHistoryIndex,
std::uint32_t const candidateLedger) const;
// Try to publish ledgers, acquire missing ledgers
void doAdvance ();
std::vector<Ledger::pointer> findNewLedgersToPublish ()
{
std::vector<Ledger::pointer> ret;
@@ -1555,6 +1440,7 @@ public:
void sweep ()
{
mLedgerHistory.sweep ();
fetch_packs_.sweep ();
}
float getCacheHitRate ()
@@ -1562,11 +1448,6 @@ public:
return mLedgerHistory.getCacheHitRate ();
}
void addValidateCallback (callback& c)
{
mOnValidate.push_back (c);
}
beast::PropertyStream::Source& getPropertySource ()
{
return *mLedgerCleaner;
@@ -1586,22 +1467,34 @@ public:
{
mLedgerHistory.clearLedgerCachePrior (seq);
}
// Fetch packs:
void gotFetchPack (
bool progress,
std::uint32_t seq) override;
void addFetchPack (
uint256 const& hash,
std::shared_ptr<Blob>& data) override;
bool getFetchPack (
uint256 const& hash,
Blob& data) override;
void makeFetchPack (
Job&, std::weak_ptr<Peer> const& wPeer,
std::shared_ptr<protocol::TMGetObjectByHash> const& request,
uint256 haveLedgerHash,
std::uint32_t uUptime) override;
std::size_t getFetchPackCacheSize () const;
};
//------------------------------------------------------------------------------
LedgerMaster::LedgerMaster (Stoppable& parent)
: Stoppable ("LedgerMaster", parent)
{
}
LedgerMaster::~LedgerMaster ()
{
}
bool LedgerMaster::shouldAcquire (std::uint32_t const currentLedger,
std::uint32_t const ledgerHistory, std::uint32_t const ledgerHistoryIndex,
std::uint32_t const candidateLedger)
bool LedgerMasterImp::shouldAcquire (
std::uint32_t const currentLedger,
std::uint32_t const ledgerHistory,
std::uint32_t const ledgerHistoryIndex,
std::uint32_t const candidateLedger) const
{
bool ret (candidateLedger >= currentLedger ||
candidateLedger > ledgerHistoryIndex ||
@@ -1615,11 +1508,342 @@ bool LedgerMaster::shouldAcquire (std::uint32_t const currentLedger,
return ret;
}
std::unique_ptr <LedgerMaster>
make_LedgerMaster (Config const& config, beast::Stoppable& parent,
beast::insight::Collector::ptr const& collector, beast::Journal journal)
// Try to publish ledgers, acquire missing ledgers
void LedgerMasterImp::doAdvance ()
{
return std::make_unique <LedgerMasterImp> (config, parent, collector, journal);
// TODO NIKB: simplify and unindent this a bit!
do
{
mAdvanceWork = false; // If there's work to do, we'll make progress
bool progress = false;
auto const pubLedgers = findNewLedgersToPublish ();
if (pubLedgers.empty())
{
if (!standalone_ && !getApp().getFeeTrack().isLoadedLocal() &&
(getApp().getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) &&
(mValidLedgerSeq == mPubLedgerSeq) &&
(getValidatedLedgerAge() < MAX_LEDGER_AGE_ACQUIRE))
{ // We are in sync, so can acquire
std::uint32_t missing;
{
ScopedLockType sl (mCompleteLock);
missing = mCompleteLedgers.prevMissing(mPubLedger->getLedgerSeq());
}
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance discovered missing " << missing;
if ((missing != RangeSet::absent) && (missing > 0) &&
shouldAcquire (mValidLedgerSeq, ledger_history_,
getApp ().getSHAMapStore ().getCanDelete (), missing) &&
((mFillInProgress == 0) || (missing > mFillInProgress)))
{
WriteLog (lsTRACE, LedgerMaster) << "advanceThread should acquire";
{
ScopedUnlockType sl(m_mutex);
uint256 hash = getLedgerHashForHistory (missing);
if (hash.isNonZero())
{
Ledger::pointer ledger = getLedgerByHash (hash);
if (!ledger)
{
if (!getApp().getInboundLedgers().isFailure (hash))
{
ledger =
getApp().getInboundLedgers().acquire(hash,
missing,
InboundLedger::fcHISTORY);
if (! ledger && (missing > 32600) && shouldFetchPack (missing))
{
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance want fetch pack " << missing;
fetch_seq_ = missing;
getFetchPack(hash, missing);
}
else
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance no fetch pack for " << missing;
}
else
WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found failed acquire";
}
if (ledger)
{
assert(ledger->getLedgerSeq() == missing);
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance acquired " << ledger->getLedgerSeq();
setFullLedger(ledger, false, false);
mHistLedger = ledger;
if ((mFillInProgress == 0) && (Ledger::getHashByIndex(ledger->getLedgerSeq() - 1) == ledger->getParentHash()))
{
// Previous ledger is in DB
ScopedLockType lock (m_mutex);
mFillInProgress = ledger->getLedgerSeq();
getApp().getJobQueue().addJob(jtADVANCE, "tryFill", std::bind (
&LedgerMasterImp::tryFill, this,
std::placeholders::_1, ledger));
}
progress = true;
}
else
{
try
{
for (int i = 0; i < ledger_fetch_size_; ++i)
{
std::uint32_t seq = missing - i;
uint256 hash = getLedgerHashForHistory (seq);
if (hash.isNonZero())
getApp().getInboundLedgers().acquire(hash,
seq, InboundLedger::fcHISTORY);
}
}
catch (...)
{
WriteLog (lsWARNING, LedgerMaster) << "Threw while prefetching";
}
}
}
else
{
WriteLog (lsFATAL, LedgerMaster) << "Unable to find ledger following prevMissing " << missing;
WriteLog (lsFATAL, LedgerMaster) << "Pub:" << mPubLedgerSeq << " Val:" << mValidLedgerSeq;
WriteLog (lsFATAL, LedgerMaster) << "Ledgers: " << getApp().getLedgerMaster().getCompleteLedgers();
clearLedger (missing + 1);
progress = true;
}
}
if (mValidLedgerSeq != mPubLedgerSeq)
{
WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found last valid changed";
progress = true;
}
}
}
else
{
mHistLedger.reset();
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance not fetching history";
}
}
else
{
WriteLog (lsTRACE, LedgerMaster) <<
"tryAdvance found " << pubLedgers.size() <<
" ledgers to publish";
for(auto ledger : pubLedgers)
{
{
ScopedUnlockType sul (m_mutex);
WriteLog(lsDEBUG, LedgerMaster) <<
"tryAdvance publishing seq " << ledger->getLedgerSeq();
setFullLedger(ledger, true, true);
getApp().getOPs().pubLedger(ledger);
}
setPubLedger(ledger);
progress = true;
}
getApp().getOPs().clearNeedNetworkLedger();
newPFWork ("pf:newLedger");
}
if (progress)
mAdvanceWork = true;
} while (mAdvanceWork);
}
void LedgerMasterImp::addFetchPack (
uint256 const& hash,
std::shared_ptr< Blob >& data)
{
fetch_packs_.canonicalize (hash, data);
}
bool LedgerMasterImp::getFetchPack (
uint256 const& hash,
Blob& data)
{
if (!fetch_packs_.retrieve (hash, data))
return false;
fetch_packs_.del (hash, false);
return hash == sha512Half(makeSlice(data));
}
void LedgerMasterImp::gotFetchPack (
bool progress,
std::uint32_t seq)
{
// FIXME: Calling this function more than once will result in
// InboundLedgers::gotFetchPack being called more than once
// which is expensive. A flag should track whether we've already dispatched
getApp().getJobQueue().addJob (
jtLEDGER_DATA, "gotFetchPack",
std::bind (&InboundLedgers::gotFetchPack,
&getApp().getInboundLedgers (), std::placeholders::_1));
}
void LedgerMasterImp::makeFetchPack (
Job&, std::weak_ptr<Peer> const& wPeer,
std::shared_ptr<protocol::TMGetObjectByHash> const& request,
uint256 haveLedgerHash,
std::uint32_t uUptime)
{
if (UptimeTimer::getInstance ().getElapsedSeconds () > (uUptime + 1))
{
m_journal.info << "Fetch pack request got stale";
return;
}
if (getApp().getFeeTrack ().isLoadedLocal () ||
(getValidatedLedgerAge() > 40))
{
m_journal.info << "Too busy to make fetch pack";
return;
}
Peer::ptr peer = wPeer.lock ();
if (!peer)
return;
auto haveLedger = getLedgerByHash (haveLedgerHash);
if (!haveLedger)
{
m_journal.info
<< "Peer requests fetch pack for ledger we don't have: "
<< haveLedger;
peer->charge (Resource::feeRequestNoReply);
return;
}
if (!haveLedger->isClosed ())
{
m_journal.warning
<< "Peer requests fetch pack from open ledger: "
<< haveLedger;
peer->charge (Resource::feeInvalidRequest);
return;
}
if (haveLedger->getLedgerSeq() < getEarliestFetch())
{
m_journal.debug << "Peer requests fetch pack that is too early";
peer->charge (Resource::feeInvalidRequest);
return;
}
auto wantLedger = getLedgerByHash (haveLedger->getParentHash ());
if (!wantLedger)
{
m_journal.info
<< "Peer requests fetch pack for ledger whose predecessor we "
<< "don't have: " << haveLedger;
peer->charge (Resource::feeRequestNoReply);
return;
}
auto fpAppender = [](
protocol::TMGetObjectByHash* reply,
std::uint32_t ledgerSeq,
uint256 const& hash,
const Blob& blob)
{
protocol::TMIndexedObject& newObj = * (reply->add_objects ());
newObj.set_ledgerseq (ledgerSeq);
newObj.set_hash (hash.begin (), 256 / 8);
newObj.set_data (&blob[0], blob.size ());
};
try
{
protocol::TMGetObjectByHash reply;
reply.set_query (false);
if (request->has_seq ())
reply.set_seq (request->seq ());
reply.set_ledgerhash (request->ledgerhash ());
reply.set_type (protocol::TMGetObjectByHash::otFETCH_PACK);
// Building a fetch pack:
// 1. Add the header for the requested ledger.
// 2. Add the nodes for the AccountStateMap of that ledger.
// 3. If there are transactions, add the nodes for the
// transactions of the ledger.
// 4. If the FetchPack now contains greater than or equal to
// 256 entries then stop.
// 5. If not very much time has elapsed, then loop back and repeat
// the same process adding the previous ledger to the FetchPack.
do
{
std::uint32_t lSeq = wantLedger->getLedgerSeq ();
protocol::TMIndexedObject& newObj = *reply.add_objects ();
newObj.set_hash (wantLedger->getHash ().begin (), 256 / 8);
Serializer s (256);
s.add32 (HashPrefix::ledgerMaster);
wantLedger->addRaw (s);
newObj.set_data (s.getDataPtr (), s.getLength ());
newObj.set_ledgerseq (lSeq);
wantLedger->stateMap().getFetchPack
(&haveLedger->stateMap(), true, 16384,
std::bind (fpAppender, &reply, lSeq, std::placeholders::_1,
std::placeholders::_2));
if (wantLedger->getTransHash ().isNonZero ())
wantLedger->txMap().getFetchPack (
nullptr, true, 512,
std::bind (fpAppender, &reply, lSeq, std::placeholders::_1,
std::placeholders::_2));
if (reply.objects ().size () >= 512)
break;
// move may save a ref/unref
haveLedger = std::move (wantLedger);
wantLedger = getLedgerByHash (haveLedger->getParentHash ());
}
while (wantLedger &&
UptimeTimer::getInstance ().getElapsedSeconds () <= uUptime + 1);
m_journal.info
<< "Built fetch pack with " << reply.objects ().size () << " nodes";
auto msg = std::make_shared<Message> (reply, protocol::mtGET_OBJECTS);
peer->send (msg);
}
catch (...)
{
m_journal.warning << "Exception building fetch pach";
}
}
std::size_t LedgerMasterImp::getFetchPackCacheSize () const
{
return fetch_packs_.getCacheSize ();
}
//------------------------------------------------------------------------------
LedgerMaster::LedgerMaster (Stoppable& parent)
: Stoppable ("LedgerMaster", parent)
{
}
std::unique_ptr <LedgerMaster>
make_LedgerMaster (
Config const& config,
Stopwatch& stopwatch,
beast::Stoppable& parent,
beast::insight::Collector::ptr const& collector,
beast::Journal journal)
{
return std::make_unique <LedgerMasterImp> (
config, stopwatch, parent, collector, journal);
}
} // ripple