mirror of
https://github.com/Xahau/xahaud.git
synced 2026-04-29 15:37:46 +00:00
more fixes for mysql nodedb
This commit is contained in:
@@ -45,7 +45,6 @@ private:
|
||||
};
|
||||
MySQLConfig config_;
|
||||
|
||||
// Initialize a new MySQL connection using stored config
|
||||
MYSQL*
|
||||
initializeConnection()
|
||||
{
|
||||
@@ -71,7 +70,24 @@ private:
|
||||
std::string("Failed to connect to MySQL: ") + error);
|
||||
}
|
||||
|
||||
// Select the database
|
||||
// Try to select the database first
|
||||
if (mysql_select_db(mysql, config_.name.c_str()))
|
||||
{
|
||||
// Database selection failed, try to create it
|
||||
std::string create_db_query = "CREATE DATABASE IF NOT EXISTS " +
|
||||
std::string(config_.name.c_str());
|
||||
|
||||
if (mysql_query(mysql, create_db_query.c_str()))
|
||||
{
|
||||
// Creation failed for some reason
|
||||
auto error = mysql_error(mysql);
|
||||
mysql_close(mysql);
|
||||
throw std::runtime_error(
|
||||
std::string("Failed to create database: ") + error);
|
||||
}
|
||||
}
|
||||
|
||||
// Try selecting again (either after creation or if it existed already)
|
||||
if (mysql_select_db(mysql, config_.name.c_str()))
|
||||
{
|
||||
auto error = mysql_error(mysql);
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include <ripple/nodestore/impl/EncodedBlob.h>
|
||||
#include <ripple/nodestore/impl/codec.h>
|
||||
#include <boost/beast/core/string.hpp>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <mysql/mysql.h>
|
||||
@@ -17,50 +18,109 @@
|
||||
namespace ripple {
|
||||
namespace NodeStore {
|
||||
|
||||
// SQL statements as constants
|
||||
static constexpr auto CREATE_DATABASE = R"SQL(
|
||||
CREATE DATABASE IF NOT EXISTS `%s`
|
||||
CHARACTER SET utf8mb4
|
||||
COLLATE utf8mb4_unicode_ci
|
||||
)SQL";
|
||||
|
||||
static constexpr auto CREATE_TABLE = R"SQL(
|
||||
CREATE TABLE IF NOT EXISTS `%s` (
|
||||
hash BINARY(32) PRIMARY KEY,
|
||||
data MEDIUMBLOB NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
INDEX idx_created_at (created_at)
|
||||
) ENGINE=InnoDB
|
||||
)SQL";
|
||||
|
||||
static constexpr auto INSERT_NODE = R"SQL(
|
||||
INSERT INTO %s (hash, data)
|
||||
VALUES (?, ?)
|
||||
ON DUPLICATE KEY UPDATE data = VALUES(data)
|
||||
)SQL";
|
||||
|
||||
class MySQLConnection
|
||||
{
|
||||
private:
|
||||
std::unique_ptr<MYSQL, decltype(&mysql_close)> mysql_;
|
||||
Config const& config_;
|
||||
std::string const& dbName_;
|
||||
beast::Journal journal_;
|
||||
static constexpr int MAX_RETRY_ATTEMPTS = 3;
|
||||
static constexpr auto RETRY_DELAY_MS = 1000;
|
||||
|
||||
public:
|
||||
MySQLConnection(
|
||||
Config const& config,
|
||||
std::string const& dbName,
|
||||
beast::Journal journal)
|
||||
: mysql_(mysql_init(nullptr), mysql_close)
|
||||
, config_(config)
|
||||
, dbName_(dbName)
|
||||
, journal_(journal)
|
||||
bool
|
||||
connect()
|
||||
{
|
||||
mysql_.reset(mysql_init(nullptr));
|
||||
if (!mysql_)
|
||||
Throw<std::runtime_error>("Failed to initialize MySQL");
|
||||
return false;
|
||||
|
||||
if (!config_.mysql.has_value())
|
||||
throw std::runtime_error(
|
||||
"[mysql_settings] stanza missing from config!");
|
||||
// Set connection options
|
||||
unsigned int timeout = 5;
|
||||
mysql_options(mysql_.get(), MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
|
||||
uint8_t const reconnect = 1;
|
||||
mysql_options(mysql_.get(), MYSQL_OPT_RECONNECT, &reconnect);
|
||||
|
||||
// Connect without database first
|
||||
auto* conn = mysql_real_connect(
|
||||
mysql_.get(),
|
||||
config_.mysql->host.c_str(),
|
||||
config_.mysql->user.c_str(),
|
||||
config_.mysql->pass.c_str(),
|
||||
dbName_.c_str(),
|
||||
nullptr, // No database selected yet
|
||||
config_.mysql->port,
|
||||
nullptr,
|
||||
0);
|
||||
CLIENT_MULTI_STATEMENTS);
|
||||
|
||||
if (!conn)
|
||||
return false;
|
||||
|
||||
// Create database (unconditionally)
|
||||
std::string query(1024, '\0');
|
||||
int length = snprintf(
|
||||
&query[0],
|
||||
query.size(),
|
||||
CREATE_DATABASE,
|
||||
config_.mysql->name.c_str());
|
||||
query.resize(length);
|
||||
|
||||
if (mysql_query(mysql_.get(), query.c_str()))
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Failed to create database: " << mysql_error(mysql_.get());
|
||||
return false;
|
||||
}
|
||||
|
||||
// Now select the database
|
||||
if (mysql_select_db(mysql_.get(), config_.mysql->name.c_str()))
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Failed to select database: " << mysql_error(mysql_.get());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public:
|
||||
MySQLConnection(Config const& config, beast::Journal journal)
|
||||
: mysql_(nullptr, mysql_close), config_(config), journal_(journal)
|
||||
{
|
||||
if (!config_.mysql.has_value())
|
||||
throw std::runtime_error(
|
||||
"[mysql_settings] stanza missing from config!");
|
||||
|
||||
if (config_.mysql->name.empty())
|
||||
throw std::runtime_error(
|
||||
"Database name missing from mysql_settings!");
|
||||
|
||||
if (!connect())
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
std::string("Failed to connect to MySQL: ") +
|
||||
mysql_error(mysql_.get()));
|
||||
(mysql_ ? mysql_error(mysql_.get()) : "initialization failed"));
|
||||
}
|
||||
|
||||
uint8_t const reconnect = 1;
|
||||
mysql_options(mysql_.get(), MYSQL_OPT_RECONNECT, &reconnect);
|
||||
}
|
||||
|
||||
MYSQL*
|
||||
@@ -72,39 +132,52 @@ public:
|
||||
bool
|
||||
ensureConnection()
|
||||
{
|
||||
if (!mysql_ || !mysql_.get() || mysql_ping(mysql_.get()) != 0)
|
||||
for (int attempt = 0; attempt < MAX_RETRY_ATTEMPTS; ++attempt)
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "MySQL connection lost, attempting reconnect";
|
||||
try
|
||||
if (!mysql_ || mysql_ping(mysql_.get()) != 0)
|
||||
{
|
||||
mysql_.reset(mysql_init(nullptr));
|
||||
auto* conn = mysql_real_connect(
|
||||
mysql_.get(),
|
||||
config_.mysql->host.c_str(),
|
||||
config_.mysql->user.c_str(),
|
||||
config_.mysql->pass.c_str(),
|
||||
dbName_.c_str(),
|
||||
config_.mysql->port,
|
||||
nullptr,
|
||||
0);
|
||||
JLOG(journal_.warn())
|
||||
<< "MySQL connection lost, attempting reconnect (attempt "
|
||||
<< (attempt + 1) << "/" << MAX_RETRY_ATTEMPTS << ")";
|
||||
|
||||
if (!conn)
|
||||
return false;
|
||||
if (connect())
|
||||
return true;
|
||||
|
||||
uint8_t const reconnect = 1;
|
||||
mysql_options(mysql_.get(), MYSQL_OPT_RECONNECT, &reconnect);
|
||||
if (attempt < MAX_RETRY_ATTEMPTS - 1)
|
||||
{
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(RETRY_DELAY_MS));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
return true;
|
||||
}
|
||||
catch (...)
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Helper method to execute a query with retry logic
|
||||
bool
|
||||
executeQuery(std::string const& query)
|
||||
{
|
||||
for (int attempt = 0; attempt < MAX_RETRY_ATTEMPTS; ++attempt)
|
||||
{
|
||||
if (ensureConnection() && !mysql_query(mysql_.get(), query.c_str()))
|
||||
return true;
|
||||
|
||||
if (attempt < MAX_RETRY_ATTEMPTS - 1)
|
||||
{
|
||||
return false;
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(RETRY_DELAY_MS));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
static thread_local std::unique_ptr<MySQLConnection> threadConnection_;
|
||||
|
||||
class MySQLBackend : public Backend
|
||||
{
|
||||
private:
|
||||
@@ -112,23 +185,7 @@ private:
|
||||
beast::Journal journal_;
|
||||
bool isOpen_{false};
|
||||
Config const& config_;
|
||||
|
||||
// Thread-local MySQL connection
|
||||
static thread_local std::unique_ptr<MySQLConnection> threadConnection_;
|
||||
|
||||
static constexpr auto CREATE_DATABASE = R"SQL(
|
||||
CREATE DATABASE IF NOT EXISTS `%s`
|
||||
CHARACTER SET utf8mb4
|
||||
COLLATE utf8mb4_unicode_ci
|
||||
)SQL";
|
||||
|
||||
static constexpr auto CREATE_NODES_TABLE = R"SQL(
|
||||
CREATE TABLE IF NOT EXISTS nodes (
|
||||
hash BINARY(32) PRIMARY KEY,
|
||||
data MEDIUMBLOB NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
) ENGINE=InnoDB
|
||||
)SQL";
|
||||
static constexpr std::size_t BATCH_SIZE = 1000;
|
||||
|
||||
MySQLConnection*
|
||||
getConnection()
|
||||
@@ -136,11 +193,32 @@ private:
|
||||
if (!threadConnection_)
|
||||
{
|
||||
threadConnection_ =
|
||||
std::make_unique<MySQLConnection>(config_, name_, journal_);
|
||||
std::make_unique<MySQLConnection>(config_, journal_);
|
||||
}
|
||||
return threadConnection_.get();
|
||||
}
|
||||
|
||||
void
|
||||
createTable() // Renamed from createDatabase to better reflect its purpose
|
||||
{
|
||||
auto* conn = getConnection();
|
||||
if (!conn->ensureConnection())
|
||||
Throw<std::runtime_error>("Failed to connect to MySQL server");
|
||||
|
||||
// Create table only
|
||||
std::string query(1024, '\0');
|
||||
int length =
|
||||
snprintf(&query[0], query.size(), CREATE_TABLE, name_.c_str());
|
||||
query.resize(length);
|
||||
|
||||
if (!conn->executeQuery(query))
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Failed to create table: " << mysql_error(conn->get());
|
||||
Throw<std::runtime_error>("Failed to create table");
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
MySQLBackend(
|
||||
std::size_t keyBytes,
|
||||
@@ -150,36 +228,14 @@ public:
|
||||
, journal_(journal)
|
||||
, config_(keyValues.getParent())
|
||||
{
|
||||
// mysql names are limited to alphanumeric
|
||||
// Sanitize table name
|
||||
name_.erase(
|
||||
std::remove_if(
|
||||
name_.begin(),
|
||||
name_.end(),
|
||||
[](char c) { return !std::isalnum(c); }),
|
||||
name_.end());
|
||||
}
|
||||
|
||||
void
|
||||
createDatabase()
|
||||
{
|
||||
auto conn = std::make_unique<MySQLConnection>(config_, "", journal_);
|
||||
|
||||
std::string query(1024, '\0');
|
||||
int length =
|
||||
snprintf(&query[0], query.size(), CREATE_DATABASE, name_.c_str());
|
||||
query.resize(length);
|
||||
|
||||
if (mysql_query(conn->get(), query.c_str()))
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
std::string("Failed to create database: ") +
|
||||
mysql_error(conn->get()));
|
||||
}
|
||||
}
|
||||
|
||||
~MySQLBackend() override
|
||||
{
|
||||
close();
|
||||
name_ = "nodes_" + name_;
|
||||
}
|
||||
|
||||
std::string
|
||||
@@ -192,27 +248,14 @@ public:
|
||||
open(bool createIfMissing) override
|
||||
{
|
||||
if (isOpen_)
|
||||
Throw<std::runtime_error>("already open");
|
||||
|
||||
if (!config_.mysql.has_value())
|
||||
throw std::runtime_error(
|
||||
"[mysql_settings] stanza missing from config!");
|
||||
|
||||
createDatabase();
|
||||
Throw<std::runtime_error>("database already open");
|
||||
|
||||
auto* conn = getConnection();
|
||||
if (!conn->ensureConnection())
|
||||
Throw<std::runtime_error>("Failed to establish MySQL connection");
|
||||
|
||||
if (createIfMissing)
|
||||
{
|
||||
if (mysql_query(conn->get(), CREATE_NODES_TABLE))
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
std::string("Failed to create nodes table: ") +
|
||||
mysql_error(conn->get()));
|
||||
}
|
||||
}
|
||||
createTable(); // Only create table if requested
|
||||
|
||||
isOpen_ = true;
|
||||
}
|
||||
@@ -518,9 +561,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// Initialize the thread_local connection
|
||||
thread_local std::unique_ptr<MySQLConnection> MySQLBackend::threadConnection_;
|
||||
|
||||
class MySQLFactory : public Factory
|
||||
{
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user