mysql debugging

This commit is contained in:
Richard Holland
2025-02-13 17:31:39 +11:00
parent 53bf63af7b
commit c98d9b58de
5 changed files with 283 additions and 164 deletions

View File

@@ -13,13 +13,13 @@
namespace ripple {
class MySQLDatabase : public SQLiteDatabase
class MySQLDatabase : public SQLiteDatabase
{
private:
Application& app_;
bool const useTxTables_;
std::unique_ptr<MYSQL, decltype(&mysql_close)> mysql_;
// Schema creation statements
static constexpr auto CREATE_LEDGERS_TABLE = R"SQL(
CREATE TABLE IF NOT EXISTS ledgers (
@@ -60,10 +60,22 @@ private:
)SQL";
public:
MySQLDatabase(
Application& app,
Config const& config,
JobQueue& jobQueue)
// In the MySQLDatabase constructor, after the mysql_real_connect call:
// Add this to the private section with other table definitions
static constexpr auto CREATE_NODES_TABLE = R"SQL(
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
public_key VARCHAR(64) NOT NULL,
ledger_hash VARCHAR(64) NOT NULL,
type VARCHAR(32) NOT NULL,
data MEDIUMBLOB NOT NULL,
UNIQUE INDEX idx_key_hash (public_key, ledger_hash)
)
)SQL";
// Then modify the constructor:
MySQLDatabase(Application& app, Config const& config, JobQueue& jobQueue)
: app_(app)
, useTxTables_(config.useTxTables())
, mysql_(mysql_init(nullptr), mysql_close)
@@ -72,7 +84,8 @@ public:
throw std::runtime_error("Failed to initialize MySQL");
if (!config.mysql.has_value())
throw std::runtime_error("[mysql_settings] stanza missing from config!");
throw std::runtime_error(
"[mysql_settings] stanza missing from config!");
// Read MySQL connection details from config
auto* conn = mysql_real_connect(
@@ -80,7 +93,7 @@ public:
config.mysql->host.c_str(),
config.mysql->user.c_str(),
config.mysql->pass.c_str(),
config.mysql->name.c_str(),
nullptr, // Don't select database in connection
config.mysql->port,
nullptr,
0);
@@ -90,6 +103,28 @@ public:
std::string("Failed to connect to MySQL: ") +
mysql_error(mysql_.get()));
// Create database if it doesn't exist
std::string create_db =
"CREATE DATABASE IF NOT EXISTS " + config.mysql->name;
std::cout << "create_db: `" << create_db << "`\n";
if (mysql_query(mysql_.get(), create_db.c_str()))
throw std::runtime_error(
std::string("Failed to create database (2): ") +
mysql_error(mysql_.get()));
// Select the database
if (mysql_select_db(mysql_.get(), config.mysql->name.c_str()))
throw std::runtime_error(
std::string("Failed to select database: ") +
mysql_error(mysql_.get()));
// Create nodes table first
if (mysql_query(mysql_.get(), CREATE_NODES_TABLE))
throw std::runtime_error(
std::string("Failed to create nodes table: ") +
mysql_error(mysql_.get()));
// Create schema if not exists
if (mysql_query(mysql_.get(), CREATE_LEDGERS_TABLE))
throw std::runtime_error(
@@ -105,7 +140,8 @@ public:
if (mysql_query(mysql_.get(), CREATE_ACCOUNT_TRANSACTIONS_TABLE))
throw std::runtime_error(
std::string("Failed to create account_transactions table: ") +
std::string(
"Failed to create account_transactions table: ") +
mysql_error(mysql_.get()));
}
}
@@ -130,8 +166,7 @@ public:
sql << "INSERT INTO ledgers ("
<< "ledger_seq, ledger_hash, parent_hash, total_coins, "
<< "closing_time, prev_closing_time, close_time_resolution, "
<< "close_flags, account_hash, tx_hash) VALUES ("
<< seq << ", "
<< "close_flags, account_hash, tx_hash) VALUES (" << seq << ", "
<< "'" << strHex(ledger->info().hash) << "', "
<< "'" << strHex(ledger->info().parentHash) << "', "
<< ledger->info().drops.drops() << ", "
@@ -153,7 +188,8 @@ public:
if (mysql_query(mysql_.get(), sql.str().c_str()))
{
JLOG(j.fatal()) << "Failed to save ledger: " << mysql_error(mysql_.get());
JLOG(j.fatal())
<< "Failed to save ledger: " << mysql_error(mysql_.get());
return false;
}
@@ -162,7 +198,8 @@ public:
std::shared_ptr<AcceptedLedger> aLedger;
try
{
aLedger = app_.getAcceptedLedgerCache().fetch(ledger->info().hash);
aLedger =
app_.getAcceptedLedgerCache().fetch(ledger->info().hash);
if (!aLedger)
{
aLedger = std::make_shared<AcceptedLedger>(ledger, app_);
@@ -179,8 +216,8 @@ public:
// Start a transaction for saving all transactions
if (mysql_query(mysql_.get(), "START TRANSACTION"))
{
JLOG(j.fatal()) << "Failed to start transaction: "
<< mysql_error(mysql_.get());
JLOG(j.fatal()) << "Failed to start transaction: "
<< mysql_error(mysql_.get());
return false;
}
@@ -195,9 +232,9 @@ public:
// Save transaction
std::stringstream txSql;
txSql << "INSERT INTO transactions ("
<< "tx_hash, ledger_seq, tx_seq, raw_tx, meta_data) VALUES ("
<< "'" << strHex(id) << "', "
<< seq << ", "
<< "tx_hash, ledger_seq, tx_seq, raw_tx, meta_data) "
"VALUES ("
<< "'" << strHex(id) << "', " << seq << ", "
<< acceptedLedgerTx->getTxnSeq() << ", "
<< "?, ?) " // Using placeholders for BLOB data
<< "ON DUPLICATE KEY UPDATE "
@@ -209,10 +246,12 @@ public:
MYSQL_STMT* stmt = mysql_stmt_init(mysql_.get());
if (!stmt)
{
throw std::runtime_error("Failed to initialize statement");
throw std::runtime_error(
"Failed to initialize statement");
}
if (mysql_stmt_prepare(stmt, txSql.str().c_str(), txSql.str().length()))
if (mysql_stmt_prepare(
stmt, txSql.str().c_str(), txSql.str().length()))
{
mysql_stmt_close(stmt);
throw std::runtime_error("Failed to prepare statement");
@@ -230,7 +269,7 @@ public:
Serializer s2;
meta.getAsObject().addWithoutSigningFields(s2);
bind[1].buffer_type = MYSQL_TYPE_BLOB;
bind[1].buffer = (void*)s2.data();
bind[1].buffer_length = s2.size();
@@ -254,13 +293,13 @@ public:
{
std::stringstream accTxSql;
accTxSql << "INSERT INTO account_transactions ("
<< "account_id, tx_hash, ledger_seq, tx_seq) VALUES ("
<< "'" << strHex(account) << "', "
<< "'" << strHex(id) << "', "
<< seq << ", "
<< acceptedLedgerTx->getTxnSeq() << ") "
<< "ON DUPLICATE KEY UPDATE "
<< "tx_hash = VALUES(tx_hash)";
<< "account_id, tx_hash, ledger_seq, tx_seq) "
"VALUES ("
<< "'" << strHex(account) << "', "
<< "'" << strHex(id) << "', " << seq << ", "
<< acceptedLedgerTx->getTxnSeq() << ") "
<< "ON DUPLICATE KEY UPDATE "
<< "tx_hash = VALUES(tx_hash)";
if (mysql_query(mysql_.get(), accTxSql.str().c_str()))
{
@@ -269,7 +308,10 @@ public:
}
app_.getMasterTransaction().inLedger(
id, seq, acceptedLedgerTx->getTxnSeq(), app_.config().NETWORK_ID);
id,
seq,
acceptedLedgerTx->getTxnSeq(),
app_.config().NETWORK_ID);
}
if (mysql_query(mysql_.get(), "COMMIT"))
@@ -316,7 +358,8 @@ public:
if (!useTxTables_)
return {};
if (mysql_query(mysql_.get(), "SELECT MIN(ledger_seq) FROM transactions"))
if (mysql_query(
mysql_.get(), "SELECT MIN(ledger_seq) FROM transactions"))
return std::nullopt;
MYSQL_RES* result = mysql_store_result(mysql_.get());
@@ -364,7 +407,8 @@ public:
return;
std::stringstream sql;
sql << "DELETE FROM account_transactions WHERE ledger_seq = " << ledgerSeq;
sql << "DELETE FROM account_transactions WHERE ledger_seq = "
<< ledgerSeq;
mysql_query(mysql_.get(), sql.str().c_str());
sql.str("");
@@ -378,7 +422,8 @@ public:
if (useTxTables_)
{
std::stringstream sql;
sql << "DELETE FROM account_transactions WHERE ledger_seq < " << ledgerSeq;
sql << "DELETE FROM account_transactions WHERE ledger_seq < "
<< ledgerSeq;
mysql_query(mysql_.get(), sql.str().c_str());
sql.str("");
@@ -398,7 +443,8 @@ public:
return;
std::stringstream sql;
sql << "DELETE FROM account_transactions WHERE ledger_seq < " << ledgerSeq;
sql << "DELETE FROM account_transactions WHERE ledger_seq < "
<< ledgerSeq;
mysql_query(mysql_.get(), sql.str().c_str());
sql.str("");
@@ -413,7 +459,8 @@ public:
return;
std::stringstream sql;
sql << "DELETE FROM account_transactions WHERE ledger_seq < " << ledgerSeq;
sql << "DELETE FROM account_transactions WHERE ledger_seq < "
<< ledgerSeq;
mysql_query(mysql_.get(), sql.str().c_str());
}
@@ -448,7 +495,8 @@ public:
if (!useTxTables_)
return 0;
if (mysql_query(mysql_.get(), "SELECT COUNT(*) FROM account_transactions"))
if (mysql_query(
mysql_.get(), "SELECT COUNT(*) FROM account_transactions"))
return 0;
MYSQL_RES* result = mysql_store_result(mysql_.get());
@@ -470,8 +518,10 @@ public:
CountMinMax
getLedgerCountMinMax() override
{
if (mysql_query(mysql_.get(),
"SELECT COUNT(*), MIN(ledger_seq), MAX(ledger_seq) FROM ledgers"))
if (mysql_query(
mysql_.get(),
"SELECT COUNT(*), MIN(ledger_seq), MAX(ledger_seq) FROM "
"ledgers"))
return {0, 0, 0};
MYSQL_RES* result = mysql_store_result(mysql_.get());
@@ -488,8 +538,7 @@ public:
CountMinMax ret{
std::stoull(row[0]),
static_cast<LedgerIndex>(std::stoll(row[1])),
static_cast<LedgerIndex>(std::stoll(row[2]))
};
static_cast<LedgerIndex>(std::stoll(row[2]))};
mysql_free_result(result);
return ret;
}
@@ -500,7 +549,7 @@ public:
std::stringstream sql;
sql << "SELECT ledger_hash, parent_hash, total_coins, closing_time, "
<< "prev_closing_time, close_time_resolution, close_flags, "
<< "account_hash, tx_hash FROM ledgers WHERE ledger_seq = "
<< "account_hash, tx_hash FROM ledgers WHERE ledger_seq = "
<< ledgerSeq;
if (mysql_query(mysql_.get(), sql.str().c_str()))
@@ -522,8 +571,10 @@ public:
info.hash = uint256(row[0]);
info.parentHash = uint256(row[1]);
info.drops = XRPAmount(std::stoull(row[2]));
info.closeTime = NetClock::time_point{NetClock::duration{std::stoll(row[3])}};
info.parentCloseTime = NetClock::time_point{NetClock::duration{std::stoll(row[4])}};
info.closeTime =
NetClock::time_point{NetClock::duration{std::stoll(row[3])}};
info.parentCloseTime =
NetClock::time_point{NetClock::duration{std::stoll(row[4])}};
info.closeTimeResolution = NetClock::duration{std::stoll(row[5])};
info.closeFlags = std::stoul(row[6]);
info.accountHash = uint256(row[7]);
@@ -537,7 +588,7 @@ public:
getLimitedOldestLedgerInfo(LedgerIndex ledgerFirstIndex) override
{
std::stringstream sql;
sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= "
sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= "
<< ledgerFirstIndex << " ORDER BY ledger_seq ASC LIMIT 1";
if (mysql_query(mysql_.get(), sql.str().c_str()))
@@ -563,7 +614,7 @@ public:
getLimitedNewestLedgerInfo(LedgerIndex ledgerFirstIndex) override
{
std::stringstream sql;
sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= "
sql << "SELECT ledger_seq FROM ledgers WHERE ledger_seq >= "
<< ledgerFirstIndex << " ORDER BY ledger_seq DESC LIMIT 1";
if (mysql_query(mysql_.get(), sql.str().c_str()))
@@ -615,7 +666,7 @@ public:
getHashByIndex(LedgerIndex ledgerIndex) override
{
std::stringstream sql;
sql << "SELECT ledger_hash FROM ledgers WHERE ledger_seq = "
sql << "SELECT ledger_hash FROM ledgers WHERE ledger_seq = "
<< ledgerIndex;
if (mysql_query(mysql_.get(), sql.str().c_str()))
@@ -641,7 +692,8 @@ public:
getHashesByIndex(LedgerIndex ledgerIndex) override
{
std::stringstream sql;
sql << "SELECT ledger_hash, parent_hash FROM ledgers WHERE ledger_seq = "
sql << "SELECT ledger_hash, parent_hash FROM ledgers WHERE ledger_seq "
"= "
<< ledgerIndex;
if (mysql_query(mysql_.get(), sql.str().c_str()))
@@ -682,10 +734,10 @@ public:
MYSQL_ROW row;
while ((row = mysql_fetch_row(sqlResult)))
{
LedgerIndex const seq =
LedgerIndex const seq =
static_cast<LedgerIndex>(std::stoull(row[0]));
result.emplace(seq,
LedgerHashPair{uint256{row[1]}, uint256{row[2]}});
result.emplace(
seq, LedgerHashPair{uint256{row[1]}, uint256{row[2]}});
}
mysql_free_result(sqlResult);
@@ -698,8 +750,9 @@ public:
if (!useTxTables_)
return {};
if (mysql_query(mysql_.get(),
"SELECT MIN(ledger_seq) FROM account_transactions"))
if (mysql_query(
mysql_.get(),
"SELECT MIN(ledger_seq) FROM account_transactions"))
return std::nullopt;
MYSQL_RES* result = mysql_store_result(mysql_.get());
@@ -718,12 +771,13 @@ public:
return seq;
}
std::optional<LedgerInfo>
getNewestLedgerInfo() override
{
if (mysql_query(mysql_.get(),
"SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1"))
if (mysql_query(
mysql_.get(),
"SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT "
"1"))
return std::nullopt;
MYSQL_RES* result = mysql_store_result(mysql_.get());
@@ -757,8 +811,8 @@ public:
if (range)
{
sql << " AND t.ledger_seq BETWEEN "
<< range->first() << " AND " << range->last();
sql << " AND t.ledger_seq BETWEEN " << range->first() << " AND "
<< range->last();
}
if (mysql_query(mysql_.get(), sql.str().c_str()))
@@ -816,7 +870,9 @@ public:
auto txn = std::make_shared<STTx const>(sit);
auto meta = std::make_shared<TxMeta>(
id, static_cast<uint32_t>(std::stoull(row[2])), Blob(row[1], row[1] + lengths[1]));
id,
static_cast<uint32_t>(std::stoull(row[2])),
Blob(row[1], row[1] + lengths[1]));
mysql_free_result(result);
@@ -861,9 +917,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger
<< " ORDER BY at.ledger_seq, at.tx_seq"
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger << " ORDER BY at.ledger_seq, at.tx_seq"
<< " LIMIT " << (options.limit + 1);
if (mysql_query(mysql_.get(), sql.str().c_str()))
@@ -877,7 +932,7 @@ public:
std::size_t count = 0;
MYSQL_ROW row;
while ((row = mysql_fetch_row(result)) &&
while ((row = mysql_fetch_row(result)) &&
(!options.limit || count < options.limit))
{
unsigned long* lengths = mysql_fetch_lengths(result);
@@ -886,9 +941,9 @@ public:
Blob rawTxn(row[0], row[0] + lengths[0]);
Blob rawMeta(row[1], row[1] + lengths[1]);
std::uint32_t ledgerSeq =
std::uint32_t ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
std::uint32_t txSeq =
std::uint32_t txSeq =
static_cast<std::uint32_t>(std::stoull(row[3]));
if (count == options.limit)
@@ -897,7 +952,8 @@ public:
break;
}
onTransaction(ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta));
onTransaction(
ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta));
++count;
}
@@ -930,8 +986,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger
<< " ORDER BY at.ledger_seq DESC, at.tx_seq DESC"
<< " LIMIT " << (options.limit + 1);
@@ -946,7 +1002,7 @@ public:
std::size_t count = 0;
MYSQL_ROW row;
while ((row = mysql_fetch_row(result)) &&
while ((row = mysql_fetch_row(result)) &&
(!options.limit || count < options.limit))
{
unsigned long* lengths = mysql_fetch_lengths(result);
@@ -955,9 +1011,9 @@ public:
Blob rawTxn(row[0], row[0] + lengths[0]);
Blob rawMeta(row[1], row[1] + lengths[1]);
std::uint32_t ledgerSeq =
std::uint32_t ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
std::uint32_t txSeq =
std::uint32_t txSeq =
static_cast<std::uint32_t>(std::stoull(row[3]));
if (count == options.limit)
@@ -966,7 +1022,8 @@ public:
break;
}
onTransaction(ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta));
onTransaction(
ledgerSeq, "COMMITTED", std::move(rawTxn), std::move(rawMeta));
++count;
}
@@ -981,7 +1038,7 @@ public:
return true;
}
std::vector<std::shared_ptr<Transaction>>
std::vector<std::shared_ptr<Transaction>>
getTxHistory(LedgerIndex startIndex) override
{
if (!useTxTables_)
@@ -1013,10 +1070,9 @@ public:
SerialIter sit(row[0], lengths[0]);
auto txn = std::make_shared<STTx const>(sit);
std::string reason;
auto tx = std::make_shared<Transaction>(
txn, reason, app_);
auto tx = std::make_shared<Transaction>(txn, reason, app_);
auto const ledgerSeq =
auto const ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[1]));
tx->setStatus(COMMITTED);
tx->setLedger(ledgerSeq);
@@ -1052,8 +1108,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger
<< " ORDER BY at.ledger_seq ASC, at.tx_seq ASC ";
if (!options.bUnlimited)
@@ -1082,10 +1138,9 @@ public:
SerialIter sit(row[0], lengths[0]);
auto txn = std::make_shared<STTx const>(sit);
std::string reason;
auto tx = std::make_shared<Transaction>(
txn, reason, app_);
auto tx = std::make_shared<Transaction>(txn, reason, app_);
auto const ledgerSeq =
auto const ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
auto meta = std::make_shared<TxMeta>(
@@ -1120,8 +1175,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger
<< " ORDER BY at.ledger_seq DESC, at.tx_seq DESC ";
if (!options.bUnlimited)
@@ -1150,10 +1205,9 @@ public:
SerialIter sit(row[0], lengths[0]);
auto txn = std::make_shared<STTx const>(sit);
std::string reason;
auto tx = std::make_shared<Transaction>(
txn, reason, app_);
auto tx = std::make_shared<Transaction>(txn, reason, app_);
auto const ledgerSeq =
auto const ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
auto meta = std::make_shared<TxMeta>(
@@ -1188,8 +1242,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger
<< " ORDER BY at.ledger_seq ASC, at.tx_seq ASC ";
if (!options.bUnlimited)
@@ -1213,7 +1267,7 @@ public:
if (!lengths)
continue;
auto const ledgerSeq =
auto const ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
result.emplace_back(
@@ -1238,8 +1292,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger
<< " ORDER BY at.ledger_seq DESC, at.tx_seq DESC ";
if (!options.bUnlimited)
@@ -1263,7 +1317,7 @@ public:
if (!lengths)
continue;
auto const ledgerSeq =
auto const ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
result.emplace_back(
@@ -1288,8 +1342,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger;
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger;
if (options.marker)
{
@@ -1318,8 +1372,7 @@ public:
{
marker = AccountTxMarker{
static_cast<std::uint32_t>(std::stoull(row[2])),
static_cast<std::uint32_t>(std::stoull(row[3]))
};
static_cast<std::uint32_t>(std::stoull(row[3]))};
break;
}
@@ -1327,7 +1380,7 @@ public:
if (!lengths)
continue;
auto const ledgerSeq =
auto const ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
result.emplace_back(
@@ -1354,8 +1407,8 @@ public:
<< "FROM account_transactions at "
<< "JOIN transactions t ON at.tx_hash = t.tx_hash "
<< "WHERE at.account_id = '" << strHex(options.account) << "' "
<< "AND at.ledger_seq BETWEEN " << options.minLedger
<< " AND " << options.maxLedger;
<< "AND at.ledger_seq BETWEEN " << options.minLedger << " AND "
<< options.maxLedger;
if (options.marker)
{
@@ -1384,8 +1437,7 @@ public:
{
marker = AccountTxMarker{
static_cast<std::uint32_t>(std::stoull(row[2])),
static_cast<std::uint32_t>(std::stoull(row[3]))
};
static_cast<std::uint32_t>(std::stoull(row[3]))};
break;
}
@@ -1393,7 +1445,7 @@ public:
if (!lengths)
continue;
auto const ledgerSeq =
auto const ledgerSeq =
static_cast<std::uint32_t>(std::stoull(row[2]));
result.emplace_back(
@@ -1414,11 +1466,12 @@ public:
std::uint32_t total = 0;
// Get ledger table size
if (!mysql_query(mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND table_name = 'ledgers'"))
if (!mysql_query(
mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND table_name = 'ledgers'"))
{
MYSQL_RES* result = mysql_store_result(mysql_.get());
if (result)
@@ -1433,19 +1486,21 @@ public:
// Get transaction tables size
if (useTxTables_)
{
if (!mysql_query(mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND (table_name = 'transactions' "
"OR table_name = 'account_transactions')"))
if (!mysql_query(
mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND (table_name = 'transactions' "
"OR table_name = 'account_transactions')"))
{
MYSQL_RES* result = mysql_store_result(mysql_.get());
if (result)
{
MYSQL_ROW row = mysql_fetch_row(result);
if (row && row[0])
total += static_cast<std::uint32_t>(std::stoull(row[0]));
total +=
static_cast<std::uint32_t>(std::stoull(row[0]));
mysql_free_result(result);
}
}
@@ -1459,11 +1514,12 @@ public:
{
std::uint32_t total = 0;
if (!mysql_query(mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND table_name = 'ledgers'"))
if (!mysql_query(
mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND table_name = 'ledgers'"))
{
MYSQL_RES* result = mysql_store_result(mysql_.get());
if (result)
@@ -1486,12 +1542,13 @@ public:
std::uint32_t total = 0;
if (!mysql_query(mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND (table_name = 'transactions' "
"OR table_name = 'account_transactions')"))
if (!mysql_query(
mysql_.get(),
"SELECT ROUND(SUM(data_length + index_length) / 1024) "
"FROM information_schema.tables "
"WHERE table_schema = DATABASE() "
"AND (table_name = 'transactions' "
"OR table_name = 'account_transactions')"))
{
MYSQL_RES* result = mysql_store_result(mysql_.get());
if (result)
@@ -1519,7 +1576,6 @@ public:
// No explicit closing needed for MySQL
// The connection will be closed when mysql_ is destroyed
}
};
// Factory function

View File

@@ -36,6 +36,8 @@ using IniFileSections = std::map<std::string, std::vector<std::string>>;
//------------------------------------------------------------------------------
class Config;
/** Holds a collection of configuration values.
A configuration file contains zero or more sections.
*/
@@ -48,11 +50,22 @@ private:
std::vector<std::string> values_;
bool had_trailing_comments_ = false;
Config const* parent_;
using const_iterator = decltype(lookup_)::const_iterator;
public:
// throws if no parent for this section
Config const&
getParent() const
{
if (!parent_)
Throw<std::runtime_error>("No parent_ for config section");
return *parent_;
}
/** Create an empty section. */
explicit Section(std::string const& name = "");
explicit Section(std::string const& name = "", Config* parent = nullptr);
/** Returns the name of this section. */
std::string const&
@@ -218,6 +231,8 @@ private:
std::map<std::string, Section, boost::beast::iless> map_;
public:
virtual ~BasicConfig() = default;
/** Returns `true` if a section with the given name exists. */
bool
exists(std::string const& name) const;

View File

@@ -24,7 +24,10 @@
namespace ripple {
Section::Section(std::string const& name) : name_(name)
class Config;
Section::Section(std::string const& name, Config* parent)
: name_(name), parent_(parent)
{
}
@@ -175,12 +178,14 @@ BasicConfig::legacy(std::string const& sectionName) const
void
BasicConfig::build(IniFileSections const& ifs)
{
Config* config_this = dynamic_cast<Config*>(this);
for (auto const& entry : ifs)
{
auto const result = map_.emplace(
std::piecewise_construct,
std::make_tuple(entry.first),
std::make_tuple(entry.first));
std::make_tuple(
entry.first, config_this)); // Will be nullptr if cast failed
result.first->second.append(entry.second);
}
}

View File

@@ -772,7 +772,7 @@ Config::loadFromString(std::string const& fileContents)
my.host = *sec.get("host");
my.user = *sec.get("user");
my.pass = *sec.get("pass");
my.pass = *sec.get("name");
my.name = *sec.get("name");
std::string portStr = *sec.get("port");
my.port = beast::lexicalCastThrow<int>(portStr);

View File

@@ -8,9 +8,9 @@
#include <ripple/nodestore/impl/EncodedBlob.h>
#include <ripple/nodestore/impl/codec.h>
#include <boost/beast/core/string.hpp>
#include <mysql/mysql.h>
#include <memory>
#include <cstdint>
#include <memory>
#include <mysql/mysql.h>
#include <sstream>
namespace ripple {
@@ -19,11 +19,19 @@ namespace NodeStore {
class MySQLBackend : public Backend
{
private:
std::string const name_;
std::string name_;
beast::Journal journal_;
bool isOpen_{false};
std::unique_ptr<MYSQL, decltype(&mysql_close)> mysql_;
Config const& config_;
static constexpr auto CREATE_DATABASE = R"SQL(
CREATE DATABASE IF NOT EXISTS `%s`
CHARACTER SET utf8mb4
COLLATE utf8mb4_unicode_ci
)SQL";
static constexpr auto CREATE_NODES_TABLE = R"SQL(
CREATE TABLE IF NOT EXISTS nodes (
hash BINARY(32) PRIMARY KEY,
@@ -40,24 +48,30 @@ public:
: name_(get(keyValues, "path", "nodestore"))
, journal_(journal)
, mysql_(mysql_init(nullptr), mysql_close)
, config_(keyValues.getParent())
{
// mysql names are limited to alphanumeric
name_.erase(
std::remove_if(
name_.begin(),
name_.end(),
[](char c) { return !std::isalnum(c); }),
name_.end());
if (!mysql_)
Throw<std::runtime_error>("Failed to initialize MySQL");
std::string const host = get(keyValues, "host", "localhost");
std::string const user = get(keyValues, "user", "ripple");
std::string const password = get(keyValues, "pass", "");
std::string const database = get(keyValues, "db", "rippledb");
uint16_t const port =
static_cast<uint16_t>(std::stoul(get(keyValues, "port", "3306")));
if (!config_.mysql.has_value())
throw std::runtime_error(
"[mysql_settings] stanza missing from config!");
auto* conn = mysql_real_connect(
mysql_.get(),
host.c_str(),
user.c_str(),
password.c_str(),
database.c_str(),
port,
config_.mysql->host.c_str(),
config_.mysql->user.c_str(),
config_.mysql->pass.c_str(),
nullptr,
config_.mysql->port,
nullptr,
0);
@@ -72,6 +86,22 @@ public:
mysql_options(mysql_.get(), MYSQL_OPT_RECONNECT, &reconnect);
}
void
createDatabase()
{
std::string query(1024, '\0');
int length =
snprintf(&query[0], query.size(), CREATE_DATABASE, name_.c_str());
query.resize(length);
if (mysql_query(mysql_.get(), query.c_str()))
{
Throw<std::runtime_error>(
std::string("Failed to create database: ") +
mysql_error(mysql_.get()) + " (1)");
}
}
~MySQLBackend() override
{
close();
@@ -89,6 +119,20 @@ public:
if (isOpen_)
Throw<std::runtime_error>("already open");
// Ensure database is selected
if (!config_.mysql.has_value())
throw std::runtime_error(
"[mysql_settings] stanza missing from config!");
createDatabase();
if (mysql_select_db(mysql_.get(), name_.c_str()))
{
Throw<std::runtime_error>(
std::string("Failed to select database: ") +
mysql_error(mysql_.get()));
}
if (createIfMissing)
{
if (mysql_query(mysql_.get(), CREATE_NODES_TABLE))
@@ -126,9 +170,8 @@ public:
if (!stmt)
return dataCorrupt;
std::string const sql =
"SELECT data FROM nodes WHERE hash = ?";
std::string const sql = "SELECT data FROM nodes WHERE hash = ?";
if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length()))
{
mysql_stmt_close(stmt);
@@ -138,7 +181,8 @@ public:
MYSQL_BIND bindParam;
std::memset(&bindParam, 0, sizeof(bindParam));
bindParam.buffer_type = MYSQL_TYPE_BLOB;
bindParam.buffer = const_cast<void*>(static_cast<void const*>(hash.data()));
bindParam.buffer =
const_cast<void*>(static_cast<void const*>(hash.data()));
bindParam.buffer_length = hash.size();
if (mysql_stmt_bind_param(stmt, &bindParam))
@@ -198,13 +242,13 @@ public:
mysql_stmt_close(stmt);
nudb::detail::buffer decompressed;
auto const result =
auto const result =
nodeobject_decompress(buffer.data(), buffer.size(), decompressed);
DecodedBlob decoded(hash.data(), result.first, result.second);
if (!decoded.wasOk())
return dataCorrupt;
*pObject = decoded.createObject();
return ok;
}
@@ -250,14 +294,14 @@ public:
EncodedBlob encoded(object);
nudb::detail::buffer compressed;
auto const result =
nodeobject_compress(encoded.getData(), encoded.getSize(), compressed);
auto const result = nodeobject_compress(
encoded.getData(), encoded.getSize(), compressed);
MYSQL_STMT* stmt = mysql_stmt_init(mysql_.get());
if (!stmt)
return;
std::string const sql =
std::string const sql =
"INSERT INTO nodes (hash, data) VALUES (?, ?) "
"ON DUPLICATE KEY UPDATE data = VALUES(data)";
@@ -272,11 +316,13 @@ public:
auto const& hash = object->getHash();
bind[0].buffer_type = MYSQL_TYPE_BLOB;
bind[0].buffer = const_cast<void*>(static_cast<void const*>(hash.data()));
bind[0].buffer =
const_cast<void*>(static_cast<void const*>(hash.data()));
bind[0].buffer_length = hash.size();
bind[1].buffer_type = MYSQL_TYPE_BLOB;
bind[1].buffer = const_cast<void*>(static_cast<void const*>(result.first));
bind[1].buffer =
const_cast<void*>(static_cast<void const*>(result.first));
bind[1].buffer_length = result.second;
if (mysql_stmt_bind_param(stmt, bind))
@@ -329,8 +375,9 @@ public:
if (!isOpen_)
return;
if (mysql_query(mysql_.get(),
"SELECT hash, data FROM nodes ORDER BY created_at"))
if (mysql_query(
mysql_.get(),
"SELECT hash, data FROM nodes ORDER BY created_at"))
return;
MYSQL_RES* result = mysql_store_result(mysql_.get());
@@ -346,14 +393,10 @@ public:
nudb::detail::buffer decompressed;
auto const decomp_result = nodeobject_decompress(
row[1],
static_cast<std::size_t>(lengths[1]),
decompressed);
row[1], static_cast<std::size_t>(lengths[1]), decompressed);
DecodedBlob decoded(
row[0],
decomp_result.first,
decomp_result.second);
row[0], decomp_result.first, decomp_result.second);
if (decoded.wasOk())
f(decoded.createObject());
@@ -417,4 +460,4 @@ static MySQLFactory mysqlFactory;
} // namespace NodeStore
} // namespace ripple
#endif // RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED
#endif // RIPPLE_NODESTORE_MYSQLBACKEND_H_INCLUDED