attempt to buffer mysql nodestore with memory cache

This commit is contained in:
Richard Holland
2025-02-15 10:41:18 +11:00
parent 8aedbba785
commit 693579e01e

View File

@@ -10,8 +10,11 @@
#include <boost/beast/core/string.hpp>
#include <chrono>
#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <mysql/mysql.h>
#include <queue>
#include <sstream>
#include <thread>
@@ -198,6 +201,37 @@ private:
bool isOpen_{false};
Config const& config_;
static constexpr std::size_t BATCH_SIZE = 1000;
static constexpr std::size_t MAX_CACHE_SIZE =
100000; // Maximum number of entries
static constexpr std::size_t CACHE_CLEANUP_THRESHOLD =
120000; // When to trigger cleanup
using DataStore = std::map<uint256, std::vector<std::uint8_t>>;
DataStore cache_;
std::mutex cacheMutex_;
// LRU tracking for cache management
struct CacheEntry
{
std::chrono::steady_clock::time_point last_access;
size_t size;
};
std::map<uint256, CacheEntry> cacheMetadata_;
std::mutex metadataMutex_;
std::atomic<size_t> currentCacheSize_{0};
// Background write queue
struct WriteOp
{
uint256 hash;
std::vector<std::uint8_t> data;
};
std::queue<WriteOp> writeQueue_;
std::mutex queueMutex_;
std::condition_variable queueCV_;
std::atomic<bool> shouldStop_{false};
std::thread writeThread_;
MySQLConnection*
getConnection()
@@ -210,25 +244,324 @@ private:
return threadConnection_.get();
}
std::string
sanitizeTableName(std::string name)
{
name.erase(
std::unique(
name.begin(),
std::transform(
name.begin(),
name.end(),
name.begin(),
[](char c) { return std::isalnum(c) ? c : '_'; })),
name.end());
return "nodes_" + name;
}
void
createTable() // Renamed from createDatabase to better reflect its purpose
cleanupCache()
{
if (currentCacheSize_.load() < CACHE_CLEANUP_THRESHOLD)
return;
// Collect entries sorted by last access time
std::vector<std::pair<uint256, std::chrono::steady_clock::time_point>>
entries;
{
std::lock_guard<std::mutex> metadataLock(metadataMutex_);
for (const auto& [hash, metadata] : cacheMetadata_)
{
entries.emplace_back(hash, metadata.last_access);
}
}
// Sort by access time, oldest first
std::sort(
entries.begin(), entries.end(), [](const auto& a, const auto& b) {
return a.second < b.second;
});
// Remove oldest entries until we're below target size
size_t removedSize = 0;
for (const auto& entry : entries)
{
if (currentCacheSize_.load() <= MAX_CACHE_SIZE)
break;
{
std::lock_guard<std::mutex> metadataLock(metadataMutex_);
auto metaIt = cacheMetadata_.find(entry.first);
if (metaIt != cacheMetadata_.end())
{
removedSize += metaIt->second.size;
cacheMetadata_.erase(metaIt);
}
}
{
std::lock_guard<std::mutex> cacheLock(cacheMutex_);
cache_.erase(entry.first);
}
currentCacheSize_--;
}
JLOG(journal_.debug())
<< "Cache cleanup removed " << removedSize
<< " bytes, current size: " << currentCacheSize_.load();
}
void
updateCacheMetadata(const uint256& hash, size_t size)
{
CacheEntry entry{std::chrono::steady_clock::now(), size};
{
std::lock_guard<std::mutex> metadataLock(metadataMutex_);
cacheMetadata_[hash] = entry;
}
if (++currentCacheSize_ >= CACHE_CLEANUP_THRESHOLD)
{
cleanupCache();
}
}
Status
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
{
if (!isOpen_)
return notFound;
uint256 const hash(uint256::fromVoid(key));
// Check cache first
{
std::lock_guard<std::mutex> cacheLock(cacheMutex_);
auto it = cache_.find(hash);
if (it != cache_.end())
{
// Update access time
{
std::lock_guard<std::mutex> metadataLock(metadataMutex_);
auto metaIt = cacheMetadata_.find(hash);
if (metaIt != cacheMetadata_.end())
{
metaIt->second.last_access =
std::chrono::steady_clock::now();
}
}
nudb::detail::buffer decompressed;
auto const result = nodeobject_decompress(
it->second.data(), it->second.size(), decompressed);
DecodedBlob decoded(hash.data(), result.first, result.second);
if (decoded.wasOk())
{
*pObject = decoded.createObject();
return ok;
}
}
}
// If not in cache, fetch from MySQL
return fetchFromMySQL(key, pObject);
}
void
startWriteThread()
{
writeThread_ = std::thread([this]() {
while (!shouldStop_)
{
std::vector<WriteOp> batch;
{
std::unique_lock<std::mutex> lock(queueMutex_);
queueCV_.wait_for(
lock, std::chrono::milliseconds(100), [this]() {
return !writeQueue_.empty() || shouldStop_;
});
// Grab up to BATCH_SIZE operations
while (!writeQueue_.empty() && batch.size() < BATCH_SIZE)
{
batch.push_back(std::move(writeQueue_.front()));
writeQueue_.pop();
}
}
if (!batch.empty())
{
auto* conn = getConnection();
if (!conn->ensureConnection())
continue;
if (mysql_query(conn->get(), "START TRANSACTION"))
continue;
bool success = true;
for (auto const& op : batch)
{
MYSQL_STMT* stmt = mysql_stmt_init(conn->get());
if (!stmt)
{
success = false;
break;
}
std::string const sql = "INSERT INTO " + name_ +
" (hash, data) VALUES (?, ?) " +
"ON DUPLICATE KEY UPDATE data = VALUES(data)";
if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length()))
{
mysql_stmt_close(stmt);
success = false;
break;
}
MYSQL_BIND bind[2];
std::memset(bind, 0, sizeof(bind));
bind[0].buffer_type = MYSQL_TYPE_BLOB;
bind[0].buffer = const_cast<void*>(
static_cast<void const*>(op.hash.data()));
bind[0].buffer_length = op.hash.size();
bind[1].buffer_type = MYSQL_TYPE_BLOB;
bind[1].buffer = const_cast<uint8_t*>(op.data.data());
bind[1].buffer_length = op.data.size();
if (mysql_stmt_bind_param(stmt, bind))
{
mysql_stmt_close(stmt);
success = false;
break;
}
if (mysql_stmt_execute(stmt))
{
mysql_stmt_close(stmt);
success = false;
break;
}
mysql_stmt_close(stmt);
}
if (success)
mysql_query(conn->get(), "COMMIT");
else
mysql_query(conn->get(), "ROLLBACK");
}
}
});
}
void
queueWrite(uint256 const& hash, std::vector<std::uint8_t> const& data)
{
std::lock_guard<std::mutex> lock(queueMutex_);
writeQueue_.push({hash, data});
queueCV_.notify_one();
}
Status
fetchFromMySQL(void const* key, std::shared_ptr<NodeObject>* pObject)
{
auto* conn = getConnection();
if (!conn->ensureConnection())
Throw<std::runtime_error>("Failed to connect to MySQL server");
// Create table only
std::string query(1024, '\0');
int length =
snprintf(&query[0], query.size(), CREATE_TABLE, name_.c_str());
query.resize(length);
if (!conn->executeQuery(query))
{
JLOG(journal_.error())
<< "Failed to create table: " << mysql_error(conn->get());
Throw<std::runtime_error>("Failed to create table");
JLOG(journal_.warn()) << "fetch: Failed to ensure connection";
return dataCorrupt;
}
uint256 const hash(uint256::fromVoid(key));
MYSQL_STMT* stmt = mysql_stmt_init(conn->get());
if (!stmt)
return dataCorrupt;
std::string const sql = "SELECT data FROM " + name_ + " WHERE hash = ?";
if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length()))
{
mysql_stmt_close(stmt);
return dataCorrupt;
}
MYSQL_BIND bindParam;
std::memset(&bindParam, 0, sizeof(bindParam));
bindParam.buffer_type = MYSQL_TYPE_BLOB;
bindParam.buffer =
const_cast<void*>(static_cast<void const*>(hash.data()));
bindParam.buffer_length = hash.size();
if (mysql_stmt_bind_param(stmt, &bindParam))
{
mysql_stmt_close(stmt);
return dataCorrupt;
}
if (mysql_stmt_execute(stmt))
{
mysql_stmt_close(stmt);
return notFound;
}
MYSQL_BIND bindResult;
std::memset(&bindResult, 0, sizeof(bindResult));
uint64_t length = 0;
bool is_null = false;
bindResult.buffer_type = MYSQL_TYPE_BLOB;
bindResult.length = &length;
bindResult.is_null = &is_null;
if (mysql_stmt_bind_result(stmt, &bindResult))
{
mysql_stmt_close(stmt);
return dataCorrupt;
}
if (mysql_stmt_store_result(stmt))
{
mysql_stmt_close(stmt);
return dataCorrupt;
}
if (mysql_stmt_num_rows(stmt) == 0)
{
mysql_stmt_close(stmt);
return notFound;
}
std::vector<uint8_t> buffer(16 * 1024 * 1024); // 16MB buffer
bindResult.buffer = buffer.data();
bindResult.buffer_length = buffer.size();
if (mysql_stmt_fetch(stmt))
{
mysql_stmt_close(stmt);
return dataCorrupt;
}
mysql_stmt_close(stmt);
// Add to cache
std::vector<uint8_t> cached_data(
buffer.begin(), buffer.begin() + length);
cache_.insert_or_assign(hash, cached_data);
updateCacheMetadata(hash, length);
nudb::detail::buffer decompressed;
auto const result = nodeobject_decompress(
cached_data.data(), cached_data.size(), decompressed);
DecodedBlob decoded(hash.data(), result.first, result.second);
if (!decoded.wasOk())
return dataCorrupt;
*pObject = decoded.createObject();
return ok;
}
public:
@@ -236,22 +569,19 @@ public:
std::size_t keyBytes,
Section const& keyValues,
beast::Journal journal)
: name_(get(keyValues, "path", "nodestore"))
: name_(sanitizeTableName(get(keyValues, "path", "nodestore")))
, journal_(journal)
, config_(keyValues.getParent())
{
// Sanitize table name
name_.erase(
std::unique(
name_.begin(),
std::transform(
name_.begin(),
name_.end(),
name_.begin(),
[](char c) { return std::isalnum(c) ? c : '_'; })),
name_.end());
startWriteThread();
}
name_ = "nodes_" + name_;
~MySQLBackend()
{
shouldStop_ = true;
queueCV_.notify_all();
if (writeThread_.joinable())
writeThread_.join();
}
std::string
@@ -271,7 +601,7 @@ public:
Throw<std::runtime_error>("Failed to establish MySQL connection");
if (createIfMissing)
createTable(); // Only create table if requested
createTable();
isOpen_ = true;
}
@@ -285,186 +615,66 @@ public:
void
close() override
{
// Wait for write queue to empty
{
std::unique_lock<std::mutex> lock(queueMutex_);
while (!writeQueue_.empty())
{
queueCV_.wait(lock);
}
}
threadConnection_.reset();
cache_.clear();
cacheMetadata_.clear();
currentCacheSize_ = 0;
isOpen_ = false;
}
Status
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
{
if (!isOpen_)
{
JLOG(journal_.warn()) << "fetch: Database not open";
return notFound;
}
auto* conn = getConnection();
if (!conn->ensureConnection())
{
JLOG(journal_.warn()) << "fetch: Failed to ensure connection";
return dataCorrupt;
}
uint256 const hash(uint256::fromVoid(key));
MYSQL_STMT* stmt = mysql_stmt_init(conn->get());
if (!stmt)
{
JLOG(journal_.warn())
<< "fetch: Failed to initialize prepared statement";
return dataCorrupt;
}
std::string const sql = "SELECT data FROM " + name_ + " WHERE hash = ?";
if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length()))
{
JLOG(journal_.warn())
<< "fetch: Failed to prepare statement. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
MYSQL_BIND bindParam;
std::memset(&bindParam, 0, sizeof(bindParam));
bindParam.buffer_type = MYSQL_TYPE_BLOB;
bindParam.buffer =
const_cast<void*>(static_cast<void const*>(hash.data()));
bindParam.buffer_length = hash.size();
if (mysql_stmt_bind_param(stmt, &bindParam))
{
JLOG(journal_.warn()) << "fetch: Failed to bind parameter. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
if (mysql_stmt_execute(stmt))
{
JLOG(journal_.warn())
<< "fetch: Failed to execute statement. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return notFound;
}
MYSQL_BIND bindResult;
std::memset(&bindResult, 0, sizeof(bindResult));
uint64_t length = 0;
bool is_null = false;
bindResult.buffer_type = MYSQL_TYPE_BLOB;
bindResult.length = &length;
bindResult.is_null = &is_null;
if (mysql_stmt_bind_result(stmt, &bindResult))
{
JLOG(journal_.warn()) << "fetch: Failed to bind result. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
// After binding the initial result...
if (mysql_stmt_store_result(stmt))
{
JLOG(journal_.warn()) << "fetch: Failed to store result. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
if (mysql_stmt_num_rows(stmt) == 0)
{
mysql_stmt_close(stmt);
return notFound;
}
// Allocate buffer - MEDIUMBLOB can store up to 16MB
constexpr size_t MEDIUM_BLOB_MAX = 16 * 1024 * 1024;
std::vector<uint8_t> buffer(MEDIUM_BLOB_MAX);
bindResult.buffer = buffer.data();
bindResult.buffer_length = MEDIUM_BLOB_MAX;
// Re-bind with the allocated buffer
if (mysql_stmt_bind_result(stmt, &bindResult))
{
JLOG(journal_.warn()) << "fetch: Failed to re-bind result. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
// Single fetch for the row
if (mysql_stmt_fetch(stmt))
{
JLOG(journal_.warn()) << "fetch: Failed to fetch row. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
// Use the actual length for subsequent operations
size_t actual_length = *bindResult.length;
// Re-bind with properly sized buffer
if (mysql_stmt_bind_result(stmt, &bindResult))
{
JLOG(journal_.warn()) << "fetch: Failed to re-bind result. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
// Seek back to start of results
mysql_stmt_data_seek(stmt, 0);
// Fetch again with proper buffer
if (mysql_stmt_fetch(stmt))
{
JLOG(journal_.warn()) << "fetch: Failed to fetch row. Error: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return dataCorrupt;
}
// After successful fetch
JLOG(journal_.trace()) << "fetch: Successfully fetched row. "
<< "Length: " << *bindResult.length
<< ", is_null: " << *bindResult.is_null
<< ", buffer size: " << buffer.size();
mysql_stmt_close(stmt);
nudb::detail::buffer decompressed;
auto const result =
nodeobject_decompress(buffer.data(), actual_length, decompressed);
JLOG(journal_.trace())
<< "fetch: Decompression result - size: " << result.second
<< ", success: " << (result.first != nullptr);
DecodedBlob decoded(hash.data(), result.first, result.second);
if (!decoded.wasOk())
{
JLOG(journal_.warn()) << "fetch: Blob decoding failed";
return dataCorrupt;
}
*pObject = decoded.createObject();
return ok;
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256 const*> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
if (!isOpen_)
return {results, notFound};
std::vector<uint256 const*> mysqlFetch;
mysqlFetch.reserve(hashes.size());
// First try cache
for (auto const& h : hashes)
{
auto it = cache_.find(*h);
if (it != cache_.end())
{
// Update access time
auto metaIt = cacheMetadata_.find(*h);
if (metaIt != cacheMetadata_.end())
{
metaIt->second.last_access =
std::chrono::steady_clock::now();
}
nudb::detail::buffer decompressed;
auto const result = nodeobject_decompress(
it->second.data(), it->second.size(), decompressed);
DecodedBlob decoded(h->data(), result.first, result.second);
if (decoded.wasOk())
{
results.push_back(decoded.createObject());
continue;
}
}
mysqlFetch.push_back(h);
results.push_back(nullptr); // Placeholder for MySQL fetch
}
// If everything was in cache, return early
if (mysqlFetch.empty())
return {results, ok};
// Fetch remaining from MySQL
auto* conn = getConnection();
if (!conn->ensureConnection())
return {results, dataCorrupt};
@@ -474,11 +684,17 @@ public:
try
{
for (auto const& h : hashes)
for (size_t i = 0; i < mysqlFetch.size(); ++i)
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h->begin(), &nObj);
results.push_back(status == ok ? nObj : nullptr);
Status status = fetchFromMySQL(mysqlFetch[i]->data(), &nObj);
// Find the original position in results
auto originalPos = std::distance(
hashes.begin(),
std::find(hashes.begin(), hashes.end(), mysqlFetch[i]));
results[originalPos] = (status == ok ? nObj : nullptr);
}
if (mysql_query(conn->get(), "COMMIT"))
@@ -499,86 +715,58 @@ public:
if (!isOpen_ || !object)
return;
auto* conn = getConnection();
if (!conn->ensureConnection())
return;
EncodedBlob encoded(object);
nudb::detail::buffer compressed;
auto const result = nodeobject_compress(
encoded.getData(), encoded.getSize(), compressed);
MYSQL_STMT* stmt = mysql_stmt_init(conn->get());
if (!stmt)
return;
std::vector<std::uint8_t> data(
static_cast<const std::uint8_t*>(result.first),
static_cast<const std::uint8_t*>(result.first) + result.second);
std::string const sql = "INSERT INTO " + name_ +
" (hash, data) VALUES (?, ?) "
"ON DUPLICATE KEY UPDATE data = VALUES(data)";
// Update cache immediately
cache_.insert_or_assign(object->getHash(), data);
updateCacheMetadata(object->getHash(), data.size());
if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length()))
{
JLOG(journal_.error()) << "Failed to prepare MySQL statement: "
<< mysql_stmt_error(stmt);
mysql_stmt_close(stmt);
return;
}
MYSQL_BIND bind[2];
std::memset(bind, 0, sizeof(bind));
auto const& hash = object->getHash();
bind[0].buffer_type = MYSQL_TYPE_BLOB;
bind[0].buffer =
const_cast<void*>(static_cast<void const*>(hash.data()));
bind[0].buffer_length = hash.size();
bind[1].buffer_type = MYSQL_TYPE_BLOB;
bind[1].buffer =
const_cast<void*>(static_cast<void const*>(result.first));
bind[1].buffer_length = result.second;
if (mysql_stmt_bind_param(stmt, bind))
{
mysql_stmt_close(stmt);
return;
}
mysql_stmt_execute(stmt);
mysql_stmt_close(stmt);
// Queue async write to MySQL
queueWrite(object->getHash(), data);
}
void
storeBatch(Batch const& batch) override
{
if (!isOpen_)
return;
auto* conn = getConnection();
if (!conn->ensureConnection())
return;
// if (mysql_query(conn->get(), "START TRANSACTION"))
// return;
try
for (auto const& e : batch)
{
for (auto const& e : batch)
store(e);
if (!e)
continue;
// if (mysql_query(conn->get(), "COMMIT"))
// mysql_query(conn->get(), "ROLLBACK");
}
catch (...)
{
// mysql_query(conn->get(), "ROLLBACK");
throw;
EncodedBlob encoded(e);
nudb::detail::buffer compressed;
auto const result = nodeobject_compress(
encoded.getData(), encoded.getSize(), compressed);
std::vector<std::uint8_t> data(
static_cast<const std::uint8_t*>(result.first),
static_cast<const std::uint8_t*>(result.first) + result.second);
// Update cache immediately
cache_.insert_or_assign(e->getHash(), data);
updateCacheMetadata(e->getHash(), data.size());
// Queue async write to MySQL
queueWrite(e->getHash(), data);
}
}
void
sync() override
{
// Wait for write queue to empty
std::unique_lock<std::mutex> lock(queueMutex_);
while (!writeQueue_.empty())
{
queueCV_.wait(lock);
}
}
void
@@ -587,6 +775,27 @@ public:
if (!isOpen_)
return;
// First, process all cached entries
std::vector<std::pair<uint256, std::vector<std::uint8_t>>>
cached_entries;
for (const auto& entry : cache_)
{
cached_entries.push_back(entry);
}
for (const auto& entry : cached_entries)
{
nudb::detail::buffer decompressed;
auto const result = nodeobject_decompress(
entry.second.data(), entry.second.size(), decompressed);
DecodedBlob decoded(
entry.first.data(), result.first, result.second);
if (decoded.wasOk())
f(decoded.createObject());
}
// Then fetch any remaining entries from MySQL
auto* conn = getConnection();
if (!conn->ensureConnection())
return;
@@ -608,15 +817,32 @@ public:
if (!lengths)
continue;
uint256 hash;
std::memcpy(hash.data(), row[0], hash.size());
// Skip if already processed from cache
if (cache_.find(hash) != cache_.end())
continue;
nudb::detail::buffer decompressed;
auto const decomp_result = nodeobject_decompress(
row[1], static_cast<std::size_t>(lengths[1]), decompressed);
DecodedBlob decoded(
row[0], decomp_result.first, decomp_result.second);
hash.data(), decomp_result.first, decomp_result.second);
if (decoded.wasOk())
f(decoded.createObject());
{
auto obj = decoded.createObject();
f(obj);
// Add to cache for future use
std::vector<std::uint8_t> data(
reinterpret_cast<const std::uint8_t*>(row[1]),
reinterpret_cast<const std::uint8_t*>(row[1]) + lengths[1]);
cache_.insert_or_assign(hash, std::move(data));
updateCacheMetadata(hash, lengths[1]);
}
}
mysql_free_result(result);
@@ -625,7 +851,8 @@ public:
int
getWriteLoad() override
{
return 0;
std::lock_guard<std::mutex> lock(queueMutex_);
return static_cast<int>(writeQueue_.size());
}
void
@@ -639,6 +866,27 @@ public:
{
return 1;
}
private:
void
createTable()
{
auto* conn = getConnection();
if (!conn->ensureConnection())
Throw<std::runtime_error>("Failed to connect to MySQL server");
std::string query(1024, '\0');
int length =
snprintf(&query[0], query.size(), CREATE_TABLE, name_.c_str());
query.resize(length);
if (!conn->executeQuery(query))
{
JLOG(journal_.error())
<< "Failed to create table: " << mysql_error(conn->get());
Throw<std::runtime_error>("Failed to create table");
}
}
};
class MySQLFactory : public Factory