From d9956845bb56026d30c3a13aec479404c2c31419 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 13 Jul 2013 15:15:09 -0700 Subject: [PATCH] Slight refactor of the boundary between the node db and its backend(s). --- .../node/ripple_LevelDBBackendFactory.cpp | 8 -- .../node/ripple_MdbBackendFactory.cpp | 29 ---- modules/ripple_app/node/ripple_NodeStore.cpp | 125 ++++++++++-------- modules/ripple_app/node/ripple_NodeStore.h | 28 ++-- .../node/ripple_SqliteBackendFactory.cpp | 12 -- 5 files changed, 87 insertions(+), 115 deletions(-) diff --git a/modules/ripple_app/node/ripple_LevelDBBackendFactory.cpp b/modules/ripple_app/node/ripple_LevelDBBackendFactory.cpp index ef27c96ec7..b936e7349d 100644 --- a/modules/ripple_app/node/ripple_LevelDBBackendFactory.cpp +++ b/modules/ripple_app/node/ripple_LevelDBBackendFactory.cpp @@ -33,14 +33,6 @@ public: return mName; } - bool store (NodeObject::ref obj) - { - Blob blob (toBlob (obj)); - return mDB->Put (leveldb::WriteOptions (), - leveldb::Slice (reinterpret_cast(obj->getHash ().begin ()), 256 / 8), - leveldb::Slice (reinterpret_cast(&blob.front ()), blob.size ())).ok (); - } - bool bulkStore (const std::vector< NodeObject::pointer >& objs) { leveldb::WriteBatch batch; diff --git a/modules/ripple_app/node/ripple_MdbBackendFactory.cpp b/modules/ripple_app/node/ripple_MdbBackendFactory.cpp index cf6e02be7e..0b74349ab3 100644 --- a/modules/ripple_app/node/ripple_MdbBackendFactory.cpp +++ b/modules/ripple_app/node/ripple_MdbBackendFactory.cpp @@ -61,35 +61,6 @@ public: return m_name; } - bool store (NodeObject::ref obj) - { - MDB_txn *txn = nullptr; - int rc = 0; - - rc = mdb_txn_begin(m_env, NULL, 0, &txn); - - if (rc == 0) - { - MDB_val key, data; - Blob blob (toBlob (obj)); - - key.mv_size = (256 / 8); - key.mv_data = const_cast(obj->getHash().begin()); - - data.mv_size = blob.size(); - data.mv_data = &blob.front(); - - rc = mdb_put(txn, m_dbi, &key, &data, 0); - } - - if (rc == 0) - rc = mdb_txn_commit(txn); - else if (txn) - mdb_txn_abort(txn); - - return rc == 0; - } - bool bulkStore (std::vector const& objs) { MDB_txn *txn = nullptr; diff --git a/modules/ripple_app/node/ripple_NodeStore.cpp b/modules/ripple_app/node/ripple_NodeStore.cpp index 49034e1fdf..960e0d805f 100644 --- a/modules/ripple_app/node/ripple_NodeStore.cpp +++ b/modules/ripple_app/node/ripple_NodeStore.cpp @@ -10,14 +10,9 @@ NodeStore::NodeStore (String backendParameters, String fastBackendParameters, in : m_backend (createBackend (backendParameters)) , mCache ("NodeStore", cacheSize, cacheAge) , mNegativeCache ("HashedObjectNegativeCache", 0, 120) - , mWriteGeneration (0) - , mWriteLoad (0) - , mWritePending (false) { if (fastBackendParameters.isNotEmpty ()) m_fastBackend = createBackend (fastBackendParameters); - - mWriteSet.reserve (128); } void NodeStore::addBackendFactory (BackendFactory& factory) @@ -44,17 +39,14 @@ void NodeStore::sweep () void NodeStore::waitWrite () { - boost::mutex::scoped_lock sl (mWriteMutex); - int gen = mWriteGeneration; - - while (mWritePending && (mWriteGeneration == gen)) - mWriteCondition.wait (sl); + m_backend->waitWrite (); + if (m_fastBackend) + m_fastBackend->waitWrite (); } int NodeStore::getWriteLoad () { - boost::mutex::scoped_lock sl (mWriteMutex); - return std::max (mWriteLoad, static_cast (mWriteSet.size ())); + return m_backend->getWriteLoad (); } bool NodeStore::store (NodeObjectType type, uint32 index, @@ -72,56 +64,15 @@ bool NodeStore::store (NodeObjectType type, uint32 index, if (!mCache.canonicalize (hash, object)) { - boost::mutex::scoped_lock sl (mWriteMutex); - mWriteSet.push_back (object); - - if (!mWritePending) - { - mWritePending = true; - getApp().getJobQueue ().addJob (jtWRITE, "NodeObject::store", - BIND_TYPE (&NodeStore::bulkWrite, this, P_1)); - } + m_backend->store (object); + if (m_fastBackend) + m_fastBackend->store (object); } mNegativeCache.del (hash); return true; } -void NodeStore::bulkWrite (Job&) -{ - int setSize = 0; - - while (1) - { - std::vector< boost::shared_ptr > set; - set.reserve (128); - - { - boost::mutex::scoped_lock sl (mWriteMutex); - - mWriteSet.swap (set); - assert (mWriteSet.empty ()); - ++mWriteGeneration; - mWriteCondition.notify_all (); - - if (set.empty ()) - { - mWritePending = false; - mWriteLoad = 0; - return; - } - - mWriteLoad = std::max (setSize, static_cast (mWriteSet.size ())); - setSize = set.size (); - } - - m_backend->bulkStore (set); - - if (m_fastBackend) - m_fastBackend->bulkStore (set); - } -} - NodeObject::pointer NodeStore::retrieve (uint256 const& hash) { NodeObject::pointer obj = mCache.fetch (hash); @@ -232,3 +183,65 @@ NodeStore::Backend* NodeStore::createBackend (String const& parameters) return backend; } + +bool NodeStore::Backend::store (NodeObject::ref object) +{ + boost::mutex::scoped_lock sl (mWriteMutex); + mWriteSet.push_back (object); + + if (!mWritePending) + { + mWritePending = true; + getApp().getJobQueue ().addJob (jtWRITE, "NodeObject::store", + BIND_TYPE (&NodeStore::Backend::bulkWrite, this, P_1)); + } + return true; +} + +void NodeStore::Backend::bulkWrite (Job &) +{ + int setSize = 0; + + while (1) + { + std::vector< boost::shared_ptr > set; + set.reserve (128); + + { + boost::mutex::scoped_lock sl (mWriteMutex); + + mWriteSet.swap (set); + assert (mWriteSet.empty ()); + ++mWriteGeneration; + mWriteCondition.notify_all (); + + if (set.empty ()) + { + mWritePending = false; + mWriteLoad = 0; + return; + } + + mWriteLoad = std::max (setSize, static_cast (mWriteSet.size ())); + setSize = set.size (); + } + + bulkStore (set); + } +} + +void NodeStore::Backend::waitWrite () +{ + boost::mutex::scoped_lock sl (mWriteMutex); + int gen = mWriteGeneration; + + while (mWritePending && (mWriteGeneration == gen)) + mWriteCondition.wait (sl); +} + +int NodeStore::Backend::getWriteLoad () +{ + boost::mutex::scoped_lock sl (mWriteMutex); + + return std::max (mWriteLoad, static_cast (mWriteSet.size ())); +} diff --git a/modules/ripple_app/node/ripple_NodeStore.h b/modules/ripple_app/node/ripple_NodeStore.h index 78adb51726..564a85d046 100644 --- a/modules/ripple_app/node/ripple_NodeStore.h +++ b/modules/ripple_app/node/ripple_NodeStore.h @@ -23,13 +23,18 @@ public: // typedef boost::shared_ptr pointer; + Backend () : mWriteGeneration(0), mWriteLoad(0), mWritePending(false) + { + mWriteSet.reserve(128); + } + virtual ~Backend () { } virtual std::string getDataBaseName() = 0; // Store/retrieve a single object // These functions must be thread safe - virtual bool store (NodeObject::ref) = 0; + virtual bool store (NodeObject::ref); virtual NodeObject::pointer retrieve (uint256 const &hash) = 0; // Store a group of objects @@ -39,6 +44,18 @@ public: // Visit every object in the database // This function will only be called during an import operation virtual void visitAll (FUNCTION_TYPE ) = 0; + + virtual void bulkWrite (Job &); + virtual void waitWrite (); + virtual int getWriteLoad (); + + protected: + boost::mutex mWriteMutex; + boost::condition_variable mWriteCondition; + int mWriteGeneration; + int mWriteLoad; + bool mWritePending; + std::vector > mWriteSet; }; public: @@ -90,7 +107,6 @@ public: NodeObject::pointer retrieve (uint256 const& hash); - void bulkWrite (Job&); void waitWrite (); void tune (int size, int age); void sweep (); @@ -111,14 +127,6 @@ private: TaggedCache mCache; KeyCache mNegativeCache; - - boost::mutex mWriteMutex; - boost::condition_variable mWriteCondition; - int mWriteGeneration; - int mWriteLoad; - - std::vector< boost::shared_ptr > mWriteSet; - bool mWritePending; }; #endif diff --git a/modules/ripple_app/node/ripple_SqliteBackendFactory.cpp b/modules/ripple_app/node/ripple_SqliteBackendFactory.cpp index f1e74d3bba..0b421ac5be 100644 --- a/modules/ripple_app/node/ripple_SqliteBackendFactory.cpp +++ b/modules/ripple_app/node/ripple_SqliteBackendFactory.cpp @@ -24,18 +24,6 @@ public: return mName; } - bool store(NodeObject::ref object) - { - ScopedLock sl(mDb->getDBLock()); - static SqliteStatement pSt(mDb->getDB()->getSqliteDB(), - "INSERT OR IGNORE INTO CommittedObjects " - "(Hash,ObjType,LedgerIndex,Object) VALUES (?, ?, ?, ?);"); - bind(pSt, object); - pSt.step(); - pSt.reset(); - return true; - } - bool bulkStore(const std::vector< NodeObject::pointer >& objects) { ScopedLock sl(mDb->getDBLock());