clean up ledger page interface. wire up account_tx

This commit is contained in:
CJ Cobb
2021-02-23 16:13:48 -05:00
parent 5534d9c75e
commit 78c6bde902
11 changed files with 97 additions and 63 deletions

View File

@@ -21,8 +21,8 @@
#include <ripple/protocol/STLedgerEntry.h>
#include <boost/json.hpp>
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
// {
// account: <ident>,
@@ -43,7 +43,7 @@
boost::json::object
doAccountInfo(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& postgres)
{
boost::json::object response;

View File

@@ -18,8 +18,8 @@
//==============================================================================
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
std::vector<std::pair<
std::shared_ptr<ripple::STTx const>,
@@ -27,7 +27,7 @@ std::vector<std::pair<
doAccountTxStoredProcedure(
ripple::AccountID const& account,
std::shared_ptr<PgPool>& pgPool,
CassandraFlatMapBackend const& backend)
BackendInterface const& backend)
{
pg_params dbParams;
@@ -129,7 +129,7 @@ doAccountTxStoredProcedure(
boost::json::object
doAccountTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
{
boost::json::object response;
@@ -148,16 +148,43 @@ doAccountTx(
return response;
}
std::optional<BackendInterface::AccountTransactionsCursor> cursor;
if (request.contains("cursor"))
{
auto const& obj = request.at("cursor").as_object();
std::optional<uint32_t> ledgerSequence;
if (obj.contains("ledger_sequence"))
{
ledgerSequence = (uint32_t)obj.at("ledger_sequence").as_int64();
}
std::optional<uint32_t> transactionIndex;
if (obj.contains("transaction_index"))
{
transactionIndex = (uint32_t)obj.at("transaction_index").as_int64();
}
if (!ledgerSequence || !transactionIndex)
{
response["error"] =
"malformed cursor. include transaction_index and "
"ledger_sequence in an object named \"cursor\"";
return response;
}
cursor = {*ledgerSequence, *transactionIndex};
}
boost::json::array txns;
auto res = doAccountTxStoredProcedure(*account, pgPool, backend);
for (auto const& [sttx, meta] : res)
auto [blobs, retCursor] =
backend.fetchAccountTransactions(*account, cursor);
for (auto const& txnPlusMeta : blobs)
{
boost::json::object obj;
obj["transaction"] = getJson(*sttx);
auto [txn, meta] = deserializeTxPlusMeta(txnPlusMeta);
obj["transaction"] = getJson(*txn);
obj["metadata"] = getJson(*meta);
txns.push_back(obj);
}
response["transactions"] = txns;
response["cursor"] = {};
return response;
}

View File

@@ -7,9 +7,9 @@
#include <boost/json.hpp>
#include <algorithm>
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/DBHelpers.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
std::optional<std::uint32_t>
ledgerSequenceFromRequest(

View File

@@ -21,7 +21,7 @@
#include <ripple/protocol/STLedgerEntry.h>
#include <boost/json.hpp>
#include <handlers/RPCHelpers.h>
#include <reporting/ReportingBackend.h>
#include <reporting/BackendInterface.h>
// Get state nodes from a ledger
// Inputs:
// limit: integer, maximum number of entries
@@ -38,21 +38,23 @@
boost::json::object
doLedgerData(
boost::json::object const& request,
CassandraFlatMapBackend const& backend)
BackendInterface const& backend)
{
boost::json::object response;
uint32_t ledger = request.at("ledger_index").as_int64();
std::optional<int64_t> marker = request.contains("marker")
? request.at("marker").as_int64()
: std::optional<int64_t>{};
ripple::uint256 cursor;
if (request.contains("cursor"))
{
cursor.parseHex(request.at("cursor").as_string().c_str());
}
bool binary =
request.contains("binary") ? request.at("binary").as_bool() : false;
size_t limit = request.contains("limit") ? request.at("limit").as_int64()
: (binary ? 2048 : 256);
BackendInterface::LedgerPage page;
auto start = std::chrono::system_clock::now();
page = backend.fetchLedgerPage(marker, ledger, limit);
page = backend.fetchLedgerPage(cursor, ledger, limit);
auto end = std::chrono::system_clock::now();
@@ -61,7 +63,7 @@ doLedgerData(
.count();
boost::json::array objects;
std::vector<BackendInterface::LedgerObject>& results = page.objects;
std::optional<int64_t>& returnedMarker = page.cursor;
std::optional<ripple::uint256> const& returnedCursor = page.cursor;
BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound returned " << results.size() << " results";
for (auto const& [key, object] : results)
@@ -79,8 +81,8 @@ doLedgerData(
objects.push_back(getJson(sle));
}
response["objects"] = objects;
if (returnedMarker)
response["marker"] = returnedMarker.value();
if (returnedCursor)
response["marker"] = ripple::strHex(*returnedCursor);
response["num_results"] = results.size();
response["db_time"] = time;

View File

@@ -18,8 +18,8 @@
//==============================================================================
#include <handlers/RPCHelpers.h>
#include <reporting/BackendInterface.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
// {
// transaction: <hex>
@@ -28,7 +28,7 @@
boost::json::object
doTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& postgres)
{
boost::json::object response;

View File

@@ -31,7 +31,7 @@
// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource::ETLSource(
boost::json::object const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& networkValidatedLedgers,
boost::asio::io_context& ioContext)
: ioc_(ioContext)
@@ -400,7 +400,7 @@ public:
process(
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
bool abort = false)
{
BOOST_LOG_TRIVIAL(info) << "Processing response. "
@@ -582,7 +582,7 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
}
ETLLoadBalancer::ETLLoadBalancer(
boost::json::array const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext)
{

View File

@@ -25,7 +25,7 @@
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/ReportingBackend.h>
#include <reporting/BackendInterface.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
@@ -85,7 +85,7 @@ class ETLSource
// used for retrying connections
boost::asio::steady_timer timer_;
CassandraFlatMapBackend& backend_;
BackendInterface& backend_;
public:
bool
@@ -113,7 +113,7 @@ public:
/// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource(
boost::json::object const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& networkValidatedLedgers,
boost::asio::io_context& ioContext);
@@ -285,7 +285,7 @@ private:
public:
ETLLoadBalancer(
boost::json::array const& config,
CassandraFlatMapBackend& backend,
BackendInterface& backend,
NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext);

View File

@@ -55,7 +55,7 @@ flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
class CassandraFlatMapBackend : BackendInterface
class CassandraFlatMapBackend : public BackendInterface
{
private:
// convenience function for one-off queries. For normal reads and writes,
@@ -141,7 +141,7 @@ public:
{
}
~CassandraFlatMapBackend()
~CassandraFlatMapBackend() override
{
if (open_)
close();
@@ -915,16 +915,23 @@ public:
}
BackendInterface::LedgerPage
fetchLedgerPage(
std::optional<BackendInterface::LedgerCursor> const& cursor,
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const override
{
BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound";
CassStatement* statement = cass_prepared_bind(upperBound_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
int64_t cursorVal = cursor.has_value() ? cursor.value() : INT64_MIN;
CassError rc = cass_statement_bind_int64(statement, 0, cursorVal);
int64_t intCursor = INT64_MIN;
if (cursor)
{
auto token = getToken(cursor->data());
if (token)
intCursor = *token;
}
CassError rc = cass_statement_bind_int64(statement, 0, intCursor);
if (rc != CASS_OK)
{
cass_statement_free(statement);
@@ -1017,9 +1024,7 @@ public:
{
results.push_back({keys[i], objs[i]});
}
auto token = getToken(results[results.size() - 1].key.data());
assert(token);
return {results, token};
return {results, keys[keys.size() - 1]};
}
return {{}, {}};

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <ripple/basics/StringUtilities.h>
#include <reporting/BackendFactory.h>
#include <reporting/DBHelpers.h>
#include <reporting/ReportingETL.h>
@@ -76,7 +77,7 @@ ReportingETL::insertTransactions(
auto journal = ripple::debugLog();
accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal);
std::string keyStr{(const char*)sttx.getTransactionID().data(), 32};
flatMapBackend_.writeTransaction(
flatMapBackend_->writeTransaction(
std::move(keyStr),
ledger.seq,
std::move(*raw),
@@ -89,7 +90,7 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence)
{
// check that database is actually empty
auto ledger = flatMapBackend_.fetchLedgerBySequence(startingSequence);
auto ledger = flatMapBackend_->fetchLedgerBySequence(startingSequence);
if (ledger)
{
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
@@ -128,9 +129,9 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
{
for (auto& data : accountTxData)
{
flatMapBackend_.writeAccountTransactions(std::move(data));
flatMapBackend_->writeAccountTransactions(std::move(data));
}
bool success = flatMapBackend_.writeLedger(
bool success = flatMapBackend_->writeLedger(
lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
}
auto end = std::chrono::system_clock::now();
@@ -155,7 +156,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
size_t numAttempts = 0;
while (!stopping_)
{
auto ledger = flatMapBackend_.fetchLedgerBySequence(ledgerSequence);
auto ledger = flatMapBackend_->fetchLedgerBySequence(ledgerSequence);
if (!ledger)
{
@@ -292,7 +293,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
}
assert(not(isCreated and isDeleted));
flatMapBackend_.writeLedgerObject(
flatMapBackend_->writeLedgerObject(
std::move(*obj.mutable_key()),
lgrInfo.seq,
std::move(*obj.mutable_data()),
@@ -302,9 +303,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
}
for (auto& data : accountTxData)
{
flatMapBackend_.writeAccountTransactions(std::move(data));
flatMapBackend_->writeAccountTransactions(std::move(data));
}
bool success = flatMapBackend_.writeLedger(
bool success = flatMapBackend_->writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
@@ -347,7 +348,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
<< "Starting etl pipeline";
writing_ = true;
auto parent = flatMapBackend_.fetchLedgerBySequence(startSequence - 1);
auto parent = flatMapBackend_->fetchLedgerBySequence(startSequence - 1);
if (!parent)
{
assert(false);
@@ -482,7 +483,7 @@ void
ReportingETL::monitor()
{
std::optional<uint32_t> latestSequence =
flatMapBackend_.fetchLatestLedgerSequence();
flatMapBackend_->fetchLatestLedgerSequence();
if (!latestSequence)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
@@ -637,17 +638,16 @@ ReportingETL::ReportingETL(
boost::asio::io_context& ioc)
: publishStrand_(ioc)
, ioContext_(ioc)
, flatMapBackend_(
config.at("database").as_object().at("cassandra").as_object())
, flatMapBackend_(makeBackend(config))
, pgPool_(make_PgPool(
config.at("database").as_object().at("postgres").as_object()))
, loadBalancer_(
config.at("etl_sources").as_array(),
flatMapBackend_,
*flatMapBackend_,
networkValidatedLedgers_,
ioc)
{
flatMapBackend_.open();
flatMapBackend_->open();
initSchema(pgPool_);
}

View File

@@ -25,10 +25,10 @@
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/BackendInterface.h>
#include <reporting/ETLHelpers.h>
#include <reporting/ETLSource.h>
#include <reporting/Pg.h>
#include <reporting/ReportingBackend.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
@@ -59,7 +59,7 @@ struct AccountTransactionsData;
class ReportingETL
{
private:
CassandraFlatMapBackend flatMapBackend_;
std::unique_ptr<BackendInterface> flatMapBackend_;
std::shared_ptr<PgPool> pgPool_;
std::thread worker_;
@@ -321,10 +321,10 @@ public:
return loadBalancer_;
}
CassandraFlatMapBackend&
BackendInterface&
getFlatMapBackend()
{
return flatMapBackend_;
return *flatMapBackend_;
}
std::shared_ptr<PgPool>&

View File

@@ -54,32 +54,32 @@ std::unordered_map<std::string, RPCCommand> commandMap{
boost::json::object
doAccountInfo(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& postgres);
boost::json::object
doTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool);
boost::json::object
doAccountTx(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool);
boost::json::object
doLedgerData(
boost::json::object const& request,
CassandraFlatMapBackend const& backend);
BackendInterface const& backend);
boost::json::object
doBookOffers(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool);
boost::json::object
buildResponse(
boost::json::object const& request,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
{
std::string command = request.at("command").as_string().c_str();
@@ -122,14 +122,14 @@ class session : public std::enable_shared_from_this<session>
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_;
std::string response_;
CassandraFlatMapBackend const& backend_;
BackendInterface const& backend_;
std::shared_ptr<PgPool>& pgPool_;
public:
// Take ownership of the socket
explicit session(
boost::asio::ip::tcp::socket&& socket,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
: ws_(std::move(socket)), backend_(backend), pgPool_(pgPool)
{
@@ -242,14 +242,14 @@ class listener : public std::enable_shared_from_this<listener>
{
boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_;
CassandraFlatMapBackend const& backend_;
BackendInterface const& backend_;
std::shared_ptr<PgPool>& pgPool_;
public:
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint,
CassandraFlatMapBackend const& backend,
BackendInterface const& backend,
std::shared_ptr<PgPool>& pgPool)
: ioc_(ioc), acceptor_(ioc), backend_(backend), pgPool_(pgPool)
{