From 306811d2a78d0285a6247176d4ae47e636f1790d Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Thu, 21 Nov 2013 18:11:00 -0800 Subject: [PATCH] Add RocksDB NodeStore backend --- doc/rippled-example.cfg | 1 + src/ripple_core/nodestore/NodeStore.cpp | 6 +- .../nodestore/backend/RocksDBFactory.cpp | 344 ++++++++++++++++++ .../nodestore/backend/RocksDBFactory.h | 39 ++ src/ripple_core/nodestore/impl/DatabaseImp.h | 4 + .../nodestore/tests/BackendTests.cpp | 4 + .../nodestore/tests/DatabaseTests.cpp | 8 + .../nodestore/tests/TimingTests.cpp | 6 + 8 files changed, 410 insertions(+), 2 deletions(-) create mode 100644 src/ripple_core/nodestore/backend/RocksDBFactory.cpp create mode 100644 src/ripple_core/nodestore/backend/RocksDBFactory.h diff --git a/doc/rippled-example.cfg b/doc/rippled-example.cfg index 33ae49ed8..06eef7329 100644 --- a/doc/rippled-example.cfg +++ b/doc/rippled-example.cfg @@ -657,6 +657,7 @@ # LevelDB Use Google's LevelDB database (deprecated) # MDB Use MDB # none Use no backend +# RocksDB Use Facebook's RocksDB database # SQLite Use SQLite # # Required keys: diff --git a/src/ripple_core/nodestore/NodeStore.cpp b/src/ripple_core/nodestore/NodeStore.cpp index e6701d5bc..9bfcd3f17 100644 --- a/src/ripple_core/nodestore/NodeStore.cpp +++ b/src/ripple_core/nodestore/NodeStore.cpp @@ -23,9 +23,9 @@ #include "../ripple_leveldb/ripple_leveldb.h" #include "../ripple_mdb/ripple_mdb.h" #include "../ripple/sophia/ripple_sophia.h" +#include "../ripple/rocksdb/ripple_rocksdb.h" -namespace ripple -{ +namespace ripple { # include "impl/DecodedBlob.h" # include "impl/EncodedBlob.h" @@ -44,6 +44,8 @@ namespace ripple #include "backend/MdbFactory.cpp" # include "backend/SophiaFactory.h" #include "backend/SophiaFactory.cpp" +# include "backend/RocksDBFactory.h" +#include "backend/RocksDBFactory.cpp" #include "impl/BatchWriter.cpp" # include "impl/Factories.h" diff --git a/src/ripple_core/nodestore/backend/RocksDBFactory.cpp b/src/ripple_core/nodestore/backend/RocksDBFactory.cpp new file mode 100644 index 000000000..f8670b626 --- /dev/null +++ b/src/ripple_core/nodestore/backend/RocksDBFactory.cpp @@ -0,0 +1,344 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#if RIPPLE_ROCKSDB_AVAILABLE + +namespace NodeStore { + +//------------------------------------------------------------------------------ + +class RocksDBEnv : public rocksdb::EnvWrapper +{ +public: + static RocksDBEnv* get () + { + static RocksDBEnv instance; + return &instance; + } + + RocksDBEnv () + : EnvWrapper (rocksdb::Env::Default()) + { + } + + struct ThreadParams + { + ThreadParams (void (*f_)(void*), void* a_) + : f (f_) + , a (a_) + { + } + + void (*f)(void*); + void* a; + }; + + static void thread_entry (void* ptr) + { + ThreadParams* const p (reinterpret_cast (ptr)); + void (*f)(void*) = p->f; + void* a (p->a); + delete p; + + static Atomic n; + int const id (++n); + std::stringstream ss; + ss << "rocksdb #" << id; + Thread::setCurrentThreadName (ss.str()); + + (*f)(a); + } + + void StartThread(void (*f)(void*), void* a) + { + ThreadParams* const p (new ThreadParams (f, a)); + EnvWrapper::StartThread (&RocksDBEnv::thread_entry, p); + } +}; + +//------------------------------------------------------------------------------ + +class RocksDBFactory::BackendImp + : public Backend + , public BatchWriter::Callback + , public LeakChecked +{ +public: + typedef RecycledObjectPool StringPool; + + //-------------------------------------------------------------------------- + + BackendImp (int keyBytes, + Parameters const& keyValues, + Scheduler& scheduler) + : m_keyBytes (keyBytes) + , m_scheduler (scheduler) + , m_batch (*this, scheduler) + , m_name (keyValues ["path"].toStdString ()) + { + if (m_name.empty()) + Throw (std::runtime_error ("Missing path in RocksDBFactory backend")); + + rocksdb::Options options; + options.create_if_missing = true; + + if (keyValues["cache_mb"].isEmpty()) + { + options.block_cache = rocksdb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024); + } + else + { + options.block_cache = rocksdb::NewLRUCache (keyValues["cache_mb"].getIntValue() * 1024L * 1024L); + } + + if (keyValues["filter_bits"].isEmpty()) + { + if (getConfig ().NODE_SIZE >= 2) + options.filter_policy = rocksdb::NewBloomFilterPolicy (10); + } + else if (keyValues["filter_bits"].getIntValue() != 0) + { + options.filter_policy = rocksdb::NewBloomFilterPolicy (keyValues["filter_bits"].getIntValue()); + } + + if (! keyValues["open_files"].isEmpty()) + { + options.max_open_files = keyValues["open_files"].getIntValue(); + } + + if (! keyValues["file_size_mb"].isEmpty()) + { + options.target_file_size_base = 1024 * 1024 * keyValues["file_size_mb"].getIntValue(); + } + + if (! keyValues["file_size_mult"].isEmpty()) + { + options.target_file_size_multiplier = keyValues["file_size_mult"].getIntValue(); + } + + options.env = RocksDBEnv::get(); + + rocksdb::DB* db = nullptr; + rocksdb::Status status = rocksdb::DB::Open (options, m_name, &db); + if (!status.ok () || !db) + Throw (std::runtime_error (std::string("Unable to open/create RocksDB: ") + status.ToString())); + + m_db = db; + } + + ~BackendImp () + { + } + + std::string getName() + { + return m_name; + } + + //-------------------------------------------------------------------------- + + Status fetch (void const* key, NodeObject::Ptr* pObject) + { + pObject->reset (); + + Status status (ok); + + rocksdb::ReadOptions const options; + rocksdb::Slice const slice (static_cast (key), m_keyBytes); + + { + // These are reused std::string objects, + // required for RocksDB's funky interface. + // + StringPool::ScopedItem item (m_stringPool); + std::string& string = item.getObject (); + + rocksdb::Status getStatus = m_db->Get (options, slice, &string); + + if (getStatus.ok ()) + { + DecodedBlob decoded (key, string.data (), string.size ()); + + if (decoded.wasOk ()) + { + *pObject = decoded.createObject (); + } + else + { + // Decoding failed, probably corrupted! + // + status = dataCorrupt; + } + } + else + { + if (getStatus.IsCorruption ()) + { + status = dataCorrupt; + } + else if (getStatus.IsNotFound ()) + { + status = notFound; + } + else + { + status = unknown; + } + } + } + + return status; + } + + void store (NodeObject::ref object) + { + m_batch.store (object); + } + + void storeBatch (Batch const& batch) + { + rocksdb::WriteBatch wb; + + { + EncodedBlob::Pool::ScopedItem item (m_blobPool); + + BOOST_FOREACH (NodeObject::ref object, batch) + { + item.getObject ().prepare (object); + + wb.Put ( + rocksdb::Slice (reinterpret_cast (item.getObject ().getKey ()), + m_keyBytes), + rocksdb::Slice (reinterpret_cast (item.getObject ().getData ()), + item.getObject ().getSize ())); + } + } + + rocksdb::WriteOptions const options; + + m_db->Write (options, &wb).ok (); + } + + void visitAll (VisitCallback& callback) + { + rocksdb::ReadOptions const options; + + ScopedPointer it (m_db->NewIterator (options)); + + for (it->SeekToFirst (); it->Valid (); it->Next ()) + { + if (it->key ().size () == m_keyBytes) + { + DecodedBlob decoded (it->key ().data (), + it->value ().data (), + it->value ().size ()); + + if (decoded.wasOk ()) + { + NodeObject::Ptr object (decoded.createObject ()); + + callback.visitObject (object); + } + else + { + // Uh oh, corrupted data! + WriteLog (lsFATAL, NodeObject) << "Corrupt NodeObject #" << uint256 (it->key ().data ()); + } + } + else + { + // VFALCO NOTE What does it mean to find an + // incorrectly sized key? Corruption? + WriteLog (lsFATAL, NodeObject) << "Bad key size = " << it->key ().size (); + } + } + } + + int getWriteLoad () + { + return m_batch.getWriteLoad (); + } + + //-------------------------------------------------------------------------- + + void writeBatch (Batch const& batch) + { + storeBatch (batch); + } + +private: + size_t const m_keyBytes; + Scheduler& m_scheduler; + BatchWriter m_batch; + StringPool m_stringPool; + EncodedBlob::Pool m_blobPool; + std::string m_name; + ScopedPointer m_db; +}; + +//------------------------------------------------------------------------------ + +class RocksDBFactoryImp : public RocksDBFactory +{ +public: + std::shared_ptr m_lruCache; + + RocksDBFactoryImp () + { + rocksdb::Options options; + options.create_if_missing = true; + options.block_cache = rocksdb::NewLRUCache ( + getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024); + + m_lruCache = options.block_cache; + } + + ~RocksDBFactoryImp () + { + + } + + String getName () const + { + return "RocksDB"; + } + + Backend* createInstance ( + size_t keyBytes, Parameters const& keyValues, + Scheduler& scheduler) + { + return new RocksDBFactory::BackendImp ( + keyBytes, keyValues, scheduler); + } +}; + +//------------------------------------------------------------------------------ + +RocksDBFactory::~RocksDBFactory () +{ +} + +RocksDBFactory* RocksDBFactory::New () +{ + return new RocksDBFactoryImp; +} + +} + +#endif diff --git a/src/ripple_core/nodestore/backend/RocksDBFactory.h b/src/ripple_core/nodestore/backend/RocksDBFactory.h new file mode 100644 index 000000000..49f43f56b --- /dev/null +++ b/src/ripple_core/nodestore/backend/RocksDBFactory.h @@ -0,0 +1,39 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_NODESTORE_ROCKSDBFACTORY_H_INCLUDED +#define RIPPLE_NODESTORE_ROCKSDBFACTORY_H_INCLUDED + +namespace NodeStore { + +/** Factory to produce RocksDB backends for the NodeStore. + @see Database +*/ +class RocksDBFactory : public Factory +{ +public: + class BackendImp; + + static RocksDBFactory* New (); + virtual ~RocksDBFactory () = 0; +}; + +} + +#endif diff --git a/src/ripple_core/nodestore/impl/DatabaseImp.h b/src/ripple_core/nodestore/impl/DatabaseImp.h index 71211e50a..79cde4b38 100644 --- a/src/ripple_core/nodestore/impl/DatabaseImp.h +++ b/src/ripple_core/nodestore/impl/DatabaseImp.h @@ -344,6 +344,10 @@ void Database::addAvailableBackends () addFactory (MdbFactory::getInstance ()); #endif +#if RIPPLE_ROCKSDB_AVAILABLE + addFactory (RocksDBFactory::New ()); +#endif + #if RIPPLE_SOPHIA_AVAILABLE addFactory (SophiaFactory::getInstance ()); #endif diff --git a/src/ripple_core/nodestore/tests/BackendTests.cpp b/src/ripple_core/nodestore/tests/BackendTests.cpp index c7a16d8a0..af2357014 100644 --- a/src/ripple_core/nodestore/tests/BackendTests.cpp +++ b/src/ripple_core/nodestore/tests/BackendTests.cpp @@ -95,6 +95,10 @@ public: testBackend ("mdb", seedValue, 200); #endif + #if RIPPLE_ROCKSDB_AVAILABLE + testBackend ("rocksdb", seedValue); + #endif + #if RIPPLE_SOPHIA_AVAILABLE testBackend ("sophia", seedValue); #endif diff --git a/src/ripple_core/nodestore/tests/DatabaseTests.cpp b/src/ripple_core/nodestore/tests/DatabaseTests.cpp index 663fa3e12..ef8846ef5 100644 --- a/src/ripple_core/nodestore/tests/DatabaseTests.cpp +++ b/src/ripple_core/nodestore/tests/DatabaseTests.cpp @@ -188,6 +188,10 @@ public: testNodeStore ("mdb", useEphemeralDatabase, true, seedValue, 200); #endif + #if RIPPLE_ROCKSDB_AVAILABLE + testNodeStore ("rocksdb", useEphemeralDatabase, true, seedValue); + #endif + #if RIPPLE_SOPHIA_AVAILABLE testNodeStore ("sophia", useEphemeralDatabase, true, seedValue); #endif @@ -201,6 +205,10 @@ public: { testImport ("leveldb", "leveldb", seedValue); + #if RIPPLE_ROCKSDB_AVAILABLE + testImport ("rocksdb", "rocksdb", seedValue); + #endif + #if RIPPLE_HYPERLEVELDB_AVAILABLE testImport ("hyperleveldb", "hyperleveldb", seedValue); #endif diff --git a/src/ripple_core/nodestore/tests/TimingTests.cpp b/src/ripple_core/nodestore/tests/TimingTests.cpp index 570e76811..f468c16aa 100644 --- a/src/ripple_core/nodestore/tests/TimingTests.cpp +++ b/src/ripple_core/nodestore/tests/TimingTests.cpp @@ -118,15 +118,21 @@ public: testBackend ("hyperleveldb", seedValue); #endif + #if RIPPLE_ROCKSDB_AVAILABLE + testBackend ("rocksdb", seedValue); + #endif + /* #if RIPPLE_MDB_AVAILABLE testBackend ("mdb", seedValue); #endif */ + /* #if RIPPLE_SOPHIA_AVAILABLE testBackend ("sophia", seedValue); #endif + */ /* testBackend ("sqlite", seedValue);