Merge branch 'backend_interface'

This commit is contained in:
CJ Cobb
2021-03-03 16:58:53 -05:00
27 changed files with 4013 additions and 1951 deletions

View File

@@ -56,7 +56,8 @@ include(Postgres)
target_sources(reporting PRIVATE
reporting/ETLSource.cpp
reporting/ReportingBackend.cpp
reporting/CassandraBackend.cpp
reporting/PostgresBackend.cpp
reporting/Pg.cpp
reporting/DBHelpers.cpp
reporting/ReportingETL.cpp
@@ -64,7 +65,8 @@ target_sources(reporting PRIVATE
handlers/Tx.cpp
handlers/RPCHelpers.cpp
handlers/AccountTx.cpp
handlers/LedgerData.cpp)
handlers/LedgerData.cpp
handlers/BookOffers.cpp)
message(${Boost_LIBRARIES})

View File

@@ -21,8 +21,8 @@
#include <ripple/protocol/STLedgerEntry.h>
#include <boost/json.hpp>
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
// {
// account: <ident>,
@@ -43,8 +43,7 @@
boost::json::object
doAccountInfo(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& postgres)
BackendInterface const& backend)
{
boost::json::object response;
std::string strIdent;
@@ -60,7 +59,7 @@ doAccountInfo(
size_t ledgerSequence = 0;
if (not request.contains("ledger_index"))
{
std::optional<ripple::LedgerInfo> latest = getLedger({}, postgres);
auto latest = backend.fetchLatestLedgerSequence();
if (not latest)
{
@@ -69,7 +68,7 @@ doAccountInfo(
}
else
{
ledgerSequence = latest->seq;
ledgerSequence = *latest;
}
}
else
@@ -93,7 +92,7 @@ doAccountInfo(
auto start = std::chrono::system_clock::now();
std::optional<std::vector<unsigned char>> dbResponse =
backend.fetch(key.key.data(), ledgerSequence);
backend.fetchLedgerObject(key.key, ledgerSequence);
auto end = std::chrono::system_clock::now();
auto time =
std::chrono::duration_cast<std::chrono::microseconds>(end - start)

View File

@@ -18,8 +18,8 @@
//==============================================================================
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
std::vector<std::pair<
std::shared_ptr<ripple::STTx const>,
@@ -27,7 +27,7 @@ std::vector<std::pair<
doAccountTxStoredProcedure(
ripple::AccountID const& account,
std::shared_ptr<PgPool>& pgPool,
CassandraFlatMapBackend const& backend)
BackendInterface const& backend)
{
pg_params dbParams;
@@ -105,10 +105,10 @@ doAccountTxStoredProcedure(
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>>
results;
auto dbResults = backend.fetchBatch(nodestoreHashes);
auto dbResults = backend.fetchTransactions(nodestoreHashes);
for (auto const& res : dbResults)
{
if (res.first.size() && res.second.size())
if (res.transaction.size() && res.metadata.size())
results.push_back(deserializeTxPlusMeta(res));
}
return results;
@@ -127,10 +127,7 @@ doAccountTxStoredProcedure(
// resume previous query
// }
boost::json::object
doAccountTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool)
doAccountTx(boost::json::object const& request, BackendInterface const& backend)
{
boost::json::object response;
@@ -148,16 +145,44 @@ doAccountTx(
return response;
}
std::optional<Backend::AccountTransactionsCursor> cursor;
if (request.contains("cursor"))
{
auto const& obj = request.at("cursor").as_object();
std::optional<uint32_t> ledgerSequence;
if (obj.contains("ledger_sequence"))
{
ledgerSequence = (uint32_t)obj.at("ledger_sequence").as_int64();
}
std::optional<uint32_t> transactionIndex;
if (obj.contains("transaction_index"))
{
transactionIndex = (uint32_t)obj.at("transaction_index").as_int64();
}
if (!ledgerSequence || !transactionIndex)
{
response["error"] =
"malformed cursor. include transaction_index and "
"ledger_sequence in an object named \"cursor\"";
return response;
}
cursor = {*ledgerSequence, *transactionIndex};
}
constexpr uint32_t limit = 500;
boost::json::array txns;
auto res = doAccountTxStoredProcedure(*account, pgPool, backend);
for (auto const& [sttx, meta] : res)
auto [blobs, retCursor] =
backend.fetchAccountTransactions(*account, limit, cursor);
for (auto const& txnPlusMeta : blobs)
{
boost::json::object obj;
obj["transaction"] = getJson(*sttx);
auto [txn, meta] = deserializeTxPlusMeta(txnPlusMeta);
obj["transaction"] = getJson(*txn);
obj["metadata"] = getJson(*meta);
txns.push_back(obj);
}
response["transactions"] = txns;
response["cursor"] = {};
return response;
}

345
handlers/BookOffers.cpp Normal file
View File

@@ -0,0 +1,345 @@
#include <ripple/app/ledger/Ledger.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/Indexes.h>
#include <ripple/protocol/STLedgerEntry.h>
#include <ripple/protocol/jss.h>
#include <boost/json.hpp>
#include <algorithm>
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/DBHelpers.h>
#include <reporting/Pg.h>
std::optional<std::uint32_t>
ledgerSequenceFromRequest(
boost::json::object const& request,
std::shared_ptr<PgPool> const& pool)
{
std::stringstream sql;
sql << "SELECT ledger_seq FROM ledgers WHERE ";
if (request.contains("ledger_index"))
{
sql << "ledger_seq = "
<< std::to_string(request.at("ledger_index").as_int64());
}
else if (request.contains("ledger_hash"))
{
sql << "ledger_hash = \\\\x" << request.at("ledger_hash").as_string();
}
else
{
sql.str("");
sql << "SELECT max_ledger()";
}
sql << ";";
auto index = PgQuery(pool)(sql.str().c_str());
if (!index || index.isNull())
return {};
return std::optional<std::uint32_t>{index.asInt()};
}
std::vector<ripple::uint256>
loadBookOfferIndexes(
ripple::Book const& book,
std::uint32_t seq,
std::uint32_t limit,
std::shared_ptr<PgPool> const& pool)
{
std::vector<ripple::uint256> hashes = {};
ripple::uint256 bookBase = getBookBase(book);
ripple::uint256 bookEnd = getQualityNext(bookBase);
pg_params dbParams;
char const*& command = dbParams.first;
std::vector<std::optional<std::string>>& values = dbParams.second;
command =
"SELECT offer_indexes FROM books "
"WHERE book_directory >= $1::bytea "
"AND book_directory < $2::bytea "
"AND ledger_index <= $3::bigint "
"LIMIT $4::bigint";
values.resize(4);
values[0] = "\\x" + ripple::strHex(bookBase);
values[1] = "\\x" + ripple::strHex(bookEnd);
values[2] = std::to_string(seq);
values[3] = std::to_string(limit);
auto indexes = PgQuery(pool)(dbParams);
if (!indexes || indexes.isNull())
return {};
for (auto i = 0; i < indexes.ntuples(); ++i)
{
auto unHexed = ripple::strUnHex(indexes.c_str(i) + 2);
if (unHexed)
hashes.push_back(ripple::uint256::fromVoid(unHexed->data()));
}
return hashes;
}
boost::json::object
doBookOffers(
boost::json::object const& request,
BackendInterface const& backend)
{
std::cout << "enter" << std::endl;
boost::json::object response;
uint32_t sequence = request.at("ledger_index").as_int64();
if (!request.contains("taker_pays"))
{
response["error"] = "Missing field taker_pays";
return response;
}
if (!request.contains("taker_gets"))
{
response["error"] = "Missing field taker_gets";
return response;
}
boost::json::object taker_pays;
if (request.at("taker_pays").kind() == boost::json::kind::object)
{
taker_pays = request.at("taker_pays").as_object();
}
else
{
response["error"] = "Invalid field taker_pays";
return response;
}
boost::json::object taker_gets;
if (request.at("taker_gets").kind() == boost::json::kind::object)
{
taker_gets = request.at("taker_gets").as_object();
}
else
{
response["error"] = "Invalid field taker_gets";
return response;
}
if (!taker_pays.contains("currency"))
{
response["error"] = "Missing field taker_pays.currency";
return response;
}
if (!taker_pays.at("currency").is_string())
{
response["error"] = "taker_pays.currency should be string";
return response;
}
if (!taker_gets.contains("currency"))
{
response["error"] = "Missing field taker_gets.currency";
return response;
}
if (!taker_gets.at("currency").is_string())
{
response["error"] = "taker_gets.currency should be string";
return response;
}
ripple::Currency pay_currency;
if (!ripple::to_currency(
pay_currency, taker_pays.at("currency").as_string().c_str()))
{
response["error"] =
"Invalid field 'taker_pays.currency', bad currency.";
return response;
}
ripple::Currency get_currency;
if (!ripple::to_currency(
get_currency, taker_gets["currency"].as_string().c_str()))
{
response["error"] =
"Invalid field 'taker_gets.currency', bad currency.";
return response;
}
ripple::AccountID pay_issuer;
if (taker_pays.contains("issuer"))
{
if (!taker_pays.at("issuer").is_string())
{
response["error"] = "taker_pays.issuer should be string";
return response;
}
if (!ripple::to_issuer(
pay_issuer, taker_pays.at("issuer").as_string().c_str()))
{
response["error"] =
"Invalid field 'taker_pays.issuer', bad issuer.";
return response;
}
if (pay_issuer == ripple::noAccount())
{
response["error"] =
"Invalid field 'taker_pays.issuer', bad issuer account one.";
return response;
}
}
else
{
pay_issuer = ripple::xrpAccount();
}
if (isXRP(pay_currency) && !isXRP(pay_issuer))
{
response["error"] =
"Unneeded field 'taker_pays.issuer' for XRP currency "
"specification.";
return response;
}
if (!isXRP(pay_currency) && isXRP(pay_issuer))
{
response["error"] =
"Invalid field 'taker_pays.issuer', expected non-XRP issuer.";
return response;
}
ripple::AccountID get_issuer;
if (taker_gets.contains("issuer"))
{
if (!taker_gets["issuer"].is_string())
{
response["error"] = "taker_gets.issuer should be string";
return response;
}
if (!ripple::to_issuer(
get_issuer, taker_gets.at("issuer").as_string().c_str()))
{
response["error"] =
"Invalid field 'taker_gets.issuer', bad issuer.";
return response;
}
if (get_issuer == ripple::noAccount())
{
response["error"] =
"Invalid field 'taker_gets.issuer', bad issuer account one.";
return response;
}
}
else
{
get_issuer = ripple::xrpAccount();
}
if (ripple::isXRP(get_currency) && !ripple::isXRP(get_issuer))
{
response["error"] =
"Unneeded field 'taker_gets.issuer' for XRP currency "
"specification.";
return response;
}
if (!ripple::isXRP(get_currency) && ripple::isXRP(get_issuer))
{
response["error"] =
"Invalid field 'taker_gets.issuer', expected non-XRP issuer.";
return response;
}
boost::optional<ripple::AccountID> takerID;
if (request.contains("taker"))
{
if (!request.at("taker").is_string())
{
response["error"] = "taker should be string";
return response;
}
takerID = ripple::parseBase58<ripple::AccountID>(
request.at("taker").as_string().c_str());
if (!takerID)
{
response["error"] = "Invalid taker";
return response;
}
}
if (pay_currency == get_currency && pay_issuer == get_issuer)
{
response["error"] = "Bad market";
return response;
}
std::uint32_t limit = 500;
if (request.contains("limit") and
request.at("limit").kind() == boost::json::kind::int64)
limit = request.at("limit").as_int64();
std::optional<ripple::uint256> cursor;
if (request.contains("cursor"))
{
cursor = {};
cursor->parseHex(request.at("cursor").as_string().c_str());
}
ripple::Book book = {
{pay_currency, pay_issuer}, {get_currency, get_issuer}};
ripple::uint256 bookBase = getBookBase(book);
auto start = std::chrono::system_clock::now();
auto [offers, retCursor] =
backend.fetchBookOffers(bookBase, sequence, limit, cursor);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(warning) << "Time loading books from Postgres: "
<< ((end - start).count() / 1000000000.0);
response["offers"] = boost::json::value(boost::json::array_kind);
boost::json::array& jsonOffers = response.at("offers").as_array();
start = std::chrono::system_clock::now();
std::transform(
std::move_iterator(offers.begin()),
std::move_iterator(offers.end()),
std::back_inserter(jsonOffers),
[](auto obj) {
try
{
ripple::SerialIter it{obj.blob.data(), obj.blob.size()};
ripple::SLE offer{it, obj.key};
return getJson(offer);
}
catch (std::exception const& e)
{
boost::json::object empty;
empty["missing_key"] = ripple::strHex(obj.key);
empty["data"] = ripple::strHex(obj.blob);
return empty;
}
});
end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(warning) << "Time transforming to json: "
<< ((end - start).count() / 1000000000.0);
return response;
}

View File

@@ -21,7 +21,7 @@
#include <ripple/protocol/STLedgerEntry.h>
#include <boost/json.hpp>
#include <handlers/RPCHelpers.h>
#include <reporting/ReportingBackend.h>
#include <reporting/BackendInterface.h>
// Get state nodes from a ledger
// Inputs:
// limit: integer, maximum number of entries
@@ -38,21 +38,23 @@
boost::json::object
doLedgerData(
boost::json::object const& request,
CassandraFlatMapBackend const& backend)
BackendInterface const& backend)
{
boost::json::object response;
uint32_t ledger = request.at("ledger_index").as_int64();
std::optional<int64_t> marker = request.contains("marker")
? request.at("marker").as_int64()
: std::optional<int64_t>{};
ripple::uint256 cursor;
if (request.contains("cursor"))
{
cursor.parseHex(request.at("cursor").as_string().c_str());
}
bool binary =
request.contains("binary") ? request.at("binary").as_bool() : false;
size_t limit = request.contains("limit") ? request.at("limit").as_int64()
: (binary ? 2048 : 256);
Backend::LedgerPage page;
auto start = std::chrono::system_clock::now();
auto [results, returnedMarker] =
backend.doUpperBound(marker, ledger, limit);
page = backend.fetchLedgerPage(cursor, ledger, limit);
auto end = std::chrono::system_clock::now();
@@ -60,6 +62,10 @@ doLedgerData(
std::chrono::duration_cast<std::chrono::microseconds>(end - start)
.count();
boost::json::array objects;
std::vector<Backend::LedgerObject>& results = page.objects;
std::optional<ripple::uint256> const& returnedCursor = page.cursor;
BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound returned " << results.size() << " results";
for (auto const& [key, object] : results)
{
ripple::STLedgerEntry sle{
@@ -75,12 +81,12 @@ doLedgerData(
objects.push_back(getJson(sle));
}
response["objects"] = objects;
if (returnedMarker)
response["marker"] = returnedMarker.value();
if (returnedCursor)
response["marker"] = ripple::strHex(*returnedCursor);
response["num_results"] = results.size();
response["db_time"] = time;
response["time_per_result"] = time / results.size();
return response;
}

View File

@@ -1,4 +1,5 @@
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
std::optional<ripple::AccountID>
accountFromStringStrict(std::string const& account)
@@ -21,20 +22,19 @@ accountFromStringStrict(std::string const& account)
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>
deserializeTxPlusMeta(
std::pair<std::vector<unsigned char>, std::vector<unsigned char>> const&
blobs)
deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs)
{
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>
result;
{
ripple::SerialIter s{blobs.first.data(), blobs.first.size()};
ripple::SerialIter s{
blobs.transaction.data(), blobs.transaction.size()};
result.first = std::make_shared<ripple::STTx const>(s);
}
{
ripple::SerialIter s{blobs.second.data(), blobs.second.size()};
ripple::SerialIter s{blobs.metadata.data(), blobs.metadata.size()};
result.second =
std::make_shared<ripple::STObject const>(s, ripple::sfMetadata);
}

View File

@@ -5,15 +5,14 @@
#include <ripple/protocol/STLedgerEntry.h>
#include <ripple/protocol/STTx.h>
#include <boost/json.hpp>
#include <reporting/BackendInterface.h>
std::optional<ripple::AccountID>
accountFromStringStrict(std::string const& account);
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::STObject const>>
deserializeTxPlusMeta(
std::pair<std::vector<unsigned char>, std::vector<unsigned char>> const&
blobs);
deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs);
boost::json::object
getJson(ripple::STBase const& obj);

View File

@@ -18,18 +18,15 @@
//==============================================================================
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
// {
// transaction: <hex>
// }
boost::json::object
doTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& postgres)
doTx(boost::json::object const& request, BackendInterface const& backend)
{
boost::json::object response;
if (!request.contains("transaction"))
@@ -44,19 +41,19 @@ doTx(
return response;
}
auto range = getLedgerRange(postgres);
auto range = backend.fetchLedgerRange();
if (!range)
{
response["error"] = "Database is empty";
return response;
}
auto dbResponse = backend.fetchTransaction(hash.data());
auto dbResponse = backend.fetchTransaction(hash);
if (!dbResponse)
{
response["error"] = "Transaction not found in Cassandra";
response["ledger_range"] = std::to_string(range->lower()) + " - " +
std::to_string(range->upper());
response["ledger_range"] = std::to_string(range->minSequence) + " - " +
std::to_string(range->maxSequence);
return response;
}

View File

@@ -0,0 +1,27 @@
#ifndef RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED
#define RIPPLE_APP_REPORTING_BACKENDFACTORY_H_INCLUDED
#include <reporting/BackendInterface.h>
#include <reporting/CassandraBackend.h>
#include <reporting/PostgresBackend.h>
namespace Backend {
std::unique_ptr<BackendInterface>
makeBackend(boost::json::object const& config)
{
boost::json::object const& dbConfig = config.at("database").as_object();
if (dbConfig.contains("cassandra"))
{
auto backend = std::make_unique<CassandraBackend>(
dbConfig.at("cassandra").as_object());
return std::move(backend);
}
else if (dbConfig.contains("postgres"))
{
auto backend = std::make_unique<PostgresBackend>(
dbConfig.at("postgres").as_object());
return std::move(backend);
}
return nullptr;
}
} // namespace Backend
#endif

View File

@@ -0,0 +1,138 @@
#ifndef RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
#define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED
#include <ripple/ledger/ReadView.h>
#include <reporting/DBHelpers.h>
namespace Backend {
using Blob = std::vector<unsigned char>;
struct LedgerObject
{
ripple::uint256 key;
Blob blob;
};
struct LedgerPage
{
std::vector<LedgerObject> objects;
std::optional<ripple::uint256> cursor;
};
struct TransactionAndMetadata
{
Blob transaction;
Blob metadata;
};
struct AccountTransactionsCursor
{
uint32_t ledgerSequence;
uint32_t transactionIndex;
};
struct LedgerRange
{
uint32_t minSequence;
uint32_t maxSequence;
};
class BackendInterface
{
public:
// read methods
virtual std::optional<uint32_t>
fetchLatestLedgerSequence() const = 0;
virtual std::optional<ripple::LedgerInfo>
fetchLedgerBySequence(uint32_t sequence) const = 0;
virtual std::optional<LedgerRange>
fetchLedgerRange() const = 0;
virtual std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;
// returns a transaction, metadata pair
virtual std::optional<TransactionAndMetadata>
fetchTransaction(ripple::uint256 const& hash) const = 0;
virtual LedgerPage
fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const = 0;
virtual std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor = {}) const = 0;
virtual std::vector<TransactionAndMetadata>
fetchTransactions(std::vector<ripple::uint256> const& hashes) const = 0;
virtual std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const = 0;
virtual std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
fetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor = {}) const = 0;
// write methods
virtual void
writeLedger(
ripple::LedgerInfo const& ledgerInfo,
std::string&& ledgerHeader,
bool isFirst = false) const = 0;
virtual void
writeLedgerObject(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const = 0;
virtual void
writeTransaction(
std::string&& hash,
uint32_t seq,
std::string&& transaction,
std::string&& metadata) const = 0;
virtual void
writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const = 0;
// other database methods
// Open the database. Set up all of the necessary objects and
// datastructures. After this call completes, the database is ready for use.
virtual void
open() = 0;
// Close the database, releasing any resources
virtual void
close() = 0;
virtual void
startWrites() const = 0;
virtual bool
finishWrites() const = 0;
virtual ~BackendInterface()
{
}
};
} // namespace Backend
using BackendInterface = Backend::BackendInterface;
#endif

View File

@@ -0,0 +1,954 @@
#include <reporting/CassandraBackend.h>
namespace Backend {
template <class T, class F>
void
processAsyncWriteResponse(T&& requestParams, CassFuture* fut, F func)
{
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &func](
const boost::system::error_code& error) {
func(requestParams, true);
});
}
else
{
backend.finishAsyncWrite();
int remaining = --requestParams.refs;
if (remaining == 0)
delete &requestParams;
}
}
// Process the result of an asynchronous write. Retry on error
// @param fut cassandra future associated with the write
// @param cbData struct that holds the request parameters
void
flatMapWriteCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->write(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteBookCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeBook(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry)
{
auto const& backend = *requestParams.backend;
if (requestParams.isDeleted)
backend.writeDeletedKey(requestParams, true);
else
backend.writeKey(requestParams, true);
}
void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
processAsyncWriteResponse(std::move(requestParams), fut, retryWriteKey);
}
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeKey(requestParams, true);
});
}
else
{
auto finish = [&backend]() {
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
};
CassandraResult result{cass_future_get_result(fut)};
if (!result)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
requestParams.createdSequence = result.getUInt32();
backend.writeDeletedKey(requestParams, false);
}
}
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteTransactionCallbackData& requestParams =
*static_cast<CassandraBackend::WriteTransactionCallbackData*>(cbData);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeTransaction(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteAccountTxCallbackData& requestParams =
*static_cast<CassandraBackend::WriteAccountTxCallbackData*>(cbData);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeAccountTx(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHeaderCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHeaderCallbackData*>(cbData);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeLedgerHeader(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHashCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHashCallbackData*>(cbData);
auto func = [&requestParams](auto& params, bool retry) {
requestParams.backend->writeLedgerHash(params, retry);
};
processAsyncWriteResponse(std::move(requestParams), fut, func);
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadCallbackData& requestParams =
*static_cast<CassandraBackend::ReadCallbackData*>(cbData);
auto func = [&requestParams](auto& params) {
requestParams.backend.read(params);
};
CassandraAsyncResult asyncResult{requestParams, fut, func};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
requestParams.result = {result.getBytes(), result.getBytes()};
}
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::ReadObjectCallbackData& requestParams =
*static_cast<CassandraBackend::ReadObjectCallbackData*>(cbData);
auto func = [&requestParams](auto& params) {
requestParams.backend.readObject(params);
};
CassandraAsyncResult asyncResult{requestParams, fut, func};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
requestParams.result = result.getBytes();
}
}
std::optional<LedgerRange>
CassandraBackend::fetchLedgerRange() const
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
CassandraStatement statement{selectLedgerRange_};
CassandraResult result = executeSyncRead(statement);
if (!result)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows";
return {};
}
LedgerRange range;
range.maxSequence = range.minSequence = result.getUInt32();
if (result.nextRow())
{
range.maxSequence = result.getUInt32();
}
if (range.minSequence > range.maxSequence)
{
std::swap(range.minSequence, range.maxSequence);
}
return range;
}
void
CassandraBackend::open()
{
std::cout << config_ << std::endl;
auto getString = [this](std::string const& field) -> std::string {
if (config_.contains(field))
{
auto jsonStr = config_[field].as_string();
return {jsonStr.c_str(), jsonStr.size()};
}
return {""};
};
if (open_)
{
assert(false);
BOOST_LOG_TRIVIAL(error) << "database is already open";
return;
}
std::lock_guard<std::mutex> lock(mutex_);
CassCluster* cluster = cass_cluster_new();
if (!cluster)
throw std::runtime_error("nodestore:: Failed to create CassCluster");
std::string secureConnectBundle = getString("secure_connect_bundle");
if (!secureConnectBundle.empty())
{
/* Setup driver to connect to the cloud using the secure connection
* bundle */
if (cass_cluster_set_cloud_secure_connection_bundle(
cluster, secureConnectBundle.c_str()) != CASS_OK)
{
BOOST_LOG_TRIVIAL(error) << "Unable to configure cloud using the "
"secure connection bundle: "
<< secureConnectBundle;
throw std::runtime_error(
"nodestore: Failed to connect using secure connection "
"bundle");
return;
}
}
else
{
std::string contact_points = getString("contact_points");
if (contact_points.empty())
{
throw std::runtime_error(
"nodestore: Missing contact_points in Cassandra config");
}
CassError rc =
cass_cluster_set_contact_points(cluster, contact_points.c_str());
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra contact_points: "
<< contact_points << ", result: " << rc << ", "
<< cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
int port = config_.contains("port") ? config_["port"].as_int64() : 0;
if (port)
{
rc = cass_cluster_set_port(cluster, port);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra port: " << port
<< ", result: " << rc << ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
}
}
cass_cluster_set_token_aware_routing(cluster, cass_true);
CassError rc =
cass_cluster_set_protocol_version(cluster, CASS_PROTOCOL_VERSION_V4);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting cassandra protocol version: "
<< ", result: " << rc << ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
std::string username = getString("username");
if (username.size())
{
BOOST_LOG_TRIVIAL(debug)
<< "user = " << username.c_str()
<< " password = " << getString("password").c_str();
cass_cluster_set_credentials(
cluster, username.c_str(), getString("password").c_str());
}
unsigned int const workers = std::thread::hardware_concurrency();
rc = cass_cluster_set_num_threads_io(cluster, workers);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra io threads to " << workers
<< ", result: " << rc << ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
cass_cluster_set_request_timeout(cluster, 2000);
rc = cass_cluster_set_queue_size_io(
cluster,
maxRequestsOutstanding); // This number needs to scale w/ the
// number of request per sec
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra max core connections per "
"host"
<< ", result: " << rc << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
throw std::runtime_error(ss.str());
}
std::string certfile = getString("certfile");
if (certfile.size())
{
std::ifstream fileStream(
boost::filesystem::path(certfile).string(), std::ios::in);
if (!fileStream)
{
std::stringstream ss;
ss << "opening config file " << certfile;
throw std::system_error(errno, std::generic_category(), ss.str());
}
std::string cert(
std::istreambuf_iterator<char>{fileStream},
std::istreambuf_iterator<char>{});
if (fileStream.bad())
{
std::stringstream ss;
ss << "reading config file " << certfile;
throw std::system_error(errno, std::generic_category(), ss.str());
}
CassSsl* context = cass_ssl_new();
cass_ssl_set_verify_flags(context, CASS_SSL_VERIFY_NONE);
rc = cass_ssl_add_trusted_cert(context, cert.c_str());
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error setting Cassandra ssl context: " << rc
<< ", " << cass_error_desc(rc);
throw std::runtime_error(ss.str());
}
cass_cluster_set_ssl(cluster, context);
cass_ssl_free(context);
}
std::string keyspace = getString("keyspace");
if (keyspace.empty())
{
throw std::runtime_error(
"nodestore: Missing keyspace in Cassandra config");
}
std::string tableName = getString("table_name");
if (tableName.empty())
{
throw std::runtime_error(
"nodestore: Missing table name in Cassandra config");
}
cass_cluster_set_connect_timeout(cluster, 10000);
CassStatement* statement;
CassFuture* fut;
bool setupSessionAndTable = false;
while (!setupSessionAndTable)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
session_.reset(cass_session_new());
assert(session_);
fut = cass_session_connect_keyspace(
session_.get(), cluster, keyspace.c_str());
rc = cass_future_error_code(fut);
cass_future_free(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "nodestore: Error connecting Cassandra session keyspace: "
<< rc << ", " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
std::stringstream query;
query << "CREATE TABLE IF NOT EXISTS " << tableName << "flat"
<< " ( key blob, sequence bigint, object blob, PRIMARY "
"KEY(key, "
"sequence)) WITH CLUSTERING ORDER BY (sequence DESC)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "flat"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName
<< "flattransactions"
<< " ( hash blob PRIMARY KEY, sequence bigint, transaction "
"blob, metadata blob)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "flattransactions"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "keys"
<< " ( key blob, created bigint, deleted bigint, PRIMARY KEY "
"(key, created)) with clustering order by (created "
"desc) ";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "keys"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "books"
<< " ( book blob, sequence bigint, key blob, deleted_at "
"bigint, PRIMARY KEY "
"(book, key)) WITH CLUSTERING ORDER BY (key ASC)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "books"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
" hash blob, "
"PRIMARY KEY "
"(account, seq_idx)) WITH "
"CLUSTERING ORDER BY (seq_idx desc)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "account_tx"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledgers"
<< " ( sequence bigint PRIMARY KEY, header blob )";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "ledgers"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_hashes"
<< " (hash blob PRIMARY KEY, sequence bigint)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "ledger_hashes"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_range"
<< " (is_latest boolean PRIMARY KEY, sequence bigint)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
{
std::stringstream ss;
ss << "nodestore: Error creating Cassandra table: " << rc << ", "
<< cass_error_desc(rc) << " - " << query.str();
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
query = {};
query << "SELECT * FROM " << tableName << "ledger_range"
<< " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
cass_future_free(fut);
cass_statement_free(statement);
if (rc != CASS_OK)
{
if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
{
BOOST_LOG_TRIVIAL(warning)
<< "table not here yet, sleeping 1s to "
"see if table creation propagates";
continue;
}
else
{
std::stringstream ss;
ss << "nodestore: Error checking for table: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
}
setupSessionAndTable = true;
}
cass_cluster_free(cluster);
bool setupPreparedStatements = false;
while (!setupPreparedStatements)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::stringstream query;
query << "INSERT INTO " << tableName << "flat"
<< " (key, sequence, object) VALUES (?, ?, ?)";
if (!insertObject_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "flattransactions"
<< " (hash, sequence, transaction, metadata) VALUES (?, ?, "
"?, ?)";
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "keys"
<< " (key, created, deleted) VALUES (?, ?, ?)";
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "books"
<< " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)";
if (!insertBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tableName << "books"
<< " (book, key, deleted_at) VALUES (?, ?, ?)";
if (!deleteBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT created FROM " << tableName << "keys"
<< " WHERE key = ? ORDER BY created desc LIMIT 1";
if (!getCreated_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT object, sequence FROM " << tableName << "flat"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
"LIMIT 1";
if (!selectObject_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT transaction,metadata FROM " << tableName
<< "flattransactions"
<< " WHERE hash = ?";
if (!selectTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tableName << "keys "
<< " WHERE TOKEN(key) >= ? and created <= ?"
<< " and deleted > ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
if (!selectLedgerPageKeys_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key,object FROM " << tableName << "flat "
<< " WHERE TOKEN(key) >= ? and sequence <= ? ORDER BY sequence "
"DESC "
<< " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING";
if (!selectLedgerPage_.prepareStatement(query, session_.get()))
continue;
/*
query = {};
query << "SELECT filterempty(key,object) FROM " << tableName <<
"flat "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
<< " PER PARTITION LIMIT 1 LIMIT ?"
<< " ALLOW FILTERING";
if (!upperBound2_.prepareStatement(query, session_.get()))
continue;
*/
query = {};
query << "SELECT TOKEN(key) FROM " << tableName << "flat "
<< " WHERE key = ? LIMIT 1";
if (!getToken_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tableName << "books "
<< " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND"
" key > ? "
" ORDER BY key ASC LIMIT ? ALLOW FILTERING";
if (!getBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " INSERT INTO " << tableName << "account_tx"
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " SELECT hash,seq_idx FROM " << tableName << "account_tx"
<< " WHERE account = ? "
<< " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " INSERT INTO " << tableName << "ledgers "
<< " (sequence, header) VALUES(?,?)";
if (!insertLedgerHeader_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " INSERT INTO " << tableName << "ledger_hashes"
<< " (hash, sequence) VALUES(?,?)";
if (!insertLedgerHash_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " update " << tableName << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence != ?";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " select header from " << tableName
<< "ledgers where sequence = ?";
if (!selectLedgerBySeq_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " select sequence from " << tableName
<< "ledger_range where is_latest = true";
if (!selectLatestLedger_.prepareStatement(query, session_.get()))
continue;
query = {};
query << " SELECT sequence FROM " << tableName << "ledger_range WHERE "
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
setupPreparedStatements = true;
}
work_.emplace(ioContext_);
ioThread_ = std::thread{[this]() { ioContext_.run(); }};
open_ = true;
if (config_.contains("max_requests_outstanding"))
{
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
}
BOOST_LOG_TRIVIAL(info) << "Opened database successfully";
}
} // namespace Backend

1533
reporting/CassandraBackend.h Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -45,6 +45,44 @@ writeToLedgersDB(ripple::LedgerInfo const& info, PgQuery& pgQuery)
return res;
}
/*
bool
writeBooks(std::vector<BookDirectoryData> const& bookDirData, PgQuery& pg)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Writing " << bookDirData.size() << "books to Postgres";
try
{
std::stringstream booksCopyBuffer;
for (auto const& data : bookDirData)
{
std::string directoryIndex = ripple::strHex(data.directoryIndex);
std::string bookIndex = ripple::strHex(data.bookIndex);
auto ledgerSeq = data.ledgerSequence;
booksCopyBuffer << "\\\\x" << directoryIndex << '\t'
<< std::to_string(ledgerSeq) << '\t' << "\\\\x"
<< bookIndex << '\n';
}
pg.bulkInsert("books", booksCopyBuffer.str());
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Successfully inserted books";
return true;
}
catch (std::exception& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << "Caught exception inserting books : " << e.what();
assert(false);
return false;
}
}
*/
bool
writeToPostgres(
ripple::LedgerInfo const& info,
@@ -56,8 +94,8 @@ writeToPostgres(
try
{
// Create a PgQuery object to run multiple commands over the same
// connection in a single transaction block.
// Create a PgQuery object to run multiple commands over the
// same connection in a single transaction block.
PgQuery pg(pgPool);
auto res = pg("BEGIN");
if (!res || res.status() != PGRES_COMMAND_OK)
@@ -67,9 +105,10 @@ writeToPostgres(
throw std::runtime_error(msg.str());
}
// Writing to the ledgers db fails if the ledger already exists in the
// db. In this situation, the ETL process has detected there is another
// writer, and falls back to only publishing
// Writing to the ledgers db fails if the ledger already
// exists in the db. In this situation, the ETL process has
// detected there is another writer, and falls back to only
// publishing
if (!writeToLedgersDB(info, pg))
{
BOOST_LOG_TRIVIAL(warning)

View File

@@ -48,6 +48,26 @@ struct AccountTransactionsData
}
};
inline bool
isOffer(std::string const& object)
{
short offer_bytes = (object[1] << 8) | object[2];
return offer_bytes == 0x006f;
}
inline ripple::uint256
getBook(std::string const& offer)
{
ripple::SerialIter it{offer.data(), offer.size()};
ripple::SLE sle{it, {}};
ripple::uint256 book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book.data()[book.size() - 1 - i] = 0x00;
}
return book;
}
/// Write new ledger and transaction data to Postgres
/// @param info Ledger Info to write
/// @param accountTxData transaction data to write
@@ -60,4 +80,27 @@ writeToPostgres(
std::vector<AccountTransactionsData> const& accountTxData,
std::shared_ptr<PgPool> const& pgPool);
inline ripple::LedgerInfo
deserializeHeader(ripple::Slice data)
{
ripple::SerialIter sit(data.data(), data.size());
ripple::LedgerInfo info;
info.seq = sit.get32();
info.drops = sit.get64();
info.parentHash = sit.get256();
info.txHash = sit.get256();
info.accountHash = sit.get256();
info.parentCloseTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTimeResolution = ripple::NetClock::duration{sit.get8()};
info.closeFlags = sit.get8();
info.hash = sit.get256();
return info;
}
#endif

View File

@@ -19,6 +19,7 @@
*/
//==============================================================================
#include <ripple/protocol/STLedgerEntry.h>
#include <boost/asio/strand.hpp>
#include <boost/json.hpp>
#include <boost/json/src.hpp>
@@ -30,7 +31,7 @@
// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource::ETLSource(
boost::json::object const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& networkValidatedLedgers,
boost::asio::io_context& ioContext)
: ioc_(ioContext)
@@ -399,18 +400,19 @@ public:
process(
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
bool abort = false)
{
std::cout << "Processing calldata" << std::endl;
BOOST_LOG_TRIVIAL(info) << "Processing response. "
<< "Marker prefix = " << getMarkerPrefix();
if (abort)
{
std::cout << "AsyncCallData aborted";
BOOST_LOG_TRIVIAL(error) << "AsyncCallData aborted";
return CallStatus::ERRORED;
}
if (!status_.ok())
{
BOOST_LOG_TRIVIAL(debug)
BOOST_LOG_TRIVIAL(error)
<< "AsyncCallData status_ not ok: "
<< " code = " << status_.error_code()
<< " message = " << status_.error_message();
@@ -439,13 +441,31 @@ public:
call(stub, cq);
}
BOOST_LOG_TRIVIAL(info) << "Writing objects";
for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects()))
{
backend.store(
std::optional<ripple::uint256> book;
short offer_bytes = (obj.data()[1] << 8) | obj.data()[2];
if (offer_bytes == 0x006f)
{
ripple::SerialIter it{obj.data().data(), obj.data().size()};
ripple::SLE sle{it, {}};
book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book->data()[book->size() - 1 - i] = 0x00;
}
}
backend.writeLedgerObject(
std::move(*obj.mutable_key()),
request_.ledger().sequence(),
std::move(*obj.mutable_data()));
std::move(*obj.mutable_data()),
true,
false,
std::move(book));
}
BOOST_LOG_TRIVIAL(info) << "Wrote objects";
return more ? CallStatus::MORE : CallStatus::DONE;
}
@@ -455,6 +475,7 @@ public:
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq)
{
BOOST_LOG_TRIVIAL(info) << "Making next request. " << getMarkerPrefix();
context_ = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncResponseReader<
@@ -472,7 +493,7 @@ public:
if (next_->marker().size() == 0)
return "";
else
return std::string{next_->marker().data()[0]};
return ripple::strHex(std::string{next_->marker().data()[0]});
}
};
@@ -491,8 +512,8 @@ ETLSource::loadInitialLedger(uint32_t sequence)
std::vector<AsyncCallData> calls;
calls.emplace_back(sequence);
BOOST_LOG_TRIVIAL(debug) << "Starting data download for ledger " << sequence
<< ". Using source = " << toString();
BOOST_LOG_TRIVIAL(info) << "Starting data download for ledger " << sequence
<< ". Using source = " << toString();
for (auto& c : calls)
c.call(stub_, cq);
@@ -513,13 +534,13 @@ ETLSource::loadInitialLedger(uint32_t sequence)
}
else
{
BOOST_LOG_TRIVIAL(debug)
BOOST_LOG_TRIVIAL(info)
<< "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, cq, backend_, abort);
if (result != AsyncCallData::CallStatus::MORE)
{
numFinished++;
BOOST_LOG_TRIVIAL(debug)
BOOST_LOG_TRIVIAL(info)
<< "Finished a marker. "
<< "Current number of finished = " << numFinished;
}
@@ -554,15 +575,14 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
<< "ETLSource::fetchLedger - is_unlimited is "
"false. Make sure secure_gateway is set "
"correctly on the ETL source. source = "
<< toString() << " response = " << response.DebugString()
<< " status = " << status.error_message();
<< toString() << " status = " << status.error_message();
assert(false);
}
return {status, std::move(response)};
}
ETLLoadBalancer::ETLLoadBalancer(
boost::json::array const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext)
{

View File

@@ -25,7 +25,7 @@
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/ReportingBackend.h>
#include <reporting/BackendInterface.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
@@ -85,7 +85,7 @@ class ETLSource
// used for retrying connections
boost::asio::steady_timer timer_;
CassandraFlatMapBackend& backend_;
BackendInterface& backend_;
public:
bool
@@ -113,7 +113,7 @@ public:
/// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource(
boost::json::object const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& networkValidatedLedgers,
boost::asio::io_context& ioContext);
@@ -285,7 +285,7 @@ private:
public:
ETLLoadBalancer(
boost::json::array const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext);

View File

@@ -741,6 +741,14 @@ CREATE TABLE IF NOT EXISTS ledgers (
trans_set_hash bytea NOT NULL
);
CREATE TABLE IF NOT EXISTS objects (
key bytea NOT NULL,
ledger_seq bigint NOT NULL,
object bytea,
PRIMARY KEY(key, ledger_seq)
);
-- Index for lookups by ledger hash.
CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
USING hash (ledger_hash);
@@ -748,35 +756,30 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
-- Transactions table. Deletes from the ledger table
-- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS transactions (
ledger_seq bigint NOT NULL,
transaction_index bigint NOT NULL,
trans_id bytea NOT NULL,
nodestore_hash bytea NOT NULL,
constraint transactions_pkey PRIMARY KEY (ledger_seq, transaction_index),
constraint transactions_fkey FOREIGN KEY (ledger_seq)
REFERENCES ledgers (ledger_seq) ON DELETE CASCADE
hash bytea PRIMARY KEY,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
transaction bytea NOT NULL,
metadata bytea NOT NULL
);
-- Index for lookups by transaction hash.
CREATE INDEX IF NOT EXISTS transactions_trans_id_idx ON transactions
USING hash (trans_id);
-- Table that maps accounts to transactions affecting them. Deletes from the
-- ledger table by way of transactions table cascade here based on ledger_seq.
-- ledger table cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS account_transactions (
account bytea NOT NULL,
ledger_seq bigint NOT NULL,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
transaction_index bigint NOT NULL,
constraint account_transactions_pkey PRIMARY KEY (account, ledger_seq,
transaction_index),
constraint account_transactions_fkey FOREIGN KEY (ledger_seq,
transaction_index) REFERENCES transactions (
ledger_seq, transaction_index) ON DELETE CASCADE
hash bytea NOT NULL,
PRIMARY KEY (account, ledger_seq, transaction_index),
);
-- Table that maps a book to a list of offers in that book. Deletes from the ledger table
-- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS books (
book bytea NOT NULL,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
deleted boolean NOT NULL,
offer_key bytea NOT NULL,
PRIMARY KEY(book, offer_key, deleted)
);
-- Index to allow for fast cascading deletions and referential integrity.
CREATE INDEX IF NOT EXISTS fki_account_transactions_idx ON
account_transactions USING btree (ledger_seq, transaction_index);
-- Avoid inadvertent administrative tampering with committed data.
CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO
@@ -785,44 +788,11 @@ CREATE OR REPLACE RULE transactions_update_protect AS ON UPDATE TO
transactions DO INSTEAD NOTHING;
CREATE OR REPLACE RULE account_transactions_update_protect AS ON UPDATE TO
account_transactions DO INSTEAD NOTHING;
CREATE OR REPLACE RULE objects_update_protect AS ON UPDATE TO
objects DO INSTEAD NOTHING;
CREATE OR REPLACE RULE books_update_protect AS ON UPDATE TO
books DO INSTEAD NOTHING;
-- Stored procedure to assist with the tx() RPC call. Takes transaction hash
-- as input. If found, returns the ledger sequence in which it was applied.
-- If not, returns the range of ledgers searched.
CREATE OR REPLACE FUNCTION tx (
_in_trans_id bytea
) RETURNS jsonb AS $$
DECLARE
_min_ledger bigint := min_ledger();
_min_seq bigint := (SELECT ledger_seq
FROM ledgers
WHERE ledger_seq = _min_ledger
FOR SHARE);
_max_seq bigint := max_ledger();
_ledger_seq bigint;
_nodestore_hash bytea;
BEGIN
IF _min_seq IS NULL THEN
RETURN jsonb_build_object('error', 'empty database');
END IF;
IF length(_in_trans_id) != 32 THEN
RETURN jsonb_build_object('error', '_in_trans_id size: '
|| to_char(length(_in_trans_id), '999'));
END IF;
EXECUTE 'SELECT nodestore_hash, ledger_seq
FROM transactions
WHERE trans_id = $1
AND ledger_seq BETWEEN $2 AND $3
' INTO _nodestore_hash, _ledger_seq USING _in_trans_id, _min_seq, _max_seq;
IF _nodestore_hash IS NULL THEN
RETURN jsonb_build_object('min_seq', _min_seq, 'max_seq', _max_seq);
END IF;
RETURN jsonb_build_object('nodestore_hash', _nodestore_hash, 'ledger_seq',
_ledger_seq);
END;
$$ LANGUAGE plpgsql;
-- Return the earliest ledger sequence intended for range operations
-- that protect the bottom of the range from deletion. Return NULL if empty.
@@ -845,195 +815,6 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- account_tx() RPC helper. From the rippled reporting process, only the
-- parameters without defaults are required. For the parameters with
-- defaults, validation should be done by rippled, such as:
-- _in_account_id should be a valid xrp base58 address.
-- _in_forward either true or false according to the published api
-- _in_limit should be validated and not simply passed through from
-- client.
--
-- For _in_ledger_index_min and _in_ledger_index_max, if passed in the
-- request, verify that their type is int and pass through as is.
-- For _ledger_hash, verify and convert from hex length 32 bytes and
-- prepend with \x (\\x C++).
--
-- For _in_ledger_index, if the input type is integer, then pass through
-- as is. If the type is string and contents = validated, then do not
-- set _in_ledger_index. Instead set _in_invalidated to TRUE.
--
-- There is no need for rippled to do any type of lookup on max/min
-- ledger range, lookup of hash, or the like. This functions does those
-- things, including error responses if bad input. Only the above must
-- be done to set the correct search range.
--
-- If a marker is present in the request, verify the members 'ledger'
-- and 'seq' are integers and they correspond to _in_marker_seq
-- _in_marker_index.
-- To reiterate:
-- JSON input field 'ledger' corresponds to _in_marker_seq
-- JSON input field 'seq' corresponds to _in_marker_index
CREATE OR REPLACE FUNCTION account_tx (
_in_account_id bytea,
_in_forward bool,
_in_limit bigint,
_in_ledger_index_min bigint = NULL,
_in_ledger_index_max bigint = NULL,
_in_ledger_hash bytea = NULL,
_in_ledger_index bigint = NULL,
_in_validated bool = NULL,
_in_marker_seq bigint = NULL,
_in_marker_index bigint = NULL
) RETURNS jsonb AS $$
DECLARE
_min bigint;
_max bigint;
_sort_order text := (SELECT CASE WHEN _in_forward IS TRUE THEN
'ASC' ELSE 'DESC' END);
_marker bool;
_between_min bigint;
_between_max bigint;
_sql text;
_cursor refcursor;
_result jsonb;
_record record;
_tally bigint := 0;
_ret_marker jsonb;
_transactions jsonb[] := '{}';
BEGIN
IF _in_ledger_index_min IS NOT NULL OR
_in_ledger_index_max IS NOT NULL THEN
_min := (SELECT CASE WHEN _in_ledger_index_min IS NULL
THEN min_ledger() ELSE greatest(
_in_ledger_index_min, min_ledger()) END);
_max := (SELECT CASE WHEN _in_ledger_index_max IS NULL OR
_in_ledger_index_max = -1 THEN max_ledger() ELSE
least(_in_ledger_index_max, max_ledger()) END);
IF _max < _min THEN
RETURN jsonb_build_object('error', 'max is less than min ledger');
END IF;
ELSIF _in_ledger_hash IS NOT NULL OR _in_ledger_index IS NOT NULL
OR _in_validated IS TRUE THEN
IF _in_ledger_hash IS NOT NULL THEN
IF length(_in_ledger_hash) != 32 THEN
RETURN jsonb_build_object('error', '_in_ledger_hash size: '
|| to_char(length(_in_ledger_hash), '999'));
END IF;
EXECUTE 'SELECT ledger_seq
FROM ledgers
WHERE ledger_hash = $1'
INTO _min USING _in_ledger_hash::bytea;
ELSE
IF _in_ledger_index IS NOT NULL AND _in_validated IS TRUE THEN
RETURN jsonb_build_object('error',
'_in_ledger_index cannot be set and _in_validated true');
END IF;
IF _in_validated IS TRUE THEN
_in_ledger_index := max_ledger();
END IF;
_min := (SELECT ledger_seq
FROM ledgers
WHERE ledger_seq = _in_ledger_index);
END IF;
IF _min IS NULL THEN
RETURN jsonb_build_object('error', 'ledger not found');
END IF;
_max := _min;
ELSE
_min := min_ledger();
_max := max_ledger();
END IF;
IF _in_marker_seq IS NOT NULL OR _in_marker_index IS NOT NULL THEN
_marker := TRUE;
IF _in_marker_seq IS NULL OR _in_marker_index IS NULL THEN
-- The rippled implementation returns no transaction results
-- if either of these values are missing.
_between_min := 0;
_between_max := 0;
ELSE
IF _in_forward IS TRUE THEN
_between_min := _in_marker_seq;
_between_max := _max;
ELSE
_between_min := _min;
_between_max := _in_marker_seq;
END IF;
END IF;
ELSE
_marker := FALSE;
_between_min := _min;
_between_max := _max;
END IF;
IF _between_max < _between_min THEN
RETURN jsonb_build_object('error', 'ledger search range is '
|| to_char(_between_min, '999') || '-'
|| to_char(_between_max, '999'));
END IF;
_sql := format('
SELECT transactions.ledger_seq, transactions.transaction_index,
transactions.trans_id, transactions.nodestore_hash
FROM transactions
INNER JOIN account_transactions
ON transactions.ledger_seq =
account_transactions.ledger_seq
AND transactions.transaction_index =
account_transactions.transaction_index
WHERE account_transactions.account = $1
AND account_transactions.ledger_seq BETWEEN $2 AND $3
ORDER BY transactions.ledger_seq %s, transactions.transaction_index %s
', _sort_order, _sort_order);
OPEN _cursor FOR EXECUTE _sql USING _in_account_id, _between_min,
_between_max;
LOOP
FETCH _cursor INTO _record;
IF _record IS NULL THEN EXIT; END IF;
IF _marker IS TRUE THEN
IF _in_marker_seq = _record.ledger_seq THEN
IF _in_forward IS TRUE THEN
IF _in_marker_index > _record.transaction_index THEN
CONTINUE;
END IF;
ELSE
IF _in_marker_index < _record.transaction_index THEN
CONTINUE;
END IF;
END IF;
END IF;
_marker := FALSE;
END IF;
_tally := _tally + 1;
IF _tally > _in_limit THEN
_ret_marker := jsonb_build_object(
'ledger', _record.ledger_seq,
'seq', _record.transaction_index);
EXIT;
END IF;
-- Is the transaction index in the tx object?
_transactions := _transactions || jsonb_build_object(
'ledger_seq', _record.ledger_seq,
'transaction_index', _record.transaction_index,
'trans_id', _record.trans_id,
'nodestore_hash', _record.nodestore_hash);
END LOOP;
CLOSE _cursor;
_result := jsonb_build_object('ledger_index_min', _min,
'ledger_index_max', _max,
'transactions', _transactions);
IF _ret_marker IS NOT NULL THEN
_result := _result || jsonb_build_object('marker', _ret_marker);
END IF;
RETURN _result;
END;
$$ LANGUAGE plpgsql;
-- Trigger prior to insert on ledgers table. Validates length of hash fields.
-- Verifies ancestry based on ledger_hash & prev_hash as follows:
@@ -1535,36 +1316,3 @@ getLedger(
return info;
}
std::optional<LedgerRange>
getLedgerRange(std::shared_ptr<PgPool>& pgPool)
{
auto range = PgQuery(pgPool)("SELECT complete_ledgers()");
if (!range)
return {};
std::string res{range.c_str()};
try
{
size_t minVal = 0;
size_t maxVal = 0;
if (res == "empty" || res == "error" || res.empty())
return {};
else if (size_t delim = res.find('-'); delim != std::string::npos)
{
minVal = std::stol(res.substr(0, delim));
maxVal = std::stol(res.substr(delim + 1));
}
else
{
minVal = maxVal = std::stol(res);
}
return LedgerRange{minVal, maxVal};
}
catch (std::exception&)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Error parsing result of getCompleteLedgers()";
}
return {};
}

View File

@@ -524,8 +524,4 @@ getLedger(
std::variant<std::monostate, ripple::uint256, uint32_t> const& whichLedger,
std::shared_ptr<PgPool>& pgPool);
using LedgerRange = boost::icl::closed_interval<uint32_t>;
std::optional<LedgerRange>
getLedgerRange(std::shared_ptr<PgPool>& pgPool);
#endif // RIPPLE_CORE_PG_H_INCLUDED

View File

@@ -0,0 +1,535 @@
#include <boost/format.hpp>
#include <reporting/PostgresBackend.h>
namespace Backend {
PostgresBackend::PostgresBackend(boost::json::object const& config)
: pgPool_(make_PgPool(config))
{
}
void
PostgresBackend::writeLedger(
ripple::LedgerInfo const& ledgerInfo,
std::string&& ledgerHeader,
bool isFirst) const
{
PgQuery pgQuery(pgPool_);
BOOST_LOG_TRIVIAL(debug) << __func__;
auto cmd = boost::format(
R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
auto ledgerInsert = boost::str(
cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) %
ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() %
ledgerInfo.closeTime.time_since_epoch().count() %
ledgerInfo.parentCloseTime.time_since_epoch().count() %
ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags %
ripple::strHex(ledgerInfo.accountHash) %
ripple::strHex(ledgerInfo.txHash));
BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
<< " : "
<< "query string = " << ledgerInsert;
auto res = pgQuery(ledgerInsert.data());
abortWrite_ = !res;
}
void
PostgresBackend::writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const
{
if (abortWrite_)
return;
PgQuery pg(pgPool_);
for (auto const& record : data)
{
std::string txHash = ripple::strHex(record.txHash);
auto idx = record.transactionIndex;
auto ledgerSeq = record.ledgerSequence;
for (auto const& a : record.accounts)
{
std::string acct = ripple::strHex(a);
accountTxBuffer_ << "\\\\x" << acct << '\t'
<< std::to_string(ledgerSeq) << '\t'
<< std::to_string(idx) << '\t' << "\\\\x"
<< ripple::strHex(txHash);
}
}
}
void
PostgresBackend::writeLedgerObject(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const
{
if (abortWrite_)
return;
objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(blob) << '\n';
if (book)
{
booksBuffer_ << "\\\\x" << ripple::strHex(*book) << '\t'
<< std::to_string(seq) << '\t' << isDeleted << '\t'
<< "\\\\x" << ripple::strHex(key) << '\n';
}
}
void
PostgresBackend::writeTransaction(
std::string&& hash,
uint32_t seq,
std::string&& transaction,
std::string&& metadata) const
{
if (abortWrite_)
return;
transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(transaction) << '\t' << "\\\\x"
<< ripple::strHex(metadata) << '\n';
}
uint32_t
checkResult(PgResult const& res, uint32_t numFieldsExpected)
{
if (!res)
{
assert(false);
throw std::runtime_error("null postgres response");
}
else if (res.status() != PGRES_TUPLES_OK)
{
std::stringstream msg;
msg << " : Postgres response should have been "
"PGRES_TUPLES_OK but instead was "
<< res.status() << " - msg = " << res.msg();
assert(false);
throw std::runtime_error(msg.str());
}
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " Postgres result msg : " << res.msg();
if (res.isNull() || res.ntuples() == 0)
{
return 0;
}
else if (res.ntuples() > 0)
{
if (res.nfields() != numFieldsExpected)
{
std::stringstream msg;
msg << "Wrong number of fields in Postgres "
"response. Expected "
<< numFieldsExpected << ", but got " << res.nfields();
throw std::runtime_error(msg.str());
assert(false);
}
}
return res.ntuples();
}
ripple::LedgerInfo
parseLedgerInfo(PgResult const& res)
{
char const* hash = res.c_str(0, 0);
char const* prevHash = res.c_str(0, 1);
char const* accountHash = res.c_str(0, 2);
char const* txHash = res.c_str(0, 3);
std::int64_t totalCoins = res.asBigInt(0, 4);
std::int64_t closeTime = res.asBigInt(0, 5);
std::int64_t parentCloseTime = res.asBigInt(0, 6);
std::int64_t closeTimeRes = res.asBigInt(0, 7);
std::int64_t closeFlags = res.asBigInt(0, 8);
std::int64_t ledgerSeq = res.asBigInt(0, 9);
using time_point = ripple::NetClock::time_point;
using duration = ripple::NetClock::duration;
ripple::LedgerInfo info;
if (!info.parentHash.parseHex(prevHash + 2))
throw std::runtime_error("parseLedgerInfo - error parsing parent hash");
if (!info.txHash.parseHex(txHash + 2))
throw std::runtime_error("parseLedgerInfo - error parsing tx map hash");
if (!info.accountHash.parseHex(accountHash + 2))
throw std::runtime_error(
"parseLedgerInfo - error parsing state map hash");
info.drops = totalCoins;
info.closeTime = time_point{duration{closeTime}};
info.parentCloseTime = time_point{duration{parentCloseTime}};
info.closeFlags = closeFlags;
info.closeTimeResolution = duration{closeTimeRes};
info.seq = ledgerSeq;
if (!info.hash.parseHex(hash + 2))
throw std::runtime_error("parseLedgerInfo - error parsing ledger hash");
info.validated = true;
return info;
}
std::optional<uint32_t>
PostgresBackend::fetchLatestLedgerSequence() const
{
PgQuery pgQuery(pgPool_);
auto res =
pgQuery("SELECT sequence FROM ledgers ORDER BY sequence DESC LIMIT 1");
if (checkResult(res, 1))
return res.asBigInt(0, 0);
return {};
}
std::optional<ripple::LedgerInfo>
PostgresBackend::fetchLedgerBySequence(uint32_t sequence) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT * FROM ledgers WHERE sequence = "
<< std::to_string(sequence);
auto res = pgQuery(sql.str().data());
if (checkResult(res, 10))
return parseLedgerInfo(res);
return {};
}
std::optional<LedgerRange>
PostgresBackend::fetchLedgerRange() const
{
auto range = PgQuery(pgPool_)("SELECT complete_ledgers()");
if (!range)
return {};
std::string res{range.c_str()};
try
{
size_t minVal = 0;
size_t maxVal = 0;
if (res == "empty" || res == "error" || res.empty())
return {};
else if (size_t delim = res.find('-'); delim != std::string::npos)
{
minVal = std::stol(res.substr(0, delim));
maxVal = std::stol(res.substr(delim + 1));
}
else
{
minVal = maxVal = std::stol(res);
}
return LedgerRange{minVal, maxVal};
}
catch (std::exception&)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
<< "Error parsing result of getCompleteLedgers()";
}
return {};
}
std::optional<Blob>
PostgresBackend::fetchLedgerObject(
ripple::uint256 const& key,
uint32_t sequence) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT object FROM objects WHERE key = "
<< "\'\\x" << ripple::strHex(key) << "\'"
<< " AND sequence <= " << std::to_string(sequence)
<< " ORDER BY sequence DESC LIMIT 1";
auto res = pgQuery(sql.str().data());
if (checkResult(res, 1))
{
char const* object = res.c_str(0, 0);
std::string_view view{object};
std::vector<unsigned char> blob{view.front(), view.back()};
return blob;
}
return {};
}
// returns a transaction, metadata pair
std::optional<TransactionAndMetadata>
PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_sequence FROM transactions "
"WHERE hash = "
<< "\'\\x" << ripple::strHex(hash) << "\'";
auto res = pgQuery(sql.str().data());
if (checkResult(res, 3))
{
char const* txn = res.c_str(0, 0);
char const* metadata = res.c_str(0, 1);
std::string_view txnView{txn};
std::string_view metadataView{metadata};
return {
{{txnView.front(), txnView.back()},
{metadataView.front(), metadataView.back()}}};
}
return {};
}
LedgerPage
PostgresBackend::fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT key,object FROM"
<< " (SELECT DISTINCT ON (key) * FROM objects"
<< " WHERE sequence <= " << std::to_string(ledgerSequence);
if (cursor)
sql << " AND key > \'x\\" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key, sequence DESC) sub"
<< " WHERE object != \'\\x\'"
<< " LIMIT " << std::to_string(limit);
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 2))
{
std::vector<LedgerObject> objects;
for (size_t i = 0; i < numRows; ++i)
{
ripple::uint256 key;
if (!key.parseHex(res.c_str(i, 0)))
throw std::runtime_error("Error parsing key from postgres");
char const* object = res.c_str(i, 1);
std::string_view view{object};
objects.push_back({std::move(key), {view.front(), view.back()}});
}
if (numRows == limit)
return {objects, objects[objects.size() - 1].key};
else
return {objects, {}};
}
return {};
}
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
PostgresBackend::fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT key FROM"
<< " (SELECT DISTINCT ON (key) * FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(book)
<< "\' AND sequence <= " << std::to_string(ledgerSequence);
if (cursor)
sql << " AND key > \'" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key DESC, sequence DESC)"
<< " sub WHERE NOT deleted"
<< " LIMIT " << std::to_string(limit);
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1))
{
std::vector<ripple::uint256> keys;
for (size_t i = 0; i < numRows; ++i)
{
ripple::uint256 key;
if (!key.parseHex(res.c_str(i, 0)))
throw std::runtime_error("Error parsing key from postgres");
keys.push_back(std::move(key));
}
std::vector<Blob> blobs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results;
std::transform(
blobs.begin(),
blobs.end(),
keys.begin(),
std::back_inserter(results),
[](auto& blob, auto& key) {
return LedgerObject{std::move(key), std::move(blob)};
});
return {results, results[results.size() - 1].key};
}
return {{}, {}};
}
std::vector<TransactionAndMetadata>
PostgresBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_sequence FROM transactions "
"WHERE ";
bool first = true;
for (auto const& hash : hashes)
{
if (!first)
sql << " OR ";
sql << "HASH = \'\\x" << ripple::strHex(hash) << "\'";
first = false;
}
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3))
{
std::vector<TransactionAndMetadata> results;
for (size_t i = 0; i < numRows; ++i)
{
char const* txn = res.c_str(i, 0);
char const* metadata = res.c_str(i, 1);
std::string_view txnView{txn};
std::string_view metadataView{metadata};
results.push_back(
{{txnView.front(), txnView.back()},
{metadataView.front(), metadataView.back()}});
}
return results;
}
return {};
}
std::vector<Blob>
PostgresBackend::fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT object FROM objects WHERE";
bool first = true;
for (auto const& key : keys)
{
if (!first)
{
sql << " OR ";
first = false;
}
else
{
sql << " ( ";
}
sql << " key = "
<< "\'\\x" << ripple::strHex(key) << "\'";
}
sql << " ) "
<< " AND sequence <= " << std::to_string(sequence)
<< " ORDER BY sequence DESC LIMIT 1";
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1))
{
std::vector<Blob> results;
for (size_t i = 0; i < numRows; ++i)
{
char const* object = res.c_str(i, 0);
std::string_view view{object};
results.push_back({view.front(), view.back()});
}
return results;
}
return {};
}
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
PostgresBackend::fetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT hash, ledger_sequence, transaction_index FROM "
"account_transactions WHERE account = "
<< ripple::strHex(account);
if (cursor)
sql << " AND ledger_sequence < " << cursor->ledgerSequence
<< " AND transaction_index < " << cursor->transactionIndex;
sql << " LIMIT " << std::to_string(limit);
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3))
{
std::vector<ripple::uint256> hashes;
for (size_t i = 0; i < numRows; ++i)
{
ripple::uint256 hash;
if (!hash.parseHex(res.c_str(i, 0)))
throw std::runtime_error(
"Error parsing transaction hash from Postgres");
hashes.push_back(std::move(hash));
}
if (numRows == limit)
{
AccountTransactionsCursor retCursor{
res.asBigInt(numRows - 1, 1), res.asBigInt(numRows - 1, 2)};
return {fetchTransactions(hashes), {retCursor}};
}
else
{
return {fetchTransactions(hashes), {}};
}
}
return {};
}
void
PostgresBackend::open()
{
initSchema(pgPool_);
}
void
PostgresBackend::close()
{
}
void
PostgresBackend::startWrites() const
{
PgQuery pg(pgPool_);
auto res = pg("BEGIN");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "Postgres error creating transaction: " << res.msg();
throw std::runtime_error(msg.str());
}
}
bool
PostgresBackend::finishWrites() const
{
if (abortWrite_)
return false;
PgQuery pg(pgPool_);
pg.bulkInsert("transactions", transactionsBuffer_.str());
pg.bulkInsert("objects", objectsBuffer_.str());
pg.bulkInsert("books", booksBuffer_.str());
pg.bulkInsert("account_transactions", accountTxBuffer_.str());
auto res = pg("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "Postgres error committing transaction: " << res.msg();
throw std::runtime_error(msg.str());
}
transactionsBuffer_.str("");
transactionsBuffer_.clear();
objectsBuffer_.str("");
objectsBuffer_.clear();
booksBuffer_.str("");
booksBuffer_.clear();
accountTxBuffer_.str("");
accountTxBuffer_.clear();
return true;
}
} // namespace Backend

