refactor publish sequence logic for read only

This commit is contained in:
CJ Cobb
2022-02-17 20:49:28 +00:00
committed by CJ Cobb
parent d9741c7c80
commit f84733cb20
7 changed files with 102 additions and 93 deletions

View File

@@ -38,7 +38,7 @@ retryOnTimeout(F func, size_t waitMs = 500)
}
template <class F>
void
auto
synchronous(F&& f)
{
boost::asio::io_context ctx;
@@ -47,18 +47,42 @@ synchronous(F&& f)
work.emplace(ctx);
boost::asio::spawn(strand, [&f, &work](boost::asio::yield_context yield) {
f(yield);
using R = typename std::result_of<F(boost::asio::yield_context&)>::type;
if constexpr (!std::is_same<R, void>::value)
{
R res;
boost::asio::spawn(
strand, [&f, &work, &res](boost::asio::yield_context yield) {
res = f(yield);
work.reset();
});
work.reset();
});
ctx.run();
return res;
}
else
{
boost::asio::spawn(
strand, [&f, &work](boost::asio::yield_context yield) {
f(yield);
work.reset();
});
ctx.run();
ctx.run();
}
}
template <class F>
auto
synchronousAndRetryOnTimeout(F&& f)
{
return retryOnTimeout([&]() { return synchronous(f); });
}
class BackendInterface
{
protected:
mutable std::shared_mutex rngMtx_;
std::optional<LedgerRange> range;
SimpleCache cache_;
@@ -109,10 +133,21 @@ public:
std::optional<LedgerRange>
fetchLedgerRange() const
{
std::lock_guard lk(mutex_);
std::shared_lock lck(rngMtx_);
return range;
}
void
updateRange(uint32_t newMax)
{
std::unique_lock lck(rngMtx_);
assert(!range || newMax >= range->maxSequence);
if (!range)
range = {newMax, newMax};
else
range->maxSequence = newMax;
}
std::optional<ripple::Fees>
fetchFees(std::uint32_t const seq, boost::asio::yield_context& yield) const;
@@ -216,12 +251,9 @@ public:
std::optional<LedgerRange>
hardFetchLedgerRange() const
{
std::optional<LedgerRange> range = {};
synchronous([&](boost::asio::yield_context& yield) {
range = hardFetchLedgerRange(yield);
return synchronous([&](boost::asio::yield_context yield) {
return hardFetchLedgerRange(yield);
});
return range;
}
virtual std::optional<LedgerRange>
@@ -234,16 +266,6 @@ public:
std::optional<LedgerRange>
hardFetchLedgerRangeNoThrow(boost::asio::yield_context& yield) const;
void
updateRange(std::uint32_t const newMax)
{
std::lock_guard lk(mutex_);
if (!range)
range = {newMax, newMax};
else
range->maxSequence = newMax;
}
virtual void
writeLedger(
ripple::LedgerInfo const& ledgerInfo,
@@ -306,7 +328,7 @@ private:
std::string&& blob) = 0;
virtual bool
doFinishWrites() const = 0;
doFinishWrites() = 0;
};
} // namespace Backend

View File

@@ -692,7 +692,7 @@ public:
boost::asio::yield_context& yield) const override;
bool
doFinishWrites() const override
doFinishWrites() override
{
if (!range || lastSync_ == 0 ||
ledgerSequence_ - syncInterval_ == lastSync_)

View File

@@ -717,7 +717,7 @@ PostgresBackend::startWrites() const
}
bool
PostgresBackend::doFinishWrites() const
PostgresBackend::doFinishWrites()
{
synchronous([&](boost::asio::yield_context yield) {
if (!abortWrite_)

View File

@@ -134,7 +134,7 @@ public:
startWrites() const override;
bool
doFinishWrites() const override;
doFinishWrites() override;
bool
doOnlineDelete(

View File

@@ -51,7 +51,7 @@ public:
getMostRecent()
{
std::unique_lock lck(m_);
cv_.wait(lck, [this]() { return max_ || stopping_; });
cv_.wait(lck, [this]() { return max_; });
return max_;
}
@@ -60,24 +60,19 @@ public:
/// @return true if sequence was validated, false otherwise
/// a return value of false means the datastructure has been stopped
bool
waitUntilValidatedByNetwork(uint32_t sequence)
waitUntilValidatedByNetwork(
uint32_t sequence,
std::optional<uint32_t> maxWaitMs = {})
{
std::unique_lock lck(m_);
cv_.wait(lck, [sequence, this]() {
return (max_ && sequence <= *max_) || stopping_;
});
return !stopping_;
}
/// Puts the datastructure in the stopped state
/// Future calls to this datastructure will not block
/// This operation cannot be reversed
void
stop()
{
std::lock_guard lck(m_);
stopping_ = true;
cv_.notify_all();
auto pred = [sequence, this]() -> bool {
return (max_ && sequence <= *max_);
};
if (maxWaitMs)
cv_.wait_for(lck, std::chrono::milliseconds(*maxWaitMs));
else
cv_.wait(lck, pred);
return pred();
}
};

View File

@@ -249,7 +249,7 @@ PlainETLSource::onConnect(
std::string(BOOST_BEAST_VERSION_STRING) +
" coro-client");
req.set("X-User", "coro-client");
req.set("X-User", "coro-client");
}));
// Update the host_ string. This will provide the value of the

