everything in cassandra

This commit is contained in:
CJ Cobb
2021-02-11 16:46:47 -05:00
parent 5205620c49
commit bb1205fd36
5 changed files with 481 additions and 87 deletions

View File

@@ -80,4 +80,27 @@ writeToPostgres(
std::vector<AccountTransactionsData> const& accountTxData,
std::shared_ptr<PgPool> const& pgPool);
inline ripple::LedgerInfo
deserializeHeader(ripple::Slice data)
{
ripple::SerialIter sit(data.data(), data.size());
ripple::LedgerInfo info;
info.seq = sit.get32();
info.drops = sit.get64();
info.parentHash = sit.get256();
info.txHash = sit.get256();
info.accountHash = sit.get256();
info.parentCloseTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTimeResolution = ripple::NetClock::duration{sit.get8()};
info.closeFlags = sit.get8();
info.hash = sit.get256();
return info;
}
#endif

View File

@@ -244,6 +244,76 @@ flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData)
delete &requestParams;
}
}
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteLedgerHeaderCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteLedgerHeaderCallbackData*>(
cbData);
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeLedgerHeader(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
}
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData)
{
CassandraFlatMapBackend::WriteLedgerHashCallbackData& requestParams =
*static_cast<CassandraFlatMapBackend::WriteLedgerHashCallbackData*>(
cbData);
CassandraFlatMapBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeLedgerHash(requestParams, true);
});
}
else
{
--(backend.numRequestsOutstanding_);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
delete &requestParams;
}
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
@@ -901,7 +971,7 @@ CassandraFlatMapBackend::open()
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_range"
<< " (is_latest boolean PRIMARY KEY, sequence counter)";
<< " (is_latest boolean PRIMARY KEY, sequence bigint)";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
@@ -1343,28 +1413,72 @@ CassandraFlatMapBackend::open()
insertLedgerHash_ = cass_future_get_prepared(prepare_future);
query = {};
query << " UPDATE " << tableName << "ledger_range"
<< " SET sequence = sequence + ? WHERE is_latest = ?";
query << " update " << tableName << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence != ?";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// Wait for the statement to prepare and get the result
// wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// Handle error
// handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing getToken : " << rc << ", "
ss << "nodestore: error preparing gettoken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
updateLedgerRange_ = cass_future_get_prepared(prepare_future);
query = {};
query << " select header from ledgers where sequence = ?";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: error preparing gettoken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
selectLedgerBySeq_ = cass_future_get_prepared(prepare_future);
query = {};
query << " select sequence from ledgers_range where is_latest = true";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
// wait for the statement to prepare and get the result
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
// handle error
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: error preparing gettoken : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
selectLatestLedger_ = cass_future_get_prepared(prepare_future);
setupPreparedStatements = true;
}

View File

