partial refactor of CassandraBackend

This commit is contained in:
CJ Cobb
2021-03-02 16:19:14 -05:00
parent 8691cff8df
commit 308c585801
15 changed files with 490 additions and 1247 deletions

View File

@@ -43,8 +43,7 @@
boost::json::object
doAccountInfo(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& postgres)
BackendInterface const& backend)
{
boost::json::object response;
std::string strIdent;
@@ -60,7 +59,7 @@ doAccountInfo(
size_t ledgerSequence = 0;
if (not request.contains("ledger_index"))
{
std::optional<ripple::LedgerInfo> latest = getLedger({}, postgres);
auto latest = backend.fetchLatestLedgerSequence();
if (not latest)
{
@@ -69,7 +68,7 @@ doAccountInfo(
}
else
{
ledgerSequence = latest->seq;
ledgerSequence = *latest;
}
}
else

View File

@@ -127,10 +127,7 @@ doAccountTxStoredProcedure(
// resume previous query
// }
boost::json::object
doAccountTx(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
doAccountTx(boost::json::object const& request, BackendInterface const& backend)
{
boost::json::object response;

View File

@@ -90,15 +90,11 @@ loadBookOfferIndexes(
boost::json::object
doBookOffers(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pool)
BackendInterface const& backend)
{
std::cout << "enter" << std::endl;
boost::json::object response;
auto sequence = ledgerSequenceFromRequest(request, pool);
if (!sequence)
return response;
uint32_t sequence = request.at("ledger_index").as_int64();
if (!request.contains("taker_pays"))
{
@@ -310,7 +306,7 @@ doBookOffers(
ripple::uint256 bookBase = getBookBase(book);
auto start = std::chrono::system_clock::now();
auto [offers, retCursor] =
backend.fetchBookOffers(bookBase, *sequence, limit, cursor);
backend.fetchBookOffers(bookBase, sequence, limit, cursor);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(warning) << "Time loading books from Postgres: "

View File

@@ -26,10 +26,7 @@
// }
boost::json::object
doTx(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& postgres)
doTx(boost::json::object const& request, BackendInterface const& backend)
{
boost::json::object response;
if (!request.contains("transaction"))
@@ -44,7 +41,7 @@ doTx(
return response;
}
auto range = getLedgerRange(postgres);
auto range = backend.fetchLedgerRange();
if (!range)
{
response["error"] = "Database is empty";
@@ -55,8 +52,8 @@ doTx(
if (!dbResponse)
{
response["error"] = "Transaction not found in Cassandra";
response["ledger_range"] = std::to_string(range->lower()) + " - " +
std::to_string(range->upper());
response["ledger_range"] = std::to_string(range->minSequence) + " - " +
std::to_string(range->maxSequence);
return response;
}

View File

@@ -2,6 +2,7 @@
#define RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED
#include <reporting/BackendInterface.h>
#include <reporting/CassandraBackend.h>
#include <reporting/PostgresBackend.h>
namespace Backend {
std::unique_ptr<BackendInterface>
makeBackend(boost::json::object const& config)
@@ -14,6 +15,12 @@ makeBackend(boost::json::object const& config)
dbConfig.at("cassandra").as_object());
return std::move(backend);
}
else if (dbConfig.contains("postgres"))
{
auto backend = std::make_unique<PostgresBackend>(
dbConfig.at("postgres").as_object());
return std::move(backend);
}
return nullptr;
}
} // namespace Backend

View File

@@ -27,6 +27,12 @@ struct AccountTransactionsCursor
uint32_t transactionIndex;
};
struct LedgerRange
{
uint32_t minSequence;
uint32_t maxSequence;
};
class BackendInterface
{
public:
@@ -38,6 +44,9 @@ public:
virtual std::optional<ripple::LedgerInfo>
fetchLedgerBySequence(uint32_t sequence) const = 0;
virtual std::optional<LedgerRange>
fetchLedgerRange() const = 0;
virtual std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;

View File

@@ -452,6 +452,74 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData)
}
}
std::optional<LedgerRange>
CassandraBackend::fetchLedgerRange() const
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(selectLedgerRange_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassFuture* fut;
CassError rc;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
CassIterator* iter = cass_iterator_from_result(res);
std::optional<uint32_t> min;
std::optional<uint32_t> max;
if (cass_iterator_next(iter))
{
cass_int64_t sequence;
rc = cass_value_get_int64(
cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
LedgerRange range;
range.minSequence = sequence;
if (!cass_iterator_next(iter))
{
cass_result_free(res);
return range;
}
rc = cass_value_get_int64(
cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
cass_result_free(res);
range.maxSequence = sequence;
if (range.minSequence > range.maxSequence)
{
std::swap(range.minSequence, range.maxSequence);
}
return range;
}
return {};
}
void
CassandraBackend::open()
{
@@ -1023,217 +1091,53 @@ CassandraBackend::open()
std::stringstream query;
query << "INSERT INTO " << tableName << "flat"
<< " (key, sequence, object) VALUES (?, ?, ?)";
CassFuture* prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!insertObject_.prepareStatement(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
insertObject_ = cass_future_get_prepared(prepare_future);
/* The future can be freed immediately after getting the prepared
* object
*/
cass_future_free(prepare_future);
query = {};
query << "INSERT INTO " << tableName << "flattransactions"
<< " (hash, sequence, transaction, metadata) VALUES (?, ?, "
"?, ?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!insertTransaction_.prepare(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
insertTransaction_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "INSERT INTO " << tableName << "keys"
<< " (key, created, deleted) VALUES (?, ?, ?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!insertKey_.prepare(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
insertKey_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "INSERT INTO " << tableName << "books"
<< " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!insertBook_.prepareStatement(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
insertBook_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "INSERT INTO " << tableName << "books"
<< " (book, key, deleted_at) VALUES (?, ?, ?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!deleteBook_.prepareStatement(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
deleteBook_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "SELECT created FROM " << tableName << "keys"
<< " WHERE key = ? ORDER BY created desc LIMIT 1";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!getCreated_.prepareStatement(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
getCreated_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "SELECT object, sequence FROM " << tableName << "flat"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
"LIMIT 1";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing select : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!selectObject_.prepareStatement(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
selectObject_ = cass_future_get_prepared(prepare_future);
/* The future can be freed immediately after getting the prepared
* object
*/
cass_future_free(prepare_future);
query = {};
query << "SELECT transaction,metadata FROM " << tableName
<< "flattransactions"
<< " WHERE hash = ?";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing select : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!selectTransaction_.prepareStatement(query, session_.get()))
continue;
}
/* Get the prepared object from the future */
selectTransaction_ = cass_future_get_prepared(prepare_future);
/* The future can be freed immediately after getting the prepared
* object
*/
cass_future_free(prepare_future);
query = {};
query << "SELECT key FROM " << tableName << "keys "
@@ -1241,111 +1145,32 @@ CassandraBackend::open()
<< " and deleted > ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing upperBound : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str();
if (!selectLedgerPage_.prepareStatement(query, session_.get()))
continue;
}
// Get the prepared object from the future
upperBound_ = cass_future_get_prepared(prepare_future);
// The future can be freed immediately after getting the prepared
// object
//
cass_future_free(prepare_future);
/*
query = {};
query << "SELECT filterempty(key,object) FROM " << tableName << "flat "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing upperBound : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str();
if (!upperBound2_.prepareStatement(query, session_.get()))
continue;
}
// Get the prepared object from the future
upperBound2_ = cass_future_get_prepared(prepare_future);
// The future can be freed immediately after getting the prepared
// object
//
cass_future_free(prepare_future);
*/
query = {};
query << "SELECT TOKEN(key) FROM " << tableName << "flat "
<< " WHERE key = ? LIMIT 1";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!getToken_.prepareStatement(query, session_.get()))
continue;
}
getToken_ = cass_future_get_prepared(prepare_future);
query = {};
query << "SELECT key FROM " << tableName << "books "
<< " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND"
" key > ? "
" ORDER BY key ASC LIMIT ? ALLOW FILTERING";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!getBook_.prepareStatement(query, session_.get()))
continue;
}
getBook_ = cass_future_get_prepared(prepare_future);
@@ -1353,165 +1178,53 @@ CassandraBackend::open()
query << " INSERT INTO " << tableName << "account_tx"
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
}
insertAccountTx_ = cass_future_get_prepared(prepare_future);
query = {};
query << " SELECT hash,seq_idx FROM " << tableName << "account_tx"
<< " WHERE account = ? "
<< " AND seq_idx < ? LIMIT ?";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;
}
selectAccountTx_ = cass_future_get_prepared(prepare_future);
query = {};
query << " INSERT INTO " << tableName << "ledgers "
<< " (sequence, header) VALUES(?,?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!insertLedgerHeader_.prepareStatement(query, session_.get()))
continue;
}
insertLedgerHeader_ = cass_future_get_prepared(prepare_future);
query = {};
query << " INSERT INTO " << tableName << "ledger_hashes"
<< " (hash, sequence) VALUES(?,?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!insertLedgerHash_.prepareStatement(query, session_.get()))
continue;
}
insertLedgerHash_ = cass_future_get_prepared(prepare_future);
query = {};
query << " update " << tableName << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence != ?";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: error preparing updateLedgerRange : " << rc
<< ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
}
updateLedgerRange_ = cass_future_get_prepared(prepare_future);
query = {};
query << " select header from " << tableName
<< "ledgers where sequence = ?";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: error preparing selectLedgerBySeq : " << rc
<< ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!selectLedgerBySeq_.prepareStatement(query, session_.get()))
continue;
}
selectLedgerBySeq_ = cass_future_get_prepared(prepare_future);
query = {};
query << " select sequence from " << tableName
<< "ledger_range where is_latest = true";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: error preparing selectLatestLedger : " << rc
<< ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
if (!selectLatestLedger_.prepareStatement(query, session_.get()))
continue;
}
selectLatestLedger_ = cass_future_get_prepared(prepare_future);
query = {};
query << " SELECT sequence FROM " << tableName << "ledger_range WHERE "
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
selectLedgerRange_ = cass_future_get_prepared(prepare_future);
setupPreparedStatements = true;
}

