Cover LoadBalancer with tests (#1394)

Fixes #680. Fixes #1222.
This commit is contained in:
Sergey Kuznetsov
2024-05-15 14:02:36 +01:00
committed by GitHub
parent f74b89cc8d
commit da10535bc0
61 changed files with 2769 additions and 1466 deletions

View File

@@ -21,9 +21,10 @@
#include "data/BackendInterface.hpp"
#include "etl/ETLHelpers.hpp"
#include "etl/ETLService.hpp"
#include "etl/ETLState.hpp"
#include "etl/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp"
#include "util/Constants.hpp"
#include "util/Random.hpp"
#include "util/log/Logger.hpp"
@@ -57,19 +58,23 @@ LoadBalancer::make_LoadBalancer(
Config const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
SourceFactory sourceFactory
)
{
return std::make_shared<LoadBalancer>(config, ioc, backend, subscriptions, validatedLedgers);
return std::make_shared<LoadBalancer>(
config, ioc, std::move(backend), std::move(subscriptions), std::move(validatedLedgers), std::move(sourceFactory)
);
}
LoadBalancer::LoadBalancer(
Config const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
SourceFactory sourceFactory
)
{
auto const forwardingCacheTimeout = config.valueOr<float>("forwarding_cache_timeout", 0.f);
@@ -81,7 +86,8 @@ LoadBalancer::LoadBalancer(
static constexpr std::uint32_t MAX_DOWNLOAD = 256;
if (auto value = config.maybeValue<uint32_t>("num_markers"); value) {
downloadRanges_ = std::clamp(*value, 1u, MAX_DOWNLOAD);
ASSERT(*value > 0 and *value <= MAX_DOWNLOAD, "'num_markers' value in config must be in range 1-256");
downloadRanges_ = *value;
} else if (backend->fetchLedgerRange()) {
downloadRanges_ = 4;
}
@@ -98,7 +104,7 @@ LoadBalancer::LoadBalancer(
};
for (auto const& entry : config.array("etl_sources")) {
auto source = make_Source(
auto source = sourceFactory(
entry,
ioc,
backend,
@@ -113,12 +119,12 @@ LoadBalancer::LoadBalancer(
);
// checking etl node validity
auto const stateOpt = ETLState::fetchETLStateFromSource(source);
auto const stateOpt = ETLState::fetchETLStateFromSource(*source);
if (!stateOpt) {
checkOnETLFailure(fmt::format(
"Failed to fetch ETL state from source = {} Please check the configuration and network",
source.toString()
source->toString()
));
} else if (etlState_ && etlState_->networkID && stateOpt->networkID &&
etlState_->networkID != stateOpt->networkID) {
@@ -132,7 +138,7 @@ LoadBalancer::LoadBalancer(
}
sources_.push_back(std::move(source));
LOG(log_.info()) << "Added etl source - " << sources_.back().toString();
LOG(log_.info()) << "Added etl source - " << sources_.back()->toString();
}
if (sources_.empty())
@@ -140,8 +146,8 @@ LoadBalancer::LoadBalancer(
// This is made separate from source creation to prevent UB in case one of the sources will call
// chooseForwardingSource while we are still filling the sources_ vector
for (auto& source : sources_) {
source.run();
for (auto const& source : sources_) {
source->run();
}
}
@@ -150,53 +156,57 @@ LoadBalancer::~LoadBalancer()
sources_.clear();
}
std::pair<std::vector<std::string>, bool>
LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly)
std::vector<std::string>
LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly, std::chrono::steady_clock::duration retryAfter)
{
std::vector<std::string> response;
auto const success = execute(
execute(
[this, &response, &sequence, cacheOnly](auto& source) {
auto [data, res] = source.loadInitialLedger(sequence, downloadRanges_, cacheOnly);
auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, cacheOnly);
if (!res) {
LOG(log_.error()) << "Failed to download initial ledger. Sequence = " << sequence
<< " source = " << source.toString();
LOG(log_.error()) << "Failed to download initial ledger."
<< " Sequence = " << sequence << " source = " << source->toString();
} else {
response = std::move(data);
}
return res;
},
sequence
sequence,
retryAfter
);
return {std::move(response), success};
return response;
}
LoadBalancer::OptionalGetLedgerResponseType
LoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors)
LoadBalancer::fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors,
std::chrono::steady_clock::duration retryAfter
)
{
GetLedgerResponseType response;
bool const success = execute(
execute(
[&response, ledgerSequence, getObjects, getObjectNeighbors, log = log_](auto& source) {
auto [status, data] = source.fetchLedger(ledgerSequence, getObjects, getObjectNeighbors);
auto [status, data] = source->fetchLedger(ledgerSequence, getObjects, getObjectNeighbors);
response = std::move(data);
if (status.ok() && response.validated()) {
LOG(log.info()) << "Successfully fetched ledger = " << ledgerSequence
<< " from source = " << source.toString();
<< " from source = " << source->toString();
return true;
}
LOG(log.warn()) << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString()
<< ", error_code: " << status.error_code() << ", error_msg: " << status.error_message()
<< ", source = " << source.toString();
<< ", source = " << source->toString();
return false;
},
ledgerSequence
ledgerSequence,
retryAfter
);
if (success) {
return response;
}
return {};
return response;
}
std::optional<boost::json::object>
@@ -212,15 +222,14 @@ LoadBalancer::forwardToRippled(
}
}
std::size_t sourceIdx = 0;
if (!sources_.empty())
sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests.");
std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
auto numAttempts = 0u;
std::optional<boost::json::object> response;
while (numAttempts < sources_.size()) {
if (auto res = sources_[sourceIdx].forwardToRippled(request, clientIp, yield)) {
if (auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, yield)) {
response = std::move(res);
break;
}
@@ -240,42 +249,41 @@ LoadBalancer::toJson() const
{
boost::json::array ret;
for (auto& src : sources_)
ret.push_back(src.toJson());
ret.push_back(src->toJson());
return ret;
}
template <typename Func>
bool
LoadBalancer::execute(Func f, uint32_t ledgerSequence)
void
LoadBalancer::execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter)
{
std::size_t sourceIdx = 0;
if (!sources_.empty())
sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
ASSERT(not sources_.empty(), "ETL sources must be configured to execute functions.");
size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1);
auto numAttempts = 0;
size_t numAttempts = 0;
while (true) {
auto& source = sources_[sourceIdx];
LOG(log_.debug()) << "Attempting to execute func. ledger sequence = " << ledgerSequence
<< " - source = " << source.toString();
<< " - source = " << source->toString();
// Originally, it was (source->hasLedger(ledgerSequence) || true)
/* Sometimes rippled has ledger but doesn't actually know. However,
but this does NOT happen in the normal case and is safe to remove
This || true is only needed when loading full history standalone */
if (source.hasLedger(ledgerSequence)) {
if (source->hasLedger(ledgerSequence)) {
bool const res = f(source);
if (res) {
LOG(log_.debug()) << "Successfully executed func at source = " << source.toString()
LOG(log_.debug()) << "Successfully executed func at source = " << source->toString()
<< " - ledger sequence = " << ledgerSequence;
break;
}
LOG(log_.warn()) << "Failed to execute func at source = " << source.toString()
LOG(log_.warn()) << "Failed to execute func at source = " << source->toString()
<< " - ledger sequence = " << ledgerSequence;
} else {
LOG(log_.warn()) << "Ledger not present at source = " << source.toString()
LOG(log_.warn()) << "Ledger not present at source = " << source->toString()
<< " - ledger sequence = " << ledgerSequence;
}
sourceIdx = (sourceIdx + 1) % sources_.size();
@@ -283,10 +291,9 @@ LoadBalancer::execute(Func f, uint32_t ledgerSequence)
if (numAttempts % sources_.size() == 0) {
LOG(log_.info()) << "Ledger sequence " << ledgerSequence
<< " is not yet available from any configured sources. Sleeping and trying again";
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(retryAfter);
}
}
return true;
}
std::optional<ETLState>
@@ -304,10 +311,11 @@ LoadBalancer::chooseForwardingSource()
{
hasForwardingSource_ = false;
for (auto& source : sources_) {
if (source.isConnected()) {
source.setForwarding(true);
if (not hasForwardingSource_ and source->isConnected()) {
source->setForwarding(true);
hasForwardingSource_ = true;
return;
} else {
source->setForwarding(false);
}
}
}