diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index d8fccbb43..472d28b56 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -379,7 +379,7 @@ SHAMapStoreImp::run() try { - validatedLedger->stateMap().snapShot(isMem)->visitNodes( + validatedLedger->stateMap().snapShot(false)->visitNodes( std::bind( &SHAMapStoreImp::copyNode, this, diff --git a/src/ripple/nodestore/backend/MemoryFactory.cpp b/src/ripple/nodestore/backend/MemoryFactory.cpp index ef056b176..9acb1fd88 100644 --- a/src/ripple/nodestore/backend/MemoryFactory.cpp +++ b/src/ripple/nodestore/backend/MemoryFactory.cpp @@ -1,154 +1,55 @@ #include #include #include +#include +#include +#include #include #include #include -#include #include #include -// Define the map implementation selector -#ifndef MEMORY_DB_USE_CONCURRENT_MAP -#define MEMORY_DB_USE_CONCURRENT_MAP 0 // Set to 0 to use mutex-guarded std::map -#endif - namespace ripple { - -struct base_uint_hasher -{ - using result_type = std::size_t; - - result_type - operator()(base_uint<256> const& value) const - { - return hardened_hash<>{}(value); - } -}; - namespace NodeStore { -#if MEMORY_DB_USE_CONCURRENT_MAP - using DataStore = boost::unordered::concurrent_flat_map< - uint256, - std::shared_ptr, - base_uint_hasher>; -#else - using DataStore = std::map>; -#endif - -struct MemoryDB -{ - explicit MemoryDB() = default; - -#if !MEMORY_DB_USE_CONCURRENT_MAP - mutable std::mutex mutex; // Only needed for std::map implementation -#endif - bool open = false; - DataStore table; - - // Helper functions to abstract the different map implementations - Status fetch(uint256 const& hash, std::shared_ptr* pObject) const - { -#if MEMORY_DB_USE_CONCURRENT_MAP - bool found = table.visit(hash, [&](const auto& key_value_pair) { - *pObject = key_value_pair.second; - }); - return found ? ok : notFound; -#else - std::lock_guard lock(mutex); - auto it = table.find(hash); - if (it == table.end()) - return notFound; - *pObject = it->second; - return ok; -#endif - } - - void store(uint256 const& hash, std::shared_ptr const& object) - { -#if MEMORY_DB_USE_CONCURRENT_MAP - table.insert_or_assign(hash, object); -#else - std::lock_guard lock(mutex); - table[hash] = object; -#endif - } - - template - void for_each(Func&& f) const - { -#if MEMORY_DB_USE_CONCURRENT_MAP - table.visit_all([&f](const auto& entry) { f(entry.second); }); -#else - std::lock_guard lock(mutex); - for (const auto& entry : table) - f(entry.second); -#endif - } - - size_t size() const - { -#if MEMORY_DB_USE_CONCURRENT_MAP - return table.size(); -#else - std::lock_guard lock(mutex); - return table.size(); -#endif - } -}; - -class MemoryFactory : public Factory -{ -private: - std::mutex mutex_; - std::map map_; - -public: - MemoryFactory(); - ~MemoryFactory() override; - - std::string - getName() const override; - - std::unique_ptr - createInstance( - size_t keyBytes, - Section const& keyValues, - std::size_t burstSize, - Scheduler& scheduler, - beast::Journal journal) override; - - MemoryDB& - open(std::string const& path) - { - std::lock_guard _(mutex_); - auto const result = map_.emplace( - std::piecewise_construct, std::make_tuple(path), std::make_tuple()); - MemoryDB& db = result.first->second; - if (db.open) - Throw("already open"); - return db; - } -}; - -static MemoryFactory memoryFactory; - -//------------------------------------------------------------------------------ - class MemoryBackend : public Backend { private: std::string name_; - beast::Journal const journal_; - MemoryDB* db_{nullptr}; + beast::Journal journal_; + bool isOpen_{false}; + + struct base_uint_hasher + { + using result_type = std::size_t; + + result_type + operator()(base_uint<256> const& value) const + { + return hardened_hash<>{}(value); + } + }; + +#if MEMORY_DB_USE_CONCURRENT_MAP + using DataStore = boost::unordered::concurrent_flat_map< + uint256, + std::vector, // Store compressed blob data + base_uint_hasher>; +#else + using DataStore = std::map>; // Store compressed blob data + mutable std::recursive_mutex mutex_; // Only needed for std::map implementation +#endif + + DataStore table_; public: MemoryBackend( size_t keyBytes, Section const& keyValues, beast::Journal journal) - : name_(get(keyValues, "path")), journal_(journal) + : name_(get(keyValues, "path")) + , journal_(journal) { boost::ignore_unused(journal_); if (name_.empty()) @@ -169,29 +70,74 @@ public: void open(bool createIfMissing) override { - db_ = &memoryFactory.open(name_); +#if !MEMORY_DB_USE_CONCURRENT_MAP + std::lock_guard lock(mutex_); +#endif + if (isOpen_) + Throw("already open"); + isOpen_ = true; } bool isOpen() override { - return static_cast(db_); + return isOpen_; } void close() override { - db_ = nullptr; +#if MEMORY_DB_USE_CONCURRENT_MAP + table_.clear(); +#else + std::lock_guard lock(mutex_); + table_.clear(); +#endif + isOpen_ = false; + std::cout << "memdb " << name_ << " is closed.\n"; } - //-------------------------------------------------------------------------- - Status fetch(void const* key, std::shared_ptr* pObject) override { - assert(db_); + if (!isOpen_) + return notFound; + uint256 const hash(uint256::fromVoid(key)); - return db_->fetch(hash, pObject); + +#if MEMORY_DB_USE_CONCURRENT_MAP + bool found = table_.visit(hash, [&](const auto& key_value_pair) { + nudb::detail::buffer bf; + auto const result = nodeobject_decompress( + key_value_pair.second.data(), + key_value_pair.second.size(), + bf); + DecodedBlob decoded(hash.data(), result.first, result.second); + if (!decoded.wasOk()) + { + *pObject = nullptr; + return; + } + *pObject = decoded.createObject(); + }); + return found ? (*pObject ? ok : dataCorrupt) : notFound; +#else + std::lock_guard lock(mutex_); + auto it = table_.find(hash); + if (it == table_.end()) + return notFound; + + nudb::detail::buffer bf; + auto const result = nodeobject_decompress( + it->second.data(), + it->second.size(), + bf); + DecodedBlob decoded(hash.data(), result.first, result.second); + if (!decoded.wasOk()) + return dataCorrupt; + *pObject = decoded.createObject(); + return ok; +#endif } std::pair>, Status> @@ -214,15 +160,36 @@ public: void store(std::shared_ptr const& object) override { - assert(db_); - if (!object) - std::cout << "mapping null object\n"; + if (!isOpen_) + return; - db_->store(object->getHash(), object); + if (!object) + { + std::cout << "mapping null object\n"; + return; + } + + EncodedBlob encoded(object); + nudb::detail::buffer bf; + auto const result = nodeobject_compress( + encoded.getData(), + encoded.getSize(), + bf); + + std::vector compressed( + static_cast(result.first), + static_cast(result.first) + result.second); + +#if MEMORY_DB_USE_CONCURRENT_MAP + table_.insert_or_assign(object->getHash(), std::move(compressed)); +#else + std::lock_guard lock(mutex_); + table_[object->getHash()] = std::move(compressed); +#endif static int counter = 0; if (counter++ % 1000 == 0) - std::cout << "map size: " << db_->size() << "\n"; + std::cout << "map size: " << size() << "\n"; } void @@ -240,8 +207,34 @@ public: void for_each(std::function)> f) override { - assert(db_); - db_->for_each(f); + if (!isOpen_) + return; + +#if MEMORY_DB_USE_CONCURRENT_MAP + table_.visit_all([&f](const auto& entry) { + nudb::detail::buffer bf; + auto const result = nodeobject_decompress( + entry.second.data(), + entry.second.size(), + bf); + DecodedBlob decoded(entry.first.data(), result.first, result.second); + if (decoded.wasOk()) + f(decoded.createObject()); + }); +#else + std::lock_guard lock(mutex_); + for (const auto& entry : table_) + { + nudb::detail::buffer bf; + auto const result = nodeobject_decompress( + entry.second.data(), + entry.second.size(), + bf); + DecodedBlob decoded(entry.first.data(), result.first, result.second); + if (decoded.wasOk()) + f(decoded.createObject()); + } +#endif } int @@ -253,6 +246,7 @@ public: void setDeletePath() override { + close(); } int @@ -260,35 +254,51 @@ public: { return 0; } + +private: + size_t size() const + { +#if MEMORY_DB_USE_CONCURRENT_MAP + return table_.size(); +#else + std::lock_guard lock(mutex_); + return table_.size(); +#endif + } }; -// Factory implementation remains the same -MemoryFactory::MemoryFactory() +class MemoryFactory : public Factory { - Manager::instance().insert(*this); -} +public: + MemoryFactory() + { + Manager::instance().insert(*this); + } -MemoryFactory::~MemoryFactory() -{ - Manager::instance().erase(*this); -} + ~MemoryFactory() override + { + Manager::instance().erase(*this); + } -std::string -MemoryFactory::getName() const -{ - return "Memory"; -} + std::string + getName() const override + { + return "Memory"; + } -std::unique_ptr -MemoryFactory::createInstance( - size_t keyBytes, - Section const& keyValues, - std::size_t, - Scheduler& scheduler, - beast::Journal journal) -{ - return std::make_unique(keyBytes, keyValues, journal); -} + std::unique_ptr + createInstance( + size_t keyBytes, + Section const& keyValues, + std::size_t burstSize, + Scheduler& scheduler, + beast::Journal journal) override + { + return std::make_unique(keyBytes, keyValues, journal); + } +}; + +static MemoryFactory memoryFactory; } // namespace NodeStore } // namespace ripple diff --git a/src/ripple/peerfinder/impl/InMemoryStore.h b/src/ripple/peerfinder/impl/InMemoryStore.h index 62892232d..ea14d8b72 100644 --- a/src/ripple/peerfinder/impl/InMemoryStore.h +++ b/src/ripple/peerfinder/impl/InMemoryStore.h @@ -47,6 +47,8 @@ public: { entries.emplace(entry.endpoint, entry.valence); } + + std::cout << "peer finder mem map size: " << entries.size() << "\n"; } };