@@ -49,6 +49,10 @@ void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
class CassandraFlatMapBackend
{
private:
@@ -104,6 +108,8 @@ private:
const CassPrepared* insertLedgerHash_ = nullptr;
const CassPrepared* updateLedgerRange_ = nullptr;
const CassPrepared* updateLedgerHeader_ = nullptr;
const CassPrepared* selectLedgerBySeq_ = nullptr;
const CassPrepared* selectLatestLedger_ = nullptr;
// io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_;
@@ -370,19 +376,179 @@ public:
return {{}, {}};
}
void
struct WriteLedgerHeaderCallbackData
{
CassandraFlatMapBackend const* backend;
uint32_t sequence;
std::string header;
uint32_t currentRetries = 0;
WriteLedgerHeaderCallbackData(
CassandraFlatMapBackend const* f,
uint32_t sequence,
std::string&& header)
: backend(f), sequence(sequence), header(std::move(header))
{
}
};
struct WriteLedgerHashCallbackData
{
CassandraFlatMapBackend const* backend;
ripple::uint256 hash;
uint32_t sequence;
uint32_t currentRetries = 0;
WriteLedgerHashCallbackData(
CassandraFlatMapBackend const* f,
ripple::uint256 hash,
uint32_t sequence)
: backend(f), hash(hash), sequence(sequence)
{
}
};
bool
writeLedger(
ripple::LedgerInfo const& ledgerInfo,
std::string&& header,
bool isFirst = false)
bool isFirst = false) const
{
WriteLedgerHeaderCallbackData* headerCb =
new WriteLedgerHeaderCallbackData(
this, ledgerInfo.seq, std::move(header));
WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData(
this, ledgerInfo.hash, ledgerInfo.seq);
++numRequestsOutstanding_;
++numRequestsOutstanding_;
writeLedgerHeader(*headerCb, false);
writeLedgerHash(*hashCb, false);
// wait for all other writes to finish
sync();
// write range
if (isFirst)
{
CassStatement* statement = cass_prepared_bind(updateLedgerRange_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc =
cass_statement_bind_int64(statement, 0, ledgerInfo.seq);
rc = cass_statement_bind_bool(statement, 1, cass_false);
rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq);
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra write error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
cass_statement_free(statement);
}
CassStatement* statement = cass_prepared_bind(updateLedgerRange_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
// TODO check rc
CassError rc = cass_statement_bind_int64(statement, 0, ledgerInfo.seq);
assert(rc == CASS_OK);
rc = cass_statement_bind_bool(statement, 1, cass_true);
assert(rc == CASS_OK);
rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq);
assert(rc == CASS_OK);
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra write error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
cass_statement_free(statement);
CassResult const* res = cass_future_get_result(fut);
cass_future_free(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra write error: no rows";
cass_result_free(res);
return false;
}
cass_bool_t success;
rc = cass_value_get_bool(cass_row_get_column(row, 0), &success);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra write error: " << rc << ", "
<< cass_error_desc(rc);
return false;
}
cass_result_free(res);
return success == cass_true;
}
void
writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
CassStatement* statement = cass_prepared_bind(insertLedgerHash_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(cb.hash.data()), 32);
assert(rc == CASS_OK);
rc = cass_statement_bind_int64(statement, 1, cb.sequence);
assert(rc == CASS_OK);
// actually do the write
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
cass_future_set_callback(
fut, flatMapWriteLedgerHashCallback, static_cast<void*>(&cb));
cass_future_free(fut);
}
void
writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const
{
// write header
unsigned char* headerRaw = (unsigned char*)header.data();
++numRequestsOutstanding_;
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
throttleCv_.wait(lck, [this]() {
return numRequestsOutstanding_ < maxRequestsOutstanding;
});
}
}
unsigned char* headerRaw = (unsigned char*)cb.header.data();
CassStatement* statement = cass_prepared_bind(insertLedgerHeader_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_int64(statement, 0, ledgerInfo.seq);
CassError rc = cass_statement_bind_int64(statement, 0, cb.sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
@@ -395,7 +561,7 @@ public:
statement,
1,
static_cast<cass_byte_t const*>(headerRaw),
header.size());
cb.header.size());
if (rc != CASS_OK)
{
cass_statement_free(statement);
@@ -405,38 +571,136 @@ public:
return;
}
// actually do the write
// write hash
++numRequestsOutstanding_;
statement = cass_prepared_bind(insertLedgerHash_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
rc = cass_statement_bind_bytes(
statement,
0,
static_cast<cass_byte_t const*>(ledgerInfo.hash.data()),
32);
CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement);
rc = cass_statement_bind_int64(statement, 1, ledgerInfo.seq);
// actually do the write
cass_future_set_callback(
fut, flatMapWriteLedgerHeaderCallback, static_cast<void*>(&cb));
cass_future_free(fut);
}
// wait for all other writes to finish
sync();
// write range
if (isFirst)
std::optional<uint32_t>
getLatestLedgerSequence()
{
statement = cass_prepared_bind(updateLedgerRange_);
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(selectLatestLedger_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
rc = cass_statement_bind_bool(statement, 0, cass_false);
rc = cass_statement_bind_int64(statement, 1, ledgerInfo.seq);
// execute
CassFuture* fut;
CassError rc;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
statement = cass_prepared_bind(updateLedgerRange_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
rc = cass_statement_bind_bool(statement, 0, cass_true);
rc = cass_statement_bind_int64(statement, 1, 1);
// actually execute the statement
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows";
cass_result_free(res);
return {};
}
cass_int64_t sequence;
rc = cass_value_get_int64(cass_row_get_column(row, 0), &sequence);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
cass_result_free(res);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "Fetched from cassandra in "
<< std::chrono::duration_cast<std::chrono::microseconds>(
end - start)
.count()
<< " microseconds";
return sequence;
}
// Synchronously fetch the object with key key and store the result in pno
std::optional<ripple::LedgerInfo>
getLedgerBySequence(uint32_t sequence)
{
BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra";
auto start = std::chrono::system_clock::now();
CassStatement* statement = cass_prepared_bind(selectLedgerBySeq_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_int64(statement, 0, sequence);
if (rc != CASS_OK)
{
cass_statement_free(statement);
BOOST_LOG_TRIVIAL(error)
<< "Binding Cassandra ledger fetch query: " << rc << ", "
<< cass_error_desc(rc);
return {};
}
CassFuture* fut;
do
{
fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
std::stringstream ss;
ss << "Cassandra fetch error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
}
} while (rc != CASS_OK);
CassResult const* res = cass_future_get_result(fut);
cass_statement_free(statement);
cass_future_free(fut);
CassRow const* row = cass_result_first_row(res);
if (!row)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows";
cass_result_free(res);
return {};
}
cass_byte_t const* buf;
std::size_t bufSize;
rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
if (rc != CASS_OK)
{
cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc
<< ", " << cass_error_desc(rc);
return {};
}
std::vector<unsigned char> result{buf, buf + bufSize};
ripple::LedgerInfo lgrInfo =
deserializeHeader(ripple::makeSlice(result));
cass_result_free(res);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "Fetched from cassandra in "
<< std::chrono::duration_cast<std::chrono::microseconds>(
end - start)
.count()
<< " microseconds";
return lgrInfo;
}
// Synchronously fetch the object with key key and store the result in
// pno
// @param key the key of the object
// @param pno object in which to store the result
// @return result status of query
@@ -1674,7 +1938,7 @@ public:
}
void
sync()
sync() const
{
std::unique_lock<std::mutex> lck(syncMutex_);
@@ -1691,6 +1955,10 @@ public:
flatMapWriteBookCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadCallback(CassFuture* fut, void* cbData);

View File

@@ -43,29 +43,6 @@ toString(ripple::LedgerInfo const& info)
<< " ParentHash : " << strHex(info.parentHash) << " }";
return ss.str();
}
ripple::LedgerInfo
deserializeHeader(ripple::Slice data)
{
ripple::SerialIter sit(data.data(), data.size());
ripple::LedgerInfo info;
info.seq = sit.get32();
info.drops = sit.get64();
info.parentHash = sit.get256();
info.txHash = sit.get256();
info.accountHash = sit.get256();
info.parentCloseTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTimeResolution = ripple::NetClock::duration{sit.get8()};
info.closeFlags = sit.get8();
info.hash = sit.get256();
return info;
}
} // namespace detail
std::vector<AccountTransactionsData>
@@ -112,7 +89,7 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence)
{
// check that database is actually empty
auto ledger = getLedger(startingSequence, pgPool_);
auto ledger = flatMapBackend_.getLedgerBySequence(startingSequence);
if (ledger)
{
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
@@ -129,8 +106,8 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
if (!ledgerData)
return {};
ripple::LedgerInfo lgrInfo = detail::deserializeHeader(
ripple::makeSlice(ledgerData->ledger_header()));
ripple::LedgerInfo lgrInfo =
deserializeHeader(ripple::makeSlice(ledgerData->ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
@@ -174,7 +151,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
size_t numAttempts = 0;
while (!stopping_)
{
auto ledger = getLedger(ledgerSequence, pgPool_);
auto ledger = flatMapBackend_.getLedgerBySequence(ledgerSequence);
if (!ledger)
{
@@ -263,14 +240,14 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx)
return response;
}
std::pair<ripple::LedgerInfo, std::vector<AccountTransactionsData>>
std::pair<ripple::LedgerInfo, bool>
ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Beginning ledger update";
ripple::LedgerInfo lgrInfo =
detail::deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
@@ -319,7 +296,12 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
isDeleted,
std::move(bookDir));
}
flatMapBackend_.sync();
for (auto& data : accountTxData)
{
flatMapBackend_.storeAccountTx(std::move(data));
}
bool success = flatMapBackend_.writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserted/modified/deleted all objects. Number of objects = "
@@ -328,7 +310,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Finished ledger update. " << detail::toString(lgrInfo);
return {lgrInfo, std::move(accountTxData)};
return {lgrInfo, success};
}
// Database must be populated when this starts
@@ -361,7 +343,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
<< "Starting etl pipeline";
writing_ = true;
auto parent = getLedger(startSequence - 1, pgPool_);
auto parent = flatMapBackend_.getLedgerBySequence(startSequence - 1);
if (!parent)
{
assert(false);
@@ -443,20 +425,22 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
continue;
auto start = std::chrono::system_clock::now();
auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse);
auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
auto end = std::chrono::system_clock::now();
if (!writeToPostgres(lgrInfo, accountTxData, pgPool_))
writeConflict = true;
auto duration = ((end - start).count()) / 1000000000.0;
auto numTxns = accountTxData.size();
auto numTxns = fetchResponse->hashes_list().hashes_size();
auto numObjects = fetchResponse->ledger_objects().objects_size();
BOOST_LOG_TRIVIAL(info)
<< "Load phase of etl : "
<< "Successfully published ledger! Ledger info: "
<< detail::toString(lgrInfo) << ". txn count = " << numTxns
<< ". load time = " << duration << ". load tps "
<< numTxns / duration;
if (!writeConflict)
<< ". object count = " << numObjects
<< ". load time = " << duration
<< ". load txns per second = " << numTxns / duration
<< ". load objs per second = " << numObjects / duration;
// success is false if the ledger was already written
if (success)
{
publishLedger(lgrInfo);
lastPublishedSequence = lgrInfo.seq;
@@ -492,12 +476,14 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
void
ReportingETL::monitor()
{
auto ledger = getLedger(std::monostate(), pgPool_);
if (!ledger)
std::optional<uint32_t> latestSequence =
flatMapBackend_.getLatestLedgerSequence();
if (!latestSequence)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Database is empty. Will download a ledger "
"from the network.";
std::optional<ripple::LedgerInfo> ledger;
if (startSequence_)
{
BOOST_LOG_TRIVIAL(info)
@@ -531,6 +517,8 @@ ReportingETL::monitor()
return;
}
}
if (ledger)
latestSequence = ledger->seq;
}
else
{
@@ -543,7 +531,7 @@ ReportingETL::monitor()
<< __func__ << " : "
<< "Database already populated. Picking up from the tip of history";
}
if (!ledger)
if (!latestSequence)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " : "
@@ -554,7 +542,7 @@ ReportingETL::monitor()
{
// publishLedger(ledger);
}
uint32_t nextSequence = ledger->seq + 1;
uint32_t nextSequence = latestSequence.value() + 1;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "

View File

@@ -213,6 +213,7 @@ private:
ripple::LedgerInfo const& ledger,
org::xrpl::rpc::v1::GetLedgerResponse& data);
// TODO update this documentation
/// Build the next ledger using the previous ledger and the extracted data.
/// This function calls insertTransactions()
/// @note rawData should be data that corresponds to the ledger immediately
@@ -220,7 +221,7 @@ private:
/// @param parent the previous ledger
/// @param rawData data extracted from an ETL source
/// @return the newly built ledger and data to write to Postgres
std::pair<ripple::LedgerInfo, std::vector<AccountTransactionsData>>
std::pair<ripple::LedgerInfo, bool>
buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData);
/// Attempt to read the specified ledger from the database, and then publish