Ledger acquire fixes/cleanups/logging

* Inbound ledger and SHAMapAddNode cleanup
    * Log acquire stats
    * Fix progress logic
    * Remove ledgers we no longer need to acquire
    * Stash stale state data in our fetch pack, it can still be useful
    * Stash in fetch pack if acquire terminated while job was pending
    * Account for duplicate/invalid nodes in a few cases previously missed
    * Dispatch each InboundLedger once (not per data)
    * Trigger only the "best" peer
    * Don't call tryAdvance on failed acquires
This commit is contained in:
JoelKatz
2013-12-14 20:16:54 -08:00
committed by Vinnie Falco
parent 9bdb0774ad
commit b2dbe8ef83
14 changed files with 515 additions and 294 deletions

View File

@@ -499,8 +499,7 @@ public:
return;
// we need to switch the ledger we're working from
Ledger::pointer newLCL =
getApp().getLedgerMaster ().getLedgerByHash (lclHash);
Ledger::pointer newLCL = getApp().getLedgerMaster ().getLedgerByHash (lclHash);
if (newLCL)
{
@@ -510,37 +509,32 @@ public:
mPreviousLedger = newLCL;
mPrevLedgerHash = lclHash;
}
else if (!mAcquiringLedger || (mAcquiringLedger->getHash ()
!= mPrevLedgerHash))
else if (!mAcquiringLedger || (mAcquiringLedger->getHash () != mPrevLedgerHash))
{
// need to start acquiring the correct consensus LCL
WriteLog (lsWARNING, LedgerConsensus)
<< "Need consensus ledger " << mPrevLedgerHash;
WriteLog (lsWARNING, LedgerConsensus) << "Need consensus ledger " << mPrevLedgerHash;
if (mAcquiringLedger)
{
getApp().getInboundLedgers ().dropLedger
(mAcquiringLedger->getHash ());
}
getApp().getInboundLedgers ().dropLedger (mAcquiringLedger->getHash ());
mAcquiringLedger = getApp().getInboundLedgers ().findCreate
(mPrevLedgerHash, 0, true);
mAcquiringLedger = getApp().getInboundLedgers ().findCreateConsensusLedger (mPrevLedgerHash);
mHaveCorrectLCL = false;
return;
}
else
return;
WriteLog (lsINFO, LedgerConsensus)
<< "Have the consensus ledger " << mPrevLedgerHash;
WriteLog (lsINFO, LedgerConsensus) << "Have the consensus ledger " << mPrevLedgerHash;
mHaveCorrectLCL = true;
mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution (
mPreviousLedger->getCloseResolution ()
,mPreviousLedger->getCloseAgree ()
,mPreviousLedger->getLedgerSeq () + 1);
mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseAgree (),
mPreviousLedger->getLedgerSeq () + 1);
}
void timerEntry ()
{
if ((mState != lcsFINISHED) && (mState != lcsACCEPTED))

View File

@@ -33,12 +33,10 @@ InboundLedger::InboundLedger (uint256 const& hash, uint32 seq)
, mAborted (false)
, mSignaled (false)
, mByHash (true)
, mWaitCount (0)
, mSeq (seq)
, mReceiveDispatched (false)
{
#ifdef LA_DEBUG
WriteLog (lsTRACE, InboundLedger) << "Acquiring ledger " << mHash;
#endif
}
bool InboundLedger::checkLocal ()
@@ -55,6 +53,12 @@ bool InboundLedger::checkLocal ()
InboundLedger::~InboundLedger ()
{
BOOST_FOREACH (PeerDataPairType& entry, mReceivedData)
{
if (entry.second->type () == protocol::liAS_NODE)
getApp().getInboundLedgers().gotStaleData(entry.second);
}
}
void InboundLedger::init(ScopedLockType& collectionLock, bool couldBeNew)
@@ -127,7 +131,6 @@ bool InboundLedger::tryLocal ()
if (mLedger->peekTransactionMap ()->fetchRoot (mLedger->getTransHash (), &filter))
{
WriteLog (lsTRACE, InboundLedger) << "Got root txn map locally";
std::vector<uint256> h = mLedger->getNeededTransactionHashes (1, &filter);
if (h.empty ())
@@ -152,7 +155,6 @@ bool InboundLedger::tryLocal ()
if (mLedger->peekAccountStateMap ()->fetchRoot (mLedger->getAccountHash (), &filter))
{
WriteLog (lsTRACE, InboundLedger) << "Got root AS map locally";
std::vector<uint256> h = mLedger->getNeededAccountStateHashes (1, &filter);
if (h.empty ())
@@ -175,17 +177,25 @@ bool InboundLedger::tryLocal ()
return mComplete;
}
/** Called with a lock by the PeerSet when the timer expires
*/
void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
{
mRecentTXNodes.clear ();
mRecentASNodes.clear ();
if (isDone())
{
WriteLog (lsINFO, InboundLedger) << "Already done " << mHash;
return;
}
if (getTimeouts () > LEDGER_TIMEOUT_COUNT)
{
if (mSeq != 0)
WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mSeq;
WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mSeq;
else
WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mHash;
WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mHash;
setFailed ();
done ();
return;
@@ -193,17 +203,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
if (isDone())
{
WriteLog (lsINFO, InboundLedger) << "Already done " << mHash;
return;
}
checkLocal();
if (isDone())
{
WriteLog (lsINFO, InboundLedger) << "Completed fetch " << mHash;
return;
}
mAggressive = true;
mByHash = true;
@@ -216,23 +216,6 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
}
}
void InboundLedger::awaitData ()
{
++mWaitCount;
}
void InboundLedger::noAwaitData ()
{ // subtract one if mWaitCount is greater than zero
do
{
int j = mWaitCount.get();
if (j <= 0)
return;
if (mWaitCount.compareAndSetBool(j - 1, j))
return;
} while (1);
}
void InboundLedger::addPeers ()
{
std::vector<Peer::pointer> peerList = getApp().getPeers ().getPeerVector ();
@@ -264,8 +247,8 @@ void InboundLedger::addPeers ()
{
if (peerHas (peerList[ (i + firstPeer) % vSize]))
++found;
}
if (mSeq != 0)
}
if (mSeq != 0)
WriteLog (lsDEBUG, InboundLedger) << "Chose " << found << " peer(s) for ledger " << mSeq;
else
WriteLog (lsDEBUG, InboundLedger) << "Chose " << found << " peer(s) for ledger " << getHash ().GetHex();
@@ -287,8 +270,10 @@ static void LADispatch (
std::vector< FUNCTION_TYPE<void (InboundLedger::pointer)> > trig)
{
if (la->isComplete() && !la->isFailed())
{
getApp().getLedgerMaster().checkAccept(la->getLedger());
getApp().getLedgerMaster().tryAdvance();
getApp().getLedgerMaster().tryAdvance();
}
for (unsigned int i = 0; i < trig.size (); ++i)
trig[i] (la);
}
@@ -325,14 +310,14 @@ void InboundLedger::done ()
BIND_TYPE (LADispatch, P_1, shared_from_this (), triggers));
}
bool InboundLedger::addOnComplete (FUNCTION_TYPE<void (InboundLedger::pointer)> trigger)
bool InboundLedger::addOnComplete (FUNCTION_TYPE<void (InboundLedger::pointer)> triggerFunc)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (isDone ())
return false;
mOnComplete.push_back (trigger);
mOnComplete.push_back (triggerFunc);
return true;
}
@@ -347,12 +332,6 @@ void InboundLedger::trigger (Peer::ref peer)
return;
}
if ((mWaitCount.get() > 0) && peer)
{
WriteLog (lsTRACE, InboundLedger) << "Skipping peer";
return;
}
if (ShouldLog (lsTRACE, InboundLedger))
{
if (peer)
@@ -550,7 +529,7 @@ void InboundLedger::trigger (Peer::ref peer)
BOOST_FOREACH (SHAMapNode const& it, nodeIDs)
{
* (tmGL.add_nodeids ()) = it.getRawString ();
}
}
WriteLog (lsTRACE, InboundLedger) << "Sending AS node " << nodeIDs.size ()
<< " request to " << (peer ? "selected peer" : "all peers");
CondLog (nodeIDs.size () == 1, lsTRACE, InboundLedger) << "AS node: " << nodeIDs[0];
@@ -638,13 +617,15 @@ void InboundLedger::filterNodes (std::vector<SHAMapNode>& nodeIDs, std::vector<u
}
}
/** Take ledger base data
Call with a lock
*/
bool InboundLedger::takeBase (const std::string& data) // data must not have hash prefix
{
// Return value: true=normal, false=bad data
#ifdef LA_DEBUG
WriteLog (lsTRACE, InboundLedger) << "got base acquiring ledger " << mHash;
#endif
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mComplete || mFailed || mHaveBase)
return true;
@@ -671,26 +652,35 @@ bool InboundLedger::takeBase (const std::string& data) // data must not have has
progress ();
if (!mLedger->getTransHash ())
if (mLedger->getTransHash ().isZero ())
mHaveTransactions = true;
if (!mLedger->getAccountHash ())
if (mLedger->getAccountHash ().isZero ())
mHaveState = true;
mLedger->setAcquiring ();
return true;
}
/** Process TX data received from a peer
Call with a lock
*/
bool InboundLedger::takeTxNode (const std::list<SHAMapNode>& nodeIDs,
const std::list< Blob >& data, SHAMapAddNode& san)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (!mHaveBase)
{
WriteLog (lsWARNING, InboundLedger) << "TX node without base";
san.incInvalid();
return false;
}
if (mHaveTransactions || mFailed)
{
san.incDuplicate();
return true;
}
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin ();
std::list< Blob >::const_iterator nodeDatait = data.begin ();
@@ -700,13 +690,15 @@ bool InboundLedger::takeTxNode (const std::list<SHAMapNode>& nodeIDs,
{
if (nodeIDit->isRoot ())
{
if (!san.combine (mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), *nodeDatait,
snfWIRE, &tFilter)))
san += mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), *nodeDatait,
snfWIRE, &tFilter);
if (!san.isGood())
return false;
}
else
{
if (!san.combine (mLedger->peekTransactionMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter)))
san += mLedger->peekTransactionMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter);
if (!san.isGood())
return false;
}
@@ -729,6 +721,9 @@ bool InboundLedger::takeTxNode (const std::list<SHAMapNode>& nodeIDs,
return true;
}
/** Process AS data received from a peer
Call with a lock
*/
bool InboundLedger::takeAsNode (const std::list<SHAMapNode>& nodeIDs,
const std::list< Blob >& data, SHAMapAddNode& san)
{
@@ -740,11 +735,15 @@ bool InboundLedger::takeAsNode (const std::list<SHAMapNode>& nodeIDs,
if (!mHaveBase)
{
WriteLog (lsWARNING, InboundLedger) << "Don't have ledger base";
san.incInvalid();
return false;
}
if (mHaveState || mFailed)
{
san.incDuplicate();
return true;
}
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin ();
std::list< Blob >::const_iterator nodeDatait = data.begin ();
@@ -754,17 +753,22 @@ bool InboundLedger::takeAsNode (const std::list<SHAMapNode>& nodeIDs,
{
if (nodeIDit->isRoot ())
{
if (!san.combine (mLedger->peekAccountStateMap ()->addRootNode (mLedger->getAccountHash (),
*nodeDatait, snfWIRE, &tFilter)))
san += mLedger->peekAccountStateMap ()
->addRootNode (mLedger->getAccountHash (), *nodeDatait, snfWIRE, &tFilter);
if (!san.isGood ())
{
WriteLog (lsWARNING, InboundLedger) << "Bad ledger base";
return false;
}
}
else if (!san.combine (mLedger->peekAccountStateMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter)))
else
{
WriteLog (lsWARNING, InboundLedger) << "Unable to add AS node";
return false;
san += mLedger->peekAccountStateMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter);
if (!san.isGood ())
{
WriteLog (lsWARNING, InboundLedger) << "Unable to add AS node";
return false;
}
}
++nodeIDit;
@@ -786,34 +790,48 @@ bool InboundLedger::takeAsNode (const std::list<SHAMapNode>& nodeIDs,
return true;
}
/** Process AS root node received from a peer
Call with a lock
*/
bool InboundLedger::takeAsRootNode (Blob const& data, SHAMapAddNode& san)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mFailed || mHaveState)
{
san.incDuplicate();
return true;
}
if (!mHaveBase)
{
san.incInvalid();
return false;
}
AccountStateSF tFilter (mLedger->getLedgerSeq ());
return san.combine (
mLedger->peekAccountStateMap ()->addRootNode (mLedger->getAccountHash (), data, snfWIRE, &tFilter));
san += mLedger->peekAccountStateMap ()->addRootNode (mLedger->getAccountHash (), data, snfWIRE, &tFilter);
return san.isGood();
}
/** Process AS root node received from a peer
Call with a lock
*/
bool InboundLedger::takeTxRootNode (Blob const& data, SHAMapAddNode& san)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mFailed || mHaveState)
{
san.incDuplicate();
return true;
}
if (!mHaveBase)
{
san.incInvalid();
return false;
}
TransactionStateSF tFilter (mLedger->getLedgerSeq ());
return san.combine (
mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), data, snfWIRE, &tFilter));
san += mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), data, snfWIRE, &tFilter);
return san.isGood();
}
std::vector<InboundLedger::neededHash_t> InboundLedger::getNeededHashes ()
@@ -849,6 +867,164 @@ std::vector<InboundLedger::neededHash_t> InboundLedger::getNeededHashes ()
return ret;
}
/** Stash a TMLedgerData received from a peer for later processing
Returns 'true' if we need to dispatch
*/
bool InboundLedger::gotData (boost::weak_ptr<Peer> peer, boost::shared_ptr<protocol::TMLedgerData> data)
{
ScopedLockType sl (mReceivedDataLock, __FILE__, __LINE__);
mReceivedData.push_back (PeerDataPairType (peer, data));
if (mReceiveDispatched)
return false;
mReceiveDispatched = true;
return true;
}
/** Process one TMLedgerData
Returns the number of useful nodes
*/
int InboundLedger::processData (boost::shared_ptr<Peer> peer, protocol::TMLedgerData& packet)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (packet.type () == protocol::liBASE)
{
if (packet.nodes_size () < 1)
{
WriteLog (lsWARNING, InboundLedger) << "Got empty base data";
peer->charge (Resource::feeInvalidRequest);
return -1;
}
if (!mHaveBase && !takeBase (packet.nodes (0).nodedata ()))
{
WriteLog (lsWARNING, InboundLedger) << "Got invalid base data";
peer->charge (Resource::feeInvalidRequest);
return -1;
}
SHAMapAddNode san;
if (!mHaveState && (packet.nodes ().size () > 1) &&
!takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san))
{
WriteLog (lsWARNING, InboundLedger) << "Included ASbase invalid";
}
if (!mHaveTransactions && (packet.nodes ().size () > 2) &&
!takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san))
{
WriteLog (lsWARNING, InboundLedger) << "Included TXbase invalid";
}
if (!san.isInvalid ())
progress ();
else
WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid base data";
return san.getGood ();
}
if ((packet.type () == protocol::liTX_NODE) || (packet.type () == protocol::liAS_NODE))
{
std::list<SHAMapNode> nodeIDs;
std::list< Blob > nodeData;
if (packet.nodes ().size () == 0)
{
WriteLog (lsINFO, InboundLedger) << "Got response with no nodes";
peer->charge (Resource::feeInvalidRequest);
return -1;
}
for (int i = 0; i < packet.nodes ().size (); ++i)
{
const protocol::TMLedgerNode& node = packet.nodes (i);
if (!node.has_nodeid () || !node.has_nodedata ())
{
WriteLog (lsWARNING, InboundLedger) << "Got bad node";
peer->charge (Resource::feeInvalidRequest);
return -1;
}
nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ()));
nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ()));
}
SHAMapAddNode ret;
if (packet.type () == protocol::liTX_NODE)
{
takeTxNode (nodeIDs, nodeData, ret);
WriteLog (lsDEBUG, InboundLedger) << "Ledger TX node stats: " << ret.get();
}
else
{
takeAsNode (nodeIDs, nodeData, ret);
WriteLog (lsDEBUG, InboundLedger) << "Ledger AS node stats: " << ret.get();
}
if (!ret.isInvalid ())
progress ();
else
WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid node data";
return ret.getGood ();
}
return -1;
}
/** Process pending TMLedgerData
Query the 'best' peer
*/
void InboundLedger::runData ()
{
boost::shared_ptr<Peer> chosenPeer;
int chosenPeerCount = -1;
std::vector <PeerDataPairType> data;
do
{
data.clear();
{
ScopedLockType sl (mReceivedDataLock, __FILE__, __LINE__);
if (mReceivedData.empty ())
{
mReceiveDispatched = false;
break;
}
data.swap(mReceivedData);
}
BOOST_FOREACH (PeerDataPairType& entry, data)
{
Peer::pointer peer = entry.first.lock();
if (peer)
{
int count = processData (peer, *(entry.second));
if (count > chosenPeerCount)
{
chosenPeer = peer;
chosenPeerCount = count;
}
}
}
} while (1);
if (chosenPeer)
trigger (chosenPeer);
}
Json::Value InboundLedger::getJson (int)
{
Json::Value ret (Json::objectValue);

View File

@@ -31,6 +31,7 @@ public:
static char const* getCountedObjectName () { return "InboundLedger"; }
typedef boost::shared_ptr <InboundLedger> pointer;
typedef std::pair < boost::weak_ptr<Peer>, boost::shared_ptr<protocol::TMLedgerData> > PeerDataPairType;
public:
InboundLedger (uint256 const& hash, uint32 seq);
@@ -69,21 +70,14 @@ public:
// VFALCO TODO Make this the Listener / Observer pattern
bool addOnComplete (FUNCTION_TYPE<void (InboundLedger::pointer)>);
bool takeBase (const std::string& data);
bool takeTxNode (const std::list<SHAMapNode>& IDs, const std::list<Blob >& data,
SHAMapAddNode&);
bool takeTxRootNode (Blob const& data, SHAMapAddNode&);
bool takeAsNode (const std::list<SHAMapNode>& IDs, const std::list<Blob >& data,
SHAMapAddNode&);
bool takeAsRootNode (Blob const& data, SHAMapAddNode&);
void trigger (Peer::ref);
bool tryLocal ();
void addPeers ();
void awaitData ();
void noAwaitData ();
bool checkLocal ();
void init(ScopedLockType& collectionLock, bool couldBeNew);
bool gotData (boost::weak_ptr<Peer>, boost::shared_ptr<protocol::TMLedgerData>);
typedef std::pair <protocol::TMGetObjectByHash::ObjectType, uint256> neededHash_t;
std::vector<neededHash_t> getNeededHashes ();
@@ -92,6 +86,7 @@ public:
std::set<SHAMapNode>& recentNodes, int max, bool aggressive);
Json::Value getJson (int);
void runData ();
private:
void done ();
@@ -105,6 +100,16 @@ private:
boost::weak_ptr <PeerSet> pmDowncast ();
int processData (boost::shared_ptr<Peer> peer, protocol::TMLedgerData& data);
bool takeBase (const std::string& data);
bool takeTxNode (const std::list<SHAMapNode>& IDs, const std::list<Blob >& data,
SHAMapAddNode&);
bool takeTxRootNode (Blob const& data, SHAMapAddNode&);
bool takeAsNode (const std::list<SHAMapNode>& IDs, const std::list<Blob >& data,
SHAMapAddNode&);
bool takeAsRootNode (Blob const& data, SHAMapAddNode&);
private:
Ledger::pointer mLedger;
bool mHaveBase;
@@ -113,12 +118,17 @@ private:
bool mAborted;
bool mSignaled;
bool mByHash;
beast::Atomic<int> mWaitCount;
uint32 mSeq;
std::set <SHAMapNode> mRecentTXNodes;
std::set <SHAMapNode> mRecentASNodes;
// Data we have received from peers
PeerSet::LockType mReceivedDataLock;
std::vector <PeerDataPairType> mReceivedData;
bool mReceiveDispatched;
std::vector <FUNCTION_TYPE <void (InboundLedger::pointer)> > mOnComplete;
};