106
reporting/PostgresBackend.h Normal file
View File

@@ -0,0 +1,106 @@
#ifndef RIPPLE_APP_REPORTING_POSTGRESBACKEND_H_INCLUDED
#define RIPPLE_APP_REPORTING_POSTGRESBACKEND_H_INCLUDED
#include <boost/json.hpp>
#include <reporting/BackendInterface.h>
namespace Backend {
class PostgresBackend : public BackendInterface
{
private:
mutable std::stringstream objectsBuffer_;
mutable std::stringstream transactionsBuffer_;
mutable std::stringstream booksBuffer_;
mutable std::stringstream accountTxBuffer_;
mutable bool abortWrite_ = false;
public:
std::shared_ptr<PgPool> pgPool_;
PostgresBackend(boost::json::object const& config);
std::optional<uint32_t>
fetchLatestLedgerSequence() const override;
std::optional<ripple::LedgerInfo>
fetchLedgerBySequence(uint32_t sequence) const override;
std::optional<LedgerRange>
fetchLedgerRange() const override;
std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence)
const override;
// returns a transaction, metadata pair
std::optional<TransactionAndMetadata>
fetchTransaction(ripple::uint256 const& hash) const override;
LedgerPage
fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const override;
std::vector<TransactionAndMetadata>
fetchTransactions(
std::vector<ripple::uint256> const& hashes) const override;
std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override;
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
fetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::optional<AccountTransactionsCursor> const& cursor) const override;
void
writeLedger(
ripple::LedgerInfo const& ledgerInfo,
std::string&& ledgerHeader,
bool isFirst) const override;
void
writeLedgerObject(
std::string&& key,
uint32_t seq,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const override;
void
writeTransaction(
std::string&& hash,
uint32_t seq,
std::string&& transaction,
std::string&& metadata) const override;
void
writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const override;
void
open() override;
void
close() override;
void
startWrites() const override;
bool
finishWrites() const override;
};
} // namespace Backend
#endif

