Improve I/O when fetching ledgers

* negative cache for node store
* async fetch, thread pool for node store
* read barrier logic for node store
* SHAMap getMissingNodesNB (non-blocking)
* non-blocking getMissingNodes traverse
* tune caches
This commit is contained in:
David Schwartz
2014-02-20 09:30:21 -08:00
committed by Vinnie Falco
parent 37b39ed1a1
commit 2f7ac98e34
13 changed files with 370 additions and 136 deletions

View File

@@ -157,6 +157,18 @@ public:
m_map.clear ();
}
void setTargetSize (size_type s)
{
lock_guard lock (m_mutex);
m_target_size = s;
}
void setTargetAge (size_type s)
{
lock_guard lock (m_mutex);
m_target_age = std::chrono::seconds (s);
}
/** Returns `true` if the key was found.
Does not update the last access time.
*/

View File

@@ -280,7 +280,7 @@ public:
, m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service
, m_nodeStore (m_nodeStoreManager->make_Database ("NodeStore.main", m_nodeStoreScheduler,
LogPartition::getJournal <NodeObject> (),
LogPartition::getJournal <NodeObject> (), 4, // four read threads for now
getConfig ().nodeDatabase, getConfig ().ephemeralNodeDatabase))
, m_sntpClient (SNTPClient::New (*this))
@@ -1356,7 +1356,7 @@ void ApplicationImp::updateTables ()
NodeStore::DummyScheduler scheduler;
std::unique_ptr <NodeStore::Database> source (
m_nodeStoreManager->make_Database ("NodeStore.import", scheduler,
LogPartition::getJournal <NodeObject> (),
LogPartition::getJournal <NodeObject> (), 0,
getConfig ().importNodeDatabase));
WriteLog (lsWARNING, NodeObject) <<

View File

@@ -26,7 +26,7 @@ enum
{
fullBelowTargetSize = 524288
,fullBelowExpirationSeconds = 240
,fullBelowExpirationSeconds = 600
};
}

View File