View File

@@ -38,8 +38,7 @@ public:
// VFALCO TODO Should this be called findOrAdd ?
//
InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq,
bool couldBeNew)
InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq, bool couldBeNew)
{
assert (hash.isNonZero ());
InboundLedger::pointer ret;
@@ -68,6 +67,51 @@ public:
return ret;
}
InboundLedger::pointer findCreateConsensusLedger (uint256 const& hash)
{
// We do not want to destroy the ledger while holding the collection lock
InboundLedger::pointer oldLedger;
{
// If there was an old consensus inbound ledger, remove it
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mConsensusLedger.isNonZero() && (mValidationLedger != mConsensusLedger) && (hash != mConsensusLedger))
{
boost::unordered_map<uint256, InboundLedger::pointer>::iterator it = mLedgers.find (mConsensusLedger);
if (it != mLedgers.end ())
{
oldLedger = it->second;
mLedgers.erase (it);
}
}
mConsensusLedger = hash;
}
return findCreate (hash, 0, true);
}
InboundLedger::pointer findCreateValidationLedger (uint256 const& hash)
{
InboundLedger::pointer oldLedger;
{
// If there was an old validation inbound ledger, remove it
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mValidationLedger.isNonZero() && (mValidationLedger != mConsensusLedger) && (hash != mValidationLedger))
{
boost::unordered_map<uint256, InboundLedger::pointer>::iterator it = mLedgers.find (mValidationLedger);
if (it != mLedgers.end ())
{
oldLedger = it->second;
mLedgers.erase (it);
}
}
mValidationLedger = hash;
}
return findCreate (hash, 0, true);
}
InboundLedger::pointer find (uint256 const& hash)
{
assert (hash.isNonZero ());
@@ -104,16 +148,6 @@ public:
}
bool awaitLedgerData (LedgerHash const& ledgerHash)
{
InboundLedger::pointer ledger = find (ledgerHash);
if (!ledger)
return false;
ledger->awaitData ();
return true;
}
/*
This gets called when
"We got some data from an inbound ledger"
@@ -127,14 +161,13 @@ public:
// VFALCO TODO Why is hash passed by value?
// VFALCO TODO Remove the dependency on the Peer object.
//
void gotLedgerData (Job& job,
LedgerHash hash,
boost::shared_ptr<protocol::TMLedgerData> packet_ptr,
boost::weak_ptr<Peer> wPeer)
/** We received a TMLedgerData from a peer.
*/
bool gotLedgerData (LedgerHash const& hash,
boost::shared_ptr<Peer> peer,
boost::shared_ptr<protocol::TMLedgerData> packet_ptr)
{
protocol::TMLedgerData& packet = *packet_ptr;
Peer::pointer peer = wPeer.lock ();
WriteLog (lsTRACE, InboundLedger) << "Got data (" << packet.nodes ().size () << ") for acquiring ledger: " << hash;
@@ -142,107 +175,24 @@ public:
if (!ledger)
{
WriteLog (lsTRACE, InboundLedger) << "Got data for ledger we're not acquiring";
WriteLog (lsTRACE, InboundLedger) << "Got data for ledger we're no longer acquiring";
if (peer)
// If it's state node data, stash it because it still might be useful
if (packet.type () == protocol::liAS_NODE)
{
peer->charge (Resource::feeInvalidRequest);
getApp().getJobQueue().addJob(jtLEDGER_DATA, "gotStaleData",
BIND_TYPE(&InboundLedgers::gotStaleData, this, packet_ptr));
}
return;
return false;
}
ledger->noAwaitData ();
// Stash the data for later processing and see if we need to dispatch
if (ledger->gotData(boost::weak_ptr<Peer>(peer), packet_ptr))
getApp().getJobQueue().addJob (jtLEDGER_DATA, "processLedgerData",
BIND_TYPE (&InboundLedgers::doLedgerData, this, P_1, hash));
if (!peer)
return;
if (packet.type () == protocol::liBASE)
{
if (packet.nodes_size () < 1)
{
WriteLog (lsWARNING, InboundLedger) << "Got empty base data";
peer->charge (Resource::feeInvalidRequest);
return;
}
if (!ledger->takeBase (packet.nodes (0).nodedata ()))
{
WriteLog (lsWARNING, InboundLedger) << "Got invalid base data";
peer->charge (Resource::feeInvalidRequest);
return;
}
SHAMapAddNode san = SHAMapAddNode::useful ();
if ((packet.nodes ().size () > 1) && !ledger->takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san))
{
WriteLog (lsWARNING, InboundLedger) << "Included ASbase invalid";
}
if ((packet.nodes ().size () > 2) && !ledger->takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san))
{
WriteLog (lsWARNING, InboundLedger) << "Included TXbase invalid";
}
if (!san.isInvalid ())
{
ledger->progress ();
ledger->trigger (peer);
}
else
WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid base data";
return;
}
if ((packet.type () == protocol::liTX_NODE) || (packet.type () == protocol::liAS_NODE))
{
std::list<SHAMapNode> nodeIDs;
std::list< Blob > nodeData;
if (packet.nodes ().size () <= 0)
{
WriteLog (lsINFO, InboundLedger) << "Got response with no nodes";
peer->charge (Resource::feeInvalidRequest);
return;
}
for (int i = 0; i < packet.nodes ().size (); ++i)
{
const protocol::TMLedgerNode& node = packet.nodes (i);
if (!node.has_nodeid () || !node.has_nodedata ())
{
WriteLog (lsWARNING, InboundLedger) << "Got bad node";
peer->charge (Resource::feeInvalidRequest);
return;
}
nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ()));
nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ()));
}
SHAMapAddNode ret;
if (packet.type () == protocol::liTX_NODE)
ledger->takeTxNode (nodeIDs, nodeData, ret);
else
ledger->takeAsNode (nodeIDs, nodeData, ret);
if (!ret.isInvalid ())
{
ledger->progress ();
ledger->trigger (peer);
}
else
WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid node data";
return;
}
WriteLog (lsWARNING, InboundLedger) << "Not sure what ledger data we got";
peer->charge (Resource::feeInvalidRequest);
return true;
}
int getFetchCount (int& timeoutCount)
@@ -283,6 +233,47 @@ public:
return mRecentFailures.isPresent (h, false);
}
void doLedgerData (Job&, LedgerHash hash)
{
InboundLedger::pointer ledger = find (hash);
if (ledger)
ledger->runData ();
}
/** We got some data for a ledger we are no longer acquiring
Since we paid the price to receive it, we might as well stash it in case we need it.
Nodes are received in wire format and must be stashed/hashed in prefix format
*/
void gotStaleData (boost::shared_ptr<protocol::TMLedgerData> packet_ptr)
{
const uint256 uZero;
try
{
for (int i = 0; i < packet_ptr->nodes ().size (); ++i)
{
const protocol::TMLedgerNode& node = packet_ptr->nodes (i);
if (!node.has_nodeid () || !node.has_nodedata ())
return;
Serializer s;
SHAMapTreeNode newNode(
SHAMapNode (node.nodeid().data(), node.nodeid().size()),
Blob (node.nodedata().begin(), node.nodedata().end()),
0, snfWIRE, uZero, false);
newNode.addRaw(s, snfPREFIX);
boost::shared_ptr<Blob> blob = boost::make_shared<Blob> (s.begin(), s.end());
getApp().getOPs().addFetchPack (newNode.getNodeHash(), blob);
}
}
catch (...)
{
}
}
void clearFailures ()
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
@@ -399,6 +390,9 @@ private:
MapType mLedgers;
KeyCache <uint256, UptimeTimerAdapter> mRecentFailures;
uint256 mConsensusLedger;
uint256 mValidationLedger;
};
//------------------------------------------------------------------------------