View File

@@ -1,147 +0,0 @@
#include <reporting/ReportingBackend.h>
// Process the result of an asynchronous write. Retry on error
// @param fut cassandra future associated with the write
// @param cbData struct that holds the request parameters
void
flatMapWriteCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteCallbackData*>(cbData);
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.write(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
}
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteTransactionCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteTransactionCallbackData*>(
cbData);
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeTransaction(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::ReadCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::ReadCallbackData*>(cbData);
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(warning) << "Cassandra fetch error : " << rc << " : "
<< cass_error_desc(rc) << " - retrying";
// Retry right away. The only time the cluster should ever be overloaded
// is when the very first ledger is being written in full (millions of
// writes at once), during which no reads should be occurring. If reads
// are timing out, the code/architecture should be modified to handle
// greater read load, as opposed to just exponential backoff
requestParams.backend.read(requestParams);
}
else
{
auto finish = [&requestParams]() {
size_t batchSize = requestParams.batchSize;
if (++(requestParams.numFinished) == batchSize)
requestParams.cv.notify_all();
};
CassResult const* res = cass_future_get_result(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
cass_byte_t const* buf;
std::size_t bufSize;
rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
std::vector<unsigned char> txn{buf, buf + bufSize};
cass_byte_t const* buf2;
std::size_t buf2Size;
rc =
cass_value_get_bytes(cass_row_get_column(row, 1), &buf2, &buf2Size);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error)
<< "Cassandra fetch get bytes error : " << rc << ", "
<< cass_error_desc(rc);
finish();
return;
}
std::vector<unsigned char> meta{buf2, buf2 + buf2Size};
requestParams.result = std::make_pair(std::move(txn), std::move(meta));
cass_result_free(res);
finish();
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
#include <ripple/basics/StringUtilities.h>
#include <reporting/BackendFactory.h>
#include <reporting/DBHelpers.h>
#include <reporting/ReportingETL.h>
@@ -42,29 +44,6 @@ toString(ripple::LedgerInfo const& info)
<< " ParentHash : " << strHex(info.parentHash) << " }";
return ss.str();
}
ripple::LedgerInfo
deserializeHeader(ripple::Slice data)
{
ripple::SerialIter sit(data.data(), data.size());
ripple::LedgerInfo info;
info.seq = sit.get32();
info.drops = sit.get64();
info.parentHash = sit.get256();
info.txHash = sit.get256();
info.accountHash = sit.get256();
info.parentCloseTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTimeResolution = ripple::NetClock::duration{sit.get8()};
info.closeFlags = sit.get8();
info.hash = sit.get256();
return info;
}
} // namespace detail
std::vector<AccountTransactionsData>
@@ -98,7 +77,7 @@ ReportingETL::insertTransactions(
auto journal = ripple::debugLog();
accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal);
std::string keyStr{(const char*)sttx.getTransactionID().data(), 32};
flatMapBackend_.storeTransaction(
flatMapBackend_->writeTransaction(
std::move(keyStr),
ledger.seq,
std::move(*raw),
@@ -111,7 +90,7 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence)
{
// check that database is actually empty
auto ledger = getLedger(startingSequence, pgPool_);
auto ledger = flatMapBackend_->fetchLedgerBySequence(startingSequence);
if (ledger)
{
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
@@ -128,13 +107,14 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
if (!ledgerData)
return {};
ripple::LedgerInfo lgrInfo = detail::deserializeHeader(
ripple::makeSlice(ledgerData->ledger_header()));
ripple::LedgerInfo lgrInfo =
deserializeHeader(ripple::makeSlice(ledgerData->ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites();
std::vector<AccountTransactionsData> accountTxData =
insertTransactions(lgrInfo, *ledgerData);
@@ -148,9 +128,11 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
if (!stopping_)
{
flatMapBackend_.sync();
writeToPostgres(lgrInfo, accountTxData, pgPool_);
flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
flatMapBackend_->writeLedger(
lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
}
flatMapBackend_->finishWrites();
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = "
<< ((end - start).count()) / 1000000000.0;
@@ -173,7 +155,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
size_t numAttempts = 0;
while (!stopping_)
{
auto ledger = getLedger(ledgerSequence, pgPool_);
auto ledger = flatMapBackend_->fetchLedgerBySequence(ledgerSequence);
if (!ledger)
{
@@ -262,18 +244,19 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx)
return response;
}
std::pair<ripple::LedgerInfo, std::vector<AccountTransactionsData>>
std::pair<ripple::LedgerInfo, bool>
ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Beginning ledger update";
ripple::LedgerInfo lgrInfo =
detail::deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites();
std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)};
@@ -285,12 +268,43 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
{
flatMapBackend_.store(
bool isCreated = false;
bool isDeleted = false;
if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::CREATED)
isCreated = true;
else if (
obj.mod_type() == org ::xrpl::rpc::v1::RawLedgerObject::DELETED)
isDeleted = true;
std::optional<ripple::uint256> bookDir;
if (isCreated)
{
if (isOffer(obj.data()))
bookDir = getBook(obj.data());
}
else if (obj.book_of_deleted_offer().size())
{
bookDir =
ripple::uint256::fromVoid(obj.book_of_deleted_offer().data());
for (size_t i = 0; i < 8; ++i)
{
bookDir->data()[bookDir->size() - 1 - i] = 0x00;
}
}
assert(not(isCreated and isDeleted));
flatMapBackend_->writeLedgerObject(
std::move(*obj.mutable_key()),
lgrInfo.seq,
std::move(*obj.mutable_data()));
std::move(*obj.mutable_data()),
isCreated,
isDeleted,
std::move(bookDir));
}
flatMapBackend_.sync();
flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
flatMapBackend_->writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header()));
bool success = flatMapBackend_->finishWrites();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserted/modified/deleted all objects. Number of objects = "
@@ -299,7 +313,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Finished ledger update. " << detail::toString(lgrInfo);
return {lgrInfo, std::move(accountTxData)};
return {lgrInfo, success};
}
// Database must be populated when this starts
@@ -310,6 +324,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
* Behold, mortals! This function spawns three separate threads, which talk
* to each other via 2 different thread safe queues and 1 atomic variable.
* All threads and queues are function local. This function returns when all
*
* of the threads exit. There are two termination conditions: the first is
* if the load thread encounters a write conflict. In this case, the load
* thread sets writeConflict, an atomic bool, to true, which signals the
@@ -331,7 +346,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
<< "Starting etl pipeline";
writing_ = true;
auto parent = getLedger(startSequence - 1, pgPool_);
auto parent = flatMapBackend_->fetchLedgerBySequence(startSequence - 1);
if (!parent)
{
assert(false);
@@ -412,21 +427,24 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
if (isStopping())
continue;
auto numTxns =
fetchResponse->transactions_list().transactions_size();
auto numObjects = fetchResponse->ledger_objects().objects_size();
auto start = std::chrono::system_clock::now();
auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse);
auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
auto end = std::chrono::system_clock::now();
if (!writeToPostgres(lgrInfo, accountTxData, pgPool_))
writeConflict = true;
auto duration = ((end - start).count()) / 1000000000.0;
auto numTxns = accountTxData.size();
BOOST_LOG_TRIVIAL(info)
<< "Load phase of etl : "
<< "Successfully published ledger! Ledger info: "
<< detail::toString(lgrInfo) << ". txn count = " << numTxns
<< ". load time = " << duration << ". load tps "
<< numTxns / duration;
if (!writeConflict)
<< ". object count = " << numObjects
<< ". load time = " << duration
<< ". load txns per second = " << numTxns / duration
<< ". load objs per second = " << numObjects / duration;
// success is false if the ledger was already written
if (success)
{
publishLedger(lgrInfo);
lastPublishedSequence = lgrInfo.seq;
@@ -462,12 +480,14 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
void
ReportingETL::monitor()
{
auto ledger = getLedger(std::monostate(), pgPool_);
if (!ledger)
std::optional<uint32_t> latestSequence =
flatMapBackend_->fetchLatestLedgerSequence();
if (!latestSequence)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Database is empty. Will download a ledger "
"from the network.";
std::optional<ripple::LedgerInfo> ledger;
if (startSequence_)
{
BOOST_LOG_TRIVIAL(info)
@@ -501,6 +521,8 @@ ReportingETL::monitor()
return;
}
}
if (ledger)
latestSequence = ledger->seq;
}
else
{
@@ -513,7 +535,7 @@ ReportingETL::monitor()
<< __func__ << " : "
<< "Database already populated. Picking up from the tip of history";
}
if (!ledger)
if (!latestSequence)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
@@ -524,7 +546,7 @@ ReportingETL::monitor()
{
// publishLedger(ledger);
}
uint32_t nextSequence = ledger->seq + 1;
uint32_t nextSequence = latestSequence.value() + 1;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
@@ -614,17 +636,13 @@ ReportingETL::ReportingETL(
boost::asio::io_context& ioc)
: publishStrand_(ioc)
, ioContext_(ioc)
, flatMapBackend_(
config.at("database").as_object().at("cassandra").as_object())
, pgPool_(make_PgPool(
config.at("database").as_object().at("postgres").as_object()))
, flatMapBackend_(Backend::makeBackend(config))
, loadBalancer_(
config.at("etl_sources").as_array(),
flatMapBackend_,
*flatMapBackend_,
networkValidatedLedgers_,
ioc)
{
flatMapBackend_.open();
initSchema(pgPool_);
flatMapBackend_->open();
}