@@ -899,6 +899,76 @@ SHAMapTreeNode::pointer SHAMap::fetchNodeExternal (const SHAMapNode& id, uint256
return ret;
}
// Non-blocking version
SHAMapTreeNode* SHAMap::getNodeAsync (
const SHAMapNode& id,
uint256 const& hash,
SHAMapSyncFilter *filter,
bool& pending)
{
pending = false;
// If the node is in mTNByID, return it
SHAMapTreeNode::pointer ptr = mTNByID.retrieve (id);
if (ptr)
return ptr.get ();
// Try the tree node cache
ptr = getCache (hash, id);
if (!ptr)
{
// Try the filter
if (filter)
{
Blob nodeData;
if (filter->haveNode (id, hash, nodeData))
{
ptr = boost::make_shared <SHAMapTreeNode> (
boost::cref (id), boost::cref (nodeData), 0, snfPREFIX, boost::cref (hash), true);
filter->gotNode (true, id, hash, nodeData, ptr->getType ());
}
}
if (!ptr)
{
NodeObject::pointer obj;
if (!getApp().getNodeStore().asyncFetch (hash, obj))
{ // We would have to block
pending = true;
assert (!obj);
return nullptr;
}
if (!obj)
return nullptr;
ptr = boost::make_shared <SHAMapTreeNode> (id, obj->getData(), 0, snfPREFIX, hash, true);
if (id != *ptr)
{
assert (false);
return nullptr;
}
}
// Put it in the tree node cache
canonicalize (hash, ptr);
}
if (id.isRoot ())
{
// It is legal to replace the root
mTNByID.replace (id, ptr);
root = ptr;
}
else
mTNByID.canonicalize (id, &ptr);
return ptr.get ();
}
/** Look at the cache and back end (things external to this SHAMap) to
find a tree node. Only a read lock is required because mTNByID has its
own, internal synchronization. Every thread calling this function must
@@ -953,7 +1023,7 @@ SHAMapTreeNode::pointer SHAMap::fetchNodeExternalNT (const SHAMapNode& id, uint2
return SHAMapTreeNode::pointer ();
}
// Share this immutable tree node in thre TreeNodeCache
// Share this immutable tree node in the TreeNodeCache
canonicalize (hash, ret);
}
catch (...)

View File

@@ -293,6 +293,10 @@ private:
SHAMapTreeNode* firstBelow (SHAMapTreeNode*);
SHAMapTreeNode* lastBelow (SHAMapTreeNode*);
// Non-blocking version of getNodePointerNT
SHAMapTreeNode* getNodeAsync (
const SHAMapNode & id, uint256 const & hash, SHAMapSyncFilter * filter, bool& pending);
SHAMapItem::pointer onlyBelow (SHAMapTreeNode*);
void eraseChildren (SHAMapTreeNode::pointer);
void dropBelow (SHAMapTreeNode*);

View File

@@ -119,6 +119,7 @@ void SHAMap::getMissingNodes (std::vector<SHAMapNode>& nodeIDs, std::vector<uint
assert (root->isValid ());
assert (root->getNodeHash().isNonZero ());
if (root->isFullBelow ())
{
clearSynching ();
@@ -131,74 +132,102 @@ void SHAMap::getMissingNodes (std::vector<SHAMapNode>& nodeIDs, std::vector<uint
return;
}
std::stack < GMNEntry > stack;
// Track the missing hashes we have found so far
std::set <uint256> missingHashes;
SHAMapTreeNode* node = root.get ();
int firstChild = rand() % 256;
int currentChild = 0;
bool fullBelow = true;
do
while (1)
{
while (currentChild < 16)
std::stack <GMNEntry> stack;
int deferCount = 0;
// Traverse the map without blocking
SHAMapTreeNode *node = root.get ();
int firstChild = rand() % 256;
int currentChild = 0;
bool fullBelow = true;
do
{
int branch = (firstChild + ++currentChild) % 16;
if (!node->isEmptyBranch (branch))
while (currentChild < 16)
{
uint256 const& childHash = node->getChildHash (branch);
if (! m_fullBelowCache.touch_if_exists (childHash))
int branch = (firstChild + currentChild++) % 16;
if (!node->isEmptyBranch (branch))
{
SHAMapNode childID = node->getChildNodeID (branch);
SHAMapTreeNode* d = getNodePointerNT (childID, childHash, filter);
uint256 const& childHash = node->getChildHash (branch);
if (!d)
{ // node is not in the database
nodeIDs.push_back (childID);
hashes.push_back (childHash);
if (--max <= 0)
return;
fullBelow = false; // This node is definitely not full below
}
else if (d->isInner () && !d->isFullBelow ())
if (! m_fullBelowCache.touch_if_exists (childHash))
{
stack.push (GMNEntry( node, firstChild, currentChild, fullBelow));
SHAMapNode childID = node->getChildNodeID (branch);
bool pending = false;
SHAMapTreeNode* d = getNodeAsync (childID, childHash, filter, pending);
// Switch to processing the child node
node = d;
firstChild = rand() % 256;
currentChild = 0;
fullBelow = true;
if (!d)
{
if (!pending)
{ // node is not in the database
if (missingHashes.insert (childHash).second)
{
nodeIDs.push_back (childID);
hashes.push_back (childHash);
if (--max <= 0)
return;
}
}
else
{
// read is deferred
++deferCount;
}
fullBelow = false; // This node is not known full below
}
else if (d->isInner () && !d->isFullBelow ())
{
stack.push (GMNEntry (node, firstChild, currentChild, fullBelow));
// Switch to processing the child node
node = d;
firstChild = rand() % 256;
currentChild = 0;
fullBelow = true;
}
}
}
}
// We are done with this inner node (and thus all of its children)
if (fullBelow)
{ // No partial node encountered below this node
node->setFullBelow ();
if (mType == smtSTATE)
m_fullBelowCache.insert (node->getNodeHash ());
}
if (stack.empty ())
node = NULL; // Finished processing the last node, we are done
else
{ // Pick up where we left off (above this node)
GMNEntry& next = stack.top ();
node = next.node;
firstChild = next.firstChild;
currentChild = next.currentChild;
fullBelow = (fullBelow && next.fullBelow); // was and still is
stack.pop ();
}
}
while (node != NULL);
// We are done with this inner node (and thus all of its children)
if (fullBelow)
{ // No partial node encountered below this node
node->setFullBelow ();
if (mType == smtSTATE)
m_fullBelowCache.insert (node->getNodeHash ());
}
if (stack.empty ())
node = NULL; // Finished processing the last node, we are done
else
{ // Pick up where we left off (above this node)
GMNEntry& next = stack.top ();
node = next.node;
firstChild = next.firstChild;
currentChild = next.currentChild;
fullBelow = (fullBelow && next.fullBelow); // was and still is
stack.pop ();
}
// If we didn't defer any reads, we're done
if (deferCount == 0)
break;
getApp().getNodeStore().waitReads();
}
while (node != NULL);
if (nodeIDs.empty ())
clearSynching ();

View File

@@ -604,8 +604,8 @@ int Config::getSize (SizedItemName item)
{ siValidationsSize, { 256, 256, 512, 1024, 1024 } },
{ siValidationsAge, { 500, 500, 500, 500, 500 } },
{ siNodeCacheSize, { 8192, 16384, 32768, 131072, 0 } },
{ siNodeCacheAge, { 30, 60, 90, 120, 900 } },
{ siNodeCacheSize, { 16384, 32768, 131072, 262144, 0 } },
{ siNodeCacheAge, { 60, 90, 120, 900, 0 } },
{ siTreeCacheSize, { 8192, 65536, 131072, 131072, 0 } },
{ siTreeCacheAge, { 30, 60, 90, 120, 900 } },

View File

@@ -29,6 +29,7 @@
#include "../../ripple/common/seconds_clock.h"
#include "../../ripple/common/TaggedCache.h"
#include "../../ripple/common/KeyCache.h"
#include "impl/Tuning.h"
# include "impl/DecodedBlob.h"

View File

@@ -62,6 +62,23 @@ public:
*/
virtual NodeObject::pointer fetch (uint256 const& hash) = 0;
/** Fetch an object without waiting.
If I/O is required to determine whether or not the object is present,
`false` is returned. Otherwise, `true` is returned and `object` is set
to refer to the object, or `nullptr` if the object is not present.
If I/O is required, the I/O is scheduled.
@note This can be called concurrently.
@param hash The key of the object to retrieve
@param object The object retrieved
@return Whether the operation completed
*/
virtual bool asyncFetch (uint256 const& hash, NodeObject::pointer& object) = 0;
/** Wait for all currently pending async reads to complete.
*/
virtual void waitReads () = 0;
/** Store the object.
The caller's Blob parameter is overwritten.

View File

@@ -63,13 +63,14 @@ public:
@param name A diagnostic label for the database.
@param scheduler The scheduler to use for performing asynchronous tasks.
@param readThreads The number of async read threads to create
@param backendParameters The parameter string for the persistent backend.
@param fastBackendParameters [optional] The parameter string for the ephemeral backend.
@return The opened database.
*/
virtual std::unique_ptr <Database> make_Database (std::string const& name,
Scheduler& scheduler, Journal journal,
Scheduler& scheduler, Journal journal, int readThreads,
Parameters const& backendParameters,
Parameters fastBackendParameters = Parameters ()) = 0;
};