View File

@@ -34,24 +34,30 @@ public:
// VFALCO TODO Should this be called findOrAdd ?
//
virtual InboundLedger::pointer findCreate (uint256 const& hash,
uint32 seq,
bool bCouldBeNew) = 0;
uint32 seq, bool bCouldBeNew) = 0;
virtual InboundLedger::pointer find (uint256 const& hash) = 0;
virtual InboundLedger::pointer find (LedgerHash const& hash) = 0;
virtual bool hasLedger (LedgerHash const& ledgerHash) = 0;
virtual void dropLedger (LedgerHash const& ledgerHash) = 0;
virtual InboundLedger::pointer findCreateConsensusLedger (
LedgerHash const& hash) = 0;
virtual InboundLedger::pointer findCreateValidationLedger (
LedgerHash const& hash) = 0;
virtual bool awaitLedgerData (LedgerHash const& ledgerHash) = 0;
virtual void dropLedger (LedgerHash const& ledgerHash) = 0;
// VFALCO TODO Why is hash passed by value?
// VFALCO TODO Remove the dependency on the Peer object.
//
virtual void gotLedgerData (Job& job,
LedgerHash hash,
boost::shared_ptr <protocol::TMLedgerData> packet,
boost::weak_ptr<Peer> peer) = 0;
virtual bool gotLedgerData (LedgerHash const& ledgerHash,
boost::shared_ptr<Peer>,
boost::shared_ptr <protocol::TMLedgerData>) = 0;
virtual void doLedgerData (Job&, LedgerHash hash) = 0;
virtual void gotStaleData (
boost::shared_ptr <protocol::TMLedgerData> packet) = 0;
virtual int getFetchCount (int& timeoutCount) = 0;
@@ -66,6 +72,7 @@ public:
virtual void gotFetchPack (Job&) = 0;
virtual void sweep () = 0;
virtual void onStop() = 0;
};
#endif