View File

@@ -25,10 +25,10 @@
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/BackendInterface.h>
#include <reporting/ETLHelpers.h>
#include <reporting/ETLSource.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
@@ -59,8 +59,7 @@ struct AccountTransactionsData;
class ReportingETL
{
private:
CassandraFlatMapBackend flatMapBackend_;
std::shared_ptr<PgPool> pgPool_;
std::unique_ptr<BackendInterface> flatMapBackend_;
std::thread worker_;
boost::asio::io_context& ioContext_;
@@ -213,6 +212,7 @@ private:
ripple::LedgerInfo const& ledger,
org::xrpl::rpc::v1::GetLedgerResponse& data);
// TODO update this documentation
/// Build the next ledger using the previous ledger and the extracted data.
/// This function calls insertTransactions()
/// @note rawData should be data that corresponds to the ledger immediately
@@ -220,7 +220,7 @@ private:
/// @param parent the previous ledger
/// @param rawData data extracted from an ETL source
/// @return the newly built ledger and data to write to Postgres
std::pair<ripple::LedgerInfo, std::vector<AccountTransactionsData>>
std::pair<ripple::LedgerInfo, bool>
buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData);
/// Attempt to read the specified ledger from the database, and then publish
@@ -320,16 +320,10 @@ public:
return loadBalancer_;
}
CassandraFlatMapBackend&
BackendInterface&
getFlatMapBackend()
{
return flatMapBackend_;
}
std::shared_ptr<PgPool>&
getPgPool()
{
return pgPool_;
return *flatMapBackend_;
}
private:

Submodule rippled updated: 2978847d8d...e8b8ff3717

34
test.py
View File

@@ -52,7 +52,7 @@ async def ledger_data(ip, port, ledger, limit):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"limit":int(limit)}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"limit":int(limit),"binary":True}))
res = json.loads(await ws.recv())
print(res)
except websockets.exceptions.connectionclosederror as e:
@@ -66,7 +66,7 @@ async def ledger_data_full(ip, port, ledger):
marker = None
while True:
if marker is None:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger)}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":True}))
res = json.loads(await ws.recv())
print(res)
@@ -87,15 +87,40 @@ async def ledger_data_full(ip, port, ledger):
print(e)
async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, get_issuer):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
taker_gets = json.loads("{\"currency\":\"" + get_currency+"\"}")
if get_issuer is not None:
taker_gets["issuer"] = get_issuer
taker_pays = json.loads("{\"currency\":\"" + pay_currency + "\"}")
if pay_issuer is not None:
taker_pays["issuer"] = pay_issuer
await ws.send(json.dumps({"command":"book_offers","ledger_index":int(ledger), "taker_pays":taker_pays, "taker_gets":taker_gets}))
res = json.loads(await ws.recv())
print(res)
except websockets.exceptions.connectionclosederror as e:
print(e)
parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full"])
parser.add_argument('action', choices=["account_info", "tx", "account_tx", "ledger_data", "ledger_data_full", "book_offers"])
parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080')
parser.add_argument('--hash')
parser.add_argument('--account', default="rLC64xxNif3GiY9FQnbaM4kcE6VvDhwRod")
parser.add_argument('--ledger')
parser.add_argument('--limit', default='200')
parser.add_argument('--taker_pays_issuer')
parser.add_argument('--taker_pays_currency')
parser.add_argument('--taker_gets_issuer')
parser.add_argument('--taker_gets_currency')
@@ -118,6 +143,9 @@ def run(args):
elif args.action == "ledger_data_full":
asyncio.get_event_loop().run_until_complete(
ledger_data_full(args.ip, args.port, args.ledger))
elif args.action == "book_offers":
asyncio.get_event_loop().run_until_complete(
book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer))
else:
print("incorrect arguments")