View File

@@ -20,6 +20,9 @@
#ifndef RIPPLE_NODESTORE_DATABASEIMP_H_INCLUDED
#define RIPPLE_NODESTORE_DATABASEIMP_H_INCLUDED
#include <thread>
#include <condition_variable>
namespace ripple {
namespace NodeStore {
@@ -34,10 +37,25 @@ public:
std::unique_ptr <Backend> m_backend;
// Larger key/value storage, but not necessarily persistent.
std::unique_ptr <Backend> m_fastBackend;
// Positive cache
TaggedCache <uint256, NodeObject> m_cache;
// Negative cache
KeyCache <uint256> m_negCache;
std::mutex m_readLock;
std::condition_variable m_readCondVar;
std::condition_variable m_readGenCondVar;
std::set <uint256> m_readSet; // set of reads to do
uint256 m_readLast; // last hash read
std::vector <std::thread> m_readThreads;
bool m_readShut;
uint64_t m_readGen; // current read generation
DatabaseImp (std::string const& name,
Scheduler& scheduler,
int readThreads,
std::unique_ptr <Backend> backend,
std::unique_ptr <Backend> fastBackend,
Journal journal)
@@ -47,11 +65,26 @@ public:
, m_fastBackend (std::move (fastBackend))
, m_cache ("NodeStore", cacheTargetSize, cacheTargetSeconds,
get_seconds_clock (), LogPartition::getJournal <TaggedCacheLog> ())
, m_negCache ("NodeStore", get_seconds_clock (),
cacheTargetSize, cacheTargetSeconds)
, m_readShut (false)
, m_readGen (0)
{
for (int i = 0; i < readThreads; ++i)
m_readThreads.push_back (std::thread (&DatabaseImp::threadEntry, this));
}
~DatabaseImp ()
{
{
std::unique_lock <std::mutex> lock (m_readLock);
m_readShut = true;
m_readCondVar.notify_all ();
m_readGenCondVar.notify_all ();
}
BOOST_FOREACH (std::thread& th, m_readThreads)
th.join ();
}
String getName () const
@@ -59,76 +92,105 @@ public:
return m_backend->getName ();
}
//------------------------------------------------------------------------------
bool asyncFetch (uint256 const& hash, NodeObject::pointer& object)
{
// See if the object is in cache
object = m_cache.fetch (hash);
if (object || m_negCache.touch_if_exists (hash))
return true;
{
// No. Post a read
std::unique_lock <std::mutex> lock (m_readLock);
if (m_readSet.insert (hash).second)
m_readCondVar.notify_one ();
}
return false;
}
void waitReads ()
{
{
std::unique_lock <std::mutex> lock (m_readLock);
// Wake in two generations
uint64 const wakeGeneration = m_readGen + 2;
while (!m_readShut && !m_readSet.empty () && (m_readGen < wakeGeneration))
m_readGenCondVar.wait (lock);
}
}
NodeObject::Ptr fetch (uint256 const& hash)
{
// See if the object already exists in the cache
//
NodeObject::Ptr obj = m_cache.fetch (hash);
if (obj != nullptr)
return obj;
if (m_negCache.touch_if_exists (hash))
return obj;
// Check the database(s).
bool foundInFastBackend = false;
// Check the fast backend database if we have one
//
if (m_fastBackend != nullptr)
{
obj = fetchInternal (*m_fastBackend, hash);
// If we found the object, avoid storing it again later.
if (obj != nullptr)
foundInFastBackend = true;
}
// Are we still without an object?
//
if (obj == nullptr)
{
// There's still a chance it could be in one of the databases.
bool foundInFastBackend = false;
// Check the fast backend database if we have one
// Yes so at last we will try the main database.
//
if (m_fastBackend != nullptr)
{
obj = fetchInternal (*m_fastBackend, hash);
obj = fetchInternal (*m_backend, hash);
}
// If we found the object, avoid storing it again later.
if (obj != nullptr)
foundInFastBackend = true;
}
if (obj == nullptr)
{
// Just in case a write occurred
obj = m_cache.fetch (hash);
// Are we still without an object?
//
if (obj == nullptr)
{
// Yes so at last we will try the main database.
//
{
// Monitor this operation's load since it is expensive.
//
// VFALCO TODO Why is this an autoptr? Why can't it just be a plain old object?
//
// VFALCO NOTE Commented this out because it breaks the unit test!
//
//LoadEvent::autoptr event (getApp().getJobQueue ().getLoadEventAP (jtHO_READ, "HOS::retrieve"));
obj = fetchInternal (*m_backend, hash);
}
}
// Did we finally get something?
//
if (obj != nullptr)
{
// Yes it so canonicalize. This solves the problem where
// more than one thread has its own copy of the same object.
//
m_cache.canonicalize (hash, obj);
if (! foundInFastBackend)
{
// If we have a fast back end, store it there for later.
//
if (m_fastBackend != nullptr)
m_fastBackend->store (obj);
// Since this was a 'hard' fetch, we will log it.
//
WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: in db";
}
// We give up
m_negCache.insert (hash);
}
}
else
{
// found it!
// Ensure all threads get the same object
//
m_cache.canonicalize (hash, obj);
if (! foundInFastBackend)
{
// If we have a fast back end, store it there for later.
//
if (m_fastBackend != nullptr)
m_fastBackend->store (obj);
// Since this was a 'hard' fetch, we will log it.
//
WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: in db";
}
}
return obj;
@@ -168,30 +230,20 @@ public:
Blob& data,
uint256 const& hash)
{
bool const keyFoundAndObjectCached = m_cache.refreshIfPresent (hash);
NodeObject::Ptr object = NodeObject::createObject (type, index, data, hash);
// VFALCO NOTE What happens if the key is found, but the object
// fell out of the cache? We will end up passing it
// to the backend anyway.
//
if (! keyFoundAndObjectCached)
{
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
assert (hash == Serializer::getSHA512Half (data));
assert (hash == Serializer::getSHA512Half (data));
#endif
NodeObject::Ptr object = NodeObject::createObject (
type, index, data, hash);
m_cache.canonicalize (hash, object, true);
if (!m_cache.canonicalize (hash, object))
{
m_backend->store (object);
m_backend->store (object);
if (m_fastBackend)
m_fastBackend->store (object);
}
m_negCache.erase (hash);
}
if (m_fastBackend)
m_fastBackend->store (object);
}
//------------------------------------------------------------------------------
@@ -205,11 +257,14 @@ public:
{
m_cache.setTargetSize (size);
m_cache.setTargetAge (age);
m_negCache.setTargetSize (size);
m_negCache.setTargetAge (age);
}
void sweep ()
{
m_cache.sweep ();
m_negCache.sweep ();
}
int getWriteLoad ()
@@ -219,6 +274,51 @@ public:
//------------------------------------------------------------------------------
// Entry point for async read threads
void threadEntry ()
{
Thread::setCurrentThreadName ("prefetch");
while (1)
{
uint256 hash;
{
std::unique_lock <std::mutex> lock (m_readLock);
while (!m_readShut && m_readSet.empty ())
{
// all work is done
m_readGenCondVar.notify_all ();
m_readCondVar.wait (lock);
}
if (m_readShut)
break;
// Read in key order to make the back end more efficient
std::set <uint256>::iterator it = m_readSet.lower_bound (m_readLast);
if (it == m_readSet.end ())
{
it = m_readSet.begin ();
// A generation has completed
++m_readGen;
m_readGenCondVar.notify_all ();
}
hash = *it;
m_readSet.erase (it);
m_readLast = hash;
}
// Perform the read
fetch (hash);
}
}
//------------------------------------------------------------------------------
void visitAll (VisitCallback& callback)
{
m_backend->visitAll (callback);

View File

@@ -109,7 +109,7 @@ public:
}
std::unique_ptr <Database> make_Database (std::string const& name,
Scheduler& scheduler, Journal journal,
Scheduler& scheduler, Journal journal, int readThreads,
Parameters const& backendParameters,
Parameters fastBackendParameters)
{
@@ -121,7 +121,7 @@ public:
? make_Backend (fastBackendParameters, scheduler, journal)
: nullptr);
return std::make_unique <DatabaseImp> (name, scheduler,
return std::make_unique <DatabaseImp> (name, scheduler, readThreads,
std::move (backend), std::move (fastBackend), journal);
}
};