View File

@@ -628,7 +628,7 @@ public:
if (!ledger)
{
InboundLedger::pointer l = getApp().getInboundLedgers().findCreate(hash, 0, false);
InboundLedger::pointer l = getApp().getInboundLedgers().findCreateValidationLedger(hash);
if (l && l->isComplete() && !l->isFailed())
ledger = l->getLedger();
else

View File

@@ -50,7 +50,7 @@ public:
, mLastCloseConvergeTime (1000 * LEDGER_IDLE_INTERVAL)
, mLastCloseTime (0)
, mLastValidationTime (0)
, mFetchPack ("FetchPack", 2048, 20)
, mFetchPack ("FetchPack", 65536, 45)
, mFetchSeq (0)
, mLastLoadBase (256)
, mLastLoadFactor (256)
@@ -3066,6 +3066,11 @@ int NetworkOPsImp::getFetchSize ()
void NetworkOPsImp::gotFetchPack (bool progress, uint32 seq)
{
// FIXME: Calling this function more than once will result in
// InboundLedgers::gotFetchPack being called more than once
// which is expensive. A flag should track whether we've already dispatched
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "gotFetchPack",
BIND_TYPE (&InboundLedgers::gotFetchPack, &getApp().getInboundLedgers (), P_1));
}

View File

