From 09a1d1a593c2ae1897fe3cfbd730291e93c78ca9 Mon Sep 17 00:00:00 2001 From: David Schwartz Date: Thu, 19 Jan 2017 12:24:51 -0800 Subject: [PATCH] Improve getMissingNodes: * Clean up and refactor * Resume parents of nodes read asynchronously * Resume at tip of new stack if exhausted prior stack * No need to restart at root --- src/ripple/shamap/SHAMap.h | 64 ++++- src/ripple/shamap/impl/SHAMapSync.cpp | 369 +++++++++++++++----------- 2 files changed, 269 insertions(+), 164 deletions(-) diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index a19f6ddb82..e48b15ece2 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -183,10 +183,19 @@ public: std::function const&)> const&) const; // comparison/sync functions + + /** Check for nodes in the SHAMap not available + + Traverse the SHAMap efficiently, maximizing I/O + concurrency, to discover nodes referenced in the + SHAMap but not available locally. + + @param maxNodes The maximum number of found nodes to return + @param filter The filter to use when retrieving nodes + @param return The nodes known to be missing + */ std::vector> - getMissingNodes ( - std::size_t max, - SHAMapSyncFilter *filter); + getMissingNodes (int maxNodes, SHAMapSyncFilter *filter); bool getNodeFat (SHAMapNodeID node, std::vector& nodeIDs, @@ -200,6 +209,7 @@ public: SHAMapAddNode addKnownNode (SHAMapNodeID const& nodeID, Slice const& rawNode, SHAMapSyncFilter * filter); + // status functions void setImmutable (); bool isSynching () const; @@ -316,6 +326,54 @@ private: bool isFirstMap, Delta & differences, int & maxCount) const; int walkSubTree (bool doWrite, NodeObjectType t, std::uint32_t seq); bool isInconsistentNode(std::shared_ptr const& node) const; + + // Structure to track information about call to + // getMissingNodes while it's in progress + struct MissingNodes + { + MissingNodes() = delete; + MissingNodes(const MissingNodes&) = delete; + MissingNodes& operator=(const MissingNodes&) = delete; + + // basic parameters + int max_; + SHAMapSyncFilter* filter_; + int const maxDefer_; + std::uint32_t generation_; + + // nodes we have discovered to be missing + std::vector> missingNodes_; + std::set missingHashes_; + + // nodes we are in the process of traversing + using StackEntry = std::tuple< + SHAMapInnerNode*, // pointer to the node + SHAMapNodeID, // the node's ID + int, // while child we check first + int, // which child we check next + bool>; // whether we've found any missing children yet + std::stack stack_; + + // nodes we may acquire from deferred reads + std::vector > deferredReads_; + + // nodes we need to resume after we get their children from deferred reads + std::map resumes_; + + MissingNodes ( + int max, SHAMapSyncFilter* filter, + int maxDefer, std::uint32_t generation) : + max_(max), filter_(filter), + maxDefer_(maxDefer), generation_(generation) + { + missingNodes_.reserve (max); + deferredReads_.reserve(maxDefer); + } + }; + + // getMissingNodes helper functions + void gmn_ProcessNodes (MissingNodes&, MissingNodes::StackEntry& node); + void gmn_ProcessDeferredReads (MissingNodes&); }; inline diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index 313ed90578..c889a09994 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -100,205 +100,252 @@ void SHAMap::visitNodes(std::function const& functio } } +// Starting at the position referred to by the specfied +// StackEntry, process that node and its first resident +// children, descending the SHAMap until we complete the +// processing of a node. +void SHAMap::gmn_ProcessNodes (MissingNodes& mn, MissingNodes::StackEntry& se) +{ + SHAMapInnerNode*& node = std::get<0>(se); + SHAMapNodeID& nodeID = std::get<1>(se); + int& firstChild = std::get<2>(se); + int& currentChild = std::get<3>(se); + bool& fullBelow = std::get<4>(se); + + while (currentChild < 16) + { + int branch = (firstChild + currentChild++) % 16; + if (node->isEmptyBranch (branch)) + continue; + + auto const& childHash = node->getChildHash (branch); + + if (mn.missingHashes_.count (childHash) != 0) + { + // we already know this child node is missing + fullBelow = false; + } + else if (! backed_ || ! f_.fullbelow().touch_if_exists (childHash.as_uint256())) + { + SHAMapNodeID childID = nodeID.getChildNodeID (branch); + bool pending = false; + auto d = descendAsync (node, branch, mn.filter_, pending); + + if (!d) + { + fullBelow = false; // for now, not known full below + + if (! pending) + { // node is not in the database + mn.missingHashes_.insert (childHash); + mn.missingNodes_.emplace_back ( + childID, childHash.as_uint256()); + + if (--mn.max_ <= 0) + return; + } + else + mn.deferredReads_.emplace_back (node, nodeID, branch); + } + else if (d->isInner() && + ! static_cast(d)->isFullBelow(mn.generation_)) + { + mn.stack_.push (se); + + // Switch to processing the child node + node = static_cast(d); + if (auto v2Node = dynamic_cast(node)) + nodeID = SHAMapNodeID{v2Node->depth(), v2Node->key()}; + else + nodeID = childID; + firstChild = rand_int(255); + currentChild = 0; + fullBelow = true; + } + } + } + + // We have finished processing an inner node + // and thus (for now) all its children + + if (fullBelow) + { // No partial node encountered below this node + node->setFullBelowGen (mn.generation_); + if (backed_) + f_.fullbelow().insert (node->getNodeHash ().as_uint256()); + } + + node = nullptr; +} + +// Wait for deferred reads to finish and +// process their results +void SHAMap::gmn_ProcessDeferredReads (MissingNodes& mn) +{ + // Wait for our deferred reads to finish + auto const before = std::chrono::steady_clock::now(); + f_.db().waitReads(); + auto const after = std::chrono::steady_clock::now(); + + auto const elapsed = std::chrono::duration_cast + (after - before); + auto const count = mn.deferredReads_.size (); + + // Process all deferred reads + int hits = 0; + for (auto const& deferredNode : mn.deferredReads_) + { + auto parent = std::get<0>(deferredNode); + auto const& parentID = std::get<1>(deferredNode); + auto branch = std::get<2>(deferredNode); + auto const& nodeHash = parent->getChildHash (branch); + + auto nodePtr = fetchNodeNT(nodeHash, mn.filter_); + if (nodePtr) + { // Got the node + ++hits; + if (backed_) + canonicalize (nodeHash, nodePtr); + nodePtr = parent->canonicalizeChild (branch, std::move(nodePtr)); + + // When we finish this stack, we need to restart + // with the parent of this node + mn.resumes_[parent] = parentID; + } + else if ((mn.max_ > 0) && + (mn.missingHashes_.insert (nodeHash).second)) + { + mn.missingNodes_.emplace_back ( + parentID.getChildNodeID (branch), + nodeHash.as_uint256()); + + --mn.max_; + } + } + mn.deferredReads_.clear(); + + auto const process_time = std::chrono::duration_cast + (std::chrono::steady_clock::now() - after); + + if ((count > 50) || (elapsed.count() > 50)) + { + JLOG(journal_.debug()) << "getMissingNodes reads " << + count << " nodes (" << hits << " hits) in " + << elapsed.count() << " + " << process_time.count() << " ms"; + } +} + /** Get a list of node IDs and hashes for nodes that are part of this SHAMap but not available locally. The filter can hold alternate sources of nodes that are not permanently stored locally */ std::vector> -SHAMap::getMissingNodes(std::size_t max, SHAMapSyncFilter* filter) +SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) { assert (root_->isValid ()); assert (root_->getNodeHash().isNonZero ()); + assert (max > 0); - std::vector> ret; + MissingNodes mn (max, filter, + f_.db().getDesiredAsyncReadCount(), + f_.fullbelow().getGeneration()); - std::uint32_t generation = f_.fullbelow().getGeneration(); - - if (!root_->isInner ()) - { - if (generation == 0) - clearSynching(); - else - JLOG(journal_.warn()) << "synching empty tree"; - return ret; - } - - if (std::static_pointer_cast(root_)->isFullBelow (generation)) + if (! root_->isInner () || + std::static_pointer_cast(root_)-> + isFullBelow (mn.generation_)) { clearSynching (); - return ret; + return std::move (mn.missingNodes_); } - int const maxDefer = f_.db().getDesiredAsyncReadCount (); + // Start at the root. + // The firstChild value is selected randomly so if multiple threads + // are traversing the map, each thread will start at a different + // (randomly selected) inner node. This increases the likelihood + // that the two threads will produce different request sets (which is + // more efficient than sending identical requests). + MissingNodes::StackEntry pos { + static_cast(root_.get()), SHAMapNodeID(), + rand_int(255), 0, true }; + auto& node = std::get<0>(pos); + auto& nextChild = std::get<3>(pos); + auto& fullBelow = std::get<4>(pos); - // Track the missing hashes we have found so far - std::set missingHashes; - - // preallocate memory - ret.reserve (max); - - while (1) + // Traverse the map without blocking + do { - std::vector > deferredReads; - deferredReads.reserve (maxDefer + 16); - using StackEntry = std::tuple; - std::stack > stack; - - // Traverse the map without blocking - - auto node = static_cast(root_.get()); - SHAMapNodeID nodeID; - - // The firstChild value is selected randomly so if multiple threads - // are traversing the map, each thread will start at a different - // (randomly selected) inner node. This increases the likelihood - // that the two threads will produce different request sets (which is - // more efficient than sending identical requests). - int firstChild = rand_int(255); - int currentChild = 0; - bool fullBelow = true; - - do + while ((node != nullptr) && + (mn.deferredReads_.size() <= mn.maxDefer_)) { - while (currentChild < 16) + gmn_ProcessNodes (mn, pos); + + if (mn.max_ <= 0) + return std::move(mn.missingNodes_); + + if ((node == nullptr) && ! mn.stack_.empty ()) { - int branch = (firstChild + currentChild++) % 16; - if (!node->isEmptyBranch (branch)) + // Pick up where we left off with this node's parent + bool was = fullBelow; // was full below + + pos = mn.stack_.top(); + mn.stack_.pop (); + if (nextChild == 0) { - auto const& childHash = node->getChildHash (branch); - - if (missingHashes.count (childHash) != 0) - { - fullBelow = false; - } - else if (! backed_ || ! f_.fullbelow().touch_if_exists (childHash.as_uint256())) - { - SHAMapNodeID childID = nodeID.getChildNodeID (branch); - bool pending = false; - auto d = descendAsync (node, branch, filter, pending); - - if (!d) - { - if (!pending) - { // node is not in the database - missingHashes.insert (childHash); - ret.emplace_back ( - childID, - childHash.as_uint256()); - - if (--max <= 0) - return ret; - } - else - { - // read is deferred - deferredReads.emplace_back (node, branch, childID); - } - - fullBelow = false; // This node is not known full below - } - else if (d->isInner() && - !static_cast(d)->isFullBelow(generation)) - { - stack.push (std::make_tuple (node, nodeID, - firstChild, currentChild, fullBelow)); - - // Switch to processing the child node - node = static_cast(d); - if (auto v2Node = dynamic_cast(node)) - nodeID = SHAMapNodeID{v2Node->depth(), v2Node->key()}; - else - nodeID = childID; - firstChild = rand_int(255); - currentChild = 0; - fullBelow = true; - } - } + // This is a node we are processing for the first time + fullBelow = true; } + else + { + // This is a node we are continuing to process + fullBelow = fullBelow && was; // was and still is + } + assert (node); } - - // We are done with this inner node (and thus all of its children) - - if (fullBelow) - { // No partial node encountered below this node - node->setFullBelowGen (generation); - if (backed_) - f_.fullbelow().insert (node->getNodeHash ().as_uint256()); - } - - if (stack.empty ()) - node = nullptr; // Finished processing the last node, we are done - else - { // Pick up where we left off (above this node) - bool was; - std::tie(node, nodeID, firstChild, currentChild, was) = stack.top (); - fullBelow = was && fullBelow; // was and still is - stack.pop (); - } - } - while ((node != nullptr) && (deferredReads.size () <= maxDefer)); - // If we didn't defer any reads, we're done - if (deferredReads.empty ()) - break; + // We have either emptied the stack or + // posted as many deferred reads as we can - auto const before = std::chrono::steady_clock::now(); - f_.db().waitReads(); - auto const after = std::chrono::steady_clock::now(); + if (! mn.deferredReads_.empty ()) + gmn_ProcessDeferredReads(mn); - auto const elapsed = std::chrono::duration_cast - (after - before); - auto const count = deferredReads.size (); + if (mn.max_ <= 0) + return std::move(mn.missingNodes_); - // Process all deferred reads - int hits = 0; - for (auto const& deferredNode : deferredReads) - { - auto parent = std::get<0>(deferredNode); - auto branch = std::get<1>(deferredNode); - auto const& deferredNodeID = std::get<2>(deferredNode); - auto const& nodeHash = parent->getChildHash (branch); + if (node == nullptr) + { // We weren't in the middle of processing a node - auto nodePtr = fetchNodeNT(nodeHash, filter); - if (nodePtr) + if (mn.stack_.empty() && ! mn.resumes_.empty()) { - ++hits; - if (backed_) - canonicalize (nodeHash, nodePtr); - nodePtr = parent->canonicalizeChild (branch, std::move(nodePtr)); + // Recheck nodes we could not finish before + for (auto& r : mn.resumes_) + if (! r.first->isFullBelow (mn.generation_)) + mn.stack_.push (std::make_tuple ( + r.first, r.second, rand_int(255), 0, true)); + + mn.resumes_.clear(); } - else if ((max > 0) && (missingHashes.insert (nodeHash).second)) + + if (! mn.stack_.empty()) { - ret.push_back ( - std::make_pair ( - deferredNodeID, - nodeHash.as_uint256())); - - --max; + // Resume at the top of the stack + pos = mn.stack_.top(); + mn.stack_.pop(); + assert (node != nullptr); } } - auto const process_time = std::chrono::duration_cast - (std::chrono::steady_clock::now() - after); + // node will only still be nullptr if + // we finished the current node, the stack is empty + // and we have no nodes to resume - if ((count > 50) || (elapsed.count() > 50)) - { - JLOG(journal_.debug()) << "getMissingNodes reads " << - count << " nodes (" << hits << " hits) in " - << elapsed.count() << " + " << process_time.count() << " ms"; - } + } while (node != nullptr); - if (max <= 0) - return ret; - - } - - if (ret.empty ()) + if (mn.missingNodes_.empty ()) clearSynching (); - return ret; + return std::move(mn.missingNodes_); } std::vector SHAMap::getNeededHashes (int max, SHAMapSyncFilter* filter)