book_offers in Cassandra

This commit is contained in:
CJ Cobb
2021-01-26 12:51:49 -05:00
parent 5d233fce1b
commit f754ff9f5a
7 changed files with 423 additions and 13 deletions

View File

@@ -45,6 +45,44 @@ writeToLedgersDB(ripple::LedgerInfo const& info, PgQuery& pgQuery)
return res;
}
/*
bool
writeBooks(std::vector<BookDirectoryData> const& bookDirData, PgQuery& pg)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Writing " << bookDirData.size() << "books to Postgres";
try
{
std::stringstream booksCopyBuffer;
for (auto const& data : bookDirData)
{
std::string directoryIndex = ripple::strHex(data.directoryIndex);
std::string bookIndex = ripple::strHex(data.bookIndex);
auto ledgerSeq = data.ledgerSequence;
booksCopyBuffer << "\\\\x" << directoryIndex << '\t'
<< std::to_string(ledgerSeq) << '\t' << "\\\\x"
<< bookIndex << '\n';
}
pg.bulkInsert("books", booksCopyBuffer.str());
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Successfully inserted books";
return true;
}
catch (std::exception& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << "Caught exception inserting books : " << e.what();
assert(false);
return false;
}
}
*/
bool
writeToPostgres(
ripple::LedgerInfo const& info,
@@ -56,8 +94,8 @@ writeToPostgres(
try
{
// Create a PgQuery object to run multiple commands over the same
// connection in a single transaction block.
// Create a PgQuery object to run multiple commands over the
// same connection in a single transaction block.
PgQuery pg(pgPool);
auto res = pg("BEGIN");
if (!res || res.status() != PGRES_COMMAND_OK)
@@ -67,9 +105,10 @@ writeToPostgres(
throw std::runtime_error(msg.str());
}
// Writing to the ledgers db fails if the ledger already exists in the
// db. In this situation, the ETL process has detected there is another
// writer, and falls back to only publishing
// Writing to the ledgers db fails if the ledger already
// exists in the db. In this situation, the ETL process has
// detected there is another writer, and falls back to only
// publishing
if (!writeToLedgersDB(info, pg))
{
BOOST_LOG_TRIVIAL(warning)

View File

@@ -48,6 +48,21 @@ struct AccountTransactionsData
}
};
inline bool
isOffer(std::string const& object)
{
short offer_bytes = (object[1] << 8) | object[2];
return offer_bytes == 0x006f;
}
inline ripple::uint256
getBook(std::string const& offer)
{
ripple::SerialIter it{offer.data(), offer.size()};
ripple::SLE sle{it, {}};
return sle.getFieldH256(ripple::sfBookDirectory);
}
/// Write new ledger and transaction data to Postgres
/// @param info Ledger Info to write
/// @param accountTxData transaction data to write

View File

@@ -19,6 +19,7 @@
*/
//==============================================================================
#include <ripple/protocol/STLedgerEntry.h>
#include <boost/asio/strand.hpp>
#include <boost/json.hpp>
#include <boost/json/src.hpp>
@@ -441,12 +442,22 @@ public:
for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects()))
{
std::optional<ripple::uint256> book;
short offer_bytes = (obj.data()[1] << 8) | obj.data()[2];
if (offer_bytes == 0x006f)
{
ripple::SerialIter it{obj.data().data(), obj.data().size()};
ripple::SLE sle{it, {}};
book = sle.getFieldH256(ripple::sfBookDirectory);
}
backend.store(
std::move(*obj.mutable_key()),
request_.ledger().sequence(),
std::move(*obj.mutable_data()),
true,
false);
false,
std::move(book));
}
return more ? CallStatus::MORE : CallStatus::DONE;

View File

@@ -38,6 +38,42 @@ flatMapWriteCallback(CassFuture* fut, void* cbData)
delete &requestParams;
}
}
void
flatMapWriteBookCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteCallbackData*>(cbData);
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeBook(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
int remaining = --requestParams.refs;
if (remaining == 0)
delete &requestParams;
}
}
void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData)

View File