@@ -122,7 +122,7 @@ private:
WriteLog (lsDEBUG, Validations) << "Val for " << hash << " from " << signer.humanNodePublic ()
<< " added " << (val->isTrusted () ? "trusted/" : "UNtrusted/") << (isCurrent ? "current" : "stale");
if (val->isTrusted ())
if (val->isTrusted () && isCurrent)
getApp().getLedgerMaster ().checkAccept (hash);
// FIXME: This never forwards untrusted validations

View File

@@ -2428,14 +2428,9 @@ void PeerImp::recvLedger (const boost::shared_ptr<protocol::TMLedgerData>& packe
return;
}
if (getApp().getInboundLedgers ().awaitLedgerData (hash))
{
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "gotLedgerData",
BIND_TYPE (&InboundLedgers::gotLedgerData, &getApp().getInboundLedgers (),
P_1, hash, packet_ptr, boost::weak_ptr<Peer> (shared_from_this ())));
}
else
if (!getApp().getInboundLedgers ().gotLedgerData (hash, shared_from_this(), packet_ptr))
{
WriteLog (lsINFO, Peer) << "Got data for unwanted ledger";
charge (Resource::feeUnwantedData);
}
}

View File

@@ -28,9 +28,10 @@ PeerSet::PeerSet (uint256 const& hash, int interval, bool txnData)
, mFailed (false)
, mAggressive (false)
, mTxnData (txnData)
, mProgress (false)
, mTimer (getApp().getIOService ())
{
mLastAction = mLastProgress = UptimeTimer::getInstance ().getElapsedSeconds ();
mLastAction = UptimeTimer::getInstance ().getElapsedSeconds ();
assert ((mTimerInterval > 10) && (mTimerInterval < 30000));
}
@@ -71,7 +72,10 @@ void PeerSet::invokeOnTimer ()
onTimer (false, sl);
}
else
{
clearProgress ();
onTimer (true, sl);
}
if (!isDone ())
setTimer ();
@@ -88,7 +92,7 @@ void PeerSet::TimerEntry (boost::weak_ptr<PeerSet> wptr, const boost::system::er
{
if (ptr->mTxnData)
{
getApp().getJobQueue ().addJob (jtTXN_DATA, "timerEntry",
getApp().getJobQueue ().addJob (jtTXN_DATA, "timerEntryTxn",
BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr));
}
else
@@ -101,7 +105,7 @@ void PeerSet::TimerEntry (boost::weak_ptr<PeerSet> wptr, const boost::system::er
ptr->setTimer ();
}
else
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntry",
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntryLgr",
BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr));
}
}

