Add RocksDB NodeStore backend

This commit is contained in:
Vinnie Falco
2013-11-21 18:11:00 -08:00
parent 8b72f2ad79
commit 306811d2a7
8 changed files with 410 additions and 2 deletions

View File

@@ -657,6 +657,7 @@
# LevelDB Use Google's LevelDB database (deprecated)
# MDB Use MDB
# none Use no backend
# RocksDB Use Facebook's RocksDB database
# SQLite Use SQLite
#
# Required keys:

View File

@@ -23,9 +23,9 @@
#include "../ripple_leveldb/ripple_leveldb.h"
#include "../ripple_mdb/ripple_mdb.h"
#include "../ripple/sophia/ripple_sophia.h"
#include "../ripple/rocksdb/ripple_rocksdb.h"
namespace ripple
{
namespace ripple {
# include "impl/DecodedBlob.h"
# include "impl/EncodedBlob.h"
@@ -44,6 +44,8 @@ namespace ripple
#include "backend/MdbFactory.cpp"
# include "backend/SophiaFactory.h"
#include "backend/SophiaFactory.cpp"
# include "backend/RocksDBFactory.h"
#include "backend/RocksDBFactory.cpp"
#include "impl/BatchWriter.cpp"
# include "impl/Factories.h"

View File

@@ -0,0 +1,344 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#if RIPPLE_ROCKSDB_AVAILABLE
namespace NodeStore {
//------------------------------------------------------------------------------
class RocksDBEnv : public rocksdb::EnvWrapper
{
public:
static RocksDBEnv* get ()
{
static RocksDBEnv instance;
return &instance;
}
RocksDBEnv ()
: EnvWrapper (rocksdb::Env::Default())
{
}
struct ThreadParams
{
ThreadParams (void (*f_)(void*), void* a_)
: f (f_)
, a (a_)
{
}
void (*f)(void*);
void* a;
};
static void thread_entry (void* ptr)
{
ThreadParams* const p (reinterpret_cast <ThreadParams*> (ptr));
void (*f)(void*) = p->f;
void* a (p->a);
delete p;
static Atomic <int> n;
int const id (++n);
std::stringstream ss;
ss << "rocksdb #" << id;
Thread::setCurrentThreadName (ss.str());
(*f)(a);
}
void StartThread(void (*f)(void*), void* a)
{
ThreadParams* const p (new ThreadParams (f, a));
EnvWrapper::StartThread (&RocksDBEnv::thread_entry, p);
}
};
//------------------------------------------------------------------------------
class RocksDBFactory::BackendImp
: public Backend
, public BatchWriter::Callback
, public LeakChecked <RocksDBFactory::BackendImp>
{
public:
typedef RecycledObjectPool <std::string> StringPool;
//--------------------------------------------------------------------------
BackendImp (int keyBytes,
Parameters const& keyValues,
Scheduler& scheduler)
: m_keyBytes (keyBytes)
, m_scheduler (scheduler)
, m_batch (*this, scheduler)
, m_name (keyValues ["path"].toStdString ())
{
if (m_name.empty())
Throw (std::runtime_error ("Missing path in RocksDBFactory backend"));
rocksdb::Options options;
options.create_if_missing = true;
if (keyValues["cache_mb"].isEmpty())
{
options.block_cache = rocksdb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
}
else
{
options.block_cache = rocksdb::NewLRUCache (keyValues["cache_mb"].getIntValue() * 1024L * 1024L);
}
if (keyValues["filter_bits"].isEmpty())
{
if (getConfig ().NODE_SIZE >= 2)
options.filter_policy = rocksdb::NewBloomFilterPolicy (10);
}
else if (keyValues["filter_bits"].getIntValue() != 0)
{
options.filter_policy = rocksdb::NewBloomFilterPolicy (keyValues["filter_bits"].getIntValue());
}
if (! keyValues["open_files"].isEmpty())
{
options.max_open_files = keyValues["open_files"].getIntValue();
}
if (! keyValues["file_size_mb"].isEmpty())
{
options.target_file_size_base = 1024 * 1024 * keyValues["file_size_mb"].getIntValue();
}
if (! keyValues["file_size_mult"].isEmpty())
{
options.target_file_size_multiplier = keyValues["file_size_mult"].getIntValue();
}
options.env = RocksDBEnv::get();
rocksdb::DB* db = nullptr;
rocksdb::Status status = rocksdb::DB::Open (options, m_name, &db);
if (!status.ok () || !db)
Throw (std::runtime_error (std::string("Unable to open/create RocksDB: ") + status.ToString()));
m_db = db;
}
~BackendImp ()
{
}
std::string getName()
{
return m_name;
}
//--------------------------------------------------------------------------
Status fetch (void const* key, NodeObject::Ptr* pObject)
{
pObject->reset ();
Status status (ok);
rocksdb::ReadOptions const options;
rocksdb::Slice const slice (static_cast <char const*> (key), m_keyBytes);
{
// These are reused std::string objects,
// required for RocksDB's funky interface.
//
StringPool::ScopedItem item (m_stringPool);
std::string& string = item.getObject ();
rocksdb::Status getStatus = m_db->Get (options, slice, &string);
if (getStatus.ok ())
{
DecodedBlob decoded (key, string.data (), string.size ());
if (decoded.wasOk ())
{
*pObject = decoded.createObject ();
}
else
{
// Decoding failed, probably corrupted!
//
status = dataCorrupt;
}
}
else
{
if (getStatus.IsCorruption ())
{
status = dataCorrupt;
}
else if (getStatus.IsNotFound ())
{
status = notFound;
}
else
{
status = unknown;
}
}
}
return status;
}
void store (NodeObject::ref object)
{
m_batch.store (object);
}
void storeBatch (Batch const& batch)
{
rocksdb::WriteBatch wb;
{
EncodedBlob::Pool::ScopedItem item (m_blobPool);
BOOST_FOREACH (NodeObject::ref object, batch)
{
item.getObject ().prepare (object);
wb.Put (
rocksdb::Slice (reinterpret_cast <char const*> (item.getObject ().getKey ()),
m_keyBytes),
rocksdb::Slice (reinterpret_cast <char const*> (item.getObject ().getData ()),
item.getObject ().getSize ()));
}
}
rocksdb::WriteOptions const options;
m_db->Write (options, &wb).ok ();
}
void visitAll (VisitCallback& callback)
{
rocksdb::ReadOptions const options;
ScopedPointer <rocksdb::Iterator> it (m_db->NewIterator (options));
for (it->SeekToFirst (); it->Valid (); it->Next ())
{
if (it->key ().size () == m_keyBytes)
{
DecodedBlob decoded (it->key ().data (),
it->value ().data (),
it->value ().size ());
if (decoded.wasOk ())
{
NodeObject::Ptr object (decoded.createObject ());
callback.visitObject (object);
}
else
{
// Uh oh, corrupted data!
WriteLog (lsFATAL, NodeObject) << "Corrupt NodeObject #" << uint256 (it->key ().data ());
}
}
else
{
// VFALCO NOTE What does it mean to find an
// incorrectly sized key? Corruption?
WriteLog (lsFATAL, NodeObject) << "Bad key size = " << it->key ().size ();
}
}
}
int getWriteLoad ()
{
return m_batch.getWriteLoad ();
}
//--------------------------------------------------------------------------
void writeBatch (Batch const& batch)
{
storeBatch (batch);
}
private:
size_t const m_keyBytes;
Scheduler& m_scheduler;
BatchWriter m_batch;
StringPool m_stringPool;
EncodedBlob::Pool m_blobPool;
std::string m_name;
ScopedPointer <rocksdb::DB> m_db;
};
//------------------------------------------------------------------------------
class RocksDBFactoryImp : public RocksDBFactory
{
public:
std::shared_ptr <rocksdb::Cache> m_lruCache;
RocksDBFactoryImp ()
{
rocksdb::Options options;
options.create_if_missing = true;
options.block_cache = rocksdb::NewLRUCache (
getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
m_lruCache = options.block_cache;
}
~RocksDBFactoryImp ()
{
}
String getName () const
{
return "RocksDB";
}
Backend* createInstance (
size_t keyBytes, Parameters const& keyValues,
Scheduler& scheduler)
{
return new RocksDBFactory::BackendImp (
keyBytes, keyValues, scheduler);
}
};
//------------------------------------------------------------------------------
RocksDBFactory::~RocksDBFactory ()
{
}
RocksDBFactory* RocksDBFactory::New ()
{
return new RocksDBFactoryImp;
}
}
#endif

View File

@@ -0,0 +1,39 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_NODESTORE_ROCKSDBFACTORY_H_INCLUDED
#define RIPPLE_NODESTORE_ROCKSDBFACTORY_H_INCLUDED
namespace NodeStore {
/** Factory to produce RocksDB backends for the NodeStore.
@see Database
*/
class RocksDBFactory : public Factory
{
public:
class BackendImp;
static RocksDBFactory* New ();
virtual ~RocksDBFactory () = 0;
};
}
#endif

View File

@@ -344,6 +344,10 @@ void Database::addAvailableBackends ()
addFactory (MdbFactory::getInstance ());
#endif
#if RIPPLE_ROCKSDB_AVAILABLE
addFactory (RocksDBFactory::New ());
#endif
#if RIPPLE_SOPHIA_AVAILABLE
addFactory (SophiaFactory::getInstance ());
#endif

View File

@@ -95,6 +95,10 @@ public:
testBackend ("mdb", seedValue, 200);
#endif
#if RIPPLE_ROCKSDB_AVAILABLE
testBackend ("rocksdb", seedValue);
#endif
#if RIPPLE_SOPHIA_AVAILABLE
testBackend ("sophia", seedValue);
#endif

View File

@@ -188,6 +188,10 @@ public:
testNodeStore ("mdb", useEphemeralDatabase, true, seedValue, 200);
#endif
#if RIPPLE_ROCKSDB_AVAILABLE
testNodeStore ("rocksdb", useEphemeralDatabase, true, seedValue);
#endif
#if RIPPLE_SOPHIA_AVAILABLE
testNodeStore ("sophia", useEphemeralDatabase, true, seedValue);
#endif
@@ -201,6 +205,10 @@ public:
{
testImport ("leveldb", "leveldb", seedValue);
#if RIPPLE_ROCKSDB_AVAILABLE
testImport ("rocksdb", "rocksdb", seedValue);
#endif
#if RIPPLE_HYPERLEVELDB_AVAILABLE
testImport ("hyperleveldb", "hyperleveldb", seedValue);
#endif

View File

@@ -118,15 +118,21 @@ public:
testBackend ("hyperleveldb", seedValue);
#endif
#if RIPPLE_ROCKSDB_AVAILABLE
testBackend ("rocksdb", seedValue);
#endif
/*
#if RIPPLE_MDB_AVAILABLE
testBackend ("mdb", seedValue);
#endif
*/
/*
#if RIPPLE_SOPHIA_AVAILABLE
testBackend ("sophia", seedValue);
#endif
*/
/*
testBackend ("sqlite", seedValue);