View File

@@ -35,49 +35,56 @@
#include <vector>
//------------------------------------------------------------------------------
enum RPCCommand { tx, account_tx, ledger, account_info, ledger_data };
enum RPCCommand {
tx,
account_tx,
ledger,
account_info,
ledger_data,
book_offers
};
std::unordered_map<std::string, RPCCommand> commandMap{
{"tx", tx},
{"account_tx", account_tx},
{"ledger", ledger},
{"account_info", account_info},
{"ledger_data", ledger_data}};
{"ledger_data", ledger_data},
{"book_offers", book_offers}};
boost::json::object
doAccountInfo(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& postgres);
BackendInterface const& backend);
boost::json::object
doTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool);
doTx(boost::json::object const& request, BackendInterface const& backend);
boost::json::object
doAccountTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool);
BackendInterface const& backend);
boost::json::object
doLedgerData(
boost::json::object const& request,
CassandraFlatMapBackend const& backend);
BackendInterface const& backend);
boost::json::object
doBookOffers(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
buildResponse(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool)
BackendInterface const& backend)
{
std::string command = request.at("command").as_string().c_str();
BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request;
boost::json::object response;
switch (commandMap[command])
{
case tx:
return doTx(request, backend, pgPool);
return doTx(request, backend);
break;
case account_tx:
return doAccountTx(request, backend, pgPool);
return doAccountTx(request, backend);
break;
case ledger:
break;
@@ -85,7 +92,10 @@ buildResponse(
return doLedgerData(request, backend);
break;
case account_info:
return doAccountInfo(request, backend, pgPool);
return doAccountInfo(request, backend);
break;
case book_offers:
return doBookOffers(request, backend);
break;
default:
BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command;
@@ -105,16 +115,14 @@ class session : public std::enable_shared_from_this<session>
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_;
std::string response_;
CassandraFlatMapBackend const& backend_;
std::shared_ptr<PgPool>& pgPool_;
BackendInterface const& backend_;
public:
// Take ownership of the socket
explicit session(
boost::asio::ip::tcp::socket&& socket,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool)
: ws_(std::move(socket)), backend_(backend), pgPool_(pgPool)
BackendInterface const& backend)
: ws_(std::move(socket)), backend_(backend)
{
}
@@ -190,7 +198,7 @@ public:
boost::json::value raw = boost::json::parse(msg);
// BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed";
boost::json::object request = raw.as_object();
auto response = buildResponse(request, backend_, pgPool_);
auto response = buildResponse(request, backend_);
BOOST_LOG_TRIVIAL(debug) << __func__ << response;
response_ = boost::json::serialize(response);
@@ -225,16 +233,14 @@ class listener : public std::enable_shared_from_this<listener>
{
boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_;
CassandraFlatMapBackend const& backend_;
std::shared_ptr<PgPool>& pgPool_;
BackendInterface const& backend_;
public:
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint,
CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pgPool)
: ioc_(ioc), acceptor_(ioc), backend_(backend), pgPool_(pgPool)
BackendInterface const& backend)
: ioc_(ioc), acceptor_(ioc), backend_(backend)
{
boost::beast::error_code ec;
@@ -299,8 +305,7 @@ private:
else
{
// Create the session and run it
std::make_shared<session>(std::move(socket), backend_, pgPool_)
->run();
std::make_shared<session>(std::move(socket), backend_)->run();
}
// Accept another connection
@@ -406,8 +411,7 @@ main(int argc, char* argv[])
std::make_shared<listener>(
ioc,
boost::asio::ip::tcp::endpoint{address, port},
etl.getFlatMapBackend(),
etl.getPgPool())
etl.getFlatMapBackend())
->run();
// Run the I/O service on the requested number of threads