From 3906c7e1611281b901286f373c651a7635dd1438 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 15 Jul 2013 19:42:01 -0700 Subject: [PATCH] Refactor some NodeStore, TaggedCache --- modules/ripple_app/node/ripple_NodeStore.cpp | 234 +++++++++++------- modules/ripple_app/node/ripple_NodeStore.h | 101 ++++++-- .../containers/ripple_TaggedCache.h | 87 ++++--- 3 files changed, 268 insertions(+), 154 deletions(-) diff --git a/modules/ripple_app/node/ripple_NodeStore.cpp b/modules/ripple_app/node/ripple_NodeStore.cpp index 960e0d805..b575a9a8c 100644 --- a/modules/ripple_app/node/ripple_NodeStore.cpp +++ b/modules/ripple_app/node/ripple_NodeStore.cpp @@ -4,15 +4,109 @@ */ //============================================================================== +// +// NodeStore::Backend +// + +NodeStore::Backend::Backend () + : mWriteGeneration(0) + , mWriteLoad(0) + , mWritePending(false) +{ + mWriteSet.reserve (bulkWriteBatchSize); +} + +bool NodeStore::Backend::store (NodeObject::ref object) +{ + boost::mutex::scoped_lock sl (mWriteMutex); + mWriteSet.push_back (object); + + if (!mWritePending) + { + mWritePending = true; + + // VFALCO TODO Eliminate this dependency on the Application object. + getApp().getJobQueue ().addJob ( + jtWRITE, + "NodeObject::store", + BIND_TYPE (&NodeStore::Backend::bulkWrite, this, P_1)); + } + return true; +} + +void NodeStore::Backend::bulkWrite (Job &) +{ + int setSize = 0; + + // VFALCO NOTE Use the canonical for(;;) instead. + // Or better, provide a proper terminating condition. + while (1) + { + std::vector< boost::shared_ptr > set; + set.reserve (bulkWriteBatchSize); + + { + boost::mutex::scoped_lock sl (mWriteMutex); + + mWriteSet.swap (set); + assert (mWriteSet.empty ()); + ++mWriteGeneration; + mWriteCondition.notify_all (); + + if (set.empty ()) + { + mWritePending = false; + mWriteLoad = 0; + + // VFALCO NOTE Fix this function to not return from the middle + return; + } + + mWriteLoad = std::max (setSize, static_cast (mWriteSet.size ())); + setSize = set.size (); + } + + bulkStore (set); + } +} + +// VFALCO TODO This function should not be needed. Instead, the +// destructor should handle flushing of the bulk write buffer. +// +void NodeStore::Backend::waitWrite () +{ + boost::mutex::scoped_lock sl (mWriteMutex); + int gen = mWriteGeneration; + + while (mWritePending && (mWriteGeneration == gen)) + mWriteCondition.wait (sl); +} + +int NodeStore::Backend::getWriteLoad () +{ + boost::mutex::scoped_lock sl (mWriteMutex); + + return std::max (mWriteLoad, static_cast (mWriteSet.size ())); +} + +//------------------------------------------------------------------------------ + +// +// NodeStore +// + Array NodeStore::s_factories; -NodeStore::NodeStore (String backendParameters, String fastBackendParameters, int cacheSize, int cacheAge) +NodeStore::NodeStore (String backendParameters, + String fastBackendParameters, + int cacheSize, + int cacheAge) : m_backend (createBackend (backendParameters)) - , mCache ("NodeStore", cacheSize, cacheAge) - , mNegativeCache ("HashedObjectNegativeCache", 0, 120) + , m_fastBackend (fastBackendParameters.isNotEmpty () ? createBackend (fastBackendParameters) + : nullptr) + , m_cache ("NodeStore", cacheSize, cacheAge) + , m_negativeCache ("NoteStoreNegativeCache", 0, 120) { - if (fastBackendParameters.isNotEmpty ()) - m_fastBackend = createBackend (fastBackendParameters); } void NodeStore::addBackendFactory (BackendFactory& factory) @@ -22,19 +116,19 @@ void NodeStore::addBackendFactory (BackendFactory& factory) float NodeStore::getCacheHitRate () { - return mCache.getHitRate (); + return m_cache.getHitRate (); } void NodeStore::tune (int size, int age) { - mCache.setTargetSize (size); - mCache.setTargetAge (age); + m_cache.setTargetSize (size); + m_cache.setTargetAge (age); } void NodeStore::sweep () { - mCache.sweep (); - mNegativeCache.sweep (); + m_cache.sweep (); + m_negativeCache.sweep (); } void NodeStore::waitWrite () @@ -52,32 +146,49 @@ int NodeStore::getWriteLoad () bool NodeStore::store (NodeObjectType type, uint32 index, Blob const& data, uint256 const& hash) { - // return: false = already in cache, true = added to cache - if (mCache.touch (hash)) - return false; + bool wasStored = false; + bool const keyFoundAndObjectCached = m_cache.refreshIfPresent (hash); + + // VFALCO NOTE What happens if the key is found, but the object + // fell out of the cache? We will end up passing it + // to the backend anyway. + // + if (! keyFoundAndObjectCached) + { + +// VFALCO TODO Rename this to RIPPLE_NODESTORE_VERIFY_HASHES and make +// it be 1 or 0 instead of merely defined or undefined. +// #ifdef PARANOID - assert (hash == Serializer::getSHA512Half (data)); + assert (hash == Serializer::getSHA512Half (data)); #endif - NodeObject::pointer object = boost::make_shared (type, index, data, hash); + NodeObject::pointer object = boost::make_shared (type, index, data, hash); - if (!mCache.canonicalize (hash, object)) - { - m_backend->store (object); - if (m_fastBackend) - m_fastBackend->store (object); + // VFALCO NOTE What does it mean to canonicalize an object? + // + if (!m_cache.canonicalize (hash, object)) + { + m_backend->store (object); + + if (m_fastBackend) + m_fastBackend->store (object); + } + + m_negativeCache.del (hash); + + wasStored = true; } - mNegativeCache.del (hash); - return true; + return wasStored; } NodeObject::pointer NodeStore::retrieve (uint256 const& hash) { - NodeObject::pointer obj = mCache.fetch (hash); + NodeObject::pointer obj = m_cache.fetch (hash); - if (obj || mNegativeCache.isPresent (hash)) + if (obj || m_negativeCache.isPresent (hash)) return obj; if (m_fastBackend) @@ -86,7 +197,7 @@ NodeObject::pointer NodeStore::retrieve (uint256 const& hash) if (obj) { - mCache.canonicalize (hash, obj); + m_cache.canonicalize (hash, obj); return obj; } } @@ -97,17 +208,18 @@ NodeObject::pointer NodeStore::retrieve (uint256 const& hash) if (!obj) { - mNegativeCache.add (hash); + m_negativeCache.add (hash); return obj; } } - mCache.canonicalize (hash, obj); + m_cache.canonicalize (hash, obj); if (m_fastBackend) m_fastBackend->store(obj); WriteLog (lsTRACE, NodeObject) << "HOS: " << hash << " fetch: in db"; + return obj; } @@ -115,12 +227,12 @@ void NodeStore::importVisitor ( std::vector & objects, NodeObject::pointer object) { - if (objects.size() >= 128) + if (objects.size() >= bulkWriteBatchSize) { m_backend->bulkStore (objects); objects.clear (); - objects.reserve (128); + objects.reserve (bulkWriteBatchSize); } objects.push_back (object); @@ -136,7 +248,7 @@ int NodeStore::import (String sourceBackendParameters) std::vector objects; - objects.reserve (128); + objects.reserve (bulkWriteBatchSize); srcBackend->visitAll (BIND_TYPE (&NodeStore::importVisitor, this, boost::ref (objects), P_1)); @@ -183,65 +295,3 @@ NodeStore::Backend* NodeStore::createBackend (String const& parameters) return backend; } - -bool NodeStore::Backend::store (NodeObject::ref object) -{ - boost::mutex::scoped_lock sl (mWriteMutex); - mWriteSet.push_back (object); - - if (!mWritePending) - { - mWritePending = true; - getApp().getJobQueue ().addJob (jtWRITE, "NodeObject::store", - BIND_TYPE (&NodeStore::Backend::bulkWrite, this, P_1)); - } - return true; -} - -void NodeStore::Backend::bulkWrite (Job &) -{ - int setSize = 0; - - while (1) - { - std::vector< boost::shared_ptr > set; - set.reserve (128); - - { - boost::mutex::scoped_lock sl (mWriteMutex); - - mWriteSet.swap (set); - assert (mWriteSet.empty ()); - ++mWriteGeneration; - mWriteCondition.notify_all (); - - if (set.empty ()) - { - mWritePending = false; - mWriteLoad = 0; - return; - } - - mWriteLoad = std::max (setSize, static_cast (mWriteSet.size ())); - setSize = set.size (); - } - - bulkStore (set); - } -} - -void NodeStore::Backend::waitWrite () -{ - boost::mutex::scoped_lock sl (mWriteMutex); - int gen = mWriteGeneration; - - while (mWritePending && (mWriteGeneration == gen)) - mWriteCondition.wait (sl); -} - -int NodeStore::Backend::getWriteLoad () -{ - boost::mutex::scoped_lock sl (mWriteMutex); - - return std::max (mWriteLoad, static_cast (mWriteSet.size ())); -} diff --git a/modules/ripple_app/node/ripple_NodeStore.h b/modules/ripple_app/node/ripple_NodeStore.h index dc21f4c2f..df611cbc8 100644 --- a/modules/ripple_app/node/ripple_NodeStore.h +++ b/modules/ripple_app/node/ripple_NodeStore.h @@ -12,32 +12,41 @@ class NodeStore : LeakChecked { public: + enum + { + /** This is the largest number of key/value pairs we + will write during a bulk write. + */ + // VFALCO TODO Make this a tunable parameter in the key value pairs + bulkWriteBatchSize = 128 + }; + + /** Interface to inform callers of cetain activities. + */ + class Hooks + { + virtual void on + }; + /** Back end used for the store. */ class Backend { public: - // VFALCO TODO Move the function definition to the .cpp - Backend () - : mWriteGeneration(0) - , mWriteLoad(0) - , mWritePending(false) - { - mWriteSet.reserve(128); - } + Backend (); virtual ~Backend () { } - virtual std::string getDataBaseName() = 0; - - // Store/retrieve a single object - // These functions must be thread safe + /** Store a single object. + */ + // VFALCO TODO Why should the Backend know or care about NodeObject? + // It should just deal with a fixed key and raw data. + // virtual bool store (NodeObject::ref); - virtual NodeObject::pointer retrieve (uint256 const &hash) = 0; - // Store a group of objects - // This function will only be called from a single thread - virtual bool bulkStore (const std::vector< NodeObject::pointer >&) = 0; + /** Retrieve an individual object. + */ + virtual NodeObject::pointer retrieve (uint256 const &hash) = 0; // Visit every object in the database // This function will only be called during an import operation @@ -46,19 +55,38 @@ public: // virtual void visitAll (FUNCTION_TYPE ) = 0; + private: + friend class NodeStore; + // VFALCO TODO Put this bulk writing logic into a separate class. - virtual void bulkWrite (Job &); - virtual void waitWrite (); - virtual int getWriteLoad (); + // NOTE Why are these virtual? + void bulkWrite (Job &); + void waitWrite (); + int getWriteLoad (); + + private: + virtual std::string getDataBaseName() = 0; + + // Store a group of objects + // This function will only be called from a single thread + // VFALCO NOTE It looks like NodeStore throws this into the job queue? + virtual bool bulkStore (const std::vector< NodeObject::pointer >&) = 0; protected: // VFALCO TODO Put this bulk writing logic into a separate class. - boost::mutex mWriteMutex; - boost::condition_variable mWriteCondition; - int mWriteGeneration; - int mWriteLoad; - bool mWritePending; - std::vector > mWriteSet; + boost::mutex mWriteMutex; + boost::condition_variable mWriteCondition; + int mWriteGeneration; + int mWriteLoad; + bool mWritePending; + std::vector > mWriteSet; + }; + +public: + // Helper functions for the backend + class BackendHelper + { + public: }; public: @@ -90,6 +118,7 @@ public: */ // VFALCO NOTE Is cacheSize in bytes? objects? KB? // Is cacheAge in minutes? seconds? + // These should be in the parameters. // NodeStore (String backendParameters, String fastBackendParameters, @@ -103,18 +132,32 @@ public: */ static void addBackendFactory (BackendFactory& factory); + // VFALCO TODO Document this. float getCacheHitRate (); + // VFALCO TODO Document this. bool store (NodeObjectType type, uint32 index, Blob const& data, uint256 const& hash); + // VFALCO TODO Document this. NodeObject::pointer retrieve (uint256 const& hash); + // VFALCO TODO Document this. void waitWrite (); + + // VFALCO TODO Document this. + // TODO Document the parameter meanings. void tune (int size, int age); + + // VFALCO TODO Document this. void sweep (); + + // VFALCO TODO Document this. + // What are the units of the return value? int getWriteLoad (); + // VFALCO TODO Document this. + // NOTE What's the return value? int import (String sourceBackendParameters); private: @@ -125,11 +168,15 @@ private: static Array s_factories; private: + // Persistent key/value storage. ScopedPointer m_backend; + + // Larger key/value storage, but not necessarily persistent. ScopedPointer m_fastBackend; - TaggedCache mCache; - KeyCache mNegativeCache; + // VFALCO NOTE What are these things for? We need comments. + TaggedCache m_cache; + KeyCache m_negativeCache; }; #endif diff --git a/modules/ripple_basics/containers/ripple_TaggedCache.h b/modules/ripple_basics/containers/ripple_TaggedCache.h index 1551ebda0..5fa75bf15 100644 --- a/modules/ripple_basics/containers/ripple_TaggedCache.h +++ b/modules/ripple_basics/containers/ripple_TaggedCache.h @@ -62,7 +62,58 @@ public: void sweep (); void clear (); - bool touch (const key_type& key); + /** Refresh the expiration time on a key. + + @param key The key to refresh. + @return `true` if the key was found and the object is cached. + */ + bool refreshIfPresent (const key_type& key) + { + bool found = false; + + // If present, make current in cache + boost::recursive_mutex::scoped_lock sl (mLock); + + cache_iterator cit = mCache.find (key); + + if (cit != mCache.end ()) + { + cache_entry& entry = cit->second; + + if (! entry.isCached ()) + { + // Convert weak to strong. + entry.ptr = entry.lock (); + + if (entry.isCached ()) + { + // We just put the object back in cache + ++mCacheCount; + entry.touch (); + found = true; + } + else + { + // Couldn't get strong pointer, + // object fell out of the cache so remove the entry. + mCache.erase (cit); + } + } + else + { + // It's cached so update the timer + entry.touch (); + found = true; + } + } + else + { + // not present + } + + return found; + } + bool del (const key_type& key, bool valid); bool canonicalize (const key_type& key, boost::shared_ptr& data, bool replace = false); bool store (const key_type& key, const c_Data& data); @@ -264,40 +315,6 @@ void TaggedCache::sweep () } } -template -bool TaggedCache::touch (const key_type& key) -{ - // If present, make current in cache - boost::recursive_mutex::scoped_lock sl (mLock); - - cache_iterator cit = mCache.find (key); - - if (cit == mCache.end ()) // Don't have the object - return false; - - cache_entry& entry = cit->second; - - if (entry.isCached ()) - { - entry.touch (); - return true; - } - - entry.ptr = entry.lock (); - - if (entry.isCached ()) - { - // We just put the object back in cache - ++mCacheCount; - entry.touch (); - return true; - } - - // Object fell out - mCache.erase (cit); - return false; -} - template bool TaggedCache::del (const key_type& key, bool valid) {