Files
clio/src/etlng/impl/LedgerPublisher.hpp
2025-06-18 21:40:11 +01:00

283 lines
11 KiB
C++

//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "etl/SystemState.hpp"
#include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/impl/Loading.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp"
#include "util/Mutex.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Prometheus.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
#include <fmt/core.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/protocol/Fees.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/STObject.h>
#include <xrpl/protocol/Serializer.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>
namespace etlng::impl {
/**
* @brief Publishes ledgers in a synchronized fashion.
*
* If ETL is started far behind the network, ledgers will be written and published very rapidly. Monitoring processes
* will publish ledgers as they are written. However, to publish a ledger, the monitoring process needs to read all of
* the transactions for that ledger from the database. Reading the transactions from the database requires network
* calls, which can be slow. It is imperative however that the monitoring processes keep up with the writer, else the
* monitoring processes will not be able to detect if the writer failed. Therefore, publishing each ledger (which
* includes reading all of the transactions from the database) is done from the application wide asio io_service, and a
* strand is used to ensure ledgers are published in order.
*/
class LedgerPublisher : public etlng::LedgerPublisherInterface {
util::Logger log_{"ETL"};
boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
std::reference_wrapper<etl::SystemState const> state_; // shared state for ETL
util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
"etl_last_publish_seconds",
{},
"Seconds since epoch of the last published ledger"
);
util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
public:
/**
* @brief Create an instance of the publisher
*/
LedgerPublisher(
boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLServiceNg
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
etl::SystemState const& state
)
: publishStrand_{boost::asio::make_strand(ioc)}
, backend_{std::move(backend)}
, subscriptions_{std::move(subscriptions)}
, state_{std::cref(state)}
{
}
/**
* @brief Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers
* stream.
*
* @param ledgerSequence the sequence of the ledger to publish
* @param maxAttempts the number of times to attempt to read the ledger from the database
* @param attemptsDelay the delay between attempts to read the ledger from the database
* @return Whether the ledger was found in the database and published
*/
bool
publish(
uint32_t ledgerSequence,
std::optional<uint32_t> maxAttempts,
std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
) override
{
LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
while (not state_.get().isStopping) {
auto range = backend_->hardFetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence) {
++numAttempts;
LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
// We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
if (maxAttempts && numAttempts >= maxAttempts) {
LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
return false;
}
std::this_thread::sleep_for(attemptsDelay);
continue;
}
auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerBySequence(ledgerSequence, yield);
});
ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
publish(*lgr);
return true;
}
return false;
}
/**
* @brief Publish the passed ledger asynchronously.
*
* All ledgers are published thru publishStrand_ which ensures that all publishes are performed in a serial fashion.
*
* @param lgrInfo the ledger to publish
*/
void
publish(ripple::LedgerHeader const& lgrInfo)
{
boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
setLastClose(lgrInfo.closeTime);
auto age = lastCloseAgeSeconds();
// if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
if (age < kMAX_LEDGER_AGE_SECONDS) {
std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchFees(lgrInfo.seq, yield);
});
ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
});
auto const ledgerRange = backend_->fetchLedgerRange();
ASSERT(ledgerRange.has_value(), "Ledger range must exist");
auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
// order with transaction index
std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
ripple::STObject const object1(iter1, ripple::sfMetadata);
ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
ripple::STObject const object2(iter2, ripple::sfMetadata);
return object1.getFieldU32(ripple::sfTransactionIndex) <
object2.getFieldU32(ripple::sfTransactionIndex);
});
for (auto const& txAndMeta : transactions)
subscriptions_->pubTransaction(txAndMeta, lgrInfo);
subscriptions_->pubBookChanges(lgrInfo, transactions);
setLastPublishTime();
LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
} else {
LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
}
});
// we track latest publish-requested seq, not necessarily already published
setLastPublishedSequence(lgrInfo.seq);
}
/**
* @brief Get time passed since last publish, in seconds
*/
std::uint32_t
lastPublishAgeSeconds() const override
{
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
.count();
}
/**
* @brief Get last publish time as a time point
*/
std::chrono::time_point<std::chrono::system_clock>
getLastPublish() const override
{
return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
}};
}
/**
* @brief Get time passed since last ledger close, in seconds
*/
std::uint32_t
lastCloseAgeSeconds() const override
{
auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
if (now < (kRIPPLE_EPOCH_START + closeTime))
return 0;
return now - (kRIPPLE_EPOCH_START + closeTime);
}
/**
* @brief Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been
* published to network
*/
std::optional<uint32_t>
getLastPublishedSequence() const
{
return *lastPublishedSequence_.lock();
}
private:
void
setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
{
auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
*closeTime = lastCloseTime;
}
void
setLastPublishTime()
{
using namespace std::chrono;
auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
lastPublishSeconds_.get().set(nowSeconds);
}
void
setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
{
auto lastPublishSeq = lastPublishedSequence_.lock();
*lastPublishSeq = lastPublishedSequence;
}
};
} // namespace etlng::impl