Make rocksdbquick settings default:

This removes the old default configuration for the "rocksdb" backend and
replaces it with the configuration that was formerly available using
the experimental backend "rocksdbquick".

The new configuration setting improves the performance of the key/value
database by changing the compaction style and tuning the size parameters for
the typical rippled workload. Testing shows a decrease in I/O spikes for both
reading and writing.
This commit is contained in:
Vinnie Falco
2015-01-05 13:37:58 -08:00
parent e2f9f5d7e5
commit f634666dc6
7 changed files with 70 additions and 479 deletions

View File

@@ -2379,9 +2379,6 @@
<ClCompile Include="..\..\src\ripple\nodestore\backend\RocksDBFactory.cpp"> <ClCompile Include="..\..\src\ripple\nodestore\backend\RocksDBFactory.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild> <ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile> </ClCompile>
<ClCompile Include="..\..\src\ripple\nodestore\backend\RocksDBQuickFactory.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\nodestore\Database.h"> <ClInclude Include="..\..\src\ripple\nodestore\Database.h">
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\ripple\nodestore\DatabaseRotating.h"> <ClInclude Include="..\..\src\ripple\nodestore\DatabaseRotating.h">

View File

@@ -3378,9 +3378,6 @@
<ClCompile Include="..\..\src\ripple\nodestore\backend\RocksDBFactory.cpp"> <ClCompile Include="..\..\src\ripple\nodestore\backend\RocksDBFactory.cpp">
<Filter>ripple\nodestore\backend</Filter> <Filter>ripple\nodestore\backend</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="..\..\src\ripple\nodestore\backend\RocksDBQuickFactory.cpp">
<Filter>ripple\nodestore\backend</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\nodestore\Database.h"> <ClInclude Include="..\..\src\ripple\nodestore\Database.h">
<Filter>ripple\nodestore</Filter> <Filter>ripple\nodestore</Filter>
</ClInclude> </ClInclude>

View File

