From 53bf63af7b890e6d3bc5def182b8c8331d56a5fd Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Tue, 11 Feb 2025 16:47:12 +1100 Subject: [PATCH] initial mysql backend, compiling not tested --- Builds/CMake/FindMySQL.cmake | 48 + Builds/CMake/RippledCore.cmake | 1 + Builds/CMake/deps/MySQL.cmake | 54 + CMakeLists.txt | 1 + src/ripple/app/rdb/backend/MySQLDatabase.h | 1532 +++++++++++++++++ .../app/rdb/impl/RelationalDatabase.cpp | 10 + src/ripple/core/Config.h | 11 + src/ripple/core/ConfigSections.h | 1 + src/ripple/core/impl/Config.cpp | 24 + src/ripple/nodestore/backend/MySQLFactory.cpp | 420 +++++ 10 files changed, 2102 insertions(+) create mode 100644 Builds/CMake/FindMySQL.cmake create mode 100644 Builds/CMake/deps/MySQL.cmake create mode 100644 src/ripple/app/rdb/backend/MySQLDatabase.h create mode 100644 src/ripple/nodestore/backend/MySQLFactory.cpp diff --git a/Builds/CMake/FindMySQL.cmake b/Builds/CMake/FindMySQL.cmake new file mode 100644 index 000000000..7e1da1ec9 --- /dev/null +++ b/Builds/CMake/FindMySQL.cmake @@ -0,0 +1,48 @@ +# - Find MySQL +find_path(MYSQL_INCLUDE_DIR + NAMES mysql.h + PATHS + /usr/include/mysql + /usr/local/include/mysql + /opt/mysql/mysql/include + DOC "MySQL include directory" +) + +find_library(MYSQL_LIBRARY + NAMES mysqlclient + PATHS + /usr/lib + /usr/lib/x86_64-linux-gnu + /usr/lib/mysql + /usr/local/lib/mysql + /opt/mysql/mysql/lib + DOC "MySQL client library" +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(MySQL + REQUIRED_VARS + MYSQL_LIBRARY + MYSQL_INCLUDE_DIR +) + +if(MYSQL_FOUND) + set(MYSQL_INCLUDE_DIRS ${MYSQL_INCLUDE_DIR}) + set(MYSQL_LIBRARIES ${MYSQL_LIBRARY}) + + # Create an imported target + if(NOT TARGET MySQL::MySQL) + add_library(MySQL::MySQL UNKNOWN IMPORTED) + set_target_properties(MySQL::MySQL PROPERTIES + IMPORTED_LOCATION "${MYSQL_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${MYSQL_INCLUDE_DIR}" + ) + endif() + + mark_as_advanced(MYSQL_INCLUDE_DIR MYSQL_LIBRARY) +else() + message(FATAL_ERROR "Could not find MySQL development files") +endif() + +message(STATUS "Using MySQL include dir: ${MYSQL_INCLUDE_DIR}") +message(STATUS "Using MySQL library: ${MYSQL_LIBRARY}") diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 78843991f..2bdc1b050 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -540,6 +540,7 @@ target_sources (rippled PRIVATE #]===============================] src/ripple/nodestore/backend/CassandraFactory.cpp src/ripple/nodestore/backend/RWDBFactory.cpp + src/ripple/nodestore/backend/MySQLFactory.cpp src/ripple/nodestore/backend/MemoryFactory.cpp src/ripple/nodestore/backend/FlatmapFactory.cpp src/ripple/nodestore/backend/NuDBFactory.cpp diff --git a/Builds/CMake/deps/MySQL.cmake b/Builds/CMake/deps/MySQL.cmake new file mode 100644 index 000000000..64b8a02fb --- /dev/null +++ b/Builds/CMake/deps/MySQL.cmake @@ -0,0 +1,54 @@ +#[===================================================================[ + dep: MySQL + MySQL client library integration for rippled +#]===================================================================] + +# Create an IMPORTED target for MySQL +add_library(mysql_client UNKNOWN IMPORTED) + +# Find MySQL client library and headers +find_path(MYSQL_INCLUDE_DIR + NAMES mysql.h + PATHS + /usr/include/mysql + /usr/local/include/mysql + /opt/mysql/mysql/include + DOC "MySQL include directory" +) + +find_library(MYSQL_LIBRARY + NAMES mysqlclient + PATHS + /usr/lib + /usr/lib/x86_64-linux-gnu + /usr/lib/mysql + /usr/local/lib/mysql + /opt/mysql/mysql/lib + DOC "MySQL client library" +) + +# Set properties on the imported target +if(MYSQL_INCLUDE_DIR AND MYSQL_LIBRARY) + set_target_properties(mysql_client PROPERTIES + IMPORTED_LOCATION "${MYSQL_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${MYSQL_INCLUDE_DIR}" + ) + + message(STATUS "Found MySQL include dir: ${MYSQL_INCLUDE_DIR}") + message(STATUS "Found MySQL library: ${MYSQL_LIBRARY}") +else() + message(FATAL_ERROR "Could not find MySQL development files. Please install libmysqlclient-dev") +endif() + +# Add MySQL backend source to rippled sources +list(APPEND rippled_src + src/ripple/nodestore/backend/MySQLBackend.cpp) + +# Link MySQL to rippled +target_link_libraries(ripple_libs + INTERFACE + mysql_client +) + +# Create an alias target for consistency with other deps +add_library(deps::mysql ALIAS mysql_client) diff --git a/CMakeLists.txt b/CMakeLists.txt index d62541fad..8a8acc601 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,7 @@ include(deps/gRPC) include(deps/cassandra) include(deps/Postgres) include(deps/WasmEdge) +include(deps/MySQL) ### diff --git a/src/ripple/app/rdb/backend/MySQLDatabase.h b/src/ripple/app/rdb/backend/MySQLDatabase.h new file mode 100644 index 000000000..f37b7a7db --- /dev/null +++ b/src/ripple/app/rdb/backend/MySQLDatabase.h @@ -0,0 +1,1532 @@ +#ifndef RIPPLE_APP_RDB_BACKEND_MYSQLDATABASE_H_INCLUDED +#define RIPPLE_APP_RDB_BACKEND_MYSQLDATABASE_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +class MySQLDatabase : public SQLiteDatabase +{ +private: + Application& app_; + bool const useTxTables_; + std::unique_ptr mysql_; + + // Schema creation statements + static constexpr auto CREATE_LEDGERS_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS ledgers ( + ledger_seq BIGINT PRIMARY KEY, + ledger_hash VARCHAR(64) UNIQUE NOT NULL, + parent_hash VARCHAR(64) NOT NULL, + total_coins BIGINT NOT NULL, + closing_time BIGINT NOT NULL, + prev_closing_time BIGINT NOT NULL, + close_time_resolution BIGINT NOT NULL, + close_flags INT NOT NULL, + account_hash VARCHAR(64) NOT NULL, + tx_hash VARCHAR(64) NOT NULL + ) + )SQL"; + + static constexpr auto CREATE_TRANSACTIONS_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS transactions ( + tx_hash VARCHAR(64) PRIMARY KEY, + ledger_seq BIGINT NOT NULL, + tx_seq INT NOT NULL, + raw_tx MEDIUMBLOB NOT NULL, + meta_data MEDIUMBLOB NOT NULL, + FOREIGN KEY (ledger_seq) REFERENCES ledgers(ledger_seq) + ) + )SQL"; + + static constexpr auto CREATE_ACCOUNT_TRANSACTIONS_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS account_transactions ( + account_id VARCHAR(64) NOT NULL, + tx_hash VARCHAR(64) NOT NULL, + ledger_seq BIGINT NOT NULL, + tx_seq INT NOT NULL, + PRIMARY KEY (account_id, ledger_seq, tx_seq), + FOREIGN KEY (tx_hash) REFERENCES transactions(tx_hash), + FOREIGN KEY (ledger_seq) REFERENCES ledgers(ledger_seq) + ) + )SQL"; + +public: + MySQLDatabase( + Application& app, + Config const& config, + JobQueue& jobQueue) + : app_(app) + , useTxTables_(config.useTxTables()) + , mysql_(mysql_init(nullptr), mysql_close) + { + if (!mysql_) + throw std::runtime_error("Failed to initialize MySQL"); + + if (!config.mysql.has_value()) + throw std::runtime_error("[mysql_settings] stanza missing from config!"); + + // Read MySQL connection details from config + auto* conn = mysql_real_connect( + mysql_.get(), + config.mysql->host.c_str(), + config.mysql->user.c_str(), + config.mysql->pass.c_str(), + config.mysql->name.c_str(), + config.mysql->port, + nullptr, + 0); + + if (!conn) + throw std::runtime_error( + std::string("Failed to connect to MySQL: ") + + mysql_error(mysql_.get())); + + // Create schema if not exists + if (mysql_query(mysql_.get(), CREATE_LEDGERS_TABLE)) + throw std::runtime_error( + std::string("Failed to create ledgers table: ") + + mysql_error(mysql_.get())); + + if (useTxTables_) + { + if (mysql_query(mysql_.get(), CREATE_TRANSACTIONS_TABLE)) + throw std::runtime_error( + std::string("Failed to create transactions table: ") + + mysql_error(mysql_.get())); + + if (mysql_query(mysql_.get(), CREATE_ACCOUNT_TRANSACTIONS_TABLE)) + throw std::runtime_error( + std::string("Failed to create account_transactions table: ") + + mysql_error(mysql_.get())); + } + } + + bool + saveValidatedLedger( + std::shared_ptr const& ledger, + bool current) override + { + auto j = app_.journal("Ledger"); + auto seq = ledger->info().seq; + + if (!ledger->info().accountHash.isNonZero()) + { + JLOG(j.fatal()) << "AH is zero: " << getJson({*ledger, {}}); + assert(false); + return false; + } + + // Save the ledger header + std::stringstream sql; + sql << "INSERT INTO ledgers (" + << "ledger_seq, ledger_hash, parent_hash, total_coins, " + << "closing_time, prev_closing_time, close_time_resolution, " + << "close_flags, account_hash, tx_hash) VALUES (" + << seq << ", " + << "'" << strHex(ledger->info().hash) << "', " + << "'" << strHex(ledger->info().parentHash) << "', " + << ledger->info().drops.drops() << ", " + << ledger->info().closeTime.time_since_epoch().count() << ", " + << ledger->info().parentCloseTime.time_since_epoch().count() << ", " + << ledger->info().closeTimeResolution.count() << ", " + << ledger->info().closeFlags << ", " + << "'" << strHex(ledger->info().accountHash) << "', " + << "'" << strHex(ledger->info().txHash) << "') " + << "ON DUPLICATE KEY UPDATE " + << "parent_hash = VALUES(parent_hash), " + << "total_coins = VALUES(total_coins), " + << "closing_time = VALUES(closing_time), " + << "prev_closing_time = VALUES(prev_closing_time), " + << "close_time_resolution = VALUES(close_time_resolution), " + << "close_flags = VALUES(close_flags), " + << "account_hash = VALUES(account_hash), " + << "tx_hash = VALUES(tx_hash)"; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + { + JLOG(j.fatal()) << "Failed to save ledger: " << mysql_error(mysql_.get()); + return false; + } + + if (useTxTables_) + { + std::shared_ptr aLedger; + try + { + aLedger = app_.getAcceptedLedgerCache().fetch(ledger->info().hash); + if (!aLedger) + { + aLedger = std::make_shared(ledger, app_); + app_.getAcceptedLedgerCache().canonicalize_replace_client( + ledger->info().hash, aLedger); + } + } + catch (std::exception const&) + { + JLOG(j.warn()) << "An accepted ledger was missing nodes"; + return false; + } + + // Start a transaction for saving all transactions + if (mysql_query(mysql_.get(), "START TRANSACTION")) + { + JLOG(j.fatal()) << "Failed to start transaction: " + << mysql_error(mysql_.get()); + return false; + } + + try + { + for (auto const& acceptedLedgerTx : *aLedger) + { + auto const& txn = acceptedLedgerTx->getTxn(); + auto const& meta = acceptedLedgerTx->getMeta(); + auto const& id = txn->getTransactionID(); + + // Save transaction + std::stringstream txSql; + txSql << "INSERT INTO transactions (" + << "tx_hash, ledger_seq, tx_seq, raw_tx, meta_data) VALUES (" + << "'" << strHex(id) << "', " + << seq << ", " + << acceptedLedgerTx->getTxnSeq() << ", " + << "?, ?) " // Using placeholders for BLOB data + << "ON DUPLICATE KEY UPDATE " + << "ledger_seq = VALUES(ledger_seq), " + << "tx_seq = VALUES(tx_seq), " + << "raw_tx = VALUES(raw_tx), " + << "meta_data = VALUES(meta_data)"; + + MYSQL_STMT* stmt = mysql_stmt_init(mysql_.get()); + if (!stmt) + { + throw std::runtime_error("Failed to initialize statement"); + } + + if (mysql_stmt_prepare(stmt, txSql.str().c_str(), txSql.str().length())) + { + mysql_stmt_close(stmt); + throw std::runtime_error("Failed to prepare statement"); + } + + // Bind parameters for BLOB data + MYSQL_BIND bind[2]; + memset(bind, 0, sizeof(bind)); + + Serializer s; + txn->add(s); + bind[0].buffer_type = MYSQL_TYPE_BLOB; + bind[0].buffer = (void*)s.data(); + bind[0].buffer_length = s.size(); + + Serializer s2; + meta.getAsObject().addWithoutSigningFields(s2); + + bind[1].buffer_type = MYSQL_TYPE_BLOB; + bind[1].buffer = (void*)s2.data(); + bind[1].buffer_length = s2.size(); + + if (mysql_stmt_bind_param(stmt, bind)) + { + mysql_stmt_close(stmt); + throw std::runtime_error("Failed to bind parameters"); + } + + if (mysql_stmt_execute(stmt)) + { + mysql_stmt_close(stmt); + throw std::runtime_error("Failed to execute statement"); + } + + mysql_stmt_close(stmt); + + // Save account transactions + for (auto const& account : meta.getAffectedAccounts()) + { + std::stringstream accTxSql; + accTxSql << "INSERT INTO account_transactions (" + << "account_id, tx_hash, ledger_seq, tx_seq) VALUES (" + << "'" << strHex(account) << "', " + << "'" << strHex(id) << "', " + << seq << ", " + << acceptedLedgerTx->getTxnSeq() << ") " + << "ON DUPLICATE KEY UPDATE " + << "tx_hash = VALUES(tx_hash)"; + + if (mysql_query(mysql_.get(), accTxSql.str().c_str())) + { + throw std::runtime_error(mysql_error(mysql_.get())); + } + } + + app_.getMasterTransaction().inLedger( + id, seq, acceptedLedgerTx->getTxnSeq(), app_.config().NETWORK_ID); + } + + if (mysql_query(mysql_.get(), "COMMIT")) + { + throw std::runtime_error(mysql_error(mysql_.get())); + } + } + catch (std::exception const& e) + { + JLOG(j.fatal()) << "Error saving transactions: " << e.what(); + mysql_query(mysql_.get(), "ROLLBACK"); + return false; + } + } + + return true; + } + + std::optional + getMinLedgerSeq() override + { + if (mysql_query(mysql_.get(), "SELECT MIN(ledger_seq) FROM ledgers")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return seq; + } + + std::optional + getTransactionsMinLedgerSeq() override + { + if (!useTxTables_) + return {}; + + if (mysql_query(mysql_.get(), "SELECT MIN(ledger_seq) FROM transactions")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return seq; + } + + std::optional + getMaxLedgerSeq() override + { + if (mysql_query(mysql_.get(), "SELECT MAX(ledger_seq) FROM ledgers")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return seq; + } + + void + deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) override + { + if (!useTxTables_) + return; + + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq = " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + + sql.str(""); + sql << "DELETE FROM transactions WHERE ledger_seq = " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + } + + void + deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + if (useTxTables_) + { + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq < " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + + sql.str(""); + sql << "DELETE FROM transactions WHERE ledger_seq < " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + } + + std::stringstream sql; + sql << "DELETE FROM ledgers WHERE ledger_seq < " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + } + + void + deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + if (!useTxTables_) + return; + + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq < " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + + sql.str(""); + sql << "DELETE FROM transactions WHERE ledger_seq < " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + } + + void + deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + if (!useTxTables_) + return; + + std::stringstream sql; + sql << "DELETE FROM account_transactions WHERE ledger_seq < " << ledgerSeq; + mysql_query(mysql_.get(), sql.str().c_str()); + } + + std::size_t + getTransactionCount() override + { + if (!useTxTables_) + return 0; + + if (mysql_query(mysql_.get(), "SELECT COUNT(*) FROM transactions")) + return 0; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return 0; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return 0; + } + + std::size_t count = std::stoull(row[0]); + mysql_free_result(result); + return count; + } + + std::size_t + getAccountTransactionCount() override + { + if (!useTxTables_) + return 0; + + if (mysql_query(mysql_.get(), "SELECT COUNT(*) FROM account_transactions")) + return 0; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return 0; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return 0; + } + + std::size_t count = std::stoull(row[0]); + mysql_free_result(result); + return count; + } + + CountMinMax + getLedgerCountMinMax() override + { + if (mysql_query(mysql_.get(), + "SELECT COUNT(*), MIN(ledger_seq), MAX(ledger_seq) FROM ledgers")) + return {0, 0, 0}; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return {0, 0, 0}; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0] || !row[1] || !row[2]) + { + mysql_free_result(result); + return {0, 0, 0}; + } + + CountMinMax ret{ + std::stoull(row[0]), + static_cast(std::stoll(row[1])), + static_cast(std::stoll(row[2])) + }; + mysql_free_result(result); + return ret; + } + + std::optional + getLedgerInfoByIndex(LedgerIndex ledgerSeq) override + { + std::stringstream sql; + sql << "SELECT ledger_hash, parent_hash, total_coins, closing_time, " + << "prev_closing_time, close_time_resolution, close_flags, " + << "account_hash, tx_hash FROM ledgers WHERE ledger_seq = " + << ledgerSeq; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerInfo info; + info.seq = ledgerSeq; + info.hash = uint256(row[0]); + info.parentHash = uint256(row[1]); + info.drops = XRPAmount(std::stoull(row[2])); + info.closeTime = NetClock::time_point{NetClock::duration{std::stoll(row[3])}}; + info.parentCloseTime = NetClock::time_point{NetClock::duration{std::stoll(row[4])}}; + info.closeTimeResolution = NetClock::duration{std::stoll(row[5])}; + info.closeFlags = std::stoul(row[6]); + info.accountHash = uint256(row[7]); + info.txHash = uint256(row[8]); + + mysql_free_result(result); + return info; + } + + std::optional + getLimitedOldestLedgerInfo(LedgerIndex ledgerFirstIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= " + << ledgerFirstIndex << " ORDER BY ledger_seq ASC LIMIT 1"; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + std::optional + getLimitedNewestLedgerInfo(LedgerIndex ledgerFirstIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= " + << ledgerFirstIndex << " ORDER BY ledger_seq DESC LIMIT 1"; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + std::optional + getLedgerInfoByHash(uint256 const& ledgerHash) override + { + std::stringstream sql; + sql << "SELECT ledger_seq FROM ledgers WHERE ledger_hash = '" + << strHex(ledgerHash) << "'"; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = std::stoll(row[0]); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + uint256 + getHashByIndex(LedgerIndex ledgerIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_hash FROM ledgers WHERE ledger_seq = " + << ledgerIndex; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return uint256(); + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return uint256(); + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return uint256(); + } + + uint256 hash(row[0]); + mysql_free_result(result); + return hash; + } + + std::optional + getHashesByIndex(LedgerIndex ledgerIndex) override + { + std::stringstream sql; + sql << "SELECT ledger_hash, parent_hash FROM ledgers WHERE ledger_seq = " + << ledgerIndex; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0] || !row[1]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerHashPair pair{uint256(row[0]), uint256(row[1])}; + mysql_free_result(result); + return pair; + } + + std::map + getHashesByIndex(LedgerIndex minSeq, LedgerIndex maxSeq) override + { + std::map result; + std::stringstream sql; + sql << "SELECT ledger_seq, ledger_hash, parent_hash FROM ledgers " + << "WHERE ledger_seq BETWEEN " << minSeq << " AND " << maxSeq + << " ORDER BY ledger_seq"; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + LedgerIndex const seq = + static_cast(std::stoull(row[0])); + result.emplace(seq, + LedgerHashPair{uint256{row[1]}, uint256{row[2]}}); + } + + mysql_free_result(sqlResult); + return result; + } + + std::optional + getAccountTransactionsMinLedgerSeq() override + { + if (!useTxTables_) + return {}; + + if (mysql_query(mysql_.get(), + "SELECT MIN(ledger_seq) FROM account_transactions")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = static_cast(std::stoull(row[0])); + mysql_free_result(result); + return seq; + } + + + std::optional + getNewestLedgerInfo() override + { + if (mysql_query(mysql_.get(), + "SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1")) + return std::nullopt; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return std::nullopt; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return std::nullopt; + } + + LedgerIndex seq = static_cast(std::stoull(row[0])); + mysql_free_result(result); + return getLedgerInfoByIndex(seq); + } + + std::variant + getTransaction( + uint256 const& id, + std::optional> const& range, + error_code_i& ec) override + { + if (!useTxTables_) + return TxSearched::unknown; + + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM transactions t WHERE t.tx_hash = '" << strHex(id) << "'"; + + if (range) + { + sql << " AND t.ledger_seq BETWEEN " + << range->first() << " AND " << range->last(); + } + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return TxSearched::unknown; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return TxSearched::unknown; + + MYSQL_ROW row = mysql_fetch_row(result); + if (!row) + { + mysql_free_result(result); + if (range) + { + sql.str(""); + sql << "SELECT COUNT(*) FROM ledgers WHERE ledger_seq BETWEEN " + << range->first() << " AND " << range->last(); + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return TxSearched::unknown; + + result = mysql_store_result(mysql_.get()); + if (!result) + return TxSearched::unknown; + + row = mysql_fetch_row(result); + if (!row || !row[0]) + { + mysql_free_result(result); + return TxSearched::unknown; + } + + std::size_t count = std::stoull(row[0]); + mysql_free_result(result); + + return (count == (range->last() - range->first() + 1)) + ? TxSearched::all + : TxSearched::some; + } + return TxSearched::unknown; + } + + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + { + mysql_free_result(result); + return TxSearched::unknown; + } + + // Deserialize transaction and metadata + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + + auto meta = std::make_shared( + id, static_cast(std::stoull(row[2])), Blob(row[1], row[1] + lengths[1])); + + mysql_free_result(result); + + AccountTx at; + std::string reason; + at.first = std::make_shared(txn, reason, app_); + at.first->setStatus(COMMITTED); + at.first->setLedger(static_cast(std::stoull(row[2]))); + at.second = meta; + + return at; + } + catch (std::exception const&) + { + mysql_free_result(result); + return TxSearched::unknown; + } + } + + std::pair> + oldestAccountTxPage(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + static std::uint32_t const page_length(200); + auto onUnsavedLedger = + std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1); + AccountTxs ret; + Application& app = app_; + auto onTransaction = [&ret, &app]( + std::uint32_t ledger_index, + std::string const& status, + Blob&& rawTxn, + Blob&& rawMeta) { + convertBlobsToTxResult( + ret, ledger_index, status, rawTxn, rawMeta, app); + }; + + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger + << " ORDER BY at.ledger_seq, at.tx_seq" + << " LIMIT " << (options.limit + 1); + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return {}; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(result)) && + (!options.limit || count < options.limit)) + { + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + continue; + + Blob rawTxn(row[0], row[0] + lengths[0]); + Blob rawMeta(row[1], row[1] + lengths[1]); + std::uint32_t ledgerSeq = + static_cast(std::stoull(row[2])); + std::uint32_t txSeq = + static_cast(std::stoull(row[3])); + + if (count == options.limit) + { + marker = AccountTxMarker{ledgerSeq, txSeq}; + break; + } + + onTransaction(ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta)); + ++count; + } + + mysql_free_result(result); + return {ret, marker}; + } + + std::pair> + newestAccountTxPage(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + static std::uint32_t const page_length(200); + auto onUnsavedLedger = + std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1); + AccountTxs ret; + Application& app = app_; + auto onTransaction = [&ret, &app]( + std::uint32_t ledger_index, + std::string const& status, + Blob&& rawTxn, + Blob&& rawMeta) { + convertBlobsToTxResult( + ret, ledger_index, status, rawTxn, rawMeta, app); + }; + + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger + << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC" + << " LIMIT " << (options.limit + 1); + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return {}; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(result)) && + (!options.limit || count < options.limit)) + { + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + continue; + + Blob rawTxn(row[0], row[0] + lengths[0]); + Blob rawMeta(row[1], row[1] + lengths[1]); + std::uint32_t ledgerSeq = + static_cast(std::stoull(row[2])); + std::uint32_t txSeq = + static_cast(std::stoull(row[3])); + + if (count == options.limit) + { + marker = AccountTxMarker{ledgerSeq, txSeq}; + break; + } + + onTransaction(ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta)); + ++count; + } + + mysql_free_result(result); + return {ret, marker}; + } + + bool + ledgerDbHasSpace(Config const&) override + { + // MySQL manages its own space + return true; + } + + std::vector> + getTxHistory(LedgerIndex startIndex) override + { + if (!useTxTables_) + return {}; + + std::vector> result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.ledger_seq " + << "FROM transactions t " + << "ORDER BY t.ledger_seq DESC, t.tx_seq DESC " + << "LIMIT 20 OFFSET " << startIndex; + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + std::string reason; + auto tx = std::make_shared( + txn, reason, app_); + + auto const ledgerSeq = + static_cast(std::stoull(row[1])); + tx->setStatus(COMMITTED); + tx->setLedger(ledgerSeq); + result.push_back(tx); + } + catch (std::exception const&) + { + // Skip any malformed transactions + continue; + } + } + + mysql_free_result(sqlResult); + return result; + } + + bool + transactionDbHasSpace(Config const&) override + { + // MySQL manages its own space + return true; + } + + AccountTxs + getOldestAccountTxs(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + AccountTxs result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger + << " ORDER BY at.ledger_seq ASC, at.tx_seq ASC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + std::string reason; + auto tx = std::make_shared( + txn, reason, app_); + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + auto meta = std::make_shared( + txn->getTransactionID(), + ledgerSeq, + Blob(row[1], row[1] + lengths[1])); + + tx->setStatus(COMMITTED); + tx->setLedger(ledgerSeq); + + result.emplace_back(tx, meta); + } + catch (std::exception const&) + { + continue; + } + } + + mysql_free_result(sqlResult); + return result; + } + + AccountTxs + getNewestAccountTxs(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + AccountTxs result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger + << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + try + { + SerialIter sit(row[0], lengths[0]); + auto txn = std::make_shared(sit); + std::string reason; + auto tx = std::make_shared( + txn, reason, app_); + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + auto meta = std::make_shared( + txn->getTransactionID(), + ledgerSeq, + Blob(row[1], row[1] + lengths[1])); + + tx->setStatus(COMMITTED); + tx->setLedger(ledgerSeq); + + result.emplace_back(tx, meta); + } + catch (std::exception const&) + { + continue; + } + } + + mysql_free_result(sqlResult); + return result; + } + + MetaTxsList + getOldestAccountTxsB(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger + << " ORDER BY at.ledger_seq ASC, at.tx_seq ASC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + } + + mysql_free_result(sqlResult); + return result; + } + + MetaTxsList + getNewestAccountTxsB(AccountTxOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger + << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC "; + + if (!options.bUnlimited) + { + sql << "LIMIT " << options.limit; + if (options.offset) + sql << " OFFSET " << options.offset; + } + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return result; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return result; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(sqlResult))) + { + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + } + + mysql_free_result(sqlResult); + return result; + } + + std::pair> + oldestAccountTxPageB(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger; + + if (options.marker) + { + sql << " AND (at.ledger_seq > " << options.marker->ledgerSeq + << " OR (at.ledger_seq = " << options.marker->ledgerSeq + << " AND at.tx_seq > " << options.marker->txnSeq << "))"; + } + + sql << " ORDER BY at.ledger_seq ASC, at.tx_seq ASC "; + sql << "LIMIT " << (options.limit + 1); + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return {}; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(sqlResult))) + { + if (count >= options.limit) + { + marker = AccountTxMarker{ + static_cast(std::stoull(row[2])), + static_cast(std::stoull(row[3])) + }; + break; + } + + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + + ++count; + } + + mysql_free_result(sqlResult); + return {result, marker}; + } + + std::pair> + newestAccountTxPageB(AccountTxPageOptions const& options) override + { + if (!useTxTables_) + return {}; + + MetaTxsList result; + std::stringstream sql; + sql << "SELECT t.raw_tx, t.meta_data, t.ledger_seq, t.tx_seq " + << "FROM account_transactions at " + << "JOIN transactions t ON at.tx_hash = t.tx_hash " + << "WHERE at.account_id = '" << strHex(options.account) << "' " + << "AND at.ledger_seq BETWEEN " << options.minLedger + << " AND " << options.maxLedger; + + if (options.marker) + { + sql << " AND (at.ledger_seq < " << options.marker->ledgerSeq + << " OR (at.ledger_seq = " << options.marker->ledgerSeq + << " AND at.tx_seq < " << options.marker->txnSeq << "))"; + } + + sql << " ORDER BY at.ledger_seq DESC, at.tx_seq DESC "; + sql << "LIMIT " << (options.limit + 1); + + if (mysql_query(mysql_.get(), sql.str().c_str())) + return {}; + + MYSQL_RES* sqlResult = mysql_store_result(mysql_.get()); + if (!sqlResult) + return {}; + + std::optional marker; + std::size_t count = 0; + MYSQL_ROW row; + + while ((row = mysql_fetch_row(sqlResult))) + { + if (count >= options.limit) + { + marker = AccountTxMarker{ + static_cast(std::stoull(row[2])), + static_cast(std::stoull(row[3])) + }; + break; + } + + unsigned long* lengths = mysql_fetch_lengths(sqlResult); + if (!lengths) + continue; + + auto const ledgerSeq = + static_cast(std::stoull(row[2])); + + result.emplace_back( + Blob(row[0], row[0] + lengths[0]), + Blob(row[1], row[1] + lengths[1]), + ledgerSeq); + + ++count; + } + + mysql_free_result(sqlResult); + return {result, marker}; + } + + std::uint32_t + getKBUsedAll() override + { + std::uint32_t total = 0; + + // Get ledger table size + if (!mysql_query(mysql_.get(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND table_name = 'ledgers'")) + { + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total += static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + + // Get transaction tables size + if (useTxTables_) + { + if (!mysql_query(mysql_.get(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND (table_name = 'transactions' " + "OR table_name = 'account_transactions')")) + { + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total += static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + } + + return total; + } + + std::uint32_t + getKBUsedLedger() override + { + std::uint32_t total = 0; + + if (!mysql_query(mysql_.get(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND table_name = 'ledgers'")) + { + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total = static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + + return total; + } + + std::uint32_t + getKBUsedTransaction() override + { + if (!useTxTables_) + return 0; + + std::uint32_t total = 0; + + if (!mysql_query(mysql_.get(), + "SELECT ROUND(SUM(data_length + index_length) / 1024) " + "FROM information_schema.tables " + "WHERE table_schema = DATABASE() " + "AND (table_name = 'transactions' " + "OR table_name = 'account_transactions')")) + { + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (result) + { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) + total = static_cast(std::stoull(row[0])); + mysql_free_result(result); + } + } + + return total; + } + + void + closeLedgerDB() override + { + // No explicit closing needed for MySQL + // The connection will be closed when mysql_ is destroyed + } + + void + closeTransactionDB() override + { + // No explicit closing needed for MySQL + // The connection will be closed when mysql_ is destroyed + } + +}; + +// Factory function +std::unique_ptr +getMySQLDatabase(Application& app, Config const& config, JobQueue& jobQueue) +{ + return std::make_unique(app, config, jobQueue); +} +} // namespace ripple +#endif // RIPPLE_APP_RDB_BACKEND_MYSQLDATABASE_H_INCLUDED diff --git a/src/ripple/app/rdb/impl/RelationalDatabase.cpp b/src/ripple/app/rdb/impl/RelationalDatabase.cpp index 64161bd53..cc91f716b 100644 --- a/src/ripple/app/rdb/impl/RelationalDatabase.cpp +++ b/src/ripple/app/rdb/impl/RelationalDatabase.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ RelationalDatabase::init( bool use_postgres = false; bool use_rwdb = false; bool use_flatmap = false; + bool use_mysql = false; if (config.reporting()) { @@ -64,6 +66,10 @@ RelationalDatabase::init( { use_flatmap = true; } + else if (boost::iequals(get(rdb_section, "backend"), "mysql")) + { + use_mysql = true; + } else { Throw( @@ -93,6 +99,10 @@ RelationalDatabase::init( { return getFlatmapDatabase(app, config, jobQueue); } + else if (use_mysql) + { + return getMySQLDatabase(app, config, jobQueue); + } return std::unique_ptr(); } diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 3e2c3c81a..4e5754797 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -175,6 +175,17 @@ public: // Network parameters uint32_t NETWORK_ID = 0; + struct MysqlSettings + { + std::string host; + std::string user; + std::string pass; + std::string name; + uint16_t port; + }; + + std::optional mysql; + // DEPRECATED - Fee units for a reference transction. // Only provided for backwards compatibility in a couple of places static constexpr std::uint32_t FEE_UNITS_DEPRECATED = 10; diff --git a/src/ripple/core/ConfigSections.h b/src/ripple/core/ConfigSections.h index def5b3c82..a20e16135 100644 --- a/src/ripple/core/ConfigSections.h +++ b/src/ripple/core/ConfigSections.h @@ -102,6 +102,7 @@ struct ConfigSection #define SECTION_NETWORK_ID "network_id" #define SECTION_IMPORT_VL_KEYS "import_vl_keys" #define SECTION_DATAGRAM_MONITOR "datagram_monitor" +#define SECTION_MYSQL_SETTINGS "mysql_settings" } // namespace ripple diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index 7673d16ec..076439a50 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -756,6 +756,30 @@ Config::loadFromString(std::string const& fileContents) SERVER_DOMAIN = strTemp; } + if (exists(SECTION_MYSQL_SETTINGS)) + { + auto const sec = section(SECTION_MYSQL_SETTINGS); + if (!sec.exists("host") || !sec.exists("user") || !sec.exists("pass") || + !sec.exists("port") || !sec.exists("name")) + { + Throw( + "[mysql_settings] requires host=, user=, pass=, name= and " + "port= keys."); + } + + MysqlSettings my; + + my.host = *sec.get("host"); + my.user = *sec.get("user"); + my.pass = *sec.get("pass"); + my.pass = *sec.get("name"); + + std::string portStr = *sec.get("port"); + my.port = beast::lexicalCastThrow(portStr); + + mysql = my; + } + if (exists(SECTION_OVERLAY)) { auto const sec = section(SECTION_OVERLAY); diff --git a/src/ripple/nodestore/backend/MySQLFactory.cpp b/src/ripple/nodestore/backend/MySQLFactory.cpp new file mode 100644 index 000000000..7f1ee366c --- /dev/null +++ b/src/ripple/nodestore/backend/MySQLFactory.cpp @@ -0,0 +1,420 @@ +#ifndef RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED +#define RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { +namespace NodeStore { + +class MySQLBackend : public Backend +{ +private: + std::string const name_; + beast::Journal journal_; + bool isOpen_{false}; + std::unique_ptr mysql_; + + static constexpr auto CREATE_NODES_TABLE = R"SQL( + CREATE TABLE IF NOT EXISTS nodes ( + hash BINARY(32) PRIMARY KEY, + data MEDIUMBLOB NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) ENGINE=InnoDB + )SQL"; + +public: + MySQLBackend( + std::size_t keyBytes, + Section const& keyValues, + beast::Journal journal) + : name_(get(keyValues, "path", "nodestore")) + , journal_(journal) + , mysql_(mysql_init(nullptr), mysql_close) + { + if (!mysql_) + Throw("Failed to initialize MySQL"); + + std::string const host = get(keyValues, "host", "localhost"); + std::string const user = get(keyValues, "user", "ripple"); + std::string const password = get(keyValues, "pass", ""); + std::string const database = get(keyValues, "db", "rippledb"); + uint16_t const port = + static_cast(std::stoul(get(keyValues, "port", "3306"))); + + auto* conn = mysql_real_connect( + mysql_.get(), + host.c_str(), + user.c_str(), + password.c_str(), + database.c_str(), + port, + nullptr, + 0); + + if (!conn) + { + Throw( + std::string("Failed to connect to MySQL: ") + + mysql_error(mysql_.get())); + } + + uint8_t const reconnect = 1; + mysql_options(mysql_.get(), MYSQL_OPT_RECONNECT, &reconnect); + } + + ~MySQLBackend() override + { + close(); + } + + std::string + getName() override + { + return name_; + } + + void + open(bool createIfMissing) override + { + if (isOpen_) + Throw("already open"); + + if (createIfMissing) + { + if (mysql_query(mysql_.get(), CREATE_NODES_TABLE)) + { + Throw( + std::string("Failed to create nodes table: ") + + mysql_error(mysql_.get())); + } + } + + isOpen_ = true; + } + + bool + isOpen() override + { + return isOpen_; + } + + void + close() override + { + isOpen_ = false; + } + + Status + fetch(void const* key, std::shared_ptr* pObject) override + { + if (!isOpen_) + return notFound; + + uint256 const hash(uint256::fromVoid(key)); + + MYSQL_STMT* stmt = mysql_stmt_init(mysql_.get()); + if (!stmt) + return dataCorrupt; + + std::string const sql = + "SELECT data FROM nodes WHERE hash = ?"; + + if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length())) + { + mysql_stmt_close(stmt); + return dataCorrupt; + } + + MYSQL_BIND bindParam; + std::memset(&bindParam, 0, sizeof(bindParam)); + bindParam.buffer_type = MYSQL_TYPE_BLOB; + bindParam.buffer = const_cast(static_cast(hash.data())); + bindParam.buffer_length = hash.size(); + + if (mysql_stmt_bind_param(stmt, &bindParam)) + { + mysql_stmt_close(stmt); + return dataCorrupt; + } + + if (mysql_stmt_execute(stmt)) + { + mysql_stmt_close(stmt); + return notFound; + } + + MYSQL_BIND bindResult; + std::memset(&bindResult, 0, sizeof(bindResult)); + uint64_t length = 0; + bool is_null = false; + bindResult.buffer_type = MYSQL_TYPE_BLOB; + bindResult.length = &length; + bindResult.is_null = &is_null; + + if (mysql_stmt_bind_result(stmt, &bindResult)) + { + mysql_stmt_close(stmt); + return dataCorrupt; + } + + if (mysql_stmt_store_result(stmt)) + { + mysql_stmt_close(stmt); + return dataCorrupt; + } + + if (mysql_stmt_num_rows(stmt) == 0) + { + mysql_stmt_close(stmt); + return notFound; + } + + if (mysql_stmt_fetch(stmt)) + { + mysql_stmt_close(stmt); + return dataCorrupt; + } + + std::vector buffer(length); + bindResult.buffer = buffer.data(); + bindResult.buffer_length = length; + + if (mysql_stmt_fetch_column(stmt, &bindResult, 0, 0)) + { + mysql_stmt_close(stmt); + return dataCorrupt; + } + + mysql_stmt_close(stmt); + + nudb::detail::buffer decompressed; + auto const result = + nodeobject_decompress(buffer.data(), buffer.size(), decompressed); + + DecodedBlob decoded(hash.data(), result.first, result.second); + if (!decoded.wasOk()) + return dataCorrupt; + + *pObject = decoded.createObject(); + return ok; + } + + std::pair>, Status> + fetchBatch(std::vector const& hashes) override + { + std::vector> results; + results.reserve(hashes.size()); + + if (!isOpen_) + return {results, notFound}; + + if (mysql_query(mysql_.get(), "START TRANSACTION")) + return {results, dataCorrupt}; + + try + { + for (auto const& h : hashes) + { + std::shared_ptr nObj; + Status status = fetch(h->begin(), &nObj); + results.push_back(status == ok ? nObj : nullptr); + } + + if (mysql_query(mysql_.get(), "COMMIT")) + return {results, dataCorrupt}; + + return {results, ok}; + } + catch (...) + { + mysql_query(mysql_.get(), "ROLLBACK"); + throw; + } + } + + void + store(std::shared_ptr const& object) override + { + if (!isOpen_ || !object) + return; + + EncodedBlob encoded(object); + nudb::detail::buffer compressed; + auto const result = + nodeobject_compress(encoded.getData(), encoded.getSize(), compressed); + + MYSQL_STMT* stmt = mysql_stmt_init(mysql_.get()); + if (!stmt) + return; + + std::string const sql = + "INSERT INTO nodes (hash, data) VALUES (?, ?) " + "ON DUPLICATE KEY UPDATE data = VALUES(data)"; + + if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length())) + { + mysql_stmt_close(stmt); + return; + } + + MYSQL_BIND bind[2]; + std::memset(bind, 0, sizeof(bind)); + + auto const& hash = object->getHash(); + bind[0].buffer_type = MYSQL_TYPE_BLOB; + bind[0].buffer = const_cast(static_cast(hash.data())); + bind[0].buffer_length = hash.size(); + + bind[1].buffer_type = MYSQL_TYPE_BLOB; + bind[1].buffer = const_cast(static_cast(result.first)); + bind[1].buffer_length = result.second; + + if (mysql_stmt_bind_param(stmt, bind)) + { + mysql_stmt_close(stmt); + return; + } + + if (mysql_stmt_execute(stmt)) + { + mysql_stmt_close(stmt); + return; + } + + mysql_stmt_close(stmt); + } + + void + storeBatch(Batch const& batch) override + { + if (!isOpen_) + return; + + if (mysql_query(mysql_.get(), "START TRANSACTION")) + return; + + try + { + for (auto const& e : batch) + store(e); + + if (mysql_query(mysql_.get(), "COMMIT")) + mysql_query(mysql_.get(), "ROLLBACK"); + } + catch (...) + { + mysql_query(mysql_.get(), "ROLLBACK"); + throw; + } + } + + void + sync() override + { + } + + void + for_each(std::function)> f) override + { + if (!isOpen_) + return; + + if (mysql_query(mysql_.get(), + "SELECT hash, data FROM nodes ORDER BY created_at")) + return; + + MYSQL_RES* result = mysql_store_result(mysql_.get()); + if (!result) + return; + + MYSQL_ROW row; + while ((row = mysql_fetch_row(result))) + { + unsigned long* lengths = mysql_fetch_lengths(result); + if (!lengths) + continue; + + nudb::detail::buffer decompressed; + auto const decomp_result = nodeobject_decompress( + row[1], + static_cast(lengths[1]), + decompressed); + + DecodedBlob decoded( + row[0], + decomp_result.first, + decomp_result.second); + + if (decoded.wasOk()) + f(decoded.createObject()); + } + + mysql_free_result(result); + } + + int + getWriteLoad() override + { + return 0; + } + + void + setDeletePath() override + { + close(); + } + + int + fdRequired() const override + { + return 1; + } +}; + +class MySQLFactory : public Factory +{ +public: + MySQLFactory() + { + Manager::instance().insert(*this); + } + + ~MySQLFactory() override + { + Manager::instance().erase(*this); + } + + std::string + getName() const override + { + return "MySQL"; + } + + std::unique_ptr + createInstance( + std::size_t keyBytes, + Section const& keyValues, + std::size_t burstSize, + Scheduler& scheduler, + beast::Journal journal) override + { + return std::make_unique(keyBytes, keyValues, journal); + } +}; + +static MySQLFactory mysqlFactory; + +} // namespace NodeStore +} // namespace ripple + +#endif // RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED