diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 95291efca6..f42c446f6c 100755 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2081,6 +2081,9 @@ True + + True + diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 33d7db0610..fc667a6a0a 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3027,6 +3027,9 @@ ripple\nodestore\backend + + ripple\nodestore\backend + ripple\nodestore diff --git a/src/ripple/nodestore/backend/RocksDBFactory.cpp b/src/ripple/nodestore/backend/RocksDBFactory.cpp index 78bdc95136..5f612c3bdc 100644 --- a/src/ripple/nodestore/backend/RocksDBFactory.cpp +++ b/src/ripple/nodestore/backend/RocksDBFactory.cpp @@ -26,6 +26,7 @@ #include // VFALCO Bad dependency #include #include +#include #include #include #include @@ -85,6 +86,7 @@ public: class RocksDBBackend : public Backend + , public BatchWriter::Callback { private: std::atomic m_deletePath; @@ -92,6 +94,8 @@ private: public: beast::Journal m_journal; size_t const m_keyBytes; + Scheduler& m_scheduler; + BatchWriter m_batch; std::string m_name; std::unique_ptr m_db; @@ -100,75 +104,72 @@ public: : m_deletePath (false) , m_journal (journal) , m_keyBytes (keyBytes) + , m_scheduler (scheduler) + , m_batch (*this, scheduler) , m_name (keyValues ["path"].toStdString ()) { if (m_name.empty()) throw std::runtime_error ("Missing path in RocksDBFactory backend"); - // Defaults - std::uint64_t budget = 512 * 1024 * 1024; // 512MB - std::string style("level"); - std::uint64_t threads=4; - - if (!keyValues["budget"].isEmpty()) - budget = keyValues["budget"].getIntValue(); - - if (!keyValues["style"].isEmpty()) - style = keyValues["style"].toStdString(); - - if (!keyValues["threads"].isEmpty()) - threads = keyValues["threads"].getIntValue(); - - - // Set options rocksdb::Options options; + rocksdb::BlockBasedTableOptions table_options; options.create_if_missing = true; options.env = env; - if (style == "level") - options.OptimizeLevelStyleCompaction(budget); + if (keyValues["cache_mb"].isEmpty()) + { + table_options.block_cache = rocksdb::NewLRUCache (getConfig ().getSize (siHashNodeDBCache) * 1024 * 1024); + } + else + { + table_options.block_cache = rocksdb::NewLRUCache (keyValues["cache_mb"].getIntValue() * 1024L * 1024L); + } - if (style == "universal") - options.OptimizeUniversalStyleCompaction(budget); - - if (style == "point") - options.OptimizeForPointLookup(budget / 1024 / 1024); // In MB - - options.IncreaseParallelism(threads); - - // Allows hash indexes in blocks - options.prefix_extractor.reset(rocksdb::NewNoopTransform()); - - // overrride OptimizeLevelStyleCompaction - options.min_write_buffer_number_to_merge = 1; - - rocksdb::BlockBasedTableOptions table_options; - // Use hash index - table_options.index_type = - rocksdb::BlockBasedTableOptions::kHashSearch; - table_options.filter_policy.reset( - rocksdb::NewBloomFilterPolicy(10)); - options.table_factory.reset( - NewBlockBasedTableFactory(table_options)); - - // Higher values make reads slower - // table_options.block_size = 4096; - - // No point when DatabaseImp has a cache - // table_options.block_cache = - // rocksdb::NewLRUCache(64 * 1024 * 1024); - - options.memtable_factory.reset(rocksdb::NewHashSkipListRepFactory()); - // Alternative: - // options.memtable_factory.reset( - // rocksdb::NewHashCuckooRepFactory(options.write_buffer_size)); + if (keyValues["filter_bits"].isEmpty()) + { + if (getConfig ().NODE_SIZE >= 2) + table_options.filter_policy.reset (rocksdb::NewBloomFilterPolicy (10)); + } + else if (keyValues["filter_bits"].getIntValue() != 0) + { + table_options.filter_policy.reset (rocksdb::NewBloomFilterPolicy (keyValues["filter_bits"].getIntValue())); + } if (! keyValues["open_files"].isEmpty()) { options.max_open_files = keyValues["open_files"].getIntValue(); } - - if (! keyValues["compression"].isEmpty ()) + + if (! keyValues["file_size_mb"].isEmpty()) + { + options.target_file_size_base = 1024 * 1024 * keyValues["file_size_mb"].getIntValue(); + options.max_bytes_for_level_base = 5 * options.target_file_size_base; + options.write_buffer_size = 2 * options.target_file_size_base; + } + + if (! keyValues["file_size_mult"].isEmpty()) + { + options.target_file_size_multiplier = keyValues["file_size_mult"].getIntValue(); + } + + if (! keyValues["bg_threads"].isEmpty()) + { + options.env->SetBackgroundThreads + (keyValues["bg_threads"].getIntValue(), rocksdb::Env::LOW); + } + + if (! keyValues["high_threads"].isEmpty()) + { + auto const highThreads = keyValues["high_threads"].getIntValue(); + options.env->SetBackgroundThreads (highThreads, rocksdb::Env::HIGH); + + // If we have high-priority threads, presumably we want to + // use them for background flushes + if (highThreads > 0) + options.max_background_flushes = highThreads; + } + + if (! keyValues["compression"].isEmpty ()) { if (keyValues["compression"].getIntValue () == 0) { @@ -176,8 +177,25 @@ public: } } - rocksdb::DB* db = nullptr; + if (! keyValues["block_size"].isEmpty ()) + { + table_options.block_size = keyValues["block_size"].getIntValue (); + } + if (! keyValues["universal_compaction"].isEmpty ()) + { + if (keyValues["universal_compaction"].getIntValue () != 0) + { + options.compaction_style = rocksdb:: kCompactionStyleUniversal; + options.min_write_buffer_number_to_merge = 2; + options.max_write_buffer_number = 6; + options.write_buffer_size = 6 * options.target_file_size_base; + } + } + + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + rocksdb::DB* db = nullptr; rocksdb::Status status = rocksdb::DB::Open (options, m_name, &db); if (!status.ok () || !db) throw std::runtime_error (std::string("Unable to open/create RocksDB: ") + status.ToString()); @@ -190,12 +208,6 @@ public: close(); } - std::string - getName() - { - return m_name; - } - void close() override { @@ -210,6 +222,12 @@ public: } } + std::string + getName() + { + return m_name; + } + //-------------------------------------------------------------------------- Status @@ -265,32 +283,29 @@ public: void store (NodeObject::ref object) { - storeBatch(Batch{object}); + m_batch.store (object); } void storeBatch (Batch const& batch) { rocksdb::WriteBatch wb; - + EncodedBlob encoded; for (auto const& e : batch) { encoded.prepare (e); - wb.Put( - rocksdb::Slice(reinterpret_cast(encoded.getKey()), - m_keyBytes), - rocksdb::Slice(reinterpret_cast(encoded.getData()), - encoded.getSize())); + wb.Put ( + rocksdb::Slice (reinterpret_cast ( + encoded.getKey ()), m_keyBytes), + rocksdb::Slice (reinterpret_cast ( + encoded.getData ()), encoded.getSize ())); } - rocksdb::WriteOptions options; + rocksdb::WriteOptions const options; - // Crucial to ensure good write speed and non-blocking writes to memtable - options.disableWAL = true; - auto ret = m_db->Write (options, &wb); if (!ret.ok ()) @@ -336,7 +351,7 @@ public: int getWriteLoad () { - return 0; + return m_batch.getWriteLoad (); } void @@ -366,12 +381,12 @@ class RocksDBFactory : public Factory public: RocksDBEnv m_env; - RocksDBFactory() + RocksDBFactory () { Manager::instance().insert(*this); } - ~RocksDBFactory() + ~RocksDBFactory () { Manager::instance().erase(*this); } diff --git a/src/ripple/nodestore/backend/RocksDBQuickFactory.cpp b/src/ripple/nodestore/backend/RocksDBQuickFactory.cpp new file mode 100644 index 0000000000..d60f07dc13 --- /dev/null +++ b/src/ripple/nodestore/backend/RocksDBQuickFactory.cpp @@ -0,0 +1,402 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include + +#include + +#if RIPPLE_ROCKSDB_AVAILABLE + +#include // VFALCO Bad dependency +#include +#include +#include +#include +#include +#include +#include // + +namespace ripple { +namespace NodeStore { + +class RocksDBQuickEnv : public rocksdb::EnvWrapper +{ +public: + RocksDBQuickEnv () + : EnvWrapper (rocksdb::Env::Default()) + { + } + + struct ThreadParams + { + ThreadParams (void (*f_)(void*), void* a_) + : f (f_) + , a (a_) + { + } + + void (*f)(void*); + void* a; + }; + + static + void + thread_entry (void* ptr) + { + ThreadParams* const p (reinterpret_cast (ptr)); + void (*f)(void*) = p->f; + void* a (p->a); + delete p; + + static std::atomic n; + std::size_t const id (++n); + std::stringstream ss; + ss << "rocksdb #" << id; + beast::Thread::setCurrentThreadName (ss.str()); + + (*f)(a); + } + + void + StartThread (void (*f)(void*), void* a) + { + ThreadParams* const p (new ThreadParams (f, a)); + EnvWrapper::StartThread (&RocksDBQuickEnv::thread_entry, p); + } +}; + +//------------------------------------------------------------------------------ + +class RocksDBQuickBackend + : public Backend +{ +private: + std::atomic m_deletePath; + +public: + beast::Journal m_journal; + size_t const m_keyBytes; + std::string m_name; + std::unique_ptr m_db; + + RocksDBQuickBackend (int keyBytes, Parameters const& keyValues, + Scheduler& scheduler, beast::Journal journal, RocksDBQuickEnv* env) + : m_deletePath (false) + , m_journal (journal) + , m_keyBytes (keyBytes) + , m_name (keyValues ["path"].toStdString ()) + { + if (m_name.empty()) + throw std::runtime_error ("Missing path in RocksDBQuickFactory backend"); + + // Defaults + std::uint64_t budget = 512 * 1024 * 1024; // 512MB + std::string style("level"); + std::uint64_t threads=4; + + if (!keyValues["budget"].isEmpty()) + budget = keyValues["budget"].getIntValue(); + + if (!keyValues["style"].isEmpty()) + style = keyValues["style"].toStdString(); + + if (!keyValues["threads"].isEmpty()) + threads = keyValues["threads"].getIntValue(); + + + // Set options + rocksdb::Options options; + options.create_if_missing = true; + options.env = env; + + if (style == "level") + options.OptimizeLevelStyleCompaction(budget); + + if (style == "universal") + options.OptimizeUniversalStyleCompaction(budget); + + if (style == "point") + options.OptimizeForPointLookup(budget / 1024 / 1024); // In MB + + options.IncreaseParallelism(threads); + + // Allows hash indexes in blocks + options.prefix_extractor.reset(rocksdb::NewNoopTransform()); + + // overrride OptimizeLevelStyleCompaction + options.min_write_buffer_number_to_merge = 1; + + rocksdb::BlockBasedTableOptions table_options; + // Use hash index + table_options.index_type = + rocksdb::BlockBasedTableOptions::kHashSearch; + table_options.filter_policy.reset( + rocksdb::NewBloomFilterPolicy(10)); + options.table_factory.reset( + NewBlockBasedTableFactory(table_options)); + + // Higher values make reads slower + // table_options.block_size = 4096; + + // No point when DatabaseImp has a cache + // table_options.block_cache = + // rocksdb::NewLRUCache(64 * 1024 * 1024); + + options.memtable_factory.reset(rocksdb::NewHashSkipListRepFactory()); + // Alternative: + // options.memtable_factory.reset( + // rocksdb::NewHashCuckooRepFactory(options.write_buffer_size)); + + if (! keyValues["open_files"].isEmpty()) + { + options.max_open_files = keyValues["open_files"].getIntValue(); + } + + if (! keyValues["compression"].isEmpty ()) + { + if (keyValues["compression"].getIntValue () == 0) + { + options.compression = rocksdb::kNoCompression; + } + } + + rocksdb::DB* db = nullptr; + + rocksdb::Status status = rocksdb::DB::Open (options, m_name, &db); + if (!status.ok () || !db) + throw std::runtime_error (std::string("Unable to open/create RocksDBQuick: ") + status.ToString()); + + m_db.reset (db); + } + + ~RocksDBQuickBackend () + { + close(); + } + + std::string + getName() + { + return m_name; + } + + void + close() override + { + if (m_db) + { + m_db.reset(); + if (m_deletePath) + { + boost::filesystem::path dir = m_name; + boost::filesystem::remove_all (dir); + } + } + } + + //-------------------------------------------------------------------------- + + Status + fetch (void const* key, NodeObject::Ptr* pObject) + { + pObject->reset (); + + Status status (ok); + + rocksdb::ReadOptions const options; + rocksdb::Slice const slice (static_cast (key), m_keyBytes); + + std::string string; + + rocksdb::Status getStatus = m_db->Get (options, slice, &string); + + if (getStatus.ok ()) + { + DecodedBlob decoded (key, string.data (), string.size ()); + + if (decoded.wasOk ()) + { + *pObject = decoded.createObject (); + } + else + { + // Decoding failed, probably corrupted! + // + status = dataCorrupt; + } + } + else + { + if (getStatus.IsCorruption ()) + { + status = dataCorrupt; + } + else if (getStatus.IsNotFound ()) + { + status = notFound; + } + else + { + status = Status (customCode + getStatus.code()); + + m_journal.error << getStatus.ToString (); + } + } + + return status; + } + + void + store (NodeObject::ref object) + { + storeBatch(Batch{object}); + } + + void + storeBatch (Batch const& batch) + { + rocksdb::WriteBatch wb; + + EncodedBlob encoded; + + for (auto const& e : batch) + { + encoded.prepare (e); + + wb.Put( + rocksdb::Slice(reinterpret_cast(encoded.getKey()), + m_keyBytes), + rocksdb::Slice(reinterpret_cast(encoded.getData()), + encoded.getSize())); + } + + rocksdb::WriteOptions options; + + // Crucial to ensure good write speed and non-blocking writes to memtable + options.disableWAL = true; + + auto ret = m_db->Write (options, &wb); + + if (!ret.ok ()) + throw std::runtime_error ("storeBatch failed: " + ret.ToString()); + } + + void + for_each (std::function f) + { + rocksdb::ReadOptions const options; + + std::unique_ptr it (m_db->NewIterator (options)); + + for (it->SeekToFirst (); it->Valid (); it->Next ()) + { + if (it->key ().size () == m_keyBytes) + { + DecodedBlob decoded (it->key ().data (), + it->value ().data (), + it->value ().size ()); + + if (decoded.wasOk ()) + { + f (decoded.createObject ()); + } + else + { + // Uh oh, corrupted data! + if (m_journal.fatal) m_journal.fatal << + "Corrupt NodeObject #" << uint256 (it->key ().data ()); + } + } + else + { + // VFALCO NOTE What does it mean to find an + // incorrectly sized key? Corruption? + if (m_journal.fatal) m_journal.fatal << + "Bad key size = " << it->key ().size (); + } + } + } + + int + getWriteLoad () + { + return 0; + } + + void + setDeletePath() override + { + m_deletePath = true; + } + + //-------------------------------------------------------------------------- + + void + writeBatch (Batch const& batch) + { + storeBatch (batch); + } + + void + verify() override + { + } +}; + +//------------------------------------------------------------------------------ + +class RocksDBQuickFactory : public Factory +{ +public: + RocksDBQuickEnv m_env; + + RocksDBQuickFactory() + { + Manager::instance().insert(*this); + } + + ~RocksDBQuickFactory() + { + Manager::instance().erase(*this); + } + + std::string + getName () const + { + return "RocksDBQuick"; + } + + std::unique_ptr + createInstance ( + size_t keyBytes, + Parameters const& keyValues, + Scheduler& scheduler, + beast::Journal journal) + { + return std::make_unique ( + keyBytes, keyValues, scheduler, journal, &m_env); + } +}; + +static RocksDBQuickFactory rocksDBQuickFactory; + +} +} + +#endif diff --git a/src/ripple/unity/nodestore.cpp b/src/ripple/unity/nodestore.cpp index a0d0f8817c..4faccb0732 100644 --- a/src/ripple/unity/nodestore.cpp +++ b/src/ripple/unity/nodestore.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include