@@ -26,7 +26,6 @@
#include <ripple/core/Config.h> // VFALCO Bad dependency #include <ripple/core/Config.h> // VFALCO Bad dependency
#include <ripple/nodestore/Factory.h> #include <ripple/nodestore/Factory.h>
#include <ripple/nodestore/Manager.h> #include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/BatchWriter.h>
#include <ripple/nodestore/impl/DecodedBlob.h> #include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/nodestore/impl/EncodedBlob.h> #include <ripple/nodestore/impl/EncodedBlob.h>
#include <beast/threads/Thread.h> #include <beast/threads/Thread.h>
@@ -86,7 +85,6 @@ public:
class RocksDBBackend class RocksDBBackend
: public Backend : public Backend
, public BatchWriter::Callback
{ {
private: private:
std::atomic <bool> m_deletePath; std::atomic <bool> m_deletePath;
@@ -94,8 +92,6 @@ private:
public: public:
beast::Journal m_journal; beast::Journal m_journal;
size_t const m_keyBytes; size_t const m_keyBytes;
Scheduler& m_scheduler;
BatchWriter m_batch;
std::string m_name; std::string m_name;
std::unique_ptr <rocksdb::DB> m_db; std::unique_ptr <rocksdb::DB> m_db;
@@ -104,98 +100,71 @@ public:
: m_deletePath (false) : m_deletePath (false)
, m_journal (journal) , m_journal (journal)
, m_keyBytes (keyBytes) , m_keyBytes (keyBytes)
, m_scheduler (scheduler)
, m_batch (*this, scheduler)
, m_name (keyValues ["path"].toStdString ()) , m_name (keyValues ["path"].toStdString ())
{ {
if (m_name.empty()) if (m_name.empty())
throw std::runtime_error ("Missing path in RocksDBFactory backend"); throw std::runtime_error ("Missing path in RocksDBFactory backend");
// Defaults
std::uint64_t budget = 512 * 1024 * 1024; // 512MB
std::string style("level");
std::uint64_t threads=4;
if (!keyValues["budget"].isEmpty())
budget = keyValues["budget"].getIntValue();
if (!keyValues["style"].isEmpty())
style = keyValues["style"].toStdString();
if (!keyValues["threads"].isEmpty())
threads = keyValues["threads"].getIntValue();
// Set options
rocksdb::Options options; rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
options.create_if_missing = true; options.create_if_missing = true;
options.env = env; options.env = env;
if (keyValues["cache_mb"].isEmpty()) if (style == "level")
{ options.OptimizeLevelStyleCompaction(budget);
table_options.block_cache = rocksdb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024);
}
else
{
table_options.block_cache = rocksdb::NewLRUCache (keyValues["cache_mb"].getIntValue() * 1024L * 1024L);
}
if (keyValues["filter_bits"].isEmpty()) if (style == "universal")
{ options.OptimizeUniversalStyleCompaction(budget);
if (getConfig ().NODE_SIZE >= 2)
table_options.filter_policy.reset (rocksdb::NewBloomFilterPolicy (10));
}
else if (keyValues["filter_bits"].getIntValue() != 0)
{
table_options.filter_policy.reset (rocksdb::NewBloomFilterPolicy (keyValues["filter_bits"].getIntValue()));
}
if (! keyValues["open_files"].isEmpty()) if (style == "point")
{ options.OptimizeForPointLookup(budget / 1024 / 1024); // In MB
options.max_open_files = keyValues["open_files"].getIntValue();
}
if (! keyValues["file_size_mb"].isEmpty()) options.IncreaseParallelism(threads);
{
options.target_file_size_base = 1024 * 1024 * keyValues["file_size_mb"].getIntValue();
options.max_bytes_for_level_base = 5 * options.target_file_size_base;
options.write_buffer_size = 2 * options.target_file_size_base;
}
if (! keyValues["file_size_mult"].isEmpty()) // Allows hash indexes in blocks
{ options.prefix_extractor.reset(rocksdb::NewNoopTransform());
options.target_file_size_multiplier = keyValues["file_size_mult"].getIntValue();
}
if (! keyValues["bg_threads"].isEmpty()) // overrride OptimizeLevelStyleCompaction
{ options.min_write_buffer_number_to_merge = 1;
options.env->SetBackgroundThreads
(keyValues["bg_threads"].getIntValue(), rocksdb::Env::LOW); rocksdb::BlockBasedTableOptions table_options;
} // Use hash index
table_options.index_type =
rocksdb::BlockBasedTableOptions::kHashSearch;
table_options.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(10));
options.table_factory.reset(
NewBlockBasedTableFactory(table_options));
// Higher values make reads slower
// table_options.block_size = 4096;
if (! keyValues["high_threads"].isEmpty()) // No point when DatabaseImp has a cache
{ // table_options.block_cache =
auto const highThreads = keyValues["high_threads"].getIntValue(); // rocksdb::NewLRUCache(64 * 1024 * 1024);
options.env->SetBackgroundThreads (highThreads, rocksdb::Env::HIGH);
// If we have high-priority threads, presumably we want to options.memtable_factory.reset(rocksdb::NewHashSkipListRepFactory());
// use them for background flushes // Alternative:
if (highThreads > 0) // options.memtable_factory.reset(
options.max_background_flushes = highThreads; // rocksdb::NewHashCuckooRepFactory(options.write_buffer_size));
}
if (! keyValues["compression"].isEmpty ())
{
if (keyValues["compression"].getIntValue () == 0)
{
options.compression = rocksdb::kNoCompression;
}
}
if (! keyValues["block_size"].isEmpty ())
{
table_options.block_size = keyValues["block_size"].getIntValue ();
}
if (! keyValues["universal_compaction"].isEmpty ())
{
if (keyValues["universal_compaction"].getIntValue () != 0)
{
options.compaction_style = rocksdb:: kCompactionStyleUniversal;
options.min_write_buffer_number_to_merge = 2;
options.max_write_buffer_number = 6;
options.write_buffer_size = 6 * options.target_file_size_base;
}
}
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
rocksdb::DB* db = nullptr; rocksdb::DB* db = nullptr;
rocksdb::Status status = rocksdb::DB::Open (options, m_name, &db); rocksdb::Status status = rocksdb::DB::Open (options, m_name, &db);
if (!status.ok () || !db) if (!status.ok () || !db)
throw std::runtime_error (std::string("Unable to open/create RocksDB: ") + status.ToString()); throw std::runtime_error (std::string("Unable to open/create RocksDB: ") + status.ToString());
@@ -274,29 +243,32 @@ public:
void void
store (NodeObject::ref object) store (NodeObject::ref object)
{ {
m_batch.store (object); storeBatch(Batch{object});
} }
void void
storeBatch (Batch const& batch) storeBatch (Batch const& batch)
{ {
rocksdb::WriteBatch wb; rocksdb::WriteBatch wb;
EncodedBlob encoded; EncodedBlob encoded;
for (auto const& e : batch) for (auto const& e : batch)
{ {
encoded.prepare (e); encoded.prepare (e);
wb.Put ( wb.Put(
rocksdb::Slice (reinterpret_cast <char const*> ( rocksdb::Slice(reinterpret_cast<char const*>(encoded.getKey()),
encoded.getKey ()), m_keyBytes), m_keyBytes),
rocksdb::Slice (reinterpret_cast <char const*> ( rocksdb::Slice(reinterpret_cast<char const*>(encoded.getData()),
encoded.getData ()), encoded.getSize ())); encoded.getSize()));
} }
rocksdb::WriteOptions const options; rocksdb::WriteOptions options;
// Crucial to ensure good write speed and non-blocking writes to memtable
options.disableWAL = true;
auto ret = m_db->Write (options, &wb); auto ret = m_db->Write (options, &wb);
if (!ret.ok ()) if (!ret.ok ())
@@ -342,7 +314,7 @@ public:
int int
getWriteLoad () getWriteLoad ()
{ {
return m_batch.getWriteLoad (); return 0;
} }
void void
@@ -367,12 +339,12 @@ class RocksDBFactory : public Factory
public: public:
RocksDBEnv m_env; RocksDBEnv m_env;
RocksDBFactory () RocksDBFactory()
{ {
Manager::instance().insert(*this); Manager::instance().insert(*this);
} }
~RocksDBFactory () ~RocksDBFactory()
{ {
Manager::instance().erase(*this); Manager::instance().erase(*this);
} }

View File

@@ -1,374 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/unity/rocksdb.h>
#if RIPPLE_ROCKSDB_AVAILABLE
#include <ripple/core/Config.h> // VFALCO Bad dependency
#include <ripple/nodestore/Factory.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/nodestore/impl/EncodedBlob.h>
#include <beast/threads/Thread.h>
#include <atomic>
#include <beast/cxx14/memory.h> // <memory>
namespace ripple {
namespace NodeStore {
class RockDBQuickEnv : public rocksdb::EnvWrapper
{
public:
RockDBQuickEnv ()
: EnvWrapper (rocksdb::Env::Default())
{
}
struct ThreadParams
{
ThreadParams (void (*f_)(void*), void* a_)
: f (f_)
, a (a_)
{
}
void (*f)(void*);
void* a;
};
static
void
thread_entry (void* ptr)
{
ThreadParams* const p (reinterpret_cast <ThreadParams*> (ptr));
void (*f)(void*) = p->f;
void* a (p->a);
delete p;
static std::atomic <std::size_t> n;
std::size_t const id (++n);
std::stringstream ss;
ss << "rocksdb #" << id;
beast::Thread::setCurrentThreadName (ss.str());
(*f)(a);
}
void
StartThread (void (*f)(void*), void* a)
{
ThreadParams* const p (new ThreadParams (f, a));
EnvWrapper::StartThread (&RockDBQuickEnv::thread_entry, p);
}
};
//------------------------------------------------------------------------------
class RocksDBQuickBackend
: public Backend
{
private:
std::atomic <bool> m_deletePath;
public:
beast::Journal m_journal;
size_t const m_keyBytes;
std::string m_name;
std::unique_ptr <rocksdb::DB> m_db;
RocksDBQuickBackend (int keyBytes, Parameters const& keyValues,
Scheduler& scheduler, beast::Journal journal, RockDBQuickEnv* env)
: m_journal (journal)
, m_keyBytes (keyBytes)
, m_name (keyValues ["path"].toStdString ())
{
if (m_name.empty())
throw std::runtime_error ("Missing path in RocksDBFactory backend");
// Defaults
std::uint64_t budget = 512 * 1024 * 1024; // 512MB
std::string style("level");
std::uint64_t threads=4;
if (!keyValues["budget"].isEmpty())
budget = keyValues["budget"].getIntValue();
if (!keyValues["style"].isEmpty())
style = keyValues["style"].toStdString();
if (!keyValues["threads"].isEmpty())
threads = keyValues["threads"].getIntValue();
// Set options
rocksdb::Options options;
options.create_if_missing = true;
options.env = env;
if (style == "level")
options.OptimizeLevelStyleCompaction(budget);
if (style == "universal")
options.OptimizeUniversalStyleCompaction(budget);
if (style == "point")
options.OptimizeForPointLookup(budget / 1024 / 1024); // In MB
options.IncreaseParallelism(threads);
// Allows hash indexes in blocks
options.prefix_extractor.reset(rocksdb::NewNoopTransform());
// overrride OptimizeLevelStyleCompaction
options.min_write_buffer_number_to_merge = 1;
rocksdb::BlockBasedTableOptions table_options;
// Use hash index
table_options.index_type =
rocksdb::BlockBasedTableOptions::kHashSearch;
table_options.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(10));
options.table_factory.reset(
NewBlockBasedTableFactory(table_options));
// Higher values make reads slower
// table_options.block_size = 4096;
// No point when DatabaseImp has a cache
// table_options.block_cache =
// rocksdb::NewLRUCache(64 * 1024 * 1024);
options.memtable_factory.reset(rocksdb::NewHashSkipListRepFactory());
// Alternative:
// options.memtable_factory.reset(
// rocksdb::NewHashCuckooRepFactory(options.write_buffer_size));
rocksdb::DB* db = nullptr;
rocksdb::Status status = rocksdb::DB::Open (options, m_name, &db);
if (!status.ok () || !db)
throw std::runtime_error (std::string("Unable to open/create RocksDB: ") + status.ToString());
m_db.reset (db);
}
~RocksDBQuickBackend ()
{
if (m_deletePath)
{
m_db.reset();
boost::filesystem::path dir = m_name;
boost::filesystem::remove_all (dir);
}
}
std::string
getName()
{
return m_name;
}
//--------------------------------------------------------------------------
Status
fetch (void const* key, NodeObject::Ptr* pObject)
{
pObject->reset ();
Status status (ok);
rocksdb::ReadOptions const options;
rocksdb::Slice const slice (static_cast <char const*> (key), m_keyBytes);
std::string string;
rocksdb::Status getStatus = m_db->Get (options, slice, &string);
if (getStatus.ok ())
{
DecodedBlob decoded (key, string.data (), string.size ());
if (decoded.wasOk ())
{
*pObject = decoded.createObject ();
}
else
{
// Decoding failed, probably corrupted!
//
status = dataCorrupt;
}
}
else
{
if (getStatus.IsCorruption ())
{
status = dataCorrupt;
}
else if (getStatus.IsNotFound ())
{
status = notFound;
}
else
{
status = Status (customCode + getStatus.code());
m_journal.error << getStatus.ToString ();
}
}
return status;
}
void
store (NodeObject::ref object)
{
storeBatch(Batch{object});
}
void
storeBatch (Batch const& batch)
{
rocksdb::WriteBatch wb;
EncodedBlob encoded;
for (auto const& e : batch)
{
encoded.prepare (e);
wb.Put(
rocksdb::Slice(reinterpret_cast<char const*>(encoded.getKey()),
m_keyBytes),
rocksdb::Slice(reinterpret_cast<char const*>(encoded.getData()),
encoded.getSize()));
}
rocksdb::WriteOptions options;
// Crucial to ensure good write speed and non-blocking writes to memtable
options.disableWAL = true;
auto ret = m_db->Write (options, &wb);
if (!ret.ok ())
throw std::runtime_error ("storeBatch failed: " + ret.ToString());
}
void
for_each (std::function <void(NodeObject::Ptr)> f)
{
rocksdb::ReadOptions const options;
std::unique_ptr <rocksdb::Iterator> it (m_db->NewIterator (options));
for (it->SeekToFirst (); it->Valid (); it->Next ())
{
if (it->key ().size () == m_keyBytes)
{
DecodedBlob decoded (it->key ().data (),
it->value ().data (),
it->value ().size ());
if (decoded.wasOk ())
{
f (decoded.createObject ());
}
else
{
// Uh oh, corrupted data!
if (m_journal.fatal) m_journal.fatal <<
"Corrupt NodeObject #" << uint256 (it->key ().data ());
}
}
else
{
// VFALCO NOTE What does it mean to find an
// incorrectly sized key? Corruption?
if (m_journal.fatal) m_journal.fatal <<
"Bad key size = " << it->key ().size ();
}
}
}
int
getWriteLoad ()
{
return 0;
}
void
setDeletePath() override
{
m_deletePath = true;
}
//--------------------------------------------------------------------------
void
writeBatch (Batch const& batch)
{
storeBatch (batch);
}
};
//------------------------------------------------------------------------------
class RocksDBQuickFactory : public Factory
{
public:
RockDBQuickEnv m_env;
RocksDBQuickFactory()
{
Manager::instance().insert(*this);
}
~RocksDBQuickFactory()
{
Manager::instance().erase(*this);
}
std::string
getName () const
{
return "RocksDBQuick";
}
std::unique_ptr <Backend>
createInstance (
size_t keyBytes,
Parameters const& keyValues,
Scheduler& scheduler,
beast::Journal journal)
{
return std::make_unique <RocksDBQuickBackend> (
keyBytes, keyValues, scheduler, journal, &m_env);
}
};
static RocksDBQuickFactory rocksDBQuickFactory;
}
}
#endif

View File

@@ -96,17 +96,17 @@ public:
testBackend ("leveldb", seedValue); testBackend ("leveldb", seedValue);
#ifdef RIPPLE_ENABLE_SQLITE_BACKEND_TESTS
testBackend ("sqlite", seedValue);
#endif
#if RIPPLE_HYPERLEVELDB_AVAILABLE #if RIPPLE_HYPERLEVELDB_AVAILABLE
testBackend ("hyperleveldb", seedValue); testBackend ("hyperleveldb", seedValue);
#endif #endif
#if RIPPLE_ROCKSDB_AVAILABLE #if RIPPLE_ROCKSDB_AVAILABLE
testBackend ("rocksdb", seedValue); testBackend ("rocksdb", seedValue);
#endif #endif
#ifdef RIPPLE_ENABLE_SQLITE_BACKEND_TESTS
testBackend ("sqlite", seedValue);
#endif
} }
}; };