View File

@@ -54,7 +54,7 @@ public:
// Write to source db
{
std::unique_ptr <Database> src (manager->make_Database (
"test", scheduler, j, srcParams));
"test", scheduler, j, 2, srcParams));
storeBatch (*src, batch);
}
@@ -63,7 +63,7 @@ public:
{
// Re-open the db
std::unique_ptr <Database> src (manager->make_Database (
"test", scheduler, j, srcParams));
"test", scheduler, j, 2, srcParams));
// Set up the destination database
File const dest_db (File::createTempFile ("dest_db"));
@@ -72,7 +72,7 @@ public:
destParams.set ("path", dest_db.getFullPathName ());
std::unique_ptr <Database> dest (manager->make_Database (
"test", scheduler, j, destParams));
"test", scheduler, j, 2, destParams));
beginTestCase (String ("import into '") + destBackendType + "' from '" + srcBackendType + "'");
@@ -130,7 +130,7 @@ public:
{
// Open the database
std::unique_ptr <Database> db (manager->make_Database ("test", scheduler,
j, nodeParams, tempParams));
j, 2, nodeParams, tempParams));
// Write the batch
storeBatch (*db, batch);
@@ -156,7 +156,7 @@ public:
{
// Re-open the database without the ephemeral DB
std::unique_ptr <Database> db (manager->make_Database (
"test", scheduler, j, nodeParams));
"test", scheduler, j, 2, nodeParams));
// Read it back in
Batch copy;
@@ -172,7 +172,7 @@ public:
{
// Verify the ephemeral db
std::unique_ptr <Database> db (manager->make_Database ("test",
scheduler, j, tempParams, StringPairArray ()));
scheduler, j, 2, tempParams, StringPairArray ()));
// Read it back in
Batch copy;