@@ -39,6 +39,8 @@ flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
void
flatMapWriteBookCallback(CassFuture* fut, void* cbData);
void
flatMapReadCallback(CassFuture* fut, void* cbData);
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
@@ -90,6 +92,8 @@ private:
const CassPrepared* getToken_ = nullptr;
const CassPrepared* insertKey_ = nullptr;
const CassPrepared* getCreated_ = nullptr;
const CassPrepared* getBook_ = nullptr;
const CassPrepared* insertBook_ = nullptr;
// io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_;
@@ -478,6 +482,51 @@ public:
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "books"
<< " ( book blob, sequence bigint, key blob, deleted_at "
"bigint static, PRIMARY KEY "
"(book, sequence, key))";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc
<< ", " << cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "books"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
setupSessionAndTable = true;
}
@@ -567,6 +616,31 @@ public:
insertKey_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "INSERT INTO " << tableName << "books"
<< " (book, sequence, key, deleted_at) VALUES (?, ?, ?, ?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
/* Get the prepared object from the future */
insertBook_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {};
query << "SELECT created FROM " << tableName << "keys"
<< " WHERE key = ? ORDER BY created desc LIMIT 1";
@@ -708,6 +782,31 @@ public:
getToken_ = cass_future_get_prepared(prepare_future);
query = {};
query << "SELECT key FROM " << tableName << "books "
<< " WHERE book = ? AND sequence <= ? AND deleted_at > ? "
"ALLOW FILTERING";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
getBook_ = cass_future_get_prepared(prepare_future);
setupPreparedStatements = true;
}
@@ -769,6 +868,11 @@ public:
cass_prepared_free(getCreated_);
getCreated_ = nullptr;
}
if (getBook_)
{
cass_prepared_free(getBook_);
getBook_ = nullptr;
}
work_.reset();
ioThread_.join();
}
@@ -1101,6 +1205,102 @@ public:
return {{}, {}};
}
std::vector<LedgerObject>
doBookOffers(std::vector<unsigned char> const& book, uint32_t sequence)
const
{
BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers";
CassStatement* statement = cass_prepared_bind(upperBound_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(book.data()), 32);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra book to doBookOffers query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
rc = cass_statement_bind_int64(statement, 1, sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra sequence to doBookOffers query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
rc = cass_statement_bind_int64(statement, 2, sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra deleted_at to doBookOffers query: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys";
std::vector<ripple::uint256> keys;
CassIterator* iter = cass_iterator_from_result(res);
while (cass_iterator_next(iter))
{
CassRow const* row = cass_iterator_get_row(iter);
cass_byte_t const* outData;
std::size_t outSize;
CassValue const* hash = cass_row_get_column(row, 0);
rc = cass_value_get_bytes(hash, &outData, &outSize);
if (rc != CASS_OK)
{
cass_iterator_free(iter);
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
keys.push_back(ripple::uint256::fromVoid(outData));
}
BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound - populated keys. num keys = " << keys.size();
if (keys.size())
{
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchObjectsBatch(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i)
{
results.push_back({keys[i], objs[i]});
}
return results;
}
return {};
}
bool
canFetchBatch()
{
@@ -1317,6 +1517,7 @@ public:
std::string blob;
bool isCreated;
bool isDeleted;
std::optional<ripple::uint256> book;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
@@ -1327,16 +1528,20 @@ public:
uint32_t sequence,
std::string&& blob,
bool isCreated,
bool isDeleted)
bool isDeleted,
std::optional<ripple::uint256>&& book)
: backend(f)
, key(std::move(key))
, sequence(sequence)
, blob(std::move(blob))
, isCreated(isCreated)
, isDeleted(isDeleted)
, book(std::move(book))
{
if (isCreated or isDeleted)
refs = 2;
++refs;
if (book)
++refs;
}
};
@@ -1559,17 +1764,102 @@ public:
}
}
void
writeBook(WriteCallbackData& data, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
CassStatement* statement = cass_prepared_bind(insertBook_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* bookData = (unsigned char*)data.book->data();
CassError rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(bookData),
data.book->size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(
statement, 1, (data.isCreated ? data.sequence : 0));
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
const unsigned char* keyData = (unsigned char*)data.key.data();
rc = cass_statement_bind_bytes(
statement,
2,
static_cast<cass_byte_t const*>(keyData),
data.key.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "Binding cassandra insert hash: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
rc = cass_statement_bind_int64(
statement, 3, (data.isDeleted ? data.sequence : INT64_MAX));
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapWriteBookCallback, static_cast<void*>(&data));
cass_future_free(fut);
}
void
store(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated = false,
bool isDeleted = false) const
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const
{
BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
WriteCallbackData* data = new WriteCallbackData(
this, std::move(key), seq, std::move(blob), isCreated, isDeleted);
this,
std::move(key),
seq,
std::move(blob),
isCreated,
isDeleted,
std::move(book));
++numRequestsOutstanding_;
if (isCreated || isDeleted)
@@ -1577,6 +1867,9 @@ public:
write(*data, false);
if (isCreated || isDeleted)
writeKey(*data, false);
if (book)
writeBook(*data, false);
// handle book
}
struct WriteTransactionCallbackData
@@ -1722,6 +2015,8 @@ public:
flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteBookCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadCallback(CassFuture* fut, void* cbData);

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <ripple/basics/StringUtilities.h>
#include <reporting/DBHelpers.h>
#include <reporting/ReportingETL.h>
@@ -293,13 +294,26 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
obj.mod_type() == org ::xrpl::rpc::v1::RawLedgerObject::DELETED)
isDeleted = true;
std::optional<ripple::uint256> bookDir;
if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::DELETED)
{
if (isOffer(obj.data()))
bookDir = getBook(obj.data());
}
else if (obj.book_of_deleted_offer().size())
{
bookDir =
ripple::uint256::fromVoid(obj.book_of_deleted_offer().data());
}
assert(not(isCreated and isDeleted));
flatMapBackend_.store(
std::move(*obj.mutable_key()),
lgrInfo.seq,
std::move(*obj.mutable_data()),
isCreated,
isDeleted);
isDeleted,
std::move(bookDir));
}
flatMapBackend_.sync();
BOOST_LOG_TRIVIAL(debug)

Submodule rippled updated: 063363ffae...e8b8ff3717