View File

@@ -203,14 +203,14 @@ public:
{ {
testImport ("leveldb", "leveldb", seedValue); testImport ("leveldb", "leveldb", seedValue);
#if RIPPLE_ROCKSDB_AVAILABLE
testImport ("rocksdb", "rocksdb", seedValue);
#endif
#if RIPPLE_HYPERLEVELDB_AVAILABLE #if RIPPLE_HYPERLEVELDB_AVAILABLE
testImport ("hyperleveldb", "hyperleveldb", seedValue); testImport ("hyperleveldb", "hyperleveldb", seedValue);
#endif #endif
#if RIPPLE_ROCKSDB_AVAILABLE
testImport ("rocksdb", "rocksdb", seedValue);
#endif
#if RIPPLE_ENABLE_SQLITE_BACKEND_TESTS #if RIPPLE_ENABLE_SQLITE_BACKEND_TESTS
testImport ("sqlite", "sqlite", seedValue); testImport ("sqlite", "sqlite", seedValue);
#endif #endif

View File

@@ -24,7 +24,6 @@
#include <ripple/nodestore/backend/MemoryFactory.cpp> #include <ripple/nodestore/backend/MemoryFactory.cpp>
#include <ripple/nodestore/backend/NullFactory.cpp> #include <ripple/nodestore/backend/NullFactory.cpp>
#include <ripple/nodestore/backend/RocksDBFactory.cpp> #include <ripple/nodestore/backend/RocksDBFactory.cpp>
#include <ripple/nodestore/backend/RocksDBQuickFactory.cpp>
#include <ripple/nodestore/impl/BatchWriter.cpp> #include <ripple/nodestore/impl/BatchWriter.cpp>
#include <ripple/nodestore/impl/DatabaseImp.h> #include <ripple/nodestore/impl/DatabaseImp.h>