simplify memdb, use serder, attempt to address mem leak

This commit is contained in:
Richard Holland
2024-11-08 19:10:40 +11:00
parent f8ec3f20dc
commit b8c57e07d8
3 changed files with 179 additions and 167 deletions

View File

@@ -379,7 +379,7 @@ SHAMapStoreImp::run()
try
{
validatedLedger->stateMap().snapShot(isMem)->visitNodes(
validatedLedger->stateMap().snapShot(false)->visitNodes(
std::bind(
&SHAMapStoreImp::copyNode,
this,

View File

@@ -1,154 +1,55 @@
#include <ripple/basics/contract.h>
#include <ripple/nodestore/Factory.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/nodestore/impl/EncodedBlob.h>
#include <ripple/nodestore/impl/codec.h>
#include <boost/beast/core/string.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/unordered/concurrent_flat_map.hpp>
#include <map>
#include <memory>
#include <mutex>
// Define the map implementation selector
#ifndef MEMORY_DB_USE_CONCURRENT_MAP
#define MEMORY_DB_USE_CONCURRENT_MAP 0 // Set to 0 to use mutex-guarded std::map
#endif
namespace ripple {
struct base_uint_hasher
{
using result_type = std::size_t;
result_type
operator()(base_uint<256> const& value) const
{
return hardened_hash<>{}(value);
}
};
namespace NodeStore {
#if MEMORY_DB_USE_CONCURRENT_MAP
using DataStore = boost::unordered::concurrent_flat_map<
uint256,
std::shared_ptr<NodeObject>,
base_uint_hasher>;
#else
using DataStore = std::map<uint256, std::shared_ptr<NodeObject>>;
#endif
struct MemoryDB
{
explicit MemoryDB() = default;
#if !MEMORY_DB_USE_CONCURRENT_MAP
mutable std::mutex mutex; // Only needed for std::map implementation
#endif
bool open = false;
DataStore table;
// Helper functions to abstract the different map implementations
Status fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) const
{
#if MEMORY_DB_USE_CONCURRENT_MAP
bool found = table.visit(hash, [&](const auto& key_value_pair) {
*pObject = key_value_pair.second;
});
return found ? ok : notFound;
#else
std::lock_guard<std::mutex> lock(mutex);
auto it = table.find(hash);
if (it == table.end())
return notFound;
*pObject = it->second;
return ok;
#endif
}
void store(uint256 const& hash, std::shared_ptr<NodeObject> const& object)
{
#if MEMORY_DB_USE_CONCURRENT_MAP
table.insert_or_assign(hash, object);
#else
std::lock_guard<std::mutex> lock(mutex);
table[hash] = object;
#endif
}
template <typename Func>
void for_each(Func&& f) const
{
#if MEMORY_DB_USE_CONCURRENT_MAP
table.visit_all([&f](const auto& entry) { f(entry.second); });
#else
std::lock_guard<std::mutex> lock(mutex);
for (const auto& entry : table)
f(entry.second);
#endif
}
size_t size() const
{
#if MEMORY_DB_USE_CONCURRENT_MAP
return table.size();
#else
std::lock_guard<std::mutex> lock(mutex);
return table.size();
#endif
}
};
class MemoryFactory : public Factory
{
private:
std::mutex mutex_;
std::map<std::string, MemoryDB, boost::beast::iless> map_;
public:
MemoryFactory();
~MemoryFactory() override;
std::string
getName() const override;
std::unique_ptr<Backend>
createInstance(
size_t keyBytes,
Section const& keyValues,
std::size_t burstSize,
Scheduler& scheduler,
beast::Journal journal) override;
MemoryDB&
open(std::string const& path)
{
std::lock_guard _(mutex_);
auto const result = map_.emplace(
std::piecewise_construct, std::make_tuple(path), std::make_tuple());
MemoryDB& db = result.first->second;
if (db.open)
Throw<std::runtime_error>("already open");
return db;
}
};
static MemoryFactory memoryFactory;
//------------------------------------------------------------------------------
class MemoryBackend : public Backend
{
private:
std::string name_;
beast::Journal const journal_;
MemoryDB* db_{nullptr};
beast::Journal journal_;
bool isOpen_{false};
struct base_uint_hasher
{
using result_type = std::size_t;
result_type
operator()(base_uint<256> const& value) const
{
return hardened_hash<>{}(value);
}
};
#if MEMORY_DB_USE_CONCURRENT_MAP
using DataStore = boost::unordered::concurrent_flat_map<
uint256,
std::vector<std::uint8_t>, // Store compressed blob data
base_uint_hasher>;
#else
using DataStore = std::map<uint256, std::vector<std::uint8_t>>; // Store compressed blob data
mutable std::recursive_mutex mutex_; // Only needed for std::map implementation
#endif
DataStore table_;
public:
MemoryBackend(
size_t keyBytes,
Section const& keyValues,
beast::Journal journal)
: name_(get(keyValues, "path")), journal_(journal)
: name_(get(keyValues, "path"))
, journal_(journal)
{
boost::ignore_unused(journal_);
if (name_.empty())
@@ -169,29 +70,74 @@ public:
void
open(bool createIfMissing) override
{
db_ = &memoryFactory.open(name_);
#if !MEMORY_DB_USE_CONCURRENT_MAP
std::lock_guard lock(mutex_);
#endif
if (isOpen_)
Throw<std::runtime_error>("already open");
isOpen_ = true;
}
bool
isOpen() override
{
return static_cast<bool>(db_);
return isOpen_;
}
void
close() override
{
db_ = nullptr;
#if MEMORY_DB_USE_CONCURRENT_MAP
table_.clear();
#else
std::lock_guard lock(mutex_);
table_.clear();
#endif
isOpen_ = false;
std::cout << "memdb " << name_ << " is closed.\n";
}
//--------------------------------------------------------------------------
Status
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
{
assert(db_);
if (!isOpen_)
return notFound;
uint256 const hash(uint256::fromVoid(key));
return db_->fetch(hash, pObject);
#if MEMORY_DB_USE_CONCURRENT_MAP
bool found = table_.visit(hash, [&](const auto& key_value_pair) {
nudb::detail::buffer bf;
auto const result = nodeobject_decompress(
key_value_pair.second.data(),
key_value_pair.second.size(),
bf);
DecodedBlob decoded(hash.data(), result.first, result.second);
if (!decoded.wasOk())
{
*pObject = nullptr;
return;
}
*pObject = decoded.createObject();
});
return found ? (*pObject ? ok : dataCorrupt) : notFound;
#else
std::lock_guard lock(mutex_);
auto it = table_.find(hash);
if (it == table_.end())
return notFound;
nudb::detail::buffer bf;
auto const result = nodeobject_decompress(
it->second.data(),
it->second.size(),
bf);
DecodedBlob decoded(hash.data(), result.first, result.second);
if (!decoded.wasOk())
return dataCorrupt;
*pObject = decoded.createObject();
return ok;
#endif
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
@@ -214,15 +160,36 @@ public:
void
store(std::shared_ptr<NodeObject> const& object) override
{
assert(db_);
if (!object)
std::cout << "mapping null object\n";
if (!isOpen_)
return;
db_->store(object->getHash(), object);
if (!object)
{
std::cout << "mapping null object\n";
return;
}
EncodedBlob encoded(object);
nudb::detail::buffer bf;
auto const result = nodeobject_compress(
encoded.getData(),
encoded.getSize(),
bf);
std::vector<std::uint8_t> compressed(
static_cast<const std::uint8_t*>(result.first),
static_cast<const std::uint8_t*>(result.first) + result.second);
#if MEMORY_DB_USE_CONCURRENT_MAP
table_.insert_or_assign(object->getHash(), std::move(compressed));
#else
std::lock_guard lock(mutex_);
table_[object->getHash()] = std::move(compressed);
#endif
static int counter = 0;
if (counter++ % 1000 == 0)
std::cout << "map size: " << db_->size() << "\n";
std::cout << "map size: " << size() << "\n";
}
void
@@ -240,8 +207,34 @@ public:
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
{
assert(db_);
db_->for_each(f);
if (!isOpen_)
return;
#if MEMORY_DB_USE_CONCURRENT_MAP
table_.visit_all([&f](const auto& entry) {
nudb::detail::buffer bf;
auto const result = nodeobject_decompress(
entry.second.data(),
entry.second.size(),
bf);
DecodedBlob decoded(entry.first.data(), result.first, result.second);
if (decoded.wasOk())
f(decoded.createObject());
});
#else
std::lock_guard lock(mutex_);
for (const auto& entry : table_)
{
nudb::detail::buffer bf;
auto const result = nodeobject_decompress(
entry.second.data(),
entry.second.size(),
bf);
DecodedBlob decoded(entry.first.data(), result.first, result.second);
if (decoded.wasOk())
f(decoded.createObject());
}
#endif
}
int
@@ -253,6 +246,7 @@ public:
void
setDeletePath() override
{
close();
}
int
@@ -260,35 +254,51 @@ public:
{
return 0;
}
private:
size_t size() const
{
#if MEMORY_DB_USE_CONCURRENT_MAP
return table_.size();
#else
std::lock_guard lock(mutex_);
return table_.size();
#endif
}
};
// Factory implementation remains the same
MemoryFactory::MemoryFactory()
class MemoryFactory : public Factory
{
Manager::instance().insert(*this);
}
public:
MemoryFactory()
{
Manager::instance().insert(*this);
}
MemoryFactory::~MemoryFactory()
{
Manager::instance().erase(*this);
}
~MemoryFactory() override
{
Manager::instance().erase(*this);
}
std::string
MemoryFactory::getName() const
{
return "Memory";
}
std::string
getName() const override
{
return "Memory";
}
std::unique_ptr<Backend>
MemoryFactory::createInstance(
size_t keyBytes,
Section const& keyValues,
std::size_t,
Scheduler& scheduler,
beast::Journal journal)
{
return std::make_unique<MemoryBackend>(keyBytes, keyValues, journal);
}
std::unique_ptr<Backend>
createInstance(
size_t keyBytes,
Section const& keyValues,
std::size_t burstSize,
Scheduler& scheduler,
beast::Journal journal) override
{
return std::make_unique<MemoryBackend>(keyBytes, keyValues, journal);
}
};
static MemoryFactory memoryFactory;
} // namespace NodeStore
} // namespace ripple

View File

@@ -47,6 +47,8 @@ public:
{
entries.emplace(entry.endpoint, entry.valence);
}
std::cout << "peer finder mem map size: " << entries.size() << "\n";
}
};