View File

@@ -139,37 +139,22 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - Updating cache";
std::vector<Backend::LedgerObject> diff;
auto fetchDiffSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context& yield) {
diff = backend_->fetchLedgerDiff(lgrInfo.seq, yield);
std::vector<Backend::LedgerObject> diff =
Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerDiff(lgrInfo.seq, yield);
});
};
Backend::retryOnTimeout(fetchDiffSynchronous);
backend_->cache().update(diff, lgrInfo.seq);
backend_->updateRange(lgrInfo.seq);
}
backend_->updateRange(lgrInfo.seq);
std::optional<ripple::Fees> fees = {};
std::vector<Backend::TransactionAndMetadata> transactions = {};
std::optional<ripple::Fees> fees = Backend::synchronousAndRetryOnTimeout(
[&](auto yield) { return backend_->fetchFees(lgrInfo.seq, yield); });
auto fetchFeesSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context& yield) {
fees = backend_->fetchFees(lgrInfo.seq, yield);
std::vector<Backend::TransactionAndMetadata> transactions =
Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
});
};
auto fetchTxSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context& yield) {
transactions =
backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
});
};
Backend::retryOnTimeout(fetchFeesSynchronous);
Backend::retryOnTimeout(fetchTxSynchronous);
auto ledgerRange = backend_->fetchLedgerRange();
assert(ledgerRange);
@@ -222,15 +207,9 @@ ReportingETL::publishLedger(
}
else
{
std::optional<ripple::LedgerInfo> lgr = {};
auto fetchLedgerSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context& yield) {
lgr =
backend_->fetchLedgerBySequence(ledgerSequence, yield);
});
};
Backend::retryOnTimeout(fetchLedgerSynchronous);
auto lgr = Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerBySequence(ledgerSequence, yield);
});
assert(lgr);
publishLedger(*lgr);
@@ -721,7 +700,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
});
}
backend_->updateRange(lgrInfo.seq);
lastPublishedSequence = lgrInfo.seq;
}
writeConflict = !success;
@@ -782,11 +760,10 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
void
ReportingETL::monitor()
{
std::optional<uint32_t> latestSequence = {};
Backend::synchronous([&](boost::asio::yield_context& yield) {
latestSequence = backend_->fetchLatestLedgerSequence(yield);
});
std::optional<uint32_t> latestSequence =
Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLatestLedgerSequence(yield);
});
if (!latestSequence)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
@@ -916,21 +893,36 @@ void
ReportingETL::monitorReadOnly()
{
BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode";
std::optional<uint32_t> mostRecent =
networkValidatedLedgers_->getMostRecent();
if (!mostRecent)
std::optional<uint32_t> latestSequence =
Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLatestLedgerSequence(yield);
});
if (!latestSequence)
latestSequence = networkValidatedLedgers_->getMostRecent();
if (!latestSequence)
return;
uint32_t sequence = *mostRecent;
std::thread t{[this, sequence]() {
std::thread t{[this, latestSequence]() {
BOOST_LOG_TRIVIAL(info) << "Loading cache";
loadBalancer_->loadInitialLedger(sequence, true);
loadBalancer_->loadInitialLedger(*latestSequence, true);
}};
t.detach();
while (!stopping_ &&
networkValidatedLedgers_->waitUntilValidatedByNetwork(sequence))
latestSequence = *latestSequence + 1;
while (true)
{
publishLedger(sequence, {});
++sequence;
// try to grab the next ledger
if (Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerBySequence(*latestSequence, yield);
}))
{
publishLedger(*latestSequence, {});
latestSequence = *latestSequence + 1;
}
else // if we can't, wait until it's validated by the network, or 1
// second passes, whichever occurs first. Even if we don't hear
// from rippled, if ledgers are being written to the db, we
// publish them
networkValidatedLedgers_->waitUntilValidatedByNetwork(
*latestSequence, 1000);
}
}