mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Improve getMissingNodes:
* Clean up and refactor * Resume parents of nodes read asynchronously * Resume at tip of new stack if exhausted prior stack * No need to restart at root
This commit is contained in:
committed by
Scott Schurr
parent
aebcc2115d
commit
09a1d1a593
@@ -183,10 +183,19 @@ public:
|
||||
std::function<void(std::shared_ptr<SHAMapItem const> const&)> const&) const;
|
||||
|
||||
// comparison/sync functions
|
||||
|
||||
/** Check for nodes in the SHAMap not available
|
||||
|
||||
Traverse the SHAMap efficiently, maximizing I/O
|
||||
concurrency, to discover nodes referenced in the
|
||||
SHAMap but not available locally.
|
||||
|
||||
@param maxNodes The maximum number of found nodes to return
|
||||
@param filter The filter to use when retrieving nodes
|
||||
@param return The nodes known to be missing
|
||||
*/
|
||||
std::vector<std::pair<SHAMapNodeID, uint256>>
|
||||
getMissingNodes (
|
||||
std::size_t max,
|
||||
SHAMapSyncFilter *filter);
|
||||
getMissingNodes (int maxNodes, SHAMapSyncFilter *filter);
|
||||
|
||||
bool getNodeFat (SHAMapNodeID node,
|
||||
std::vector<SHAMapNodeID>& nodeIDs,
|
||||
@@ -200,6 +209,7 @@ public:
|
||||
SHAMapAddNode addKnownNode (SHAMapNodeID const& nodeID, Slice const& rawNode,
|
||||
SHAMapSyncFilter * filter);
|
||||
|
||||
|
||||
// status functions
|
||||
void setImmutable ();
|
||||
bool isSynching () const;
|
||||
@@ -316,6 +326,54 @@ private:
|
||||
bool isFirstMap, Delta & differences, int & maxCount) const;
|
||||
int walkSubTree (bool doWrite, NodeObjectType t, std::uint32_t seq);
|
||||
bool isInconsistentNode(std::shared_ptr<SHAMapAbstractNode> const& node) const;
|
||||
|
||||
// Structure to track information about call to
|
||||
// getMissingNodes while it's in progress
|
||||
struct MissingNodes
|
||||
{
|
||||
MissingNodes() = delete;
|
||||
MissingNodes(const MissingNodes&) = delete;
|
||||
MissingNodes& operator=(const MissingNodes&) = delete;
|
||||
|
||||
// basic parameters
|
||||
int max_;
|
||||
SHAMapSyncFilter* filter_;
|
||||
int const maxDefer_;
|
||||
std::uint32_t generation_;
|
||||
|
||||
// nodes we have discovered to be missing
|
||||
std::vector<std::pair<SHAMapNodeID, uint256>> missingNodes_;
|
||||
std::set <SHAMapHash> missingHashes_;
|
||||
|
||||
// nodes we are in the process of traversing
|
||||
using StackEntry = std::tuple<
|
||||
SHAMapInnerNode*, // pointer to the node
|
||||
SHAMapNodeID, // the node's ID
|
||||
int, // while child we check first
|
||||
int, // which child we check next
|
||||
bool>; // whether we've found any missing children yet
|
||||
std::stack <StackEntry> stack_;
|
||||
|
||||
// nodes we may acquire from deferred reads
|
||||
std::vector <std::tuple <SHAMapInnerNode*, SHAMapNodeID, int>> deferredReads_;
|
||||
|
||||
// nodes we need to resume after we get their children from deferred reads
|
||||
std::map<SHAMapInnerNode*, SHAMapNodeID> resumes_;
|
||||
|
||||
MissingNodes (
|
||||
int max, SHAMapSyncFilter* filter,
|
||||
int maxDefer, std::uint32_t generation) :
|
||||
max_(max), filter_(filter),
|
||||
maxDefer_(maxDefer), generation_(generation)
|
||||
{
|
||||
missingNodes_.reserve (max);
|
||||
deferredReads_.reserve(maxDefer);
|
||||
}
|
||||
};
|
||||
|
||||
// getMissingNodes helper functions
|
||||
void gmn_ProcessNodes (MissingNodes&, MissingNodes::StackEntry& node);
|
||||
void gmn_ProcessDeferredReads (MissingNodes&);
|
||||
};
|
||||
|
||||
inline
|
||||
|
||||
@@ -100,205 +100,252 @@ void SHAMap::visitNodes(std::function<bool (SHAMapAbstractNode&)> const& functio
|
||||
}
|
||||
}
|
||||
|
||||
// Starting at the position referred to by the specfied
|
||||
// StackEntry, process that node and its first resident
|
||||
// children, descending the SHAMap until we complete the
|
||||
// processing of a node.
|
||||
void SHAMap::gmn_ProcessNodes (MissingNodes& mn, MissingNodes::StackEntry& se)
|
||||
{
|
||||
SHAMapInnerNode*& node = std::get<0>(se);
|
||||
SHAMapNodeID& nodeID = std::get<1>(se);
|
||||
int& firstChild = std::get<2>(se);
|
||||
int& currentChild = std::get<3>(se);
|
||||
bool& fullBelow = std::get<4>(se);
|
||||
|
||||
while (currentChild < 16)
|
||||
{
|
||||
int branch = (firstChild + currentChild++) % 16;
|
||||
if (node->isEmptyBranch (branch))
|
||||
continue;
|
||||
|
||||
auto const& childHash = node->getChildHash (branch);
|
||||
|
||||
if (mn.missingHashes_.count (childHash) != 0)
|
||||
{
|
||||
// we already know this child node is missing
|
||||
fullBelow = false;
|
||||
}
|
||||
else if (! backed_ || ! f_.fullbelow().touch_if_exists (childHash.as_uint256()))
|
||||
{
|
||||
SHAMapNodeID childID = nodeID.getChildNodeID (branch);
|
||||
bool pending = false;
|
||||
auto d = descendAsync (node, branch, mn.filter_, pending);
|
||||
|
||||
if (!d)
|
||||
{
|
||||
fullBelow = false; // for now, not known full below
|
||||
|
||||
if (! pending)
|
||||
{ // node is not in the database
|
||||
mn.missingHashes_.insert (childHash);
|
||||
mn.missingNodes_.emplace_back (
|
||||
childID, childHash.as_uint256());
|
||||
|
||||
if (--mn.max_ <= 0)
|
||||
return;
|
||||
}
|
||||
else
|
||||
mn.deferredReads_.emplace_back (node, nodeID, branch);
|
||||
}
|
||||
else if (d->isInner() &&
|
||||
! static_cast<SHAMapInnerNode*>(d)->isFullBelow(mn.generation_))
|
||||
{
|
||||
mn.stack_.push (se);
|
||||
|
||||
// Switch to processing the child node
|
||||
node = static_cast<SHAMapInnerNode*>(d);
|
||||
if (auto v2Node = dynamic_cast<SHAMapInnerNodeV2*>(node))
|
||||
nodeID = SHAMapNodeID{v2Node->depth(), v2Node->key()};
|
||||
else
|
||||
nodeID = childID;
|
||||
firstChild = rand_int(255);
|
||||
currentChild = 0;
|
||||
fullBelow = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We have finished processing an inner node
|
||||
// and thus (for now) all its children
|
||||
|
||||
if (fullBelow)
|
||||
{ // No partial node encountered below this node
|
||||
node->setFullBelowGen (mn.generation_);
|
||||
if (backed_)
|
||||
f_.fullbelow().insert (node->getNodeHash ().as_uint256());
|
||||
}
|
||||
|
||||
node = nullptr;
|
||||
}
|
||||
|
||||
// Wait for deferred reads to finish and
|
||||
// process their results
|
||||
void SHAMap::gmn_ProcessDeferredReads (MissingNodes& mn)
|
||||
{
|
||||
// Wait for our deferred reads to finish
|
||||
auto const before = std::chrono::steady_clock::now();
|
||||
f_.db().waitReads();
|
||||
auto const after = std::chrono::steady_clock::now();
|
||||
|
||||
auto const elapsed = std::chrono::duration_cast
|
||||
<std::chrono::milliseconds> (after - before);
|
||||
auto const count = mn.deferredReads_.size ();
|
||||
|
||||
// Process all deferred reads
|
||||
int hits = 0;
|
||||
for (auto const& deferredNode : mn.deferredReads_)
|
||||
{
|
||||
auto parent = std::get<0>(deferredNode);
|
||||
auto const& parentID = std::get<1>(deferredNode);
|
||||
auto branch = std::get<2>(deferredNode);
|
||||
auto const& nodeHash = parent->getChildHash (branch);
|
||||
|
||||
auto nodePtr = fetchNodeNT(nodeHash, mn.filter_);
|
||||
if (nodePtr)
|
||||
{ // Got the node
|
||||
++hits;
|
||||
if (backed_)
|
||||
canonicalize (nodeHash, nodePtr);
|
||||
nodePtr = parent->canonicalizeChild (branch, std::move(nodePtr));
|
||||
|
||||
// When we finish this stack, we need to restart
|
||||
// with the parent of this node
|
||||
mn.resumes_[parent] = parentID;
|
||||
}
|
||||
else if ((mn.max_ > 0) &&
|
||||
(mn.missingHashes_.insert (nodeHash).second))
|
||||
{
|
||||
mn.missingNodes_.emplace_back (
|
||||
parentID.getChildNodeID (branch),
|
||||
nodeHash.as_uint256());
|
||||
|
||||
--mn.max_;
|
||||
}
|
||||
}
|
||||
mn.deferredReads_.clear();
|
||||
|
||||
auto const process_time = std::chrono::duration_cast
|
||||
<std::chrono::milliseconds> (std::chrono::steady_clock::now() - after);
|
||||
|
||||
if ((count > 50) || (elapsed.count() > 50))
|
||||
{
|
||||
JLOG(journal_.debug()) << "getMissingNodes reads " <<
|
||||
count << " nodes (" << hits << " hits) in "
|
||||
<< elapsed.count() << " + " << process_time.count() << " ms";
|
||||
}
|
||||
}
|
||||
|
||||
/** Get a list of node IDs and hashes for nodes that are part of this SHAMap
|
||||
but not available locally. The filter can hold alternate sources of
|
||||
nodes that are not permanently stored locally
|
||||
*/
|
||||
std::vector<std::pair<SHAMapNodeID, uint256>>
|
||||
SHAMap::getMissingNodes(std::size_t max, SHAMapSyncFilter* filter)
|
||||
SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
|
||||
{
|
||||
assert (root_->isValid ());
|
||||
assert (root_->getNodeHash().isNonZero ());
|
||||
assert (max > 0);
|
||||
|
||||
std::vector<std::pair<SHAMapNodeID, uint256>> ret;
|
||||
MissingNodes mn (max, filter,
|
||||
f_.db().getDesiredAsyncReadCount(),
|
||||
f_.fullbelow().getGeneration());
|
||||
|
||||
std::uint32_t generation = f_.fullbelow().getGeneration();
|
||||
|
||||
if (!root_->isInner ())
|
||||
{
|
||||
if (generation == 0)
|
||||
clearSynching();
|
||||
else
|
||||
JLOG(journal_.warn()) << "synching empty tree";
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow (generation))
|
||||
if (! root_->isInner () ||
|
||||
std::static_pointer_cast<SHAMapInnerNode>(root_)->
|
||||
isFullBelow (mn.generation_))
|
||||
{
|
||||
clearSynching ();
|
||||
return ret;
|
||||
return std::move (mn.missingNodes_);
|
||||
}
|
||||
|
||||
int const maxDefer = f_.db().getDesiredAsyncReadCount ();
|
||||
// Start at the root.
|
||||
// The firstChild value is selected randomly so if multiple threads
|
||||
// are traversing the map, each thread will start at a different
|
||||
// (randomly selected) inner node. This increases the likelihood
|
||||
// that the two threads will produce different request sets (which is
|
||||
// more efficient than sending identical requests).
|
||||
MissingNodes::StackEntry pos {
|
||||
static_cast<SHAMapInnerNode*>(root_.get()), SHAMapNodeID(),
|
||||
rand_int(255), 0, true };
|
||||
auto& node = std::get<0>(pos);
|
||||
auto& nextChild = std::get<3>(pos);
|
||||
auto& fullBelow = std::get<4>(pos);
|
||||
|
||||
// Track the missing hashes we have found so far
|
||||
std::set <SHAMapHash> missingHashes;
|
||||
|
||||
// preallocate memory
|
||||
ret.reserve (max);
|
||||
|
||||
while (1)
|
||||
// Traverse the map without blocking
|
||||
do
|
||||
{
|
||||
std::vector <std::tuple <SHAMapInnerNode*, int, SHAMapNodeID>> deferredReads;
|
||||
deferredReads.reserve (maxDefer + 16);
|
||||
|
||||
using StackEntry = std::tuple<SHAMapInnerNode*, SHAMapNodeID, int, int, bool>;
|
||||
std::stack <StackEntry, std::vector<StackEntry>> stack;
|
||||
|
||||
// Traverse the map without blocking
|
||||
|
||||
auto node = static_cast<SHAMapInnerNode*>(root_.get());
|
||||
SHAMapNodeID nodeID;
|
||||
|
||||
// The firstChild value is selected randomly so if multiple threads
|
||||
// are traversing the map, each thread will start at a different
|
||||
// (randomly selected) inner node. This increases the likelihood
|
||||
// that the two threads will produce different request sets (which is
|
||||
// more efficient than sending identical requests).
|
||||
int firstChild = rand_int(255);
|
||||
int currentChild = 0;
|
||||
bool fullBelow = true;
|
||||
|
||||
do
|
||||
while ((node != nullptr) &&
|
||||
(mn.deferredReads_.size() <= mn.maxDefer_))
|
||||
{
|
||||
while (currentChild < 16)
|
||||
gmn_ProcessNodes (mn, pos);
|
||||
|
||||
if (mn.max_ <= 0)
|
||||
return std::move(mn.missingNodes_);
|
||||
|
||||
if ((node == nullptr) && ! mn.stack_.empty ())
|
||||
{
|
||||
int branch = (firstChild + currentChild++) % 16;
|
||||
if (!node->isEmptyBranch (branch))
|
||||
// Pick up where we left off with this node's parent
|
||||
bool was = fullBelow; // was full below
|
||||
|
||||
pos = mn.stack_.top();
|
||||
mn.stack_.pop ();
|
||||
if (nextChild == 0)
|
||||
{
|
||||
auto const& childHash = node->getChildHash (branch);
|
||||
|
||||
if (missingHashes.count (childHash) != 0)
|
||||
{
|
||||
fullBelow = false;
|
||||
}
|
||||
else if (! backed_ || ! f_.fullbelow().touch_if_exists (childHash.as_uint256()))
|
||||
{
|
||||
SHAMapNodeID childID = nodeID.getChildNodeID (branch);
|
||||
bool pending = false;
|
||||
auto d = descendAsync (node, branch, filter, pending);
|
||||
|
||||
if (!d)
|
||||
{
|
||||
if (!pending)
|
||||
{ // node is not in the database
|
||||
missingHashes.insert (childHash);
|
||||
ret.emplace_back (
|
||||
childID,
|
||||
childHash.as_uint256());
|
||||
|
||||
if (--max <= 0)
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
{
|
||||
// read is deferred
|
||||
deferredReads.emplace_back (node, branch, childID);
|
||||
}
|
||||
|
||||
fullBelow = false; // This node is not known full below
|
||||
}
|
||||
else if (d->isInner() &&
|
||||
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(generation))
|
||||
{
|
||||
stack.push (std::make_tuple (node, nodeID,
|
||||
firstChild, currentChild, fullBelow));
|
||||
|
||||
// Switch to processing the child node
|
||||
node = static_cast<SHAMapInnerNode*>(d);
|
||||
if (auto v2Node = dynamic_cast<SHAMapInnerNodeV2*>(node))
|
||||
nodeID = SHAMapNodeID{v2Node->depth(), v2Node->key()};
|
||||
else
|
||||
nodeID = childID;
|
||||
firstChild = rand_int(255);
|
||||
currentChild = 0;
|
||||
fullBelow = true;
|
||||
}
|
||||
}
|
||||
// This is a node we are processing for the first time
|
||||
fullBelow = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// This is a node we are continuing to process
|
||||
fullBelow = fullBelow && was; // was and still is
|
||||
}
|
||||
assert (node);
|
||||
}
|
||||
|
||||
// We are done with this inner node (and thus all of its children)
|
||||
|
||||
if (fullBelow)
|
||||
{ // No partial node encountered below this node
|
||||
node->setFullBelowGen (generation);
|
||||
if (backed_)
|
||||
f_.fullbelow().insert (node->getNodeHash ().as_uint256());
|
||||
}
|
||||
|
||||
if (stack.empty ())
|
||||
node = nullptr; // Finished processing the last node, we are done
|
||||
else
|
||||
{ // Pick up where we left off (above this node)
|
||||
bool was;
|
||||
std::tie(node, nodeID, firstChild, currentChild, was) = stack.top ();
|
||||
fullBelow = was && fullBelow; // was and still is
|
||||
stack.pop ();
|
||||
}
|
||||
|
||||
}
|
||||
while ((node != nullptr) && (deferredReads.size () <= maxDefer));
|
||||
|
||||
// If we didn't defer any reads, we're done
|
||||
if (deferredReads.empty ())
|
||||
break;
|
||||
// We have either emptied the stack or
|
||||
// posted as many deferred reads as we can
|
||||
|
||||
auto const before = std::chrono::steady_clock::now();
|
||||
f_.db().waitReads();
|
||||
auto const after = std::chrono::steady_clock::now();
|
||||
if (! mn.deferredReads_.empty ())
|
||||
gmn_ProcessDeferredReads(mn);
|
||||
|
||||
auto const elapsed = std::chrono::duration_cast
|
||||
<std::chrono::milliseconds> (after - before);
|
||||
auto const count = deferredReads.size ();
|
||||
if (mn.max_ <= 0)
|
||||
return std::move(mn.missingNodes_);
|
||||
|
||||
// Process all deferred reads
|
||||
int hits = 0;
|
||||
for (auto const& deferredNode : deferredReads)
|
||||
{
|
||||
auto parent = std::get<0>(deferredNode);
|
||||
auto branch = std::get<1>(deferredNode);
|
||||
auto const& deferredNodeID = std::get<2>(deferredNode);
|
||||
auto const& nodeHash = parent->getChildHash (branch);
|
||||
if (node == nullptr)
|
||||
{ // We weren't in the middle of processing a node
|
||||
|
||||
auto nodePtr = fetchNodeNT(nodeHash, filter);
|
||||
if (nodePtr)
|
||||
if (mn.stack_.empty() && ! mn.resumes_.empty())
|
||||
{
|
||||
++hits;
|
||||
if (backed_)
|
||||
canonicalize (nodeHash, nodePtr);
|
||||
nodePtr = parent->canonicalizeChild (branch, std::move(nodePtr));
|
||||
// Recheck nodes we could not finish before
|
||||
for (auto& r : mn.resumes_)
|
||||
if (! r.first->isFullBelow (mn.generation_))
|
||||
mn.stack_.push (std::make_tuple (
|
||||
r.first, r.second, rand_int(255), 0, true));
|
||||
|
||||
mn.resumes_.clear();
|
||||
}
|
||||
else if ((max > 0) && (missingHashes.insert (nodeHash).second))
|
||||
|
||||
if (! mn.stack_.empty())
|
||||
{
|
||||
ret.push_back (
|
||||
std::make_pair (
|
||||
deferredNodeID,
|
||||
nodeHash.as_uint256()));
|
||||
|
||||
--max;
|
||||
// Resume at the top of the stack
|
||||
pos = mn.stack_.top();
|
||||
mn.stack_.pop();
|
||||
assert (node != nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
auto const process_time = std::chrono::duration_cast
|
||||
<std::chrono::milliseconds> (std::chrono::steady_clock::now() - after);
|
||||
// node will only still be nullptr if
|
||||
// we finished the current node, the stack is empty
|
||||
// and we have no nodes to resume
|
||||
|
||||
if ((count > 50) || (elapsed.count() > 50))
|
||||
{
|
||||
JLOG(journal_.debug()) << "getMissingNodes reads " <<
|
||||
count << " nodes (" << hits << " hits) in "
|
||||
<< elapsed.count() << " + " << process_time.count() << " ms";
|
||||
}
|
||||
} while (node != nullptr);
|
||||
|
||||
if (max <= 0)
|
||||
return ret;
|
||||
|
||||
}
|
||||
|
||||
if (ret.empty ())
|
||||
if (mn.missingNodes_.empty ())
|
||||
clearSynching ();
|
||||
|
||||
return ret;
|
||||
return std::move(mn.missingNodes_);
|
||||
}
|
||||
|
||||
std::vector<uint256> SHAMap::getNeededHashes (int max, SHAMapSyncFilter* filter)
|
||||
|
||||
Reference in New Issue
Block a user