Cleanup ledger fetching

This commit is contained in:
Nik Bougalis
2016-02-02 23:06:16 -08:00
parent ce31e26f58
commit 35ed095dbf
10 changed files with 219 additions and 355 deletions

View File

@@ -26,6 +26,7 @@
#include <ripple/basics/CountedObject.h>
#include <mutex>
#include <set>
#include <utility>
namespace ripple {
@@ -65,34 +66,15 @@ public:
{
return mHaveHeader;
}
bool isAcctStComplete () const
{
return mHaveState;
}
bool isTransComplete () const
{
return mHaveTransactions;
}
bool isDone () const
{
return mAborted || isComplete () || isFailed ();
}
Ledger::ref getLedger () const
{
return mLedger;
}
void abort ()
{
mAborted = true;
}
std::uint32_t getSeq () const
{
return mSeq;
}
// VFALCO TODO Make this the Listener / Observer pattern
bool addOnComplete (std::function<void (InboundLedger::pointer)>);
enum class TriggerReason { trAdded, trReply, trTimeout };
void trigger (Peer::ptr const&, TriggerReason);
@@ -108,16 +90,15 @@ public:
std::vector<neededHash_t> getNeededHashes ();
// VFALCO TODO Replace uint256 with something semanticallyh meaningful
void filterNodes (
std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>& nodeHashes,
TriggerReason reason);
/** Return a Json::objectValue. */
Json::Value getJson (int);
void runData ();
private:
void filterNodes (
std::vector<std::pair<SHAMapNodeID, uint256>>& nodes,
TriggerReason reason);
void done ();
void onTimer (bool progress, ScopedLockType& peerSetLock);
@@ -154,7 +135,6 @@ private:
bool mHaveHeader;
bool mHaveState;
bool mHaveTransactions;
bool mAborted;
bool mSignaled;
bool mByHash;
std::uint32_t mSeq;
@@ -168,8 +148,6 @@ private:
std::recursive_mutex mReceivedDataLock;
std::vector <PeerDataPairType> mReceivedData;
bool mReceiveDispatched;
std::vector <std::function <void (InboundLedger::pointer)> > mOnComplete;
};
} // ripple

View File

@@ -43,7 +43,7 @@ public:
virtual Ledger::pointer acquire (uint256 const& hash,
std::uint32_t seq, InboundLedger::fcReason) = 0;
virtual InboundLedger::pointer find (LedgerHash const& hash) = 0;
virtual std::shared_ptr<InboundLedger> find (LedgerHash const& hash) = 0;
virtual bool hasLedger (LedgerHash const& ledgerHash) = 0;

View File

