Performed ledger input output writing inside a transaction. (#290)

This commit is contained in:
Ravin Perera
2021-04-13 12:34:38 +05:30
committed by GitHub
parent 7a837ede7c
commit 2283d2bf89
5 changed files with 63 additions and 28 deletions

View File

@@ -9,19 +9,23 @@
#include "ledger_common.hpp"
#include "ledger_serve.hpp"
#define RAW_DATA_RETURN(ret) \
{ \
if (users_stmt != NULL) \
sqlite3_finalize(users_stmt); \
if (outputs_stmt != NULL) \
sqlite3_finalize(outputs_stmt); \
if (inputs_stmt != NULL) \
sqlite3_finalize(inputs_stmt); \
if (in_fd != -1) \
close(in_fd); \
if (out_fd != -1) \
close(out_fd); \
return ret; \
#define RAW_DATA_RETURN(ret) \
{ \
if (ret == -1) \
sqlite::rollback_transaction(db); \
else \
sqlite::commit_transaction(db); \
if (users_stmt != NULL) \
sqlite3_finalize(users_stmt); \
if (outputs_stmt != NULL) \
sqlite3_finalize(outputs_stmt); \
if (inputs_stmt != NULL) \
sqlite3_finalize(inputs_stmt); \
if (in_fd != -1) \
close(in_fd); \
if (out_fd != -1) \
close(out_fd); \
return ret; \
}
namespace ledger
@@ -299,6 +303,10 @@ namespace ledger
size_t in_pos = 0; // Current writing position offset of the inputs file.
size_t out_pos = 0; // Current writing position offset of the outputs file.
// Group all row insertions within a transaction for consistency.
if (sqlite::begin_transaction(db) == -1)
RAW_DATA_RETURN(-1);
for (const auto &[pubkey, cu] : consensed_users)
{
if (sqlite::insert_user_record(users_stmt, lcl_id.seq_no, pubkey) == -1)
@@ -416,11 +424,11 @@ namespace ledger
* Creates or open a db connection to the shard based on the params. This is used to create primary and raw shards.
* @param db Database connection to be opened.
* @param ledger_seq_no Ledger sequence number.
* @param open_db Whether a connection to the sql db must be opened or not.
* @param keep_db_connection Whether the sqlite db connection must be kept open or not.
* @return 0 if shard already exists. 1 if new shard got created. -1 on failure.
*/
int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no, const uint64_t shard_size,
const char *shard_dir, const char *db_name, const bool open_db)
const char *shard_dir, const char *db_name, const bool keep_db_connection)
{
// Construct shard path.
shard_seq_no = (ledger_seq_no - 1) / shard_size;
@@ -439,7 +447,7 @@ namespace ledger
}
// Creating ledger database and open a database connection.
if (sqlite::open_db(db_path, db) == -1)
if (sqlite::open_db(db_path, db, true) == -1)
{
LOG_ERROR << errno << ": Error creating the database " << db_name;
return -1;
@@ -460,7 +468,7 @@ namespace ledger
}
// Close the connection if it doesn't need to be retained.
if (!open_db)
if (!keep_db_connection)
sqlite::close_db(db);
util::h32 prev_shard_hash;
@@ -511,7 +519,7 @@ namespace ledger
}
else
{
if (open_db && sqlite::open_db(db_path, db) == -1)
if (keep_db_connection && sqlite::open_db(db_path, db, true) == -1)
{
LOG_ERROR << errno << ": Error openning the shard database " << db_path;
return -1;
@@ -779,7 +787,7 @@ namespace ledger
ledger::ledger_record ledger;
if (sqlite::get_ledger_by_seq_no(db, seq_no, ledger) == -1)
{
LOG_ERROR << "Error getting ledger by sequence number: " << std::to_string(seq_no);
LOG_ERROR << "Error getting ledger by sequence number: " << seq_no;
sqlite::close_db(&db);
ledger_fs.stop_ro_session(session_name);
return -1;

View File

@@ -85,7 +85,7 @@ namespace ledger
int create_raw_data_blob_file(const std::string &shard_path, const char *file_name, size_t &file_size);
int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no, const uint64_t shard_size,
const char *shard_dir, const char *db_name, const bool open_db);
const char *shard_dir, const char *db_name, const bool keep_db_connection);
void remove_old_shards(const uint64_t lcl_seq_no, const uint64_t shard_size, const uint64_t max_shards, std::string_view shard_parent_dir);

View File

@@ -92,7 +92,7 @@ namespace ledger::query
return 0; // Not found.
sqlite3 *db = NULL;
if (sqlite::open_db(db_path, &db, true) == -1)
if (sqlite::open_db(db_path, &db) == -1)
return -1;
const int sql_res = sqlite::get_ledger_by_seq_no(db, q.seq_no, ledger);
@@ -121,7 +121,7 @@ namespace ledger::query
return 0; // Not found.
sqlite3 *db = NULL;
if (sqlite::open_db(db_path, &db, true) == -1)
if (sqlite::open_db(db_path, &db) == -1)
return -1;
if ((ledger.inputs && get_ledger_inputs(db, *ledger.inputs, ledger.seq_no, shard_path, user_pubkey, fs_sess_name) == -1) ||

View File

@@ -15,6 +15,9 @@ namespace ledger::sqlite
constexpr const char *CREATE_INDEX = "CREATE INDEX ";
constexpr const char *CREATE_UNIQUE_INDEX = "CREATE UNIQUE INDEX ";
constexpr const char *JOURNAL_MODE_OFF = "PRAGMA journal_mode=OFF";
constexpr const char *BEGIN_TRANSACTION = "BEGIN TRANSACTION;";
constexpr const char *COMMIT_TRANSACTION = "COMMIT;";
constexpr const char *ROLLBACK_TRANSACTION = "ROLLBACK;";
constexpr const char *INSERT_INTO = "INSERT INTO ";
constexpr const char *PRIMARY_KEY = "PRIMARY KEY";
constexpr const char *NOT_NULL = "NOT NULL";
@@ -49,12 +52,14 @@ namespace ledger::sqlite
* Opens a connection to a given databse and give the db pointer.
* @param db_name Database name to be connected.
* @param db Pointer to the db pointer which is to be connected and pointed.
* @param writable Whether the database must be opened in a writable mode or not.
* @param journal Whether to enable db journaling or not.
* @returns returns 0 on success, or -1 on error.
*/
int open_db(std::string_view db_name, sqlite3 **db, const bool read_only)
int open_db(std::string_view db_name, sqlite3 **db, const bool writable, const bool journal)
{
int ret;
const int flags = read_only ? SQLITE_OPEN_READONLY : (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
const int flags = writable ? (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE) : SQLITE_OPEN_READONLY;
if ((ret = sqlite3_open_v2(db_name.data(), db, flags, 0)) != SQLITE_OK)
{
LOG_ERROR << ret << ": Sqlite error when opening database " << db_name;
@@ -62,9 +67,10 @@ namespace ledger::sqlite
return -1;
}
// We turn off journaling for the db because we don't need transacion support.
// Journaling mode introduces lot of extra underyling file system operations which causes lot of overhead.
if (exec_sql(*db, JOURNAL_MODE_OFF) == -1)
// We can turn off journaling for the db if we don't need transacion support.
// Journaling mode can introduce lot of extra underyling file system operations which may cause
// lot of overhead if used on a low-performance filesystem like hpfs.
if (writable && !journal && exec_sql(*db, JOURNAL_MODE_OFF) == -1)
return -1;
return 0;
@@ -90,6 +96,21 @@ namespace ledger::sqlite
return 0;
}
int begin_transaction(sqlite3 *db)
{
return sqlite::exec_sql(db, BEGIN_TRANSACTION);
}
int commit_transaction(sqlite3 *db)
{
return sqlite::exec_sql(db, COMMIT_TRANSACTION);
}
int rollback_transaction(sqlite3 *db)
{
return sqlite::exec_sql(db, ROLLBACK_TRANSACTION);
}
/**
* Create a table with given table info.
* @param db Pointer to the db.

View File

@@ -40,10 +40,16 @@ namespace ledger::sqlite
};
// Generic methods.
int open_db(std::string_view db_name, sqlite3 **db, const bool read_only = false);
int open_db(std::string_view db_name, sqlite3 **db, const bool writable = false, const bool journal = true);
int exec_sql(sqlite3 *db, std::string_view sql, int (*callback)(void *, int, char **, char **) = NULL, void *callback_first_arg = NULL);
int begin_transaction(sqlite3 *db);
int commit_transaction(sqlite3 *db);
int rollback_transaction(sqlite3 *db);
int create_table(sqlite3 *db, std::string_view table_name, const std::vector<table_column_info> &column_info);
int create_index(sqlite3 *db, std::string_view table_name, std::string_view column_names, const bool is_unique);