View File

@@ -47,12 +47,16 @@ public:
bool isActive ();
void progress ()
{
mLastProgress = UptimeTimer::getInstance().getElapsedSeconds();
mProgress = true;
mAggressive = false;
}
void clearProgress ()
{
mProgress = false;
}
bool isProgress ()
{
return (mLastProgress + (mTimerInterval / 1000)) > UptimeTimer::getInstance().getElapsedSeconds();
return mProgress;
}
void touch ()
{
@@ -114,7 +118,7 @@ protected:
bool mAggressive;
bool mTxnData;
int mLastAction;
int mLastProgress;
bool mProgress;
// VFALCO TODO move the responsibility for the timer to a higher level

View File

@@ -25,77 +25,110 @@ class SHAMapAddNode
{
public:
SHAMapAddNode ()
: mInvalid (false)
, mUseful (false)
: mGood (0)
, mBad (0)
, mDuplicate (0)
{
}
void setInvalid ()
SHAMapAddNode (int good, int bad, int duplicate)
: mGood(good)
, mBad(bad)
, mDuplicate(duplicate)
{
mInvalid = true;
}
void setUseful ()
void incInvalid ()
{
mUseful = true;
++mBad;
}
void incUseful ()
{
++mGood;
}
void incDuplicate ()
{
++mDuplicate;
}
void reset ()
{
mInvalid = false;
mUseful = false;
mGood = mBad = mDuplicate = 0;
}
int getGood ()
{
return mGood;
}
bool isInvalid () const
{
return mInvalid;
return mBad > 0;
}
bool isUseful () const
{
return mUseful;
return mGood > 0;
}
bool combine (SHAMapAddNode const& n)
SHAMapAddNode& operator+= (SHAMapAddNode const& n)
{
// VFALCO NOTE What is the meaning of these lines?
mGood += n.mGood;
mBad += n.mBad;
mDuplicate += n.mDuplicate;
if (n.mInvalid)
{
mInvalid = true;
return false;
}
if (n.mUseful)
mUseful = true;
return true;
return *this;
}
operator bool () const
bool isGood () const
{
return !mInvalid;
return (mGood + mDuplicate) > mBad;
}
static SHAMapAddNode okay ()
static SHAMapAddNode duplicate ()
{
return SHAMapAddNode (false, false);
return SHAMapAddNode (0, 0, 1);
}
static SHAMapAddNode useful ()
{
return SHAMapAddNode (false, true);
return SHAMapAddNode (1, 0, 0);
}
static SHAMapAddNode invalid ()
{
return SHAMapAddNode (true, false);
return SHAMapAddNode (0, 1, 0);
}
std::string get ()
{
std::string ret;
if (mGood > 0)
{
ret.append("good:");
ret.append(lexicalCastThrow<std::string>(mGood));
}
if (mBad > 0)
{
if (!ret.empty())
ret.append(" ");
ret.append("bad:");
ret.append(lexicalCastThrow<std::string>(mBad));
}
if (mDuplicate > 0)
{
if (!ret.empty())
ret.append(" ");
ret.append("dupe:");
ret.append(lexicalCastThrow<std::string>(mDuplicate));
}
if (ret.empty ())
ret = "no nodes processed";
return ret;
}
private:
SHAMapAddNode (bool i, bool u)
: mInvalid (i)
, mUseful (u)
{
}
bool mInvalid;
bool mUseful;
int mGood;
int mBad;
int mDuplicate;
};
#endif

View File

@@ -101,6 +101,7 @@ void SHAMap::getMissingNodes (std::vector<SHAMapNode>& nodeIDs, std::vector<uint
ScopedLockType sl (mLock, __FILE__, __LINE__);
assert (root->isValid ());
assert (root->getNodeHash().isNonZero ());
if (root->isFullBelow ())
{
@@ -260,7 +261,7 @@ SHAMapAddNode SHAMap::addRootNode (Blob const& rootNode, SHANodeFormat format,
if (root->getNodeHash ().isNonZero ())
{
WriteLog (lsTRACE, SHAMap) << "got root node, already have one";
return SHAMapAddNode::okay ();
return SHAMapAddNode::duplicate ();
}
assert (mSeq >= 1);
@@ -277,12 +278,10 @@ SHAMapAddNode SHAMap::addRootNode (Blob const& rootNode, SHANodeFormat format,
root = node;
mTNByID[*root] = root;
if (root->getNodeHash ().isZero ())
{
root->setFullBelow ();
if (root->isLeaf())
clearSynching ();
}
else if (filter)
if (filter)
{
Serializer s;
root->addRaw (s, snfPREFIX);
@@ -302,7 +301,7 @@ SHAMapAddNode SHAMap::addRootNode (uint256 const& hash, Blob const& rootNode, SH
{
WriteLog (lsTRACE, SHAMap) << "got root node, already have one";
assert (root->getNodeHash () == hash);
return SHAMapAddNode::okay ();
return SHAMapAddNode::duplicate ();
}
assert (mSeq >= 1);
@@ -315,12 +314,10 @@ SHAMapAddNode SHAMap::addRootNode (uint256 const& hash, Blob const& rootNode, SH
root = node;
mTNByID[*root] = root;
if (root->getNodeHash ().isZero ())
{
root->setFullBelow ();
if (root->isLeaf())
clearSynching ();
}
else if (filter)
if (filter)
{
Serializer s;
root->addRaw (s, snfPREFIX);
@@ -338,13 +335,13 @@ SHAMapAddNode SHAMap::addKnownNode (const SHAMapNode& node, Blob const& rawNode,
if (!isSynching ())
{
WriteLog (lsTRACE, SHAMap) << "AddKnownNode while not synching";
return SHAMapAddNode::okay ();
return SHAMapAddNode::duplicate ();
}
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (checkCacheNode (node)) // Do we already have this node?
return SHAMapAddNode::okay ();
return SHAMapAddNode::duplicate ();
SHAMapTreeNode::pointer parent = checkCacheNode(node.getParentNodeID());
SHAMapTreeNode* iNode = parent ? parent.get() : root.get ();
@@ -361,7 +358,7 @@ SHAMapAddNode SHAMap::addKnownNode (const SHAMapNode& node, Blob const& rawNode,
}
if (fullBelowCache.isPresent (iNode->getChildHash (branch)))
return SHAMapAddNode::okay ();
return SHAMapAddNode::duplicate ();
SHAMapTreeNode *nextNode = getNodePointerNT (iNode->getChildNodeID (branch), iNode->getChildHash (branch), filter);
if (!nextNode)
@@ -400,7 +397,7 @@ SHAMapAddNode SHAMap::addKnownNode (const SHAMapNode& node, Blob const& rawNode,
}
WriteLog (lsTRACE, SHAMap) << "got node, already had it (late)";
return SHAMapAddNode::okay ();
return SHAMapAddNode::duplicate ();
}
bool SHAMap::deepCompare (SHAMap& other)
@@ -737,7 +734,7 @@ public:
unexpected (gotNodes.size () < 1, "NodeSize");
unexpected (!destination.addRootNode (*gotNodes.begin (), snfWIRE, NULL), "AddRootNode");
unexpected (!destination.addRootNode (*gotNodes.begin (), snfWIRE, NULL).isGood(), "AddRootNode");
nodeIDs.clear ();
gotNodes.clear ();
@@ -791,7 +788,7 @@ public:
bytes += rawNodeIterator->size ();
#endif
if (!destination.addKnownNode (*nodeIDIterator, *rawNodeIterator, NULL))
if (!destination.addKnownNode (*nodeIDIterator, *rawNodeIterator, NULL).isGood ())
{
WriteLog (lsTRACE, SHAMap) << "AddKnownNode fails";
fail ("AddKnownNode");

View File

@@ -122,9 +122,14 @@ boost::weak_ptr<PeerSet> TransactionAcquire::pmDowncast ()
void TransactionAcquire::trigger (Peer::ref peer)
{
if (mComplete || mFailed)
if (mComplete)
{
WriteLog (lsINFO, TransactionAcquire) << "complete or failed";
WriteLog (lsINFO, TransactionAcquire) << "trigger after complete";
return;
}
if (mFailed)
{
WriteLog (lsINFO, TransactionAcquire) << "trigger after fail";
return;
}
@@ -167,7 +172,9 @@ void TransactionAcquire::trigger (Peer::ref peer)
tmGL.set_querytype (protocol::qtINDIRECT);
BOOST_FOREACH (SHAMapNode & it, nodeIDs)
* (tmGL.add_nodeids ()) = it.getRawString ();
{
* (tmGL.add_nodeids ()) = it.getRawString ();
}
sendRequest (tmGL, peer);
}
}
@@ -201,20 +208,15 @@ SHAMapAddNode TransactionAcquire::takeNodes (const std::list<SHAMapNode>& nodeID
if (nodeIDit->isRoot ())
{
if (mHaveRoot)
{
WriteLog (lsWARNING, TransactionAcquire) << "Got root TXS node, already have it";
return SHAMapAddNode ();
}
if (!mMap->addRootNode (getHash (), *nodeDatait, snfWIRE, NULL))
WriteLog (lsDEBUG, TransactionAcquire) << "Got root TXS node, already have it";
else if (!mMap->addRootNode (getHash (), *nodeDatait, snfWIRE, NULL).isGood())
{
WriteLog (lsWARNING, TransactionAcquire) << "TX acquire got bad root node";
return SHAMapAddNode::invalid ();
}
else
mHaveRoot = true;
}
else if (!mMap->addKnownNode (*nodeIDit, *nodeDatait, &sf))
else if (!mMap->addKnownNode (*nodeIDit, *nodeDatait, &sf).isGood())
{
WriteLog (lsWARNING, TransactionAcquire) << "TX acquire got bad non-root node";
return SHAMapAddNode::invalid ();