View File

@@ -57,6 +57,192 @@ flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
class CassandraPreparedStatement
{
private:
CassPrepared const* prepared_ = nullptr;
CassStatement* statement_ = nullptr;
public:
CassPrepared const*
get()
{
return statement_;
}
bool
prepareStatement(std::stringstream const& query)
{
return prepareStatement(query.str().c_str());
}
bool
prepareStatement(std::string const& query)
{
return prepareStatement(query.c_str());
}
bool
prepareStatement(char const* query, CassSession* session)
{
if (!query)
throw std::runtime_error("prepareStatement: null query");
if (!session)
throw std::runtime_error("prepareStatement: null sesssion");
CassFuture* prepareFuture = cass_session_prepare(session, query);
/* Wait for the statement to prepare and get the result */
CassError rc = cass_future_error_code(prepareFuture);
if (rc == CASS_OK)
{
prepared_ = cass_future_get_prepared(prepareFuture);
}
else
{
std::stringstream ss;
ss << "nodestore: Error preparing statement : " << rc << ", "
<< cass_error_desc(rc) << ". query : " << query;
BOOST_LOG_TRIVIAL(error) << ss.str();
}
cass_future_free(prepareFuture);
return rc == CASS_OK;
}
~CassandraPreparedStatement()
{
if (prepared_)
{
cass_prepared_free(prepared_);
prepared_ = nullptr;
}
}
};
class CassandraStatement
{
CassStatement* statement_ = nullptr;
size_t curBindingIndex_ = 0;
CassandraStatement(CassandraPreparedStatement const& prepared)
{
statement_ = cass_prepared_bind(prepared.get());
cass_statement_set_consistency(statement_, CASS_CONSISTENCY_QUORUM);
}
CassStatement*
get() const
{
return statement_;
}
bool
bindBytes(const char* data, uint32_t size)
{
return bindBytes((unsigned char*)data, size);
}
template <std::size_t Bits>
bool
bindBytes(ripple::base_uint<Bits> const& data)
{
return bindBytes(data.data(), data.size());
}
bool
bindBytes(std::string const& data)
{
return bindBytes(data.data(), data.size());
}
bool
bindBytes(const unsigned char* data, uint32_t size)
{
if (!statement_)
throw std::runtime_error(
"CassandraStatement::bindBytes - statement_ is null");
CassError rc = cass_statement_bind_bytes(
statement_,
curBindingIndex_,
static_cast<cass_byte_t const*>(data),
size);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Error binding bytes to statement: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
return false;
}
curBindingIndex_++;
return true;
}
bool
bindInt(uint32_t value)
{
return bindInt((int64_t)value);
}
bool
bindInt(int64_t value)
{
if (!statement_)
throw std::runtime_error(
"CassandraStatement::bindInt - statement_ is null");
CassError rc =
cass_statement_bind_int64(statement_, curBindingIndex_, value);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Error binding int to statement: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
return false;
}
curBindingIndex_++;
return true;
}
bool
bindIntTuple(uint32_t first, uint32_t second)
{
CassTuple* tuple = cass_tuple_new(2);
rc = cass_tuple_set_int64(tuple, 0, first);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Error binding int to tuple: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
return false;
}
rc = cass_tuple_set_int64(tuple, 1, second);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Error binding int to tuple: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
return false;
}
rc = cass_statement_bind_tuple(statement_, curBindingIndex_, tuple);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Binding tuple: " << rc << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
return false;
}
curBindingIndex_++;
return true;
}
CassandraStatement()
{
if (statement_)
cass_statement_free(statement_);
}
};
class CassandraBackend : public BackendInterface
{
private:
@@ -95,26 +281,27 @@ private:
// Database statements cached server side. Using these is more efficient
// than making a new statement
const CassPrepared* insertObject_ = nullptr;
const CassPrepared* insertTransaction_ = nullptr;
const CassPrepared* selectTransaction_ = nullptr;
const CassPrepared* selectObject_ = nullptr;
const CassPrepared* upperBound_ = nullptr;
const CassPrepared* upperBound2_ = nullptr;
const CassPrepared* getToken_ = nullptr;
const CassPrepared* insertKey_ = nullptr;
const CassPrepared* getCreated_ = nullptr;
const CassPrepared* getBook_ = nullptr;
const CassPrepared* insertBook_ = nullptr;
const CassPrepared* deleteBook_ = nullptr;
const CassPrepared* insertAccountTx_ = nullptr;
const CassPrepared* selectAccountTx_ = nullptr;
const CassPrepared* insertLedgerHeader_ = nullptr;
const CassPrepared* insertLedgerHash_ = nullptr;
const CassPrepared* updateLedgerRange_ = nullptr;
const CassPrepared* updateLedgerHeader_ = nullptr;
const CassPrepared* selectLedgerBySeq_ = nullptr;
const CassPrepared* selectLatestLedger_ = nullptr;
CassandraPreparedStatement insertObject_;
CassandraPreparedStatement insertTransaction_;
CassandraPreparedStatement selectTransaction_;
CassandraPreparedStatement selectObject_;
CassandraPreparedStatement selectLedgerPage_;
CassandraPreparedStatement upperBound2_;
CassandraPreparedStatement getToken_;
CassandraPreparedStatement insertKey_;
CassandraPreparedStatement getCreated_;
CassandraPreparedStatement getBook_;
CassandraPreparedStatement insertBook_;
CassandraPreparedStatement deleteBook_;
CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_;
CassandraPreparedStatement insertLedgerHeader_;
CassandraPreparedStatement insertLedgerHash_;
CassandraPreparedStatement updateLedgerRange_;
CassandraPreparedStatement updateLedgerHeader_;
CassandraPreparedStatement selectLedgerBySeq_;
CassandraPreparedStatement selectLatestLedger_;
CassandraPreparedStatement selectLedgerRange_;
// io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_;
@@ -176,87 +363,6 @@ public:
{
{
std::lock_guard<std::mutex> lock(mutex_);
if (insertTransaction_)
{
cass_prepared_free(insertTransaction_);
insertTransaction_ = nullptr;
}
if (insertObject_)
{
cass_prepared_free(insertObject_);
insertObject_ = nullptr;
}
if (insertKey_)
{
cass_prepared_free(insertKey_);
insertKey_ = nullptr;
}
if (selectTransaction_)
{
cass_prepared_free(selectTransaction_);
selectTransaction_ = nullptr;
}
if (selectObject_)
{
cass_prepared_free(selectObject_);
selectObject_ = nullptr;
}
if (upperBound_)
{
cass_prepared_free(upperBound_);
upperBound_ = nullptr;
}
if (getToken_)
{
cass_prepared_free(getToken_);
getToken_ = nullptr;
}
if (getCreated_)
{
cass_prepared_free(getCreated_);
getCreated_ = nullptr;
}
if (getBook_)
{
cass_prepared_free(getBook_);
getBook_ = nullptr;
}
if (insertBook_)
{
cass_prepared_free(insertBook_);
insertBook_ = nullptr;
}
if (deleteBook_)
{
cass_prepared_free(deleteBook_);
deleteBook_ = nullptr;
}
if (insertAccountTx_)
{
cass_prepared_free(insertAccountTx_);
insertAccountTx_ = nullptr;
}
if (selectAccountTx_)
{
cass_prepared_free(selectAccountTx_);
selectAccountTx_ = nullptr;
}
if (insertLedgerHeader_)
{
cass_prepared_free(insertLedgerHeader_);
insertLedgerHeader_ = nullptr;
}
if (insertLedgerHash_)
{
cass_prepared_free(insertLedgerHash_);
insertLedgerHash_ = nullptr;
}
if (updateLedgerRange_)
{
cass_prepared_free(updateLedgerRange_);
updateLedgerRange_ = nullptr;
}
work_.reset();
ioThread_.join();
}
@@ -389,7 +495,6 @@ public:
{
return {fetchTransactions(hashes), retCursor};
}
return {{}, {}};
}
@@ -720,6 +825,8 @@ public:
<< " microseconds";
return lgrInfo;
}
std::optional<LedgerRange>
fetchLedgerRange() const override;
// Synchronously fetch the object with key key and store the result in
// pno
@@ -1434,218 +1541,42 @@ public:
write(WriteCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
{
CassStatement* statement = cass_prepared_bind(insertObject_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 1, data.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
const unsigned char* blobData = (unsigned char*)data.blob.data();
rc = cass_statement_bind_bytes(
statement,
2,
static_cast<cass_byte_t const*>(blobData),
data.blob.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
CassandraStatement statement{insertObject_};
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
statement.bindBytes(data.blob);
cass_future_set_callback(
fut, flatMapWriteCallback, static_cast<void*>(&data));
cass_future_free(fut);
executeAsync(statement, flatMapWriteCallback, data, isRetry);
}
}
void
writeDeletedKey(WriteCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
CassStatement* statement = cass_prepared_bind(insertKey_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 1, data.createdSequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 2, data.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapWriteKeyCallback, static_cast<void*>(&data));
cass_future_free(fut);
CassandraStatement statement{insertKey_};
statement.bindBytes(data.key);
statement.bindInt(data.createdSequence);
statement.bindInt(data.sequence);
executeAsync(statement, flatMapWriteKeyCallback, data, isRetry);
}
void
writeKey(WriteCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
if (data.isCreated)
{
CassStatement* statement = cass_prepared_bind(insertKey_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 1, data.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 2, INT64_MAX);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
CassandraStatement statement{insertKey_};
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
statement.bindInt(INT64_MAX);
cass_future_set_callback(
fut, flatMapWriteKeyCallback, static_cast<void*>(&data));
cass_future_free(fut);
executeAsync(statement, flatMapWriteKeyCallback, data, isRetry);
}
else if (data.isDeleted)
{
CassStatement* statement = cass_prepared_bind(getCreated_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* keyData = (unsigned char*)data.key.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
CassandraStatement statement{getCreated_};
cass_future_set_callback(
fut, flatMapGetCreatedCallback, static_cast<void*>(&data));
cass_future_free(fut);
executeAsync(statement, flatMapGetCreatedCallback, data, isRetry);
}
}
@@ -1653,88 +1584,19 @@ public:
writeBook(WriteCallbackData& data, bool isRetry) const
{
assert(data.isCreated or data.isDeleted);
if (!data.isCreated and !data.isDeleted)
throw std::runtime_error(
"writing book that's neither created or deleted");
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
CassStatement* statement = nullptr;
assert(data.book);
CassandraStatement statement{
(data.isCreated ? insertBook_ : deleteBook_)};
if (!statement.bindBytes(*data.book))
throw std::runtime_error("writeBook: bind error");
if (!statement.bindBytes(data.key))
throw std::runtime_error("writeBook: bind error");
if (!statement.bindInt(data.sequence))
throw std::runtime_error("writeBook: bind error");
if (data.isCreated)
statement = cass_prepared_bind(insertBook_);
else if (data.isDeleted)
statement = cass_prepared_bind(deleteBook_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* bookData = (unsigned char*)data.book->data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(bookData),
data.book->size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
const unsigned char* keyData = (unsigned char*)data.key.data();
rc = cass_statement_bind_bytes(
statement,
1,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 2, data.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
if (data.isCreated)
{
rc = cass_statement_bind_int64(statement, 3, INT64_MAX);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapWriteBookCallback, static_cast<void*>(&data));
cass_future_free(fut);
if (!statement.bindInt(INT64_MAX))
throw std::runtime_error("writeBook: bind error");
executeAsync(statement, flatMapWriteBookCallback, data, isRetry);
}
void
writeLedgerObject(
@@ -1784,75 +1646,22 @@ public:
void
writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
for (auto const& account : data.data.accounts)
{
CassStatement* statement = cass_prepared_bind(insertAccountTx_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* accountData = (unsigned char*)account.data();
CassError rc = cass_statement_bind_bytes(
CassandraStatement statement(insertAccountTx_);
if (!statement.bindBytes(account))
throw std::runtime_error(
"writeAccountTx : error binding account");
if (!statement.bindTuple(
data.data.ledgerSequence, data.data.transactionIndex))
throw std::runtime_error(
"writeAccountTx: error binding ledger seq and txn idx");
executeAsync(
statement,
0,
static_cast<cass_byte_t const*>(accountData),
account.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert account: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassTuple* idx = cass_tuple_new(2);
rc = cass_tuple_set_int64(idx, 0, data.data.ledgerSequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding ledger sequence to tuple: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_tuple_set_int64(idx, 1, data.data.transactionIndex);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding tx idx to tuple: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_tuple(statement, 1, idx);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding tuple: " << rc << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapWriteAccountTxCallback, static_cast<void*>(&data));
cass_future_free(fut);
flatMapWriteAccountTxCallback,
static_cast<void*>(&data),
isRetry);
}
}
@@ -1887,84 +1696,20 @@ public:
void
writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
CassandraStatement statement{insertTransaction_};
if (!statement.bindBytes(data.hash))
throw std::runtime_error("writeTransaction: error binding hash");
if (!statement.bindInt(data.sequence))
throw std::runtime_error(
"writeTransaction: error binding sequence");
if (!statement.bindBytes(data.transaction))
throw std::runtime_error(
"writeTransaction: error binding transaction");
if (!statement.bindBytes(data.metadata))
throw std::runtime_error(
"writeTransaction: error binding transaction");
CassStatement* statement = cass_prepared_bind(insertTransaction_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* hashData = (unsigned char*)data.hash.data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(hashData),
data.hash.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(statement, 1, data.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
const unsigned char* transactionData =
(unsigned char*)data.transaction.data();
rc = cass_statement_bind_bytes(
statement,
2,
static_cast<cass_byte_t const*>(transactionData),
data.transaction.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
const unsigned char* metadata = (unsigned char*)data.metadata.data();
rc = cass_statement_bind_bytes(
statement,
3,
static_cast<cass_byte_t const*>(metadata),
data.metadata.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapWriteTransactionCallback, static_cast<void*>(&data));
cass_future_free(fut);
executeAsync(statement, flatMapWriteTransactionCallback, data, isRetry);
}
void
writeTransaction(
@@ -2019,6 +1764,34 @@ public:
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
friend void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
template <class T, class S>
void
executeAsync(
CassandraStatement const& statement,
T callback,
S& callbackData,
bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
CassFuture* fut = cass_session_execute(session_.get(), statement.get());
cass_future_set_callback(
fut, callback, static_cast<void*>(&callbackData));
cass_future_free(fut);
}
};
} // namespace Backend

View File

@@ -745,7 +745,7 @@ CREATE TABLE IF NOT EXISTS ledgers (
CREATE TABLE IF NOT EXISTS objects (
key bytea NOT NULL,
ledger_seq bigint NOT NULL,
object bytea NOT NULL,
object bytea,
PRIMARY KEY(key, ledger_seq)
);
@@ -757,33 +757,29 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
-- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS transactions (
hash bytea PRIMARY KEY,
ledger_seq bigint,
transaction bytea,
metadata bytea,
FOREIGN KEY (ledger_seq)
REFERENCES ledgers (ledger_seq) ON DELETE CASCADE
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
transaction bytea NOT NULL,
metadata bytea NOT NULL
);
-- Index for lookups by transaction hash.
CREATE INDEX IF NOT EXISTS transactions_trans_id_idx ON transactions
USING hash (trans_id);
-- Table that maps accounts to transactions affecting them. Deletes from the
-- ledger table by way of transactions table cascade here based on ledger_seq.
-- ledger table cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS account_transactions (
account bytea NOT NULL,
ledger_seq bigint NOT NULL,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
transaction_index bigint NOT NULL,
constraint account_transactions_pkey PRIMARY KEY (account, ledger_seq,
transaction_index),
constraint account_transactions_fkey FOREIGN KEY (ledger_seq,
transaction_index) REFERENCES transactions (
ledger_seq, transaction_index) ON DELETE CASCADE
hash bytea NOT NULL,
PRIMARY KEY (account, ledger_seq, transaction_index),
);
-- Table that maps a book to a list of offers in that book. Deletes from the ledger table
-- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS books (
book bytea NOT NULL,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
deleted boolean NOT NULL,
offer_key bytea NOT NULL,
PRIMARY KEY(book, offer_key, deleted)
);
-- Index to allow for fast cascading deletions and referential integrity.
CREATE INDEX IF NOT EXISTS fki_account_transactions_idx ON
account_transactions USING btree (ledger_seq, transaction_index);
-- Avoid inadvertent administrative tampering with committed data.
CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO
@@ -792,44 +788,11 @@ CREATE OR REPLACE RULE transactions_update_protect AS ON UPDATE TO
transactions DO INSTEAD NOTHING;
CREATE OR REPLACE RULE account_transactions_update_protect AS ON UPDATE TO
account_transactions DO INSTEAD NOTHING;
CREATE OR REPLACE RULE objects_update_protect AS ON UPDATE TO
objects DO INSTEAD NOTHING;
CREATE OR REPLACE RULE books_update_protect AS ON UPDATE TO
books DO INSTEAD NOTHING;
-- Stored procedure to assist with the tx() RPC call. Takes transaction hash
-- as input. If found, returns the ledger sequence in which it was applied.
-- If not, returns the range of ledgers searched.
CREATE OR REPLACE FUNCTION tx (
_in_trans_id bytea
) RETURNS jsonb AS $$
DECLARE
_min_ledger bigint := min_ledger();
_min_seq bigint := (SELECT ledger_seq
FROM ledgers
WHERE ledger_seq = _min_ledger
FOR SHARE);
_max_seq bigint := max_ledger();
_ledger_seq bigint;
_nodestore_hash bytea;
BEGIN
IF _min_seq IS NULL THEN
RETURN jsonb_build_object('error', 'empty database');
END IF;
IF length(_in_trans_id) != 32 THEN
RETURN jsonb_build_object('error', '_in_trans_id size: '
|| to_char(length(_in_trans_id), '999'));
END IF;
EXECUTE 'SELECT nodestore_hash, ledger_seq
FROM transactions
WHERE trans_id = $1
AND ledger_seq BETWEEN $2 AND $3
' INTO _nodestore_hash, _ledger_seq USING _in_trans_id, _min_seq, _max_seq;
IF _nodestore_hash IS NULL THEN
RETURN jsonb_build_object('min_seq', _min_seq, 'max_seq', _max_seq);
END IF;
RETURN jsonb_build_object('nodestore_hash', _nodestore_hash, 'ledger_seq',
_ledger_seq);
END;
$$ LANGUAGE plpgsql;
-- Return the earliest ledger sequence intended for range operations
-- that protect the bottom of the range from deletion. Return NULL if empty.
@@ -852,195 +815,6 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- account_tx() RPC helper. From the rippled reporting process, only the
-- parameters without defaults are required. For the parameters with
-- defaults, validation should be done by rippled, such as:
-- _in_account_id should be a valid xrp base58 address.
-- _in_forward either true or false according to the published api
-- _in_limit should be validated and not simply passed through from
-- client.
--
-- For _in_ledger_index_min and _in_ledger_index_max, if passed in the
-- request, verify that their type is int and pass through as is.
-- For _ledger_hash, verify and convert from hex length 32 bytes and
-- prepend with \x (\\x C++).
--
-- For _in_ledger_index, if the input type is integer, then pass through
-- as is. If the type is string and contents = validated, then do not
-- set _in_ledger_index. Instead set _in_invalidated to TRUE.
--
-- There is no need for rippled to do any type of lookup on max/min
-- ledger range, lookup of hash, or the like. This functions does those
-- things, including error responses if bad input. Only the above must
-- be done to set the correct search range.
--
-- If a marker is present in the request, verify the members 'ledger'
-- and 'seq' are integers and they correspond to _in_marker_seq
-- _in_marker_index.
-- To reiterate:
-- JSON input field 'ledger' corresponds to _in_marker_seq
-- JSON input field 'seq' corresponds to _in_marker_index
CREATE OR REPLACE FUNCTION account_tx (
_in_account_id bytea,
_in_forward bool,
_in_limit bigint,
_in_ledger_index_min bigint = NULL,
_in_ledger_index_max bigint = NULL,
_in_ledger_hash bytea = NULL,
_in_ledger_index bigint = NULL,
_in_validated bool = NULL,
_in_marker_seq bigint = NULL,
_in_marker_index bigint = NULL
) RETURNS jsonb AS $$
DECLARE
_min bigint;
_max bigint;
_sort_order text := (SELECT CASE WHEN _in_forward IS TRUE THEN
'ASC' ELSE 'DESC' END);
_marker bool;
_between_min bigint;
_between_max bigint;
_sql text;
_cursor refcursor;
_result jsonb;
_record record;
_tally bigint := 0;
_ret_marker jsonb;
_transactions jsonb[] := '{}';
BEGIN
IF _in_ledger_index_min IS NOT NULL OR
_in_ledger_index_max IS NOT NULL THEN
_min := (SELECT CASE WHEN _in_ledger_index_min IS NULL
THEN min_ledger() ELSE greatest(
_in_ledger_index_min, min_ledger()) END);
_max := (SELECT CASE WHEN _in_ledger_index_max IS NULL OR
_in_ledger_index_max = -1 THEN max_ledger() ELSE
least(_in_ledger_index_max, max_ledger()) END);
IF _max < _min THEN
RETURN jsonb_build_object('error', 'max is less than min ledger');
END IF;
ELSIF _in_ledger_hash IS NOT NULL OR _in_ledger_index IS NOT NULL
OR _in_validated IS TRUE THEN
IF _in_ledger_hash IS NOT NULL THEN
IF length(_in_ledger_hash) != 32 THEN
RETURN jsonb_build_object('error', '_in_ledger_hash size: '
|| to_char(length(_in_ledger_hash), '999'));
END IF;
EXECUTE 'SELECT ledger_seq
FROM ledgers
WHERE ledger_hash = $1'
INTO _min USING _in_ledger_hash::bytea;
ELSE
IF _in_ledger_index IS NOT NULL AND _in_validated IS TRUE THEN
RETURN jsonb_build_object('error',
'_in_ledger_index cannot be set and _in_validated true');
END IF;
IF _in_validated IS TRUE THEN
_in_ledger_index := max_ledger();
END IF;
_min := (SELECT ledger_seq
FROM ledgers
WHERE ledger_seq = _in_ledger_index);
END IF;
IF _min IS NULL THEN
RETURN jsonb_build_object('error', 'ledger not found');
END IF;
_max := _min;
ELSE
_min := min_ledger();
_max := max_ledger();
END IF;
IF _in_marker_seq IS NOT NULL OR _in_marker_index IS NOT NULL THEN
_marker := TRUE;
IF _in_marker_seq IS NULL OR _in_marker_index IS NULL THEN
-- The rippled implementation returns no transaction results
-- if either of these values are missing.
_between_min := 0;
_between_max := 0;
ELSE
IF _in_forward IS TRUE THEN
_between_min := _in_marker_seq;
_between_max := _max;
ELSE
_between_min := _min;
_between_max := _in_marker_seq;
END IF;
END IF;
ELSE
_marker := FALSE;
_between_min := _min;
_between_max := _max;
END IF;
IF _between_max < _between_min THEN
RETURN jsonb_build_object('error', 'ledger search range is '
|| to_char(_between_min, '999') || '-'
|| to_char(_between_max, '999'));
END IF;
_sql := format('
SELECT transactions.ledger_seq, transactions.transaction_index,
transactions.trans_id, transactions.nodestore_hash
FROM transactions
INNER JOIN account_transactions
ON transactions.ledger_seq =
account_transactions.ledger_seq
AND transactions.transaction_index =
account_transactions.transaction_index
WHERE account_transactions.account = $1
AND account_transactions.ledger_seq BETWEEN $2 AND $3
ORDER BY transactions.ledger_seq %s, transactions.transaction_index %s
', _sort_order, _sort_order);
OPEN _cursor FOR EXECUTE _sql USING _in_account_id, _between_min,
_between_max;
LOOP
FETCH _cursor INTO _record;
IF _record IS NULL THEN EXIT; END IF;
IF _marker IS TRUE THEN
IF _in_marker_seq = _record.ledger_seq THEN
IF _in_forward IS TRUE THEN
IF _in_marker_index > _record.transaction_index THEN
CONTINUE;
END IF;
ELSE
IF _in_marker_index < _record.transaction_index THEN
CONTINUE;
END IF;
END IF;
END IF;
_marker := FALSE;
END IF;
_tally := _tally + 1;
IF _tally > _in_limit THEN
_ret_marker := jsonb_build_object(
'ledger', _record.ledger_seq,
'seq', _record.transaction_index);
EXIT;
END IF;
-- Is the transaction index in the tx object?
_transactions := _transactions || jsonb_build_object(
'ledger_seq', _record.ledger_seq,
'transaction_index', _record.transaction_index,
'trans_id', _record.trans_id,
'nodestore_hash', _record.nodestore_hash);
END LOOP;
CLOSE _cursor;
_result := jsonb_build_object('ledger_index_min', _min,
'ledger_index_max', _max,
'transactions', _transactions);
IF _ret_marker IS NOT NULL THEN
_result := _result || jsonb_build_object('marker', _ret_marker);
END IF;
RETURN _result;
END;
$$ LANGUAGE plpgsql;
-- Trigger prior to insert on ledgers table. Validates length of hash fields.
-- Verifies ancestry based on ledger_hash & prev_hash as follows:
@@ -1542,36 +1316,3 @@ getLedger(
return info;
}
std::optional<LedgerRange>
getLedgerRange(std::shared_ptr<PgPool>& pgPool)
{
auto range = PgQuery(pgPool)("SELECT complete_ledgers()");
if (!range)
return {};
std::string res{range.c_str()};
try
{
size_t minVal = 0;
size_t maxVal = 0;
if (res == "empty" || res == "error" || res.empty())
return {};
else if (size_t delim = res.find('-'); delim != std::string::npos)
{
minVal = std::stol(res.substr(0, delim));
maxVal = std::stol(res.substr(delim + 1));
}
else
{
minVal = maxVal = std::stol(res);
}
return LedgerRange{minVal, maxVal};
}
catch (std::exception&)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Error parsing result of getCompleteLedgers()";
}
return {};
}

View File

@@ -524,8 +524,4 @@ getLedger(
std::variant<std::monostate, ripple::uint256, uint32_t> const& whichLedger,
std::shared_ptr<PgPool>& pgPool);
using LedgerRange = boost::icl::closed_interval<uint32_t>;
std::optional<LedgerRange>
getLedgerRange(std::shared_ptr<PgPool>& pgPool);
#endif // RIPPLE_CORE_PG_H_INCLUDED

View File

@@ -195,6 +195,40 @@ PostgresBackend::fetchLedgerBySequence(uint32_t sequence) const
return {};
}
std::optional<LedgerRange>
PostgresBackend::fetchLedgerRange() const
{
auto range = PgQuery(pgPool_)("SELECT complete_ledgers()");
if (!range)
return {};
std::string res{range.c_str()};
try
{
size_t minVal = 0;
size_t maxVal = 0;
if (res == "empty" || res == "error" || res.empty())
return {};
else if (size_t delim = res.find('-'); delim != std::string::npos)
{
minVal = std::stol(res.substr(0, delim));
maxVal = std::stol(res.substr(delim + 1));
}
else
{
minVal = maxVal = std::stol(res);
}
return LedgerRange{minVal, maxVal};
}
catch (std::exception&)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Error parsing result of getCompleteLedgers()";
}
return {};
}
std::optional<Blob>
PostgresBackend::fetchLedgerObject(
ripple::uint256 const& key,
@@ -449,6 +483,7 @@ PostgresBackend::fetchAccountTransactions(
void
PostgresBackend::open()
{
initSchema(pgPool_);
}
void

View File

@@ -23,6 +23,9 @@ public:
std::optional<ripple::LedgerInfo>
fetchLedgerBySequence(uint32_t sequence) const override;
std::optional<LedgerRange>
fetchLedgerRange() const override;
std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const override;

View File

@@ -637,8 +637,6 @@ ReportingETL::ReportingETL(
: publishStrand_(ioc)
, ioContext_(ioc)
, flatMapBackend_(Backend::makeBackend(config))
, pgPool_(make_PgPool(
config.at("database").as_object().at("postgres").as_object()))
, loadBalancer_(
config.at("etl_sources").as_array(),
*flatMapBackend_,
@@ -646,6 +644,5 @@ ReportingETL::ReportingETL(
ioc)
{
flatMapBackend_->open();
initSchema(pgPool_);
}

View File

@@ -60,7 +60,6 @@ class ReportingETL
{
private:
std::unique_ptr<BackendInterface> flatMapBackend_;
std::shared_ptr<PgPool> pgPool_;
std::thread worker_;
boost::asio::io_context& ioContext_;
@@ -327,12 +326,6 @@ public:
return *flatMapBackend_;
}
std::shared_ptr<PgPool>&
getPgPool()
{
return pgPool_;
}
private:
void
doWork();

View File

@@ -54,18 +54,13 @@ std::unordered_map<std::string, RPCCommand> commandMap{
boost::json::object
doAccountInfo(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& postgres);
BackendInterface const& backend);
boost::json::object
doTx(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool);
doTx(boost::json::object const& request, BackendInterface const& backend);
boost::json::object
doAccountTx(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool);
BackendInterface const& backend);
boost::json::object
doLedgerData(
boost::json::object const& request,
@@ -73,14 +68,12 @@ doLedgerData(
boost::json::object
doBookOffers(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool);
BackendInterface const& backend);
boost::json::object
buildResponse(
boost::json::object const& request,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
BackendInterface const& backend)
{
std::string command = request.at("command").as_string().c_str();
BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request;
@@ -88,10 +81,10 @@ buildResponse(
switch (commandMap[command])
{
case tx:
return doTx(request, backend, pgPool);
return doTx(request, backend);
break;
case account_tx:
return doAccountTx(request, backend, pgPool);
return doAccountTx(request, backend);
break;
case ledger:
break;
@@ -99,10 +92,10 @@ buildResponse(
return doLedgerData(request, backend);
break;
case account_info:
return doAccountInfo(request, backend, pgPool);
return doAccountInfo(request, backend);
break;
case book_offers:
return doBookOffers(request, backend, pgPool);
return doBookOffers(request, backend);
break;
default:
BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command;
@@ -123,15 +116,13 @@ class session : public std::enable_shared_from_this<session>
boost::beast::flat_buffer buffer_;
std::string response_;
BackendInterface const& backend_;
std::shared_ptr<PgPool>& pgPool_;
public:
// Take ownership of the socket
explicit session(
boost::asio::ip::tcp::socket&& socket,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
: ws_(std::move(socket)), backend_(backend), pgPool_(pgPool)
BackendInterface const& backend)
: ws_(std::move(socket)), backend_(backend)
{
}
@@ -207,7 +198,7 @@ public:
boost::json::value raw = boost::json::parse(msg);
// BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed";
boost::json::object request = raw.as_object();
auto response = buildResponse(request, backend_, pgPool_);
auto response = buildResponse(request, backend_);
BOOST_LOG_TRIVIAL(debug) << __func__ << response;
response_ = boost::json::serialize(response);
@@ -243,15 +234,13 @@ class listener : public std::enable_shared_from_this<listener>
boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_;
BackendInterface const& backend_;
std::shared_ptr<PgPool>& pgPool_;
public:
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
: ioc_(ioc), acceptor_(ioc), backend_(backend), pgPool_(pgPool)
BackendInterface const& backend)
: ioc_(ioc), acceptor_(ioc), backend_(backend)
{
boost::beast::error_code ec;
@@ -316,8 +305,7 @@ private:
else
{
// Create the session and run it
std::make_shared<session>(std::move(socket), backend_, pgPool_)
->run();
std::make_shared<session>(std::move(socket), backend_)->run();
}
// Accept another connection
@@ -423,8 +411,7 @@ main(int argc, char* argv[])
std::make_shared<listener>(
ioc,
boost::asio::ip::tcp::endpoint{address, port},
etl.getFlatMapBackend(),
etl.getPgPool())
etl.getFlatMapBackend())
->run();
// Run the I/O service on the requested number of threads