mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-12 15:55:51 +00:00
ETL fixes
* Don't use cache if cache is more recent than extracted ledger * Use hardFetchLedgerRangeNoThrow exclusively in ETL, to prevent race condition
This commit is contained in:
@@ -300,6 +300,8 @@ public:
|
|||||||
|
|
||||||
// Tell the database we have finished writing all data for a particular
|
// Tell the database we have finished writing all data for a particular
|
||||||
// ledger
|
// ledger
|
||||||
|
// TODO change the return value to represent different results. committed,
|
||||||
|
// write conflict, errored, successful but not committed
|
||||||
bool
|
bool
|
||||||
finishWrites(std::uint32_t const ledgerSequence);
|
finishWrites(std::uint32_t const ledgerSequence);
|
||||||
|
|
||||||
|
|||||||
@@ -694,7 +694,10 @@ public:
|
|||||||
bool
|
bool
|
||||||
doFinishWrites() override
|
doFinishWrites() override
|
||||||
{
|
{
|
||||||
if (!range || lastSync_ == 0 ||
|
// if db is empty, sync. if sync interval is 1, always sync.
|
||||||
|
// if we've never synced, sync. if its been greater than the configured
|
||||||
|
// sync interval since we last synced, sync.
|
||||||
|
if (!range || syncInterval_ == 1 || lastSync_ == 0 ||
|
||||||
ledgerSequence_ - syncInterval_ >= lastSync_)
|
ledgerSequence_ - syncInterval_ >= lastSync_)
|
||||||
{
|
{
|
||||||
// wait for all other writes to finish
|
// wait for all other writes to finish
|
||||||
|
|||||||
@@ -1,5 +1,11 @@
|
|||||||
#include <backend/SimpleCache.h>
|
#include <backend/SimpleCache.h>
|
||||||
namespace Backend {
|
namespace Backend {
|
||||||
|
uint32_t
|
||||||
|
SimpleCache::latestLedgerSequence()
|
||||||
|
{
|
||||||
|
std::shared_lock lck{mtx_};
|
||||||
|
return latestSeq_;
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
SimpleCache::update(
|
SimpleCache::update(
|
||||||
|
|||||||
@@ -51,6 +51,9 @@ public:
|
|||||||
void
|
void
|
||||||
setFull();
|
setFull();
|
||||||
|
|
||||||
|
uint32_t
|
||||||
|
latestLedgerSequence();
|
||||||
|
|
||||||
// whether the cache has all data for the most recent ledger
|
// whether the cache has all data for the most recent ledger
|
||||||
bool
|
bool
|
||||||
isFull();
|
isFull();
|
||||||
|
|||||||
@@ -221,14 +221,14 @@ ReportingETL::publishLedger(
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
|
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
|
||||||
ReportingETL::fetchLedgerData(uint32_t idx)
|
ReportingETL::fetchLedgerData(uint32_t seq)
|
||||||
{
|
{
|
||||||
BOOST_LOG_TRIVIAL(debug)
|
BOOST_LOG_TRIVIAL(debug)
|
||||||
<< __func__ << " : "
|
<< __func__ << " : "
|
||||||
<< "Attempting to fetch ledger with sequence = " << idx;
|
<< "Attempting to fetch ledger with sequence = " << seq;
|
||||||
|
|
||||||
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
|
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
|
||||||
loadBalancer_->fetchLedger(idx, false, false);
|
loadBalancer_->fetchLedger(seq, false, false);
|
||||||
if (response)
|
if (response)
|
||||||
BOOST_LOG_TRIVIAL(trace)
|
BOOST_LOG_TRIVIAL(trace)
|
||||||
<< __func__ << " : "
|
<< __func__ << " : "
|
||||||
@@ -237,14 +237,18 @@ ReportingETL::fetchLedgerData(uint32_t idx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
|
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
|
||||||
ReportingETL::fetchLedgerDataAndDiff(uint32_t idx)
|
ReportingETL::fetchLedgerDataAndDiff(uint32_t seq)
|
||||||
{
|
{
|
||||||
BOOST_LOG_TRIVIAL(debug)
|
BOOST_LOG_TRIVIAL(debug)
|
||||||
<< __func__ << " : "
|
<< __func__ << " : "
|
||||||
<< "Attempting to fetch ledger with sequence = " << idx;
|
<< "Attempting to fetch ledger with sequence = " << seq;
|
||||||
|
|
||||||
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
|
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> response =
|
||||||
loadBalancer_->fetchLedger(idx, true, !backend_->cache().isFull());
|
loadBalancer_->fetchLedger(
|
||||||
|
seq,
|
||||||
|
true,
|
||||||
|
!backend_->cache().isFull() ||
|
||||||
|
backend_->cache().latestLedgerSequence() >= seq);
|
||||||
if (response)
|
if (response)
|
||||||
BOOST_LOG_TRIVIAL(trace)
|
BOOST_LOG_TRIVIAL(trace)
|
||||||
<< __func__ << " : "
|
<< __func__ << " : "
|
||||||
@@ -412,7 +416,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
|
|||||||
}
|
}
|
||||||
backend_->cache().update(cacheUpdates, lgrInfo.seq);
|
backend_->cache().update(cacheUpdates, lgrInfo.seq);
|
||||||
// rippled didn't send successor information, so use our cache
|
// rippled didn't send successor information, so use our cache
|
||||||
if (!rawData.object_neighbors_included() || backend_->cache().isFull())
|
if (!rawData.object_neighbors_included())
|
||||||
{
|
{
|
||||||
BOOST_LOG_TRIVIAL(debug)
|
BOOST_LOG_TRIVIAL(debug)
|
||||||
<< __func__ << " object neighbors not included. using cache";
|
<< __func__ << " object neighbors not included. using cache";
|
||||||
@@ -764,11 +768,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
|
|||||||
void
|
void
|
||||||
ReportingETL::monitor()
|
ReportingETL::monitor()
|
||||||
{
|
{
|
||||||
std::optional<uint32_t> latestSequence =
|
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||||
Backend::synchronousAndRetryOnTimeout([&](auto yield) {
|
if (!rng)
|
||||||
return backend_->fetchLatestLedgerSequence(yield);
|
|
||||||
});
|
|
||||||
if (!latestSequence)
|
|
||||||
{
|
{
|
||||||
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
|
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
|
||||||
<< "Database is empty. Will download a ledger "
|
<< "Database is empty. Will download a ledger "
|
||||||
@@ -808,7 +809,14 @@ ReportingETL::monitor()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ledger)
|
if (ledger)
|
||||||
latestSequence = ledger->seq;
|
rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
BOOST_LOG_TRIVIAL(error)
|
||||||
|
<< __func__ << " : "
|
||||||
|
<< "Failed to load initial ledger. Exiting monitor loop";
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -821,18 +829,9 @@ ReportingETL::monitor()
|
|||||||
<< __func__ << " : "
|
<< __func__ << " : "
|
||||||
<< "Database already populated. Picking up from the tip of history";
|
<< "Database already populated. Picking up from the tip of history";
|
||||||
}
|
}
|
||||||
if (!latestSequence)
|
assert(rng);
|
||||||
{
|
uint32_t nextSequence = rng->maxSequence + 1;
|
||||||
BOOST_LOG_TRIVIAL(error)
|
loadCacheAsync(rng->maxSequence);
|
||||||
<< __func__ << " : "
|
|
||||||
<< "Failed to load initial ledger. Exiting monitor loop";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
}
|
|
||||||
uint32_t nextSequence = latestSequence.value() + 1;
|
|
||||||
loadCacheAsync(*latestSequence);
|
|
||||||
|
|
||||||
BOOST_LOG_TRIVIAL(debug)
|
BOOST_LOG_TRIVIAL(debug)
|
||||||
<< __func__ << " : "
|
<< __func__ << " : "
|
||||||
@@ -840,9 +839,8 @@ ReportingETL::monitor()
|
|||||||
<< "Starting monitor loop. sequence = " << nextSequence;
|
<< "Starting monitor loop. sequence = " << nextSequence;
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
if (Backend::synchronousAndRetryOnTimeout([&](auto yield) {
|
if (auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||||
return backend_->fetchLedgerBySequence(nextSequence, yield);
|
rng && rng->maxSequence > nextSequence)
|
||||||
}))
|
|
||||||
{
|
{
|
||||||
publishLedger(nextSequence, {});
|
publishLedger(nextSequence, {});
|
||||||
++nextSequence;
|
++nextSequence;
|
||||||
@@ -930,32 +928,29 @@ void
|
|||||||
ReportingETL::monitorReadOnly()
|
ReportingETL::monitorReadOnly()
|
||||||
{
|
{
|
||||||
BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode";
|
BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode";
|
||||||
std::optional<uint32_t> latestSequence =
|
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||||
Backend::synchronousAndRetryOnTimeout([&](auto yield) {
|
uint32_t latestSequence;
|
||||||
return backend_->fetchLatestLedgerSequence(yield);
|
if (!rng)
|
||||||
});
|
if (auto net = networkValidatedLedgers_->getMostRecent())
|
||||||
if (!latestSequence)
|
latestSequence = *net;
|
||||||
latestSequence = networkValidatedLedgers_->getMostRecent();
|
else
|
||||||
if (!latestSequence)
|
return;
|
||||||
return;
|
loadCacheAsync(latestSequence);
|
||||||
loadCacheAsync(*latestSequence);
|
latestSequence++;
|
||||||
latestSequence = *latestSequence + 1;
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
// try to grab the next ledger
|
if (auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||||
if (Backend::synchronousAndRetryOnTimeout([&](auto yield) {
|
rng && rng->maxSequence > latestSequence)
|
||||||
return backend_->fetchLedgerBySequence(*latestSequence, yield);
|
|
||||||
}))
|
|
||||||
{
|
{
|
||||||
publishLedger(*latestSequence, {});
|
publishLedger(latestSequence, {});
|
||||||
latestSequence = *latestSequence + 1;
|
latestSequence = latestSequence + 1;
|
||||||
}
|
}
|
||||||
else // if we can't, wait until it's validated by the network, or 1
|
else // if we can't, wait until it's validated by the network, or 1
|
||||||
// second passes, whichever occurs first. Even if we don't hear
|
// second passes, whichever occurs first. Even if we don't hear
|
||||||
// from rippled, if ledgers are being written to the db, we
|
// from rippled, if ledgers are being written to the db, we
|
||||||
// publish them
|
// publish them
|
||||||
networkValidatedLedgers_->waitUntilValidatedByNetwork(
|
networkValidatedLedgers_->waitUntilValidatedByNetwork(
|
||||||
*latestSequence, 1000);
|
latestSequence, 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user