@@ -33,6 +33,7 @@
#include <ripple/protocol/HashPrefix.h>
#include <ripple/protocol/JsonFields.h>
#include <ripple/nodestore/Database.h>
#include <algorithm>
namespace ripple {
@@ -70,7 +71,6 @@ InboundLedger::InboundLedger (
, mHaveHeader (false)
, mHaveState (false)
, mHaveTransactions (false)
, mAborted (false)
, mSignaled (false)
, mByHash (true)
, mSeq (seq)
@@ -140,7 +140,7 @@ void InboundLedger::init (ScopedLockType& collectionLock)
}
else if (!isFailed ())
{
if (m_journal.debug) m_journal.debug <<
JLOG (m_journal.debug) <<
"Acquiring ledger we already have locally: " << getHash ();
mLedger->setClosed ();
mLedger->setImmutable (app_.config());
@@ -177,7 +177,7 @@ bool InboundLedger::tryLocal ()
if (!app_.getLedgerMaster ().getFetchPack (mHash, data))
return false;
if (m_journal.trace) m_journal.trace <<
JLOG (m_journal.trace) <<
"Ledger header found in fetch pack";
mLedger = std::make_shared<Ledger> (
data.data(), data.size(), true,
@@ -195,7 +195,7 @@ bool InboundLedger::tryLocal ()
if (mLedger->getHash () != mHash)
{
// We know for a fact the ledger can never be acquired
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
mHash << " cannot be a ledger";
mFailed = true;
return true;
@@ -208,7 +208,7 @@ bool InboundLedger::tryLocal ()
{
if (mLedger->info().txHash.isZero ())
{
if (m_journal.trace) m_journal.trace <<
JLOG (m_journal.trace) <<
"No TXNs to fetch";
mHaveTransactions = true;
}
@@ -223,7 +223,7 @@ bool InboundLedger::tryLocal ()
if (h.empty ())
{
if (m_journal.trace) m_journal.trace <<
JLOG (m_journal.trace) <<
"Had full txn map locally";
mHaveTransactions = true;
}
@@ -235,7 +235,7 @@ bool InboundLedger::tryLocal ()
{
if (mLedger->info().accountHash.isZero ())
{
if (m_journal.fatal) m_journal.fatal <<
JLOG (m_journal.fatal) <<
"We are acquiring a ledger with a zero account hash";
mFailed = true;
return true;
@@ -251,7 +251,7 @@ bool InboundLedger::tryLocal ()
if (h.empty ())
{
if (m_journal.trace) m_journal.trace <<
JLOG (m_journal.trace) <<
"Had full AS map locally";
mHaveState = true;
}
@@ -261,7 +261,7 @@ bool InboundLedger::tryLocal ()
if (mHaveTransactions && mHaveState)
{
if (m_journal.debug) m_journal.debug <<
JLOG (m_journal.debug) <<
"Had everything locally";
mComplete = true;
mLedger->setClosed ();
@@ -279,7 +279,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
if (isDone())
{
if (m_journal.info) m_journal.info <<
JLOG (m_journal.info) <<
"Already done " << mHash;
return;
}
@@ -288,12 +288,12 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
{
if (mSeq != 0)
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
getTimeouts() << " timeouts for ledger " << mSeq;
}
else
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
getTimeouts() << " timeouts for ledger " << mHash;
}
setFailed ();
@@ -337,24 +337,6 @@ std::weak_ptr<PeerSet> InboundLedger::pmDowncast ()
return std::dynamic_pointer_cast<PeerSet> (shared_from_this ());
}
/** Dispatch acquire completion
*/
static void LADispatch (
InboundLedger::pointer la,
std::vector< std::function<void (InboundLedger::pointer)> > trig)
{
if (la->isComplete() && !la->isFailed())
{
la->app().getLedgerMaster().checkAccept(la->getLedger());
la->app().getLedgerMaster().tryAdvance();
}
else
la->app().getInboundLedgers().logFailure (la->getHash(), la->getSeq());
for (unsigned int i = 0; i < trig.size (); ++i)
trig[i] (la);
}
void InboundLedger::done ()
{
if (mSignaled)
@@ -373,12 +355,6 @@ void InboundLedger::done ()
assert (isComplete () || isFailed ());
std::vector< std::function<void (InboundLedger::pointer)> > triggers;
{
ScopedLockType sl (mLock);
triggers.swap (mOnComplete);
}
if (isComplete () && !isFailed () && mLedger)
{
mLedger->setClosed ();
@@ -389,22 +365,20 @@ void InboundLedger::done ()
}
// We hold the PeerSet lock, so must dispatch
auto that = shared_from_this ();
app_.getJobQueue ().addJob (
jtLEDGER_DATA, "triggers",
[that, triggers] (Job&) { LADispatch(that, triggers); });
}
bool InboundLedger::addOnComplete (
std::function <void (InboundLedger::pointer)> triggerFunc)
{
ScopedLockType sl (mLock);
if (isDone ())
return false;
mOnComplete.push_back (triggerFunc);
return true;
jtLEDGER_DATA, "AcquisitionDone",
[self = shared_from_this()](Job&)
{
if (self->isComplete() && !self->isFailed())
{
self->app().getLedgerMaster().checkAccept(
self->getLedger());
self->app().getLedgerMaster().tryAdvance();
}
else
self->app().getInboundLedgers().logFailure (
self->getHash(), self->getSeq());
});
}
/** Request more nodes, perhaps from a specific peer
@@ -415,9 +389,10 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
if (isDone ())
{
if (m_journal.debug) m_journal.debug <<
"Trigger on ledger: " << mHash << (mAborted ? " aborted" : "") <<
(mComplete ? " completed" : "") << (mFailed ? " failed" : "");
JLOG (m_journal.debug) <<
"Trigger on ledger: " << mHash <<
(mComplete ? " completed" : "") <<
(mFailed ? " failed" : "");
return;
}
@@ -445,7 +420,7 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
if (mFailed)
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
" failed local for " << mHash;
return;
}
@@ -471,8 +446,8 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
bool typeSet = false;
for (auto& p : need)
{
if (m_journal.warning) m_journal.warning
<< "Want: " << p.second;
JLOG (m_journal.warning) <<
"Want: " << p.second;
if (!typeSet)
{
@@ -503,12 +478,12 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
}
}
}
if (m_journal.info) m_journal.info <<
JLOG (m_journal.info) <<
"Attempting by hash fetch for ledger " << mHash;
}
else
{
if (m_journal.info) m_journal.info <<
JLOG (m_journal.info) <<
"getNeededHashes says acquire is complete";
mHaveHeader = true;
mHaveTransactions = true;
@@ -523,9 +498,9 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
if (!mHaveHeader && !mFailed)
{
tmGL.set_itype (protocol::liBASE);
if (m_journal.trace) m_journal.trace
<< "Sending header request to "
<< (peer ? "selected peer" : "all peers");
JLOG (m_journal.trace) <<
"Sending header request to " <<
(peer ? "selected peer" : "all peers");
sendRequest (tmGL, peer);
return;
}
@@ -561,30 +536,26 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
// we need the root node
tmGL.set_itype (protocol::liAS_NODE);
*tmGL.add_nodeids () = SHAMapNodeID ().getRawString ();
if (m_journal.trace) m_journal.trace
<< "Sending AS root request to "
<< (peer ? "selected peer" : "all peers");
JLOG (m_journal.trace) <<
"Sending AS root request to " <<
(peer ? "selected peer" : "all peers");
sendRequest (tmGL, peer);
return;
}
else
{
std::vector<SHAMapNodeID> nodeIDs;
std::vector<uint256> nodeHashes;
nodeIDs.reserve (missingNodesFind);
nodeHashes.reserve (missingNodesFind);
AccountStateSF filter(app_);
// Release the lock while we process the large state map
sl.unlock();
mLedger->stateMap().getMissingNodes (
nodeIDs, nodeHashes, missingNodesFind, &filter);
auto nodes = mLedger->stateMap().getMissingNodes (
missingNodesFind, &filter);
sl.lock();
// Make sure nothing happened while we released the lock
if (!mFailed && !mComplete && !mHaveState)
{
if (nodeIDs.empty ())
if (nodes.empty ())
{
if (!mLedger->stateMap().isValid ())
mFailed = true;
@@ -598,28 +569,28 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
}
else
{
filterNodes (nodeIDs, nodeHashes, reason);
filterNodes (nodes, reason);
if (!nodeIDs.empty ())
if (!nodes.empty ())
{
tmGL.set_itype (protocol::liAS_NODE);
for (auto const& id : nodeIDs)
for (auto const& id : nodes)
{
* (tmGL.add_nodeids ()) = id.getRawString ();
* (tmGL.add_nodeids ()) = id.first.getRawString ();
}
if (m_journal.trace) m_journal.trace <<
"Sending AS node " << nodeIDs.size () <<
" request to " << (
peer ? "selected peer" : "all peers");
if (nodeIDs.size () == 1 && m_journal.trace)
m_journal.trace << "AS node: " << nodeIDs[0];
JLOG (m_journal.trace) <<
"Sending AS node request (" <<
nodes.size () << ") to " <<
(peer ? "selected peer" : "all peers");
sendRequest (tmGL, peer);
return;
}
else
if (m_journal.trace) m_journal.trace <<
{
JLOG (m_journal.trace) <<
"All AS nodes filtered";
}
}
}
}
@@ -638,7 +609,7 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
// we need the root node
tmGL.set_itype (protocol::liTX_NODE);
* (tmGL.add_nodeids ()) = SHAMapNodeID ().getRawString ();
if (m_journal.trace) m_journal.trace <<
JLOG (m_journal.trace) <<
"Sending TX root request to " << (
peer ? "selected peer" : "all peers");
sendRequest (tmGL, peer);
@@ -646,15 +617,12 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
}
else
{
std::vector<SHAMapNodeID> nodeIDs;
std::vector<uint256> nodeHashes;
nodeIDs.reserve (missingNodesFind);
nodeHashes.reserve (missingNodesFind);
TransactionStateSF filter(app_);
mLedger->txMap().getMissingNodes (
nodeIDs, nodeHashes, missingNodesFind, &filter);
if (nodeIDs.empty ())
auto nodes = mLedger->txMap().getMissingNodes (
missingNodesFind, &filter);
if (nodes.empty ())
{
if (!mLedger->txMap().isValid ())
mFailed = true;
@@ -668,32 +636,34 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
}
else
{
filterNodes (nodeIDs, nodeHashes, reason);
filterNodes (nodes, reason);
if (!nodeIDs.empty ())
if (!nodes.empty ())
{
tmGL.set_itype (protocol::liTX_NODE);
for (auto const& id : nodeIDs)
for (auto const& n : nodes)
{
* (tmGL.add_nodeids ()) = id.getRawString ();
* (tmGL.add_nodeids ()) = n.first.getRawString ();
}
if (m_journal.trace) m_journal.trace <<
"Sending TX node " << nodeIDs.size () <<
" request to " << (
peer ? "selected peer" : "all peers");
JLOG (m_journal.trace) <<
"Sending TX node request (" <<
nodes.size () << ") to " <<
(peer ? "selected peer" : "all peers");
sendRequest (tmGL, peer);
return;
}
else
if (m_journal.trace) m_journal.trace <<
{
JLOG (m_journal.trace) <<
"All TX nodes filtered";
}
}
}
}
if (mComplete || mFailed)
{
if (m_journal.debug) m_journal.debug <<
JLOG (m_journal.debug) <<
"Done:" << (mComplete ? " complete" : "") <<
(mFailed ? " failed " : " ") <<
mLedger->info().seq;
@@ -702,82 +672,50 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
}
}
void InboundLedger::filterNodes (std::vector<SHAMapNodeID>& nodeIDs,
std::vector<uint256>& nodeHashes, TriggerReason reason)
void InboundLedger::filterNodes (
std::vector<std::pair<SHAMapNodeID, uint256>>& nodes,
TriggerReason reason)
{
// ask for new nodes in preference to ones we've already asked for
assert (nodeIDs.size () == nodeHashes.size ());
int const max = (reason == TriggerReason::trReply) ?
reqNodesReply : reqNodes;
bool const aggressive =
(reason == TriggerReason::trTimeout);
std::vector<bool> duplicates;
duplicates.reserve (nodeIDs.size ());
int dupCount = 0;
for (auto const& nodeHash : nodeHashes)
{
if (mRecentNodes.count (nodeHash) != 0)
// Sort nodes so that the ones we haven't recently
// requested come before the ones we have.
auto dup = std::stable_partition (
nodes.begin(), nodes.end(),
[this](auto const& item)
{
duplicates.push_back (true);
++dupCount;
}
else
duplicates.push_back (false);
}
return mRecentNodes.count (item.second) == 0;
});
if (dupCount == nodeIDs.size ())
// If everything is a duplicate we don't want to send
// any query at all except on a timeout where we need
// to query everyone:
if (dup == nodes.begin ())
{
// all duplicates
// we don't want to send any query at all
// except on a timeout, where we need to query everyone
if (! aggressive)
JLOG (m_journal.trace) <<
"filterNodes: all duplicates";
if (reason != TriggerReason::trTimeout)
{
nodeIDs.clear ();
nodeHashes.clear ();
if (m_journal.trace) m_journal.trace <<
"filterNodes: all are duplicates";
nodes.clear ();
return;
}
}
else if (dupCount > 0)
else
{
// some, but not all, duplicates
int insertPoint = 0;
JLOG (m_journal.trace) <<
"filterNodes: pruning duplicates";
for (unsigned int i = 0; i < nodeIDs.size (); ++i)
if (!duplicates[i])
{
// Keep this node
if (insertPoint != i)
{
nodeIDs[insertPoint] = nodeIDs[i];
nodeHashes[insertPoint] = nodeHashes[i];
}
if (++insertPoint >= max)
break;
}
if (m_journal.trace) m_journal.trace <<
"filterNodes " << nodeIDs.size () << " to " << insertPoint;
nodeIDs.resize (insertPoint);
nodeHashes.resize (insertPoint);
nodes.erase (dup, nodes.end());
}
if (nodeIDs.size () > max)
{
nodeIDs.resize (max);
nodeHashes.resize (max);
}
std::size_t const limit = (reason == TriggerReason::trReply)
? reqNodesReply
: reqNodes;
for (auto const& nodeHash : nodeHashes)
{
mRecentNodes.insert (nodeHash);
}
if (nodes.size () > limit)
nodes.resize (limit);
for (auto const& n : nodes)
mRecentNodes.insert (n.second);
}
/** Take ledger header data
@@ -787,7 +725,7 @@ void InboundLedger::filterNodes (std::vector<SHAMapNodeID>& nodeIDs,
bool InboundLedger::takeHeader (std::string const& data)
{
// Return value: true=normal, false=bad data
if (m_journal.trace) m_journal.trace <<
JLOG (m_journal.trace) <<
"got header acquiring ledger " << mHash;
if (mComplete || mFailed || mHaveHeader)
@@ -799,10 +737,9 @@ bool InboundLedger::takeHeader (std::string const& data)
if (mLedger->getHash () != mHash)
{
if (m_journal.warning) m_journal.warning <<
"Acquire hash mismatch";
if (m_journal.warning) m_journal.warning <<
mLedger->getHash () << "!=" << mHash;
JLOG (m_journal.warning) <<
"Acquire hash mismatch: " << mLedger->getHash () <<
"!=" << mHash;
mLedger.reset ();
return false;
}
@@ -833,7 +770,7 @@ bool InboundLedger::takeTxNode (const std::vector<SHAMapNodeID>& nodeIDs,
{
if (!mHaveHeader)
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"TX node without header";
san.incInvalid();
return false;
@@ -890,8 +827,9 @@ bool InboundLedger::takeTxNode (const std::vector<SHAMapNodeID>& nodeIDs,
bool InboundLedger::takeAsNode (const std::vector<SHAMapNodeID>& nodeIDs,
const std::vector< Blob >& data, SHAMapAddNode& san)
{
if (m_journal.trace) m_journal.trace <<
"got ASdata (" << nodeIDs.size () << ") acquiring ledger " << mHash;
JLOG (m_journal.trace) <<
"got ASdata (" << nodeIDs.size () <<
") acquiring ledger " << mHash;
if (nodeIDs.size () == 1 && m_journal.trace) m_journal.trace <<
"got AS node: " << nodeIDs.front ();
@@ -899,7 +837,7 @@ bool InboundLedger::takeAsNode (const std::vector<SHAMapNodeID>& nodeIDs,
if (!mHaveHeader)
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Don't have ledger header";
san.incInvalid();
return false;
@@ -923,7 +861,7 @@ bool InboundLedger::takeAsNode (const std::vector<SHAMapNodeID>& nodeIDs,
SHAMapHash{mLedger->info().accountHash}, *nodeDatait, snfWIRE, &tFilter);
if (!san.isGood ())
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Bad ledger header";
return false;
}
@@ -934,7 +872,7 @@ bool InboundLedger::takeAsNode (const std::vector<SHAMapNodeID>& nodeIDs,
*nodeIDit, *nodeDatait, &tFilter);
if (!san.isGood ())
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Unable to add AS node";
return false;
}
@@ -1078,7 +1016,7 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
{
if (packet.nodes_size () < 1)
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Got empty header data";
peer->charge (Resource::feeInvalidRequest);
return -1;
@@ -1092,7 +1030,7 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
san.incUseful ();
else
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Got invalid header data";
peer->charge (Resource::feeInvalidRequest);
return -1;
@@ -1103,14 +1041,14 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
if (!mHaveState && (packet.nodes ().size () > 1) &&
!takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san))
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Included AS root invalid";
}
if (!mHaveTransactions && (packet.nodes ().size () > 2) &&
!takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san))
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Included TX root invalid";
}
@@ -1126,7 +1064,7 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
{
if (packet.nodes ().size () == 0)
{
if (m_journal.info) m_journal.info <<
JLOG (m_journal.info) <<
"Got response with no nodes";
peer->charge (Resource::feeInvalidRequest);
return -1;
@@ -1143,7 +1081,7 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
if (!node.has_nodeid () || !node.has_nodedata ())
{
if (m_journal.warning) m_journal.warning <<
JLOG (m_journal.warning) <<
"Got bad node";
peer->charge (Resource::feeInvalidRequest);
return -1;
@@ -1160,13 +1098,13 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
if (packet.type () == protocol::liTX_NODE)
{
takeTxNode (nodeIDs, nodeData, san);
if (m_journal.debug) m_journal.debug <<
JLOG (m_journal.debug) <<
"Ledger TX node stats: " << san.get();
}
else
{
takeAsNode (nodeIDs, nodeData, san);
if (m_journal.debug) m_journal.debug <<
JLOG (m_journal.debug) <<
"Ledger AS node stats: " << san.get();
}
@@ -1207,8 +1145,7 @@ void InboundLedger::runData ()
// breaking ties in favor of the peer that responded first.
for (auto& entry : data)
{
Peer::ptr peer = entry.first.lock();
if (peer)
if (auto peer = entry.first.lock())
{
int count = processData (peer, *(entry.second));
if (count > chosenPeerCount)
@@ -1250,9 +1187,6 @@ Json::Value InboundLedger::getJson (int)
ret[jss::have_transactions] = mHaveTransactions;
}
if (mAborted)
ret[jss::aborted] = true;
ret[jss::timeouts] = getTimeouts ();
if (mHaveHeader && !mHaveState)

View File

@@ -45,7 +45,10 @@ private:
beast::Journal j_;
public:
using u256_acq_pair = std::pair<uint256, InboundLedger::pointer>;
using u256_acq_pair = std::pair<
uint256,
std::shared_ptr <InboundLedger>>;
// How long before we try again to acquire the same ledger
static const std::chrono::minutes kReacquireInterval;
@@ -66,7 +69,7 @@ public:
{
assert (hash.isNonZero ());
bool isNew = true;
InboundLedger::pointer inbound;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl (mLock);
@@ -96,11 +99,11 @@ public:
return {};
}
InboundLedger::pointer find (uint256 const& hash)
std::shared_ptr<InboundLedger> find (uint256 const& hash)
{
assert (hash.isNonZero ());
InboundLedger::pointer ret;
std::shared_ptr<InboundLedger> ret;
{
ScopedLockType sl (mLock);
@@ -157,7 +160,7 @@ public:
<< "Got data (" << packet.nodes ().size ()
<< ") for acquiring ledger: " << hash;
InboundLedger::pointer ledger = find (hash);
auto ledger = find (hash);
if (!ledger)
{
@@ -230,9 +233,7 @@ public:
void doLedgerData (LedgerHash hash)
{
InboundLedger::pointer ledger = find (hash);
if (ledger)
if (auto ledger = find (hash))
ledger->runData ();
}
@@ -341,7 +342,7 @@ public:
void gotFetchPack ()
{
std::vector<InboundLedger::pointer> acquires;
std::vector<std::shared_ptr<InboundLedger>> acquires;
{
ScopedLockType sl (mLock);
@@ -418,7 +419,7 @@ private:
using ScopedLockType = std::unique_lock <std::recursive_mutex>;
std::recursive_mutex mLock;
using MapType = hash_map <uint256, InboundLedger::pointer>;
using MapType = hash_map <uint256, std::shared_ptr<InboundLedger>>;
MapType mLedgers;
beast::aged_map <uint256, std::uint32_t> mRecentFailures;

View File

@@ -65,8 +65,6 @@ class InboundTransactionsImp
public:
Application& app_;
using u256_acq_pair = std::pair<uint256, TransactionAcquire::pointer>;
InboundTransactionsImp (
Application& app,
clock_type& clock,

View File

@@ -141,12 +141,10 @@ void TransactionAcquire::trigger (Peer::ptr const& peer)
}
else
{
std::vector<SHAMapNodeID> nodeIDs;
std::vector<uint256> nodeHashes;
ConsensusTransSetSF sf (app_, app_.getTempNodeCache ());
mMap->getMissingNodes (nodeIDs, nodeHashes, 256, &sf);
auto nodes = mMap->getMissingNodes (256, &sf);
if (nodeIDs.empty ())
if (nodes.empty ())
{
if (mMap->isValid ())
mComplete = true;
@@ -164,9 +162,9 @@ void TransactionAcquire::trigger (Peer::ptr const& peer)
if (getTimeouts () != 0)
tmGL.set_querytype (protocol::qtINDIRECT);
for (SHAMapNodeID& it : nodeIDs)
for (auto const& node : nodes)
{
*tmGL.add_nodeids () = it.getRawString ();
*tmGL.add_nodeids () = node.first.getRawString ();
}
sendRequest (tmGL, peer);
}

View File

@@ -522,7 +522,7 @@ private:
std::shared_ptr<LedgerConsensus> mLedgerConsensus;
LedgerMaster& m_ledgerMaster;
InboundLedger::pointer mAcquiringLedger;
std::shared_ptr<InboundLedger> mAcquiringLedger;
SubInfoMapType mSubAccount;
SubInfoMapType mSubRTAccount;

View File

@@ -163,8 +163,10 @@ public:
std::function<void(std::shared_ptr<SHAMapItem const> const&)> const&) const;
// comparison/sync functions
void getMissingNodes (std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>& hashes, int max,
SHAMapSyncFilter * filter);
std::vector<std::pair<SHAMapNodeID, uint256>>
getMissingNodes (
std::size_t max,
SHAMapSyncFilter *filter);
bool getNodeFat (SHAMapNodeID node,
std::vector<SHAMapNodeID>& nodeIDs,

View File

@@ -115,13 +115,14 @@ void SHAMap::visitNodes(std::function<bool (SHAMapAbstractNode&)> const& functio
but not available locally. The filter can hold alternate sources of
nodes that are not permanently stored locally
*/
void
SHAMap::getMissingNodes(std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>& hashes,
int max, SHAMapSyncFilter* filter)
std::vector<std::pair<SHAMapNodeID, uint256>>
SHAMap::getMissingNodes(std::size_t max, SHAMapSyncFilter* filter)
{
assert (root_->isValid ());
assert (root_->getNodeHash().isNonZero ());
std::vector<std::pair<SHAMapNodeID, uint256>> ret;
std::uint32_t generation = f_.fullbelow().getGeneration();
if (!root_->isInner ())
@@ -130,13 +131,13 @@ SHAMap::getMissingNodes(std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>
clearSynching();
else if (journal_.warning) journal_.warning <<
"synching empty tree";
return;
return ret;
}
if (std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow (generation))
{
clearSynching ();
return;
return ret;
}
int const maxDefer = f_.db().getDesiredAsyncReadCount ();
@@ -144,6 +145,8 @@ SHAMap::getMissingNodes(std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>
// Track the missing hashes we have found so far
std::set <SHAMapHash> missingHashes;
// preallocate memory
ret.reserve (max);
while (1)
{
@@ -190,11 +193,12 @@ SHAMap::getMissingNodes(std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>
{
if (!pending)
{ // node is not in the database
nodeIDs.push_back (childID);
hashes.push_back (childHash.as_uint256());
ret.emplace_back (
childID,
childHash.as_uint256());
if (--max <= 0)
return;
return ret;
}
else
{
@@ -274,8 +278,10 @@ SHAMap::getMissingNodes(std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>
}
else if ((max > 0) && (missingHashes.insert (nodeHash).second))
{
nodeIDs.push_back (nodeID);
hashes.push_back (nodeHash.as_uint256());
ret.push_back (
std::make_pair (
nodeID,
nodeHash.as_uint256()));
--max;
}
@@ -290,24 +296,27 @@ SHAMap::getMissingNodes(std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>
<< elapsed.count() << " + " << process_time.count() << " ms";
if (max <= 0)
return;
return ret;
}
if (nodeIDs.empty ())
if (ret.empty ())
clearSynching ();
return ret;
}
std::vector<uint256> SHAMap::getNeededHashes (int max, SHAMapSyncFilter* filter)
{
std::vector<uint256> nodeHashes;
nodeHashes.reserve(max);
auto ret = getMissingNodes(max, filter);
std::vector<SHAMapNodeID> nodeIDs;
nodeIDs.reserve(max);
std::vector<uint256> hashes;
hashes.reserve (ret.size());
getMissingNodes(nodeIDs, nodeHashes, max, filter);
return nodeHashes;
for (auto const& n : ret)
hashes.push_back (n.second);
return hashes;
}
bool SHAMap::getNodeFat (SHAMapNodeID wanted,

View File

@@ -28,10 +28,6 @@
namespace ripple {
namespace tests {
#ifdef BEAST_DEBUG
//#define SMS_DEBUG
#endif
class sync_test : public beast::unit_test::suite
{
public:
@@ -61,29 +57,23 @@ public:
if (!map.addItem (std::move(*item), false, false))
{
log <<
"Unable to add item to map";
assert (false);
log << "Unable to add item to map";
return false;
}
}
for (std::list<uint256>::iterator it = items.begin (); it != items.end (); ++it)
for (auto const& item : items)
{
if (!map.delItem (*it))
if (!map.delItem (item))
{
log <<
"Unable to remove item from map";
assert (false);
log << "Unable to remove item from map";
return false;
}
}
if (beforeHash != map.getHash ())
{
log <<
"Hashes do not match " << beforeHash << " " << map.getHash ();
assert (false);
log << "Hashes do not match " << beforeHash << " " << map.getHash ();
return false;
}
@@ -101,119 +91,73 @@ public:
for (int i = 0; i < items; ++i)
source.addItem (std::move(*makeRandomAS ()), false, false);
unexpected (!confuseMap (source, 500), "ConfuseMap");
expect (confuseMap (source, 500), "ConfuseMap");
source.setImmutable ();
std::vector<SHAMapNodeID> nodeIDs, gotNodeIDs;
std::vector< Blob > gotNodes;
std::vector<uint256> hashes;
std::vector<SHAMapNodeID>::iterator nodeIDIterator;
std::vector< Blob >::iterator rawNodeIterator;
int passes = 0;
int nodes = 0;
destination.setSynching ();
unexpected (!source.getNodeFat (
SHAMapNodeID (), nodeIDs, gotNodes,
rand_bool(), rand_int(2)), "GetNodeFat");
{
std::vector<SHAMapNodeID> gotNodeIDs;
std::vector<Blob> gotNodes;
unexpected (gotNodes.size () < 1, "NodeSize");
expect (source.getNodeFat (
SHAMapNodeID (),
gotNodeIDs,
gotNodes,
rand_bool(),
rand_int(2)), "getNodeFat (1)");
unexpected (!destination.addRootNode (source.getHash(),
*gotNodes.begin (), snfWIRE, nullptr).isGood(), "AddRootNode");
unexpected (gotNodes.size () < 1, "NodeSize");
nodeIDs.clear ();
gotNodes.clear ();
#ifdef SMS_DEBUG
int bytes = 0;
#endif
expect (destination.addRootNode (
source.getHash(),
*gotNodes.begin (),
snfWIRE,
nullptr).isGood(), "addRootNode");
}
do
{
f.clock().advance(std::chrono::seconds(1));
++passes;
hashes.clear ();
// get the list of nodes we know we need
destination.getMissingNodes (nodeIDs, hashes, 2048, nullptr);
auto nodesMissing = destination.getMissingNodes (2048, nullptr);
if (nodeIDs.empty ()) break;
if (nodesMissing.empty ())
break;
// get as many nodes as possible based on this information
for (nodeIDIterator = nodeIDs.begin (); nodeIDIterator != nodeIDs.end (); ++nodeIDIterator)
std::vector<SHAMapNodeID> gotNodeIDs;
std::vector<Blob> gotNodes;
for (auto& it : nodesMissing)
{
if (!source.getNodeFat (*nodeIDIterator, gotNodeIDs, gotNodes,
rand_bool(), rand_int(2)))
{
fail ("GetNodeFat");
}
else
{
pass ();
}
expect (source.getNodeFat (
it.first,
gotNodeIDs,
gotNodes,
rand_bool(),
rand_int(2)), "getNodeFat (2)");
}
assert (gotNodeIDs.size () == gotNodes.size ());
nodeIDs.clear ();
hashes.clear ();
expect (gotNodeIDs.size () == gotNodes.size (), "Size mismatch");
expect (!gotNodeIDs.empty (), "Didn't get NodeID");
if (gotNodeIDs.empty ())
for (std::size_t i = 0; i < gotNodeIDs.size(); ++i)
{
fail ("Got Node ID");
expect (
destination.addKnownNode (
gotNodeIDs[i],
gotNodes[i],
nullptr).isGood (), "addKnownNode");
}
else
{
pass ();
}
for (nodeIDIterator = gotNodeIDs.begin (), rawNodeIterator = gotNodes.begin ();
nodeIDIterator != gotNodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator)
{
++nodes;
#ifdef SMS_DEBUG
bytes += rawNodeIterator->size ();
#endif
if (!destination.addKnownNode (*nodeIDIterator, *rawNodeIterator, nullptr).isGood ())
{
fail ("AddKnownNode");
}
else
{
pass ();
}
}
gotNodeIDs.clear ();
gotNodes.clear ();
}
while (true);
destination.clearSynching ();
#ifdef SMS_DEBUG
log << "SYNCHING COMPLETE " << items << " items, " << nodes << " nodes, " <<
bytes / 1024 << " KB";
#endif
if (!source.deepCompare (destination))
{
fail ("Deep Compare");
}
else
{
pass ();
}
#ifdef SMS_DEBUG
log << "SHAMapSync test passed: " << items << " items, " <<
passes << " passes, " << nodes << " nodes";
#endif
expect (source.deepCompare (destination), "Deep Compare");
}
};