add type field to config. clean up cassandra code more

This commit is contained in:
CJ Cobb
2021-03-04 10:36:42 -05:00
parent 5771b31076
commit 5606d4a7dd
2 changed files with 93 additions and 333 deletions

View File

@@ -1,5 +1,6 @@
#ifndef RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED
#define RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED
#include <boost/algorithm/string.hpp>
#include <reporting/BackendInterface.h>
#include <reporting/CassandraBackend.h>
#include <reporting/PostgresBackend.h>
@@ -9,16 +10,18 @@ makeBackend(boost::json::object const& config)
{
boost::json::object const& dbConfig = config.at("database").as_object();
if (dbConfig.contains("cassandra"))
auto type = dbConfig.at("type").as_string();
if (boost::iequals(type, "cassandra"))
{
auto backend = std::make_unique<CassandraBackend>(
dbConfig.at("cassandra").as_object());
auto backend =
std::make_unique<CassandraBackend>(dbConfig.at(type).as_object());
return std::move(backend);
}
else if (dbConfig.contains("postgres"))
else if (boost::iequals(type, "postgres"))
{
auto backend = std::make_unique<PostgresBackend>(
dbConfig.at("postgres").as_object());
auto backend =
std::make_unique<PostgresBackend>(dbConfig.at(type).as_object());
return std::move(backend);
}
return nullptr;

View File

@@ -393,8 +393,8 @@ CassandraBackend::open()
"nodestore: Missing keyspace in Cassandra config");
}
std::string tableName = getString("table_name");
if (tableName.empty())
std::string tablePrefix = getString("table_prefix");
if (tablePrefix.empty())
{
throw std::runtime_error(
"nodestore: Missing table name in Cassandra config");
@@ -424,367 +424,124 @@ CassandraBackend::open()
continue;
}
auto executeSimpleStatement = [this](std::string const& query) {
CassStatement* statement = makeStatement(query.c_str(), 0);
CassFuture* fut = cass_session_execute(session_.get(), statement);
CassError rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error executing simple statement: " << rc
<< ", " << cass_error_desc(rc) << " - " << query;
BOOST_LOG_TRIVIAL(error) << ss.str();
return false;
}
return true;
};
std::stringstream query;
query << "CREATE TABLE IF NOT EXISTS " << tableName << "flat"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "objects"
<< " ( key blob, sequence bigint, object blob, PRIMARY "
"KEY(key, "
"sequence)) WITH CLUSTERING ORDER BY (sequence DESC)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "flat"
query << "SELECT * FROM " << tablePrefix << "objects"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName
<< "flattransactions"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, sequence bigint, transaction "
"blob, metadata blob)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "flattransactions"
query << "SELECT * FROM " << tablePrefix << "transactions"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "keys"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys"
<< " ( key blob, created bigint, deleted bigint, PRIMARY KEY "
"(key, created)) with clustering order by (created "
"desc) ";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "keys"
query << "SELECT * FROM " << tablePrefix << "keys"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "books"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
<< " ( book blob, sequence bigint, key blob, deleted_at "
"bigint, PRIMARY KEY "
"(book, key)) WITH CLUSTERING ORDER BY (key ASC)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "books"
query << "SELECT * FROM " << tablePrefix << "books"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "account_tx"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
" hash blob, "
"PRIMARY KEY "
"(account, seq_idx)) WITH "
"CLUSTERING ORDER BY (seq_idx desc)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "account_tx"
query << "SELECT * FROM " << tablePrefix << "account_tx"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledgers"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers"
<< " ( sequence bigint PRIMARY KEY, header blob )";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "ledgers"
query << "SELECT * FROM " << tablePrefix << "ledgers"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_hashes"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes"
<< " (hash blob PRIMARY KEY, sequence bigint)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "ledger_hashes"
query << "SELECT * FROM " << tablePrefix << "ledger_hashes"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_range"
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_range"
<< " (is_latest boolean PRIMARY KEY, sequence bigint)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!executeSimpleStatement(query.str()))
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "ledger_range"
query << "SELECT * FROM " << tablePrefix << "ledger_range"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
if (!executeSimpleStatement(query.str()))
continue;
setupSessionAndTable = true;
}
@@ -795,43 +552,43 @@ CassandraBackend::open()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::stringstream query;
query << "INSERT INTO " << tableName << "flat"
query << "INSERT INTO " << tablePrefix << "objects"
<< " (key, sequence, object) VALUES (?, ?, ?)";
if (!insertObject_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "flattransactions"
query << "INSERT INTO " << tablePrefix << "transactions"
<< " (hash, sequence, transaction, metadata) VALUES (?, ?, "
"?, ?)";
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "keys"
query << "INSERT INTO " << tablePrefix << "keys"
<< " (key, created, deleted) VALUES (?, ?, ?)";
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "books"
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)";
if (!insertBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "books"
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, deleted_at) VALUES (?, ?, ?)";
if (!deleteBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT created FROM " << tableName << "keys"
query << "SELECT created FROM " << tablePrefix << "keys"
<< " WHERE key = ? ORDER BY created desc LIMIT 1";
if (!getCreated_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT object, sequence FROM " << tableName << "flat"
query << "SELECT object, sequence FROM " << tablePrefix << "objects"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
"LIMIT 1";
@@ -839,14 +596,14 @@ CassandraBackend::open()
continue;
query = {};
query << "SELECT transaction,metadata FROM " << tableName
<< "flattransactions"
query << "SELECT transaction,metadata FROM " << tablePrefix
<< "transactions"
<< " WHERE hash = ?";
if (!selectTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tableName << "keys "
query << "SELECT key FROM " << tablePrefix << "keys "
<< " WHERE TOKEN(key) >= ? and created <= ?"
<< " and deleted > ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
@@ -855,9 +612,8 @@ CassandraBackend::open()
continue;
query = {};
query << "SELECT key,object FROM " << tableName << "flat "
<< " WHERE TOKEN(key) >= ? and sequence <= ? ORDER BY sequence "
"DESC "
query << "SELECT key,object FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING";
if (!selectLedgerPage_.prepareStatement(query, session_.get()))
@@ -865,8 +621,8 @@ CassandraBackend::open()
/*
query = {};
query << "SELECT filterempty(key,object) FROM " << tableName <<
"flat "
query << "SELECT filterempty(key,object) FROM " << tablePrefix <<
"objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
@@ -874,14 +630,14 @@ CassandraBackend::open()
continue;
*/
query = {};
query << "SELECT TOKEN(key) FROM " << tableName << "flat "
query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects "
<< " WHERE key = ? LIMIT 1";
if (!getToken_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tableName << "books "
query << "SELECT key FROM " << tablePrefix << "books "
<< " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND"
" key > ? "
" ORDER BY key ASC LIMIT ? ALLOW FILTERING";
@@ -889,51 +645,52 @@ CassandraBackend::open()
continue;
query = {};
query << " INSERT INTO " << tableName << "account_tx"
query << " INSERT INTO " << tablePrefix << "account_tx"
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " SELECT hash,seq_idx FROM " << tableName << "account_tx"
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " INSERT INTO " << tableName << "ledgers "
query << " INSERT INTO " << tablePrefix << "ledgers "
<< " (sequence, header) VALUES(?,?)";
if (!insertLedgerHeader_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " INSERT INTO " << tableName << "ledger_hashes"
query << " INSERT INTO " << tablePrefix << "ledger_hashes"
<< " (hash, sequence) VALUES(?,?)";
if (!insertLedgerHash_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " update " << tableName << "ledger_range"
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence != ?";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " select header from " << tableName
query << " select header from " << tablePrefix
<< "ledgers where sequence = ?";
if (!selectLedgerBySeq_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " select sequence from " << tableName
query << " select sequence from " << tablePrefix
<< "ledger_range where is_latest = true";
if (!selectLatestLedger_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " SELECT sequence FROM " << tableName << "ledger_range WHERE "
query << " SELECT sequence FROM " << tablePrefix
<< "ledger_range WHERE "
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;