diff --git a/handlers/AccountInfo.cpp b/handlers/AccountInfo.cpp index 59aec6c2..6a842d0b 100644 --- a/handlers/AccountInfo.cpp +++ b/handlers/AccountInfo.cpp @@ -43,8 +43,7 @@ boost::json::object doAccountInfo( boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& postgres) + BackendInterface const& backend) { boost::json::object response; std::string strIdent; @@ -60,7 +59,7 @@ doAccountInfo( size_t ledgerSequence = 0; if (not request.contains("ledger_index")) { - std::optional latest = getLedger({}, postgres); + auto latest = backend.fetchLatestLedgerSequence(); if (not latest) { @@ -69,7 +68,7 @@ doAccountInfo( } else { - ledgerSequence = latest->seq; + ledgerSequence = *latest; } } else diff --git a/handlers/AccountTx.cpp b/handlers/AccountTx.cpp index 67f10517..6c5256ec 100644 --- a/handlers/AccountTx.cpp +++ b/handlers/AccountTx.cpp @@ -127,10 +127,7 @@ doAccountTxStoredProcedure( // resume previous query // } boost::json::object -doAccountTx( - boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& pgPool) +doAccountTx(boost::json::object const& request, BackendInterface const& backend) { boost::json::object response; diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index e76120c0..5e629d06 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -90,15 +90,11 @@ loadBookOfferIndexes( boost::json::object doBookOffers( boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& pool) + BackendInterface const& backend) { std::cout << "enter" << std::endl; boost::json::object response; - auto sequence = ledgerSequenceFromRequest(request, pool); - - if (!sequence) - return response; + uint32_t sequence = request.at("ledger_index").as_int64(); if (!request.contains("taker_pays")) { @@ -310,7 +306,7 @@ doBookOffers( ripple::uint256 bookBase = getBookBase(book); auto start = std::chrono::system_clock::now(); auto [offers, retCursor] = - backend.fetchBookOffers(bookBase, *sequence, limit, cursor); + backend.fetchBookOffers(bookBase, sequence, limit, cursor); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(warning) << "Time loading books from Postgres: " diff --git a/handlers/Tx.cpp b/handlers/Tx.cpp index 65cc83bc..ee9304a5 100644 --- a/handlers/Tx.cpp +++ b/handlers/Tx.cpp @@ -26,10 +26,7 @@ // } boost::json::object -doTx( - boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& postgres) +doTx(boost::json::object const& request, BackendInterface const& backend) { boost::json::object response; if (!request.contains("transaction")) @@ -44,7 +41,7 @@ doTx( return response; } - auto range = getLedgerRange(postgres); + auto range = backend.fetchLedgerRange(); if (!range) { response["error"] = "Database is empty"; @@ -55,8 +52,8 @@ doTx( if (!dbResponse) { response["error"] = "Transaction not found in Cassandra"; - response["ledger_range"] = std::to_string(range->lower()) + " - " + - std::to_string(range->upper()); + response["ledger_range"] = std::to_string(range->minSequence) + " - " + + std::to_string(range->maxSequence); return response; } diff --git a/reporting/BackendFactory.h b/reporting/BackendFactory.h index b2932c26..2ec0d9e5 100644 --- a/reporting/BackendFactory.h +++ b/reporting/BackendFactory.h @@ -2,6 +2,7 @@ #define RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED #include #include +#include namespace Backend { std::unique_ptr makeBackend(boost::json::object const& config) @@ -14,6 +15,12 @@ makeBackend(boost::json::object const& config) dbConfig.at("cassandra").as_object()); return std::move(backend); } + else if (dbConfig.contains("postgres")) + { + auto backend = std::make_unique( + dbConfig.at("postgres").as_object()); + return std::move(backend); + } return nullptr; } } // namespace Backend diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 31d7a75d..3fd78e0d 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -27,6 +27,12 @@ struct AccountTransactionsCursor uint32_t transactionIndex; }; +struct LedgerRange +{ + uint32_t minSequence; + uint32_t maxSequence; +}; + class BackendInterface { public: @@ -38,6 +44,9 @@ public: virtual std::optional fetchLedgerBySequence(uint32_t sequence) const = 0; + virtual std::optional + fetchLedgerRange() const = 0; + virtual std::optional fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0; diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index b33ecaae..b1f9481b 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -452,6 +452,74 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) } } +std::optional +CassandraBackend::fetchLedgerRange() const +{ + BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; + auto start = std::chrono::system_clock::now(); + CassStatement* statement = cass_prepared_bind(selectLedgerRange_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassFuture* fut; + CassError rc; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); + CassIterator* iter = cass_iterator_from_result(res); + std::optional min; + std::optional max; + if (cass_iterator_next(iter)) + { + cass_int64_t sequence; + rc = cass_value_get_int64( + cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + LedgerRange range; + range.minSequence = sequence; + if (!cass_iterator_next(iter)) + { + cass_result_free(res); + return range; + } + rc = cass_value_get_int64( + cass_row_get_column(cass_iterator_get_row(iter), 0), &sequence); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + cass_result_free(res); + range.maxSequence = sequence; + if (range.minSequence > range.maxSequence) + { + std::swap(range.minSequence, range.maxSequence); + } + return range; + } + return {}; +} + void CassandraBackend::open() { @@ -1023,217 +1091,53 @@ CassandraBackend::open() std::stringstream query; query << "INSERT INTO " << tableName << "flat" << " (key, sequence, object) VALUES (?, ?, ?)"; - CassFuture* prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing insert : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!insertObject_.prepareStatement(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - insertObject_ = cass_future_get_prepared(prepare_future); - - /* The future can be freed immediately after getting the prepared - * object - */ - cass_future_free(prepare_future); query = {}; query << "INSERT INTO " << tableName << "flattransactions" << " (hash, sequence, transaction, metadata) VALUES (?, ?, " "?, ?)"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing insert : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!insertTransaction_.prepare(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - insertTransaction_ = cass_future_get_prepared(prepare_future); - cass_future_free(prepare_future); query = {}; query << "INSERT INTO " << tableName << "keys" << " (key, created, deleted) VALUES (?, ?, ?)"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing insert : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!insertKey_.prepare(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - insertKey_ = cass_future_get_prepared(prepare_future); - cass_future_free(prepare_future); query = {}; query << "INSERT INTO " << tableName << "books" << " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing insert : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!insertBook_.prepareStatement(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - insertBook_ = cass_future_get_prepared(prepare_future); - cass_future_free(prepare_future); query = {}; - query << "INSERT INTO " << tableName << "books" << " (book, key, deleted_at) VALUES (?, ?, ?)"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing insert : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!deleteBook_.prepareStatement(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - deleteBook_ = cass_future_get_prepared(prepare_future); - cass_future_free(prepare_future); query = {}; query << "SELECT created FROM " << tableName << "keys" << " WHERE key = ? ORDER BY created desc LIMIT 1"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing insert : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!getCreated_.prepareStatement(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - getCreated_ = cass_future_get_prepared(prepare_future); - cass_future_free(prepare_future); query = {}; query << "SELECT object, sequence FROM " << tableName << "flat" << " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC " "LIMIT 1"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing select : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!selectObject_.prepareStatement(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - selectObject_ = cass_future_get_prepared(prepare_future); - - /* The future can be freed immediately after getting the prepared - * object - */ - cass_future_free(prepare_future); query = {}; query << "SELECT transaction,metadata FROM " << tableName << "flattransactions" << " WHERE hash = ?"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - /* Wait for the statement to prepare and get the result */ - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - /* Handle error */ - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing select : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!selectTransaction_.prepareStatement(query, session_.get())) continue; - } - - /* Get the prepared object from the future */ - selectTransaction_ = cass_future_get_prepared(prepare_future); - - /* The future can be freed immediately after getting the prepared - * object - */ - cass_future_free(prepare_future); query = {}; query << "SELECT key FROM " << tableName << "keys " @@ -1241,111 +1145,32 @@ CassandraBackend::open() << " and deleted > ?" << " PER PARTITION LIMIT 1 LIMIT ?" << " ALLOW FILTERING"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing upperBound : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str(); + if (!selectLedgerPage_.prepareStatement(query, session_.get())) continue; - } - - // Get the prepared object from the future - upperBound_ = cass_future_get_prepared(prepare_future); - - // The future can be freed immediately after getting the prepared - // object - // - cass_future_free(prepare_future); + /* query = {}; query << "SELECT filterempty(key,object) FROM " << tableName << "flat " << " WHERE TOKEN(key) >= ? and sequence <= ?" << " PER PARTITION LIMIT 1 LIMIT ?" << " ALLOW FILTERING"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing upperBound : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str() << " : " << query.str(); + if (!upperBound2_.prepareStatement(query, session_.get())) continue; - } - - // Get the prepared object from the future - upperBound2_ = cass_future_get_prepared(prepare_future); - - // The future can be freed immediately after getting the prepared - // object - // - cass_future_free(prepare_future); +*/ query = {}; query << "SELECT TOKEN(key) FROM " << tableName << "flat " << " WHERE key = ? LIMIT 1"; - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing getToken : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!getToken_.prepareStatement(query, session_.get())) continue; - } - - getToken_ = cass_future_get_prepared(prepare_future); query = {}; query << "SELECT key FROM " << tableName << "books " << " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND" " key > ? " " ORDER BY key ASC LIMIT ? ALLOW FILTERING"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing getToken : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!getBook_.prepareStatement(query, session_.get())) continue; - } getBook_ = cass_future_get_prepared(prepare_future); @@ -1353,165 +1178,53 @@ CassandraBackend::open() query << " INSERT INTO " << tableName << "account_tx" << " (account, seq_idx, hash) " << " VALUES (?,?,?)"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing getToken : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!insertAccountTx_.prepareStatement(query, session_.get())) continue; - } - insertAccountTx_ = cass_future_get_prepared(prepare_future); query = {}; query << " SELECT hash,seq_idx FROM " << tableName << "account_tx" << " WHERE account = ? " << " AND seq_idx < ? LIMIT ?"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing getToken : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; - } - selectAccountTx_ = cass_future_get_prepared(prepare_future); query = {}; query << " INSERT INTO " << tableName << "ledgers " << " (sequence, header) VALUES(?,?)"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing getToken : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!insertLedgerHeader_.prepareStatement(query, session_.get())) continue; - } - insertLedgerHeader_ = cass_future_get_prepared(prepare_future); query = {}; query << " INSERT INTO " << tableName << "ledger_hashes" << " (hash, sequence) VALUES(?,?)"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // Wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // Handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: Error preparing getToken : " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!insertLedgerHash_.prepareStatement(query, session_.get())) continue; - } - insertLedgerHash_ = cass_future_get_prepared(prepare_future); query = {}; query << " update " << tableName << "ledger_range" << " set sequence = ? where is_latest = ? if sequence != ?"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: error preparing updateLedgerRange : " << rc - << ", " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; - } - updateLedgerRange_ = cass_future_get_prepared(prepare_future); query = {}; query << " select header from " << tableName << "ledgers where sequence = ?"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: error preparing selectLedgerBySeq : " << rc - << ", " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!selectLedgerBySeq_.prepareStatement(query, session_.get())) continue; - } - selectLedgerBySeq_ = cass_future_get_prepared(prepare_future); query = {}; query << " select sequence from " << tableName << "ledger_range where is_latest = true"; - - prepare_future = - cass_session_prepare(session_.get(), query.str().c_str()); - - // wait for the statement to prepare and get the result - rc = cass_future_error_code(prepare_future); - - if (rc != CASS_OK) - { - // handle error - cass_future_free(prepare_future); - - std::stringstream ss; - ss << "nodestore: error preparing selectLatestLedger : " << rc - << ", " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << ss.str(); + if (!selectLatestLedger_.prepareStatement(query, session_.get())) continue; - } - selectLatestLedger_ = cass_future_get_prepared(prepare_future); + query = {}; + query << " SELECT sequence FROM " << tableName << "ledger_range WHERE " + << " is_latest IN (true, false)"; + if (!selectLedgerRange_.prepareStatement(query, session_.get())) + continue; + + selectLedgerRange_ = cass_future_get_prepared(prepare_future); setupPreparedStatements = true; } diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 7297d3c1..a4c93c19 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -57,6 +57,192 @@ flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); void flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); +class CassandraPreparedStatement +{ +private: + CassPrepared const* prepared_ = nullptr; + CassStatement* statement_ = nullptr; + +public: + CassPrepared const* + get() + { + return statement_; + } + + bool + prepareStatement(std::stringstream const& query) + { + return prepareStatement(query.str().c_str()); + } + + bool + prepareStatement(std::string const& query) + { + return prepareStatement(query.c_str()); + } + + bool + prepareStatement(char const* query, CassSession* session) + { + if (!query) + throw std::runtime_error("prepareStatement: null query"); + if (!session) + throw std::runtime_error("prepareStatement: null sesssion"); + CassFuture* prepareFuture = cass_session_prepare(session, query); + /* Wait for the statement to prepare and get the result */ + CassError rc = cass_future_error_code(prepareFuture); + if (rc == CASS_OK) + { + prepared_ = cass_future_get_prepared(prepareFuture); + } + else + { + std::stringstream ss; + ss << "nodestore: Error preparing statement : " << rc << ", " + << cass_error_desc(rc) << ". query : " << query; + BOOST_LOG_TRIVIAL(error) << ss.str(); + } + cass_future_free(prepareFuture); + return rc == CASS_OK; + } + + ~CassandraPreparedStatement() + { + if (prepared_) + { + cass_prepared_free(prepared_); + prepared_ = nullptr; + } + } +}; + +class CassandraStatement +{ + CassStatement* statement_ = nullptr; + size_t curBindingIndex_ = 0; + + CassandraStatement(CassandraPreparedStatement const& prepared) + { + statement_ = cass_prepared_bind(prepared.get()); + cass_statement_set_consistency(statement_, CASS_CONSISTENCY_QUORUM); + } + + CassStatement* + get() const + { + return statement_; + } + + bool + bindBytes(const char* data, uint32_t size) + { + return bindBytes((unsigned char*)data, size); + } + + template + bool + bindBytes(ripple::base_uint const& data) + { + return bindBytes(data.data(), data.size()); + } + + bool + bindBytes(std::string const& data) + { + return bindBytes(data.data(), data.size()); + } + + bool + bindBytes(const unsigned char* data, uint32_t size) + { + if (!statement_) + throw std::runtime_error( + "CassandraStatement::bindBytes - statement_ is null"); + CassError rc = cass_statement_bind_bytes( + statement_, + curBindingIndex_, + static_cast(data), + size); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Error binding bytes to statement: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + return false; + } + curBindingIndex_++; + return true; + } + + bool + bindInt(uint32_t value) + { + return bindInt((int64_t)value); + } + + bool + bindInt(int64_t value) + { + if (!statement_) + throw std::runtime_error( + "CassandraStatement::bindInt - statement_ is null"); + CassError rc = + cass_statement_bind_int64(statement_, curBindingIndex_, value); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Error binding int to statement: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + return false; + } + curBindingIndex_++; + return true; + } + + bool + bindIntTuple(uint32_t first, uint32_t second) + { + CassTuple* tuple = cass_tuple_new(2); + rc = cass_tuple_set_int64(tuple, 0, first); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Error binding int to tuple: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + return false; + } + rc = cass_tuple_set_int64(tuple, 1, second); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Error binding int to tuple: " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + return false; + } + rc = cass_statement_bind_tuple(statement_, curBindingIndex_, tuple); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Binding tuple: " << rc << ", " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); + return false; + } + curBindingIndex_++; + return true; + } + + CassandraStatement() + { + if (statement_) + cass_statement_free(statement_); + } +}; + class CassandraBackend : public BackendInterface { private: @@ -95,26 +281,27 @@ private: // Database statements cached server side. Using these is more efficient // than making a new statement - const CassPrepared* insertObject_ = nullptr; - const CassPrepared* insertTransaction_ = nullptr; - const CassPrepared* selectTransaction_ = nullptr; - const CassPrepared* selectObject_ = nullptr; - const CassPrepared* upperBound_ = nullptr; - const CassPrepared* upperBound2_ = nullptr; - const CassPrepared* getToken_ = nullptr; - const CassPrepared* insertKey_ = nullptr; - const CassPrepared* getCreated_ = nullptr; - const CassPrepared* getBook_ = nullptr; - const CassPrepared* insertBook_ = nullptr; - const CassPrepared* deleteBook_ = nullptr; - const CassPrepared* insertAccountTx_ = nullptr; - const CassPrepared* selectAccountTx_ = nullptr; - const CassPrepared* insertLedgerHeader_ = nullptr; - const CassPrepared* insertLedgerHash_ = nullptr; - const CassPrepared* updateLedgerRange_ = nullptr; - const CassPrepared* updateLedgerHeader_ = nullptr; - const CassPrepared* selectLedgerBySeq_ = nullptr; - const CassPrepared* selectLatestLedger_ = nullptr; + CassandraPreparedStatement insertObject_; + CassandraPreparedStatement insertTransaction_; + CassandraPreparedStatement selectTransaction_; + CassandraPreparedStatement selectObject_; + CassandraPreparedStatement selectLedgerPage_; + CassandraPreparedStatement upperBound2_; + CassandraPreparedStatement getToken_; + CassandraPreparedStatement insertKey_; + CassandraPreparedStatement getCreated_; + CassandraPreparedStatement getBook_; + CassandraPreparedStatement insertBook_; + CassandraPreparedStatement deleteBook_; + CassandraPreparedStatement insertAccountTx_; + CassandraPreparedStatement selectAccountTx_; + CassandraPreparedStatement insertLedgerHeader_; + CassandraPreparedStatement insertLedgerHash_; + CassandraPreparedStatement updateLedgerRange_; + CassandraPreparedStatement updateLedgerHeader_; + CassandraPreparedStatement selectLedgerBySeq_; + CassandraPreparedStatement selectLatestLedger_; + CassandraPreparedStatement selectLedgerRange_; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -176,87 +363,6 @@ public: { { std::lock_guard lock(mutex_); - if (insertTransaction_) - { - cass_prepared_free(insertTransaction_); - insertTransaction_ = nullptr; - } - if (insertObject_) - { - cass_prepared_free(insertObject_); - insertObject_ = nullptr; - } - if (insertKey_) - { - cass_prepared_free(insertKey_); - insertKey_ = nullptr; - } - if (selectTransaction_) - { - cass_prepared_free(selectTransaction_); - selectTransaction_ = nullptr; - } - if (selectObject_) - { - cass_prepared_free(selectObject_); - selectObject_ = nullptr; - } - if (upperBound_) - { - cass_prepared_free(upperBound_); - upperBound_ = nullptr; - } - if (getToken_) - { - cass_prepared_free(getToken_); - getToken_ = nullptr; - } - if (getCreated_) - { - cass_prepared_free(getCreated_); - getCreated_ = nullptr; - } - if (getBook_) - { - cass_prepared_free(getBook_); - getBook_ = nullptr; - } - if (insertBook_) - { - cass_prepared_free(insertBook_); - insertBook_ = nullptr; - } - if (deleteBook_) - { - cass_prepared_free(deleteBook_); - deleteBook_ = nullptr; - } - if (insertAccountTx_) - { - cass_prepared_free(insertAccountTx_); - insertAccountTx_ = nullptr; - } - - if (selectAccountTx_) - { - cass_prepared_free(selectAccountTx_); - selectAccountTx_ = nullptr; - } - if (insertLedgerHeader_) - { - cass_prepared_free(insertLedgerHeader_); - insertLedgerHeader_ = nullptr; - } - if (insertLedgerHash_) - { - cass_prepared_free(insertLedgerHash_); - insertLedgerHash_ = nullptr; - } - if (updateLedgerRange_) - { - cass_prepared_free(updateLedgerRange_); - updateLedgerRange_ = nullptr; - } work_.reset(); ioThread_.join(); } @@ -389,7 +495,6 @@ public: { return {fetchTransactions(hashes), retCursor}; } - return {{}, {}}; } @@ -720,6 +825,8 @@ public: << " microseconds"; return lgrInfo; } + std::optional + fetchLedgerRange() const override; // Synchronously fetch the object with key key and store the result in // pno @@ -1434,218 +1541,42 @@ public: write(WriteCallbackData& data, bool isRetry) const { { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } - { - CassStatement* statement = cass_prepared_bind(insertObject_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - const unsigned char* keyData = (unsigned char*)data.key.data(); - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(keyData), - data.key.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_int64(statement, 1, data.sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - const unsigned char* blobData = (unsigned char*)data.blob.data(); - rc = cass_statement_bind_bytes( - statement, - 2, - static_cast(blobData), - data.blob.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); + CassandraStatement statement{insertObject_}; + statement.bindBytes(data.key); + statement.bindInt(data.sequence); + statement.bindBytes(data.blob); - cass_future_set_callback( - fut, flatMapWriteCallback, static_cast(&data)); - cass_future_free(fut); + executeAsync(statement, flatMapWriteCallback, data, isRetry); } } void writeDeletedKey(WriteCallbackData& data, bool isRetry) const { - { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } - CassStatement* statement = cass_prepared_bind(insertKey_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - const unsigned char* keyData = (unsigned char*)data.key.data(); - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(keyData), - data.key.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_int64(statement, 1, data.createdSequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_int64(statement, 2, data.sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapWriteKeyCallback, static_cast(&data)); - cass_future_free(fut); + CassandraStatement statement{insertKey_}; + statement.bindBytes(data.key); + statement.bindInt(data.createdSequence); + statement.bindInt(data.sequence); + executeAsync(statement, flatMapWriteKeyCallback, data, isRetry); } void writeKey(WriteCallbackData& data, bool isRetry) const { - { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } if (data.isCreated) { - CassStatement* statement = cass_prepared_bind(insertKey_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - const unsigned char* keyData = (unsigned char*)data.key.data(); - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(keyData), - data.key.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_int64(statement, 1, data.sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_int64(statement, 2, INT64_MAX); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); + CassandraStatement statement{insertKey_}; + statement.bindBytes(data.key); + statement.bindInt(data.sequence); + statement.bindInt(INT64_MAX); - cass_future_set_callback( - fut, flatMapWriteKeyCallback, static_cast(&data)); - cass_future_free(fut); + executeAsync(statement, flatMapWriteKeyCallback, data, isRetry); } else if (data.isDeleted) { - CassStatement* statement = cass_prepared_bind(getCreated_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - const unsigned char* keyData = (unsigned char*)data.key.data(); - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(keyData), - data.key.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); + CassandraStatement statement{getCreated_}; - cass_future_set_callback( - fut, flatMapGetCreatedCallback, static_cast(&data)); - cass_future_free(fut); + executeAsync(statement, flatMapGetCreatedCallback, data, isRetry); } } @@ -1653,88 +1584,19 @@ public: writeBook(WriteCallbackData& data, bool isRetry) const { assert(data.isCreated or data.isDeleted); - if (!data.isCreated and !data.isDeleted) - throw std::runtime_error( - "writing book that's neither created or deleted"); - { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } - CassStatement* statement = nullptr; + assert(data.book); + CassandraStatement statement{ + (data.isCreated ? insertBook_ : deleteBook_)}; + if (!statement.bindBytes(*data.book)) + throw std::runtime_error("writeBook: bind error"); + if (!statement.bindBytes(data.key)) + throw std::runtime_error("writeBook: bind error"); + if (!statement.bindInt(data.sequence)) + throw std::runtime_error("writeBook: bind error"); if (data.isCreated) - statement = cass_prepared_bind(insertBook_); - else if (data.isDeleted) - statement = cass_prepared_bind(deleteBook_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - const unsigned char* bookData = (unsigned char*)data.book->data(); - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(bookData), - data.book->size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - const unsigned char* keyData = (unsigned char*)data.key.data(); - rc = cass_statement_bind_bytes( - statement, - 1, - static_cast(keyData), - data.key.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_int64(statement, 2, data.sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - if (data.isCreated) - { - rc = cass_statement_bind_int64(statement, 3, INT64_MAX); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - - ss << "binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - } - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapWriteBookCallback, static_cast(&data)); - cass_future_free(fut); + if (!statement.bindInt(INT64_MAX)) + throw std::runtime_error("writeBook: bind error"); + executeAsync(statement, flatMapWriteBookCallback, data, isRetry); } void writeLedgerObject( @@ -1784,75 +1646,22 @@ public: void writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const { - { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(trace) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } for (auto const& account : data.data.accounts) { - CassStatement* statement = cass_prepared_bind(insertAccountTx_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - const unsigned char* accountData = (unsigned char*)account.data(); - CassError rc = cass_statement_bind_bytes( + CassandraStatement statement(insertAccountTx_); + if (!statement.bindBytes(account)) + throw std::runtime_error( + "writeAccountTx : error binding account"); + if (!statement.bindTuple( + data.data.ledgerSequence, data.data.transactionIndex)) + throw std::runtime_error( + "writeAccountTx: error binding ledger seq and txn idx"); + + executeAsync( statement, - 0, - static_cast(accountData), - account.size()); - if (rc != CASS_OK) - - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert account: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - CassTuple* idx = cass_tuple_new(2); - rc = cass_tuple_set_int64(idx, 0, data.data.ledgerSequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding ledger sequence to tuple: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_tuple_set_int64(idx, 1, data.data.transactionIndex); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding tx idx to tuple: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_tuple(statement, 1, idx); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding tuple: " << rc << ", " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapWriteAccountTxCallback, static_cast(&data)); - cass_future_free(fut); + flatMapWriteAccountTxCallback, + static_cast(&data), + isRetry); } } @@ -1887,84 +1696,20 @@ public: void writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const { - { - std::unique_lock lck(throttleMutex_); - if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) - { - BOOST_LOG_TRIVIAL(trace) - << __func__ << " : " - << "Max outstanding requests reached. " - << "Waiting for other requests to finish"; - throttleCv_.wait(lck, [this]() { - return numRequestsOutstanding_ < maxRequestsOutstanding; - }); - } - } + CassandraStatement statement{insertTransaction_}; + if (!statement.bindBytes(data.hash)) + throw std::runtime_error("writeTransaction: error binding hash"); + if (!statement.bindInt(data.sequence)) + throw std::runtime_error( + "writeTransaction: error binding sequence"); + if (!statement.bindBytes(data.transaction)) + throw std::runtime_error( + "writeTransaction: error binding transaction"); + if (!statement.bindBytes(data.metadata)) + throw std::runtime_error( + "writeTransaction: error binding transaction"); - CassStatement* statement = cass_prepared_bind(insertTransaction_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - const unsigned char* hashData = (unsigned char*)data.hash.data(); - CassError rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(hashData), - data.hash.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - rc = cass_statement_bind_int64(statement, 1, data.sequence); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert object: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - const unsigned char* transactionData = - (unsigned char*)data.transaction.data(); - rc = cass_statement_bind_bytes( - statement, - 2, - static_cast(transactionData), - data.transaction.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - const unsigned char* metadata = (unsigned char*)data.metadata.data(); - rc = cass_statement_bind_bytes( - statement, - 3, - static_cast(metadata), - data.metadata.size()); - if (rc != CASS_OK) - { - cass_statement_free(statement); - std::stringstream ss; - ss << "Binding cassandra insert hash: " << rc << ", " - << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); - throw std::runtime_error(ss.str()); - } - CassFuture* fut = cass_session_execute(session_.get(), statement); - cass_statement_free(statement); - - cass_future_set_callback( - fut, flatMapWriteTransactionCallback, static_cast(&data)); - cass_future_free(fut); + executeAsync(statement, flatMapWriteTransactionCallback, data, isRetry); } void writeTransaction( @@ -2019,6 +1764,34 @@ public: flatMapReadObjectCallback(CassFuture* fut, void* cbData); friend void flatMapGetCreatedCallback(CassFuture* fut, void* cbData); + + template + void + executeAsync( + CassandraStatement const& statement, + T callback, + S& callbackData, + bool isRetry) const + { + { + std::unique_lock lck(throttleMutex_); + if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) + { + BOOST_LOG_TRIVIAL(trace) + << __func__ << " : " + << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; + throttleCv_.wait(lck, [this]() { + return numRequestsOutstanding_ < maxRequestsOutstanding; + }); + } + } + CassFuture* fut = cass_session_execute(session_.get(), statement.get()); + + cass_future_set_callback( + fut, callback, static_cast(&callbackData)); + cass_future_free(fut); + } }; } // namespace Backend diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 6224779d..e3c73662 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -745,7 +745,7 @@ CREATE TABLE IF NOT EXISTS ledgers ( CREATE TABLE IF NOT EXISTS objects ( key bytea NOT NULL, ledger_seq bigint NOT NULL, - object bytea NOT NULL, + object bytea, PRIMARY KEY(key, ledger_seq) ); @@ -757,33 +757,29 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers -- cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS transactions ( hash bytea PRIMARY KEY, - ledger_seq bigint, - transaction bytea, - metadata bytea, - FOREIGN KEY (ledger_seq) - REFERENCES ledgers (ledger_seq) ON DELETE CASCADE + ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, + transaction bytea NOT NULL, + metadata bytea NOT NULL ); --- Index for lookups by transaction hash. -CREATE INDEX IF NOT EXISTS transactions_trans_id_idx ON transactions - USING hash (trans_id); - -- Table that maps accounts to transactions affecting them. Deletes from the --- ledger table by way of transactions table cascade here based on ledger_seq. +-- ledger table cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS account_transactions ( account bytea NOT NULL, - ledger_seq bigint NOT NULL, + ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, transaction_index bigint NOT NULL, - constraint account_transactions_pkey PRIMARY KEY (account, ledger_seq, - transaction_index), - constraint account_transactions_fkey FOREIGN KEY (ledger_seq, - transaction_index) REFERENCES transactions ( - ledger_seq, transaction_index) ON DELETE CASCADE + hash bytea NOT NULL, + PRIMARY KEY (account, ledger_seq, transaction_index), +); +-- Table that maps a book to a list of offers in that book. Deletes from the ledger table +-- cascade here based on ledger_seq. +CREATE TABLE IF NOT EXISTS books ( + book bytea NOT NULL, + ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, + deleted boolean NOT NULL, + offer_key bytea NOT NULL, + PRIMARY KEY(book, offer_key, deleted) ); - --- Index to allow for fast cascading deletions and referential integrity. -CREATE INDEX IF NOT EXISTS fki_account_transactions_idx ON - account_transactions USING btree (ledger_seq, transaction_index); -- Avoid inadvertent administrative tampering with committed data. CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO @@ -792,44 +788,11 @@ CREATE OR REPLACE RULE transactions_update_protect AS ON UPDATE TO transactions DO INSTEAD NOTHING; CREATE OR REPLACE RULE account_transactions_update_protect AS ON UPDATE TO account_transactions DO INSTEAD NOTHING; +CREATE OR REPLACE RULE objects_update_protect AS ON UPDATE TO + objects DO INSTEAD NOTHING; +CREATE OR REPLACE RULE books_update_protect AS ON UPDATE TO + books DO INSTEAD NOTHING; --- Stored procedure to assist with the tx() RPC call. Takes transaction hash --- as input. If found, returns the ledger sequence in which it was applied. --- If not, returns the range of ledgers searched. -CREATE OR REPLACE FUNCTION tx ( - _in_trans_id bytea -) RETURNS jsonb AS $$ -DECLARE - _min_ledger bigint := min_ledger(); - _min_seq bigint := (SELECT ledger_seq - FROM ledgers - WHERE ledger_seq = _min_ledger - FOR SHARE); - _max_seq bigint := max_ledger(); - _ledger_seq bigint; - _nodestore_hash bytea; -BEGIN - - IF _min_seq IS NULL THEN - RETURN jsonb_build_object('error', 'empty database'); - END IF; - IF length(_in_trans_id) != 32 THEN - RETURN jsonb_build_object('error', '_in_trans_id size: ' - || to_char(length(_in_trans_id), '999')); - END IF; - - EXECUTE 'SELECT nodestore_hash, ledger_seq - FROM transactions - WHERE trans_id = $1 - AND ledger_seq BETWEEN $2 AND $3 - ' INTO _nodestore_hash, _ledger_seq USING _in_trans_id, _min_seq, _max_seq; - IF _nodestore_hash IS NULL THEN - RETURN jsonb_build_object('min_seq', _min_seq, 'max_seq', _max_seq); - END IF; - RETURN jsonb_build_object('nodestore_hash', _nodestore_hash, 'ledger_seq', - _ledger_seq); -END; -$$ LANGUAGE plpgsql; -- Return the earliest ledger sequence intended for range operations -- that protect the bottom of the range from deletion. Return NULL if empty. @@ -852,195 +815,6 @@ BEGIN END; $$ LANGUAGE plpgsql; --- account_tx() RPC helper. From the rippled reporting process, only the --- parameters without defaults are required. For the parameters with --- defaults, validation should be done by rippled, such as: --- _in_account_id should be a valid xrp base58 address. --- _in_forward either true or false according to the published api --- _in_limit should be validated and not simply passed through from --- client. --- --- For _in_ledger_index_min and _in_ledger_index_max, if passed in the --- request, verify that their type is int and pass through as is. --- For _ledger_hash, verify and convert from hex length 32 bytes and --- prepend with \x (\\x C++). --- --- For _in_ledger_index, if the input type is integer, then pass through --- as is. If the type is string and contents = validated, then do not --- set _in_ledger_index. Instead set _in_invalidated to TRUE. --- --- There is no need for rippled to do any type of lookup on max/min --- ledger range, lookup of hash, or the like. This functions does those --- things, including error responses if bad input. Only the above must --- be done to set the correct search range. --- --- If a marker is present in the request, verify the members 'ledger' --- and 'seq' are integers and they correspond to _in_marker_seq --- _in_marker_index. --- To reiterate: --- JSON input field 'ledger' corresponds to _in_marker_seq --- JSON input field 'seq' corresponds to _in_marker_index -CREATE OR REPLACE FUNCTION account_tx ( - _in_account_id bytea, - _in_forward bool, - _in_limit bigint, - _in_ledger_index_min bigint = NULL, - _in_ledger_index_max bigint = NULL, - _in_ledger_hash bytea = NULL, - _in_ledger_index bigint = NULL, - _in_validated bool = NULL, - _in_marker_seq bigint = NULL, - _in_marker_index bigint = NULL -) RETURNS jsonb AS $$ -DECLARE - _min bigint; - _max bigint; - _sort_order text := (SELECT CASE WHEN _in_forward IS TRUE THEN - 'ASC' ELSE 'DESC' END); - _marker bool; - _between_min bigint; - _between_max bigint; - _sql text; - _cursor refcursor; - _result jsonb; - _record record; - _tally bigint := 0; - _ret_marker jsonb; - _transactions jsonb[] := '{}'; -BEGIN - IF _in_ledger_index_min IS NOT NULL OR - _in_ledger_index_max IS NOT NULL THEN - _min := (SELECT CASE WHEN _in_ledger_index_min IS NULL - THEN min_ledger() ELSE greatest( - _in_ledger_index_min, min_ledger()) END); - _max := (SELECT CASE WHEN _in_ledger_index_max IS NULL OR - _in_ledger_index_max = -1 THEN max_ledger() ELSE - least(_in_ledger_index_max, max_ledger()) END); - - IF _max < _min THEN - RETURN jsonb_build_object('error', 'max is less than min ledger'); - END IF; - - ELSIF _in_ledger_hash IS NOT NULL OR _in_ledger_index IS NOT NULL - OR _in_validated IS TRUE THEN - IF _in_ledger_hash IS NOT NULL THEN - IF length(_in_ledger_hash) != 32 THEN - RETURN jsonb_build_object('error', '_in_ledger_hash size: ' - || to_char(length(_in_ledger_hash), '999')); - END IF; - EXECUTE 'SELECT ledger_seq - FROM ledgers - WHERE ledger_hash = $1' - INTO _min USING _in_ledger_hash::bytea; - ELSE - IF _in_ledger_index IS NOT NULL AND _in_validated IS TRUE THEN - RETURN jsonb_build_object('error', - '_in_ledger_index cannot be set and _in_validated true'); - END IF; - IF _in_validated IS TRUE THEN - _in_ledger_index := max_ledger(); - END IF; - _min := (SELECT ledger_seq - FROM ledgers - WHERE ledger_seq = _in_ledger_index); - END IF; - IF _min IS NULL THEN - RETURN jsonb_build_object('error', 'ledger not found'); - END IF; - _max := _min; - ELSE - _min := min_ledger(); - _max := max_ledger(); - END IF; - - IF _in_marker_seq IS NOT NULL OR _in_marker_index IS NOT NULL THEN - _marker := TRUE; - IF _in_marker_seq IS NULL OR _in_marker_index IS NULL THEN - -- The rippled implementation returns no transaction results - -- if either of these values are missing. - _between_min := 0; - _between_max := 0; - ELSE - IF _in_forward IS TRUE THEN - _between_min := _in_marker_seq; - _between_max := _max; - ELSE - _between_min := _min; - _between_max := _in_marker_seq; - END IF; - END IF; - ELSE - _marker := FALSE; - _between_min := _min; - _between_max := _max; - END IF; - IF _between_max < _between_min THEN - RETURN jsonb_build_object('error', 'ledger search range is ' - || to_char(_between_min, '999') || '-' - || to_char(_between_max, '999')); - END IF; - - _sql := format(' - SELECT transactions.ledger_seq, transactions.transaction_index, - transactions.trans_id, transactions.nodestore_hash - FROM transactions - INNER JOIN account_transactions - ON transactions.ledger_seq = - account_transactions.ledger_seq - AND transactions.transaction_index = - account_transactions.transaction_index - WHERE account_transactions.account = $1 - AND account_transactions.ledger_seq BETWEEN $2 AND $3 - ORDER BY transactions.ledger_seq %s, transactions.transaction_index %s - ', _sort_order, _sort_order); - - OPEN _cursor FOR EXECUTE _sql USING _in_account_id, _between_min, - _between_max; - LOOP - FETCH _cursor INTO _record; - IF _record IS NULL THEN EXIT; END IF; - IF _marker IS TRUE THEN - IF _in_marker_seq = _record.ledger_seq THEN - IF _in_forward IS TRUE THEN - IF _in_marker_index > _record.transaction_index THEN - CONTINUE; - END IF; - ELSE - IF _in_marker_index < _record.transaction_index THEN - CONTINUE; - END IF; - END IF; - END IF; - _marker := FALSE; - END IF; - - _tally := _tally + 1; - IF _tally > _in_limit THEN - _ret_marker := jsonb_build_object( - 'ledger', _record.ledger_seq, - 'seq', _record.transaction_index); - EXIT; - END IF; - - -- Is the transaction index in the tx object? - _transactions := _transactions || jsonb_build_object( - 'ledger_seq', _record.ledger_seq, - 'transaction_index', _record.transaction_index, - 'trans_id', _record.trans_id, - 'nodestore_hash', _record.nodestore_hash); - - END LOOP; - CLOSE _cursor; - - _result := jsonb_build_object('ledger_index_min', _min, - 'ledger_index_max', _max, - 'transactions', _transactions); - IF _ret_marker IS NOT NULL THEN - _result := _result || jsonb_build_object('marker', _ret_marker); - END IF; - RETURN _result; -END; -$$ LANGUAGE plpgsql; -- Trigger prior to insert on ledgers table. Validates length of hash fields. -- Verifies ancestry based on ledger_hash & prev_hash as follows: @@ -1542,36 +1316,3 @@ getLedger( return info; } -std::optional -getLedgerRange(std::shared_ptr& pgPool) -{ - auto range = PgQuery(pgPool)("SELECT complete_ledgers()"); - if (!range) - return {}; - - std::string res{range.c_str()}; - try - { - size_t minVal = 0; - size_t maxVal = 0; - if (res == "empty" || res == "error" || res.empty()) - return {}; - else if (size_t delim = res.find('-'); delim != std::string::npos) - { - minVal = std::stol(res.substr(0, delim)); - maxVal = std::stol(res.substr(delim + 1)); - } - else - { - minVal = maxVal = std::stol(res); - } - return LedgerRange{minVal, maxVal}; - } - catch (std::exception&) - { - BOOST_LOG_TRIVIAL(error) - << __func__ << " : " - << "Error parsing result of getCompleteLedgers()"; - } - return {}; -} diff --git a/reporting/Pg.h b/reporting/Pg.h index 43b35884..f7d976b7 100644 --- a/reporting/Pg.h +++ b/reporting/Pg.h @@ -524,8 +524,4 @@ getLedger( std::variant const& whichLedger, std::shared_ptr& pgPool); -using LedgerRange = boost::icl::closed_interval; -std::optional -getLedgerRange(std::shared_ptr& pgPool); - #endif // RIPPLE_CORE_PG_H_INCLUDED diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index e8b8509b..fe1d243c 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -195,6 +195,40 @@ PostgresBackend::fetchLedgerBySequence(uint32_t sequence) const return {}; } +std::optional +PostgresBackend::fetchLedgerRange() const +{ + auto range = PgQuery(pgPool_)("SELECT complete_ledgers()"); + if (!range) + return {}; + + std::string res{range.c_str()}; + try + { + size_t minVal = 0; + size_t maxVal = 0; + if (res == "empty" || res == "error" || res.empty()) + return {}; + else if (size_t delim = res.find('-'); delim != std::string::npos) + { + minVal = std::stol(res.substr(0, delim)); + maxVal = std::stol(res.substr(delim + 1)); + } + else + { + minVal = maxVal = std::stol(res); + } + return LedgerRange{minVal, maxVal}; + } + catch (std::exception&) + { + BOOST_LOG_TRIVIAL(error) + << __func__ << " : " + << "Error parsing result of getCompleteLedgers()"; + } + return {}; +} + std::optional PostgresBackend::fetchLedgerObject( ripple::uint256 const& key, @@ -449,6 +483,7 @@ PostgresBackend::fetchAccountTransactions( void PostgresBackend::open() { + initSchema(pgPool_); } void diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 240872eb..6172663e 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -23,6 +23,9 @@ public: std::optional fetchLedgerBySequence(uint32_t sequence) const override; + std::optional + fetchLedgerRange() const override; + std::optional fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const override; diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 6ec1d3ab..064fe910 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -637,8 +637,6 @@ ReportingETL::ReportingETL( : publishStrand_(ioc) , ioContext_(ioc) , flatMapBackend_(Backend::makeBackend(config)) - , pgPool_(make_PgPool( - config.at("database").as_object().at("postgres").as_object())) , loadBalancer_( config.at("etl_sources").as_array(), *flatMapBackend_, @@ -646,6 +644,5 @@ ReportingETL::ReportingETL( ioc) { flatMapBackend_->open(); - initSchema(pgPool_); } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 86537f1d..c1755249 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -60,7 +60,6 @@ class ReportingETL { private: std::unique_ptr flatMapBackend_; - std::shared_ptr pgPool_; std::thread worker_; boost::asio::io_context& ioContext_; @@ -327,12 +326,6 @@ public: return *flatMapBackend_; } - std::shared_ptr& - getPgPool() - { - return pgPool_; - } - private: void doWork(); diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 6cd09e52..73cc3171 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -54,18 +54,13 @@ std::unordered_map commandMap{ boost::json::object doAccountInfo( boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& postgres); + BackendInterface const& backend); boost::json::object -doTx( - boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& pgPool); +doTx(boost::json::object const& request, BackendInterface const& backend); boost::json::object doAccountTx( boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& pgPool); + BackendInterface const& backend); boost::json::object doLedgerData( boost::json::object const& request, @@ -73,14 +68,12 @@ doLedgerData( boost::json::object doBookOffers( boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& pgPool); + BackendInterface const& backend); boost::json::object buildResponse( boost::json::object const& request, - BackendInterface const& backend, - std::shared_ptr& pgPool) + BackendInterface const& backend) { std::string command = request.at("command").as_string().c_str(); BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; @@ -88,10 +81,10 @@ buildResponse( switch (commandMap[command]) { case tx: - return doTx(request, backend, pgPool); + return doTx(request, backend); break; case account_tx: - return doAccountTx(request, backend, pgPool); + return doAccountTx(request, backend); break; case ledger: break; @@ -99,10 +92,10 @@ buildResponse( return doLedgerData(request, backend); break; case account_info: - return doAccountInfo(request, backend, pgPool); + return doAccountInfo(request, backend); break; case book_offers: - return doBookOffers(request, backend, pgPool); + return doBookOffers(request, backend); break; default: BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command; @@ -123,15 +116,13 @@ class session : public std::enable_shared_from_this boost::beast::flat_buffer buffer_; std::string response_; BackendInterface const& backend_; - std::shared_ptr& pgPool_; public: // Take ownership of the socket explicit session( boost::asio::ip::tcp::socket&& socket, - BackendInterface const& backend, - std::shared_ptr& pgPool) - : ws_(std::move(socket)), backend_(backend), pgPool_(pgPool) + BackendInterface const& backend) + : ws_(std::move(socket)), backend_(backend) { } @@ -207,7 +198,7 @@ public: boost::json::value raw = boost::json::parse(msg); // BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed"; boost::json::object request = raw.as_object(); - auto response = buildResponse(request, backend_, pgPool_); + auto response = buildResponse(request, backend_); BOOST_LOG_TRIVIAL(debug) << __func__ << response; response_ = boost::json::serialize(response); @@ -243,15 +234,13 @@ class listener : public std::enable_shared_from_this boost::asio::io_context& ioc_; boost::asio::ip::tcp::acceptor acceptor_; BackendInterface const& backend_; - std::shared_ptr& pgPool_; public: listener( boost::asio::io_context& ioc, boost::asio::ip::tcp::endpoint endpoint, - BackendInterface const& backend, - std::shared_ptr& pgPool) - : ioc_(ioc), acceptor_(ioc), backend_(backend), pgPool_(pgPool) + BackendInterface const& backend) + : ioc_(ioc), acceptor_(ioc), backend_(backend) { boost::beast::error_code ec; @@ -316,8 +305,7 @@ private: else { // Create the session and run it - std::make_shared(std::move(socket), backend_, pgPool_) - ->run(); + std::make_shared(std::move(socket), backend_)->run(); } // Accept another connection @@ -423,8 +411,7 @@ main(int argc, char* argv[]) std::make_shared( ioc, boost::asio::ip::tcp::endpoint{address, port}, - etl.getFlatMapBackend(), - etl.getPgPool()) + etl.getFlatMapBackend()) ->run(); // Run the I/O service on the requested number of threads