From 5606d4a7ddc5226621dd8ea51e5c71d443ca0544 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 4 Mar 2021 10:36:42 -0500 Subject: [PATCH] add type field to config. clean up cassandra code more --- reporting/BackendFactory.h | 15 +- reporting/CassandraBackend.cpp | 411 +++++++-------------------------- 2 files changed, 93 insertions(+), 333 deletions(-) diff --git a/reporting/BackendFactory.h b/reporting/BackendFactory.h index 2ec0d9e5..c4630b0e 100644 --- a/reporting/BackendFactory.h +++ b/reporting/BackendFactory.h @@ -1,5 +1,6 @@ #ifndef RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED #define RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED +#include #include #include #include @@ -9,16 +10,18 @@ makeBackend(boost::json::object const& config) { boost::json::object const& dbConfig = config.at("database").as_object(); - if (dbConfig.contains("cassandra")) + auto type = dbConfig.at("type").as_string(); + + if (boost::iequals(type, "cassandra")) { - auto backend = std::make_unique( - dbConfig.at("cassandra").as_object()); + auto backend = + std::make_unique(dbConfig.at(type).as_object()); return std::move(backend); } - else if (dbConfig.contains("postgres")) + else if (boost::iequals(type, "postgres")) { - auto backend = std::make_unique( - dbConfig.at("postgres").as_object()); + auto backend = + std::make_unique(dbConfig.at(type).as_object()); return std::move(backend); } return nullptr; diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 67e1116d..2757e83c 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -393,8 +393,8 @@ CassandraBackend::open() "nodestore: Missing keyspace in Cassandra config"); } - std::string tableName = getString("table_name"); - if (tableName.empty()) + std::string tablePrefix = getString("table_prefix"); + if (tablePrefix.empty()) { throw std::runtime_error( "nodestore: Missing table name in Cassandra config"); @@ -424,367 +424,124 @@ CassandraBackend::open() continue; } + auto executeSimpleStatement = [this](std::string const& query) { + CassStatement* statement = makeStatement(query.c_str(), 0); + CassFuture* fut = cass_session_execute(session_.get(), statement); + CassError rc = cass_future_error_code(fut); + cass_future_free(fut); + cass_statement_free(statement); + if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) + { + std::stringstream ss; + ss << "nodestore: Error executing simple statement: " << rc + << ", " << cass_error_desc(rc) << " - " << query; + BOOST_LOG_TRIVIAL(error) << ss.str(); + return false; + } + return true; + }; + std::stringstream query; - query << "CREATE TABLE IF NOT EXISTS " << tableName << "flat" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "objects" << " ( key blob, sequence bigint, object blob, PRIMARY " "KEY(key, " "sequence)) WITH CLUSTERING ORDER BY (sequence DESC)"; - - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } query = {}; - query << "SELECT * FROM " << tableName << "flat" + query << "SELECT * FROM " << tablePrefix << "objects" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } + if (!executeSimpleStatement(query.str())) + continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tableName - << "flattransactions" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" << " ( hash blob PRIMARY KEY, sequence bigint, transaction " "blob, metadata blob)"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } query = {}; - query << "SELECT * FROM " << tableName << "flattransactions" + query << "SELECT * FROM " << tablePrefix << "transactions" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } + if (!executeSimpleStatement(query.str())) + continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tableName << "keys" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" << " ( key blob, created bigint, deleted bigint, PRIMARY KEY " "(key, created)) with clustering order by (created " "desc) "; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } query = {}; - query << "SELECT * FROM " << tableName << "keys" + query << "SELECT * FROM " << tablePrefix << "keys" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } + if (!executeSimpleStatement(query.str())) + continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tableName << "books" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books" << " ( book blob, sequence bigint, key blob, deleted_at " "bigint, PRIMARY KEY " "(book, key)) WITH CLUSTERING ORDER BY (key ASC)"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } - query = {}; - query << "SELECT * FROM " << tableName << "books" + query << "SELECT * FROM " << tablePrefix << "books" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } - + if (!executeSimpleStatement(query.str())) + continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tableName << "account_tx" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" << " ( account blob, seq_idx tuple, " " hash blob, " "PRIMARY KEY " "(account, seq_idx)) WITH " "CLUSTERING ORDER BY (seq_idx desc)"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } query = {}; - query << "SELECT * FROM " << tableName << "account_tx" + query << "SELECT * FROM " << tablePrefix << "account_tx" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } + if (!executeSimpleStatement(query.str())) + continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledgers" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers" << " ( sequence bigint PRIMARY KEY, header blob )"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } query = {}; - query << "SELECT * FROM " << tableName << "ledgers" + query << "SELECT * FROM " << tablePrefix << "ledgers" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } + if (!executeSimpleStatement(query.str())) + continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_hashes" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes" << " (hash blob PRIMARY KEY, sequence bigint)"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } query = {}; - query << "SELECT * FROM " << tableName << "ledger_hashes" + query << "SELECT * FROM " << tablePrefix << "ledger_hashes" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } + if (!executeSimpleStatement(query.str())) + continue; query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_range" + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_range" << " (is_latest boolean PRIMARY KEY, sequence bigint)"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY) - { - std::stringstream ss; - ss << "nodestore: Error creating Cassandra table: " << rc << ", " - << cass_error_desc(rc) << " - " << query.str(); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!executeSimpleStatement(query.str())) continue; - } query = {}; - query << "SELECT * FROM " << tableName << "ledger_range" + query << "SELECT * FROM " << tablePrefix << "ledger_range" << " LIMIT 1"; - statement = makeStatement(query.str().c_str(), 0); - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - cass_future_free(fut); - cass_statement_free(statement); - if (rc != CASS_OK) - { - if (rc == CASS_ERROR_SERVER_INVALID_QUERY) - { - BOOST_LOG_TRIVIAL(warning) - << "table not here yet, sleeping 1s to " - "see if table creation propagates"; - continue; - } - else - { - std::stringstream ss; - ss << "nodestore: Error checking for table: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); - continue; - } - } + if (!executeSimpleStatement(query.str())) + continue; setupSessionAndTable = true; } @@ -795,43 +552,43 @@ CassandraBackend::open() { std::this_thread::sleep_for(std::chrono::seconds(1)); std::stringstream query; - query << "INSERT INTO " << tableName << "flat" + query << "INSERT INTO " << tablePrefix << "objects" << " (key, sequence, object) VALUES (?, ?, ?)"; if (!insertObject_.prepareStatement(query, session_.get())) continue; query = {}; - query << "INSERT INTO " << tableName << "flattransactions" + query << "INSERT INTO " << tablePrefix << "transactions" << " (hash, sequence, transaction, metadata) VALUES (?, ?, " "?, ?)"; if (!insertTransaction_.prepareStatement(query, session_.get())) continue; query = {}; - query << "INSERT INTO " << tableName << "keys" + query << "INSERT INTO " << tablePrefix << "keys" << " (key, created, deleted) VALUES (?, ?, ?)"; if (!insertKey_.prepareStatement(query, session_.get())) continue; query = {}; - query << "INSERT INTO " << tableName << "books" + query << "INSERT INTO " << tablePrefix << "books" << " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)"; if (!insertBook_.prepareStatement(query, session_.get())) continue; query = {}; - query << "INSERT INTO " << tableName << "books" + query << "INSERT INTO " << tablePrefix << "books" << " (book, key, deleted_at) VALUES (?, ?, ?)"; if (!deleteBook_.prepareStatement(query, session_.get())) continue; query = {}; - query << "SELECT created FROM " << tableName << "keys" + query << "SELECT created FROM " << tablePrefix << "keys" << " WHERE key = ? ORDER BY created desc LIMIT 1"; if (!getCreated_.prepareStatement(query, session_.get())) continue; query = {}; - query << "SELECT object, sequence FROM " << tableName << "flat" + query << "SELECT object, sequence FROM " << tablePrefix << "objects" << " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC " "LIMIT 1"; @@ -839,14 +596,14 @@ CassandraBackend::open() continue; query = {}; - query << "SELECT transaction,metadata FROM " << tableName - << "flattransactions" + query << "SELECT transaction,metadata FROM " << tablePrefix + << "transactions" << " WHERE hash = ?"; if (!selectTransaction_.prepareStatement(query, session_.get())) continue; query = {}; - query << "SELECT key FROM " << tableName << "keys " + query << "SELECT key FROM " << tablePrefix << "keys " << " WHERE TOKEN(key) >= ? and created <= ?" << " and deleted > ?" << " PER PARTITION LIMIT 1 LIMIT ?" @@ -855,9 +612,8 @@ CassandraBackend::open() continue; query = {}; - query << "SELECT key,object FROM " << tableName << "flat " - << " WHERE TOKEN(key) >= ? and sequence <= ? ORDER BY sequence " - "DESC " + query << "SELECT key,object FROM " << tablePrefix << "objects " + << " WHERE TOKEN(key) >= ? and sequence <= ? " << " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING"; if (!selectLedgerPage_.prepareStatement(query, session_.get())) @@ -865,8 +621,8 @@ CassandraBackend::open() /* query = {}; - query << "SELECT filterempty(key,object) FROM " << tableName << - "flat " + query << "SELECT filterempty(key,object) FROM " << tablePrefix << + "objects " << " WHERE TOKEN(key) >= ? and sequence <= ?" << " PER PARTITION LIMIT 1 LIMIT ?" << " ALLOW FILTERING"; @@ -874,14 +630,14 @@ CassandraBackend::open() continue; */ query = {}; - query << "SELECT TOKEN(key) FROM " << tableName << "flat " + query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects " << " WHERE key = ? LIMIT 1"; if (!getToken_.prepareStatement(query, session_.get())) continue; query = {}; - query << "SELECT key FROM " << tableName << "books " + query << "SELECT key FROM " << tablePrefix << "books " << " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND" " key > ? " " ORDER BY key ASC LIMIT ? ALLOW FILTERING"; @@ -889,51 +645,52 @@ CassandraBackend::open() continue; query = {}; - query << " INSERT INTO " << tableName << "account_tx" + query << " INSERT INTO " << tablePrefix << "account_tx" << " (account, seq_idx, hash) " << " VALUES (?,?,?)"; if (!insertAccountTx_.prepareStatement(query, session_.get())) continue; query = {}; - query << " SELECT hash,seq_idx FROM " << tableName << "account_tx" + query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" << " WHERE account = ? " << " AND seq_idx < ? LIMIT ?"; if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; query = {}; - query << " INSERT INTO " << tableName << "ledgers " + query << " INSERT INTO " << tablePrefix << "ledgers " << " (sequence, header) VALUES(?,?)"; if (!insertLedgerHeader_.prepareStatement(query, session_.get())) continue; query = {}; - query << " INSERT INTO " << tableName << "ledger_hashes" + query << " INSERT INTO " << tablePrefix << "ledger_hashes" << " (hash, sequence) VALUES(?,?)"; if (!insertLedgerHash_.prepareStatement(query, session_.get())) continue; query = {}; - query << " update " << tableName << "ledger_range" + query << " update " << tablePrefix << "ledger_range" << " set sequence = ? where is_latest = ? if sequence != ?"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; query = {}; - query << " select header from " << tableName + query << " select header from " << tablePrefix << "ledgers where sequence = ?"; if (!selectLedgerBySeq_.prepareStatement(query, session_.get())) continue; query = {}; - query << " select sequence from " << tableName + query << " select sequence from " << tablePrefix << "ledger_range where is_latest = true"; if (!selectLatestLedger_.prepareStatement(query, session_.get())) continue; query = {}; - query << " SELECT sequence FROM " << tableName << "ledger_range WHERE " + query << " SELECT sequence FROM " << tablePrefix + << "ledger_range WHERE " << " is_latest IN (true, false)"; if (!selectLedgerRange_.prepareStatement(query, session_.get())) continue;