Add config to run without valid etl (#946)

Fix #943
This commit is contained in:
cyan317
2023-10-20 16:22:25 +01:00
committed by GitHub
parent 1aab2b94b1
commit e062121917
10 changed files with 142 additions and 50 deletions

View File

@@ -26,6 +26,7 @@
// --- // ---
} }
}, },
"allow_no_etl": false, // Allow Clio to run without valid ETL source, otherwise Clio will stop if ETL check fails
"etl_sources": [ "etl_sources": [
{ {
"ip": "127.0.0.1", "ip": "127.0.0.1",

View File

@@ -208,8 +208,9 @@ public:
/** /**
* @brief Get the etl nodes' state * @brief Get the etl nodes' state
* @return the etl nodes' state, nullopt if etl nodes are not connected
*/ */
etl::ETLState std::optional<etl::ETLState>
getETLState() const noexcept getETLState() const noexcept
{ {
return loadBalancer_->getETLState(); return loadBalancer_->getETLState();

View File

@@ -17,26 +17,11 @@
*/ */
//============================================================================== //==============================================================================
#include <data/BackendInterface.h>
#include <etl/ETLState.h> #include <etl/ETLState.h>
#include <etl/Source.h>
#include <rpc/JS.h> #include <rpc/JS.h>
namespace etl { namespace etl {
ETLState
ETLState::fetchETLStateFromSource(Source const& source) noexcept
{
auto const serverInfoRippled = data::synchronous([&source](auto yield) {
return source.forwardToRippled({{"command", "server_info"}}, std::nullopt, yield);
});
if (serverInfoRippled)
return boost::json::value_to<ETLState>(boost::json::value(*serverInfoRippled));
return ETLState{};
}
ETLState ETLState
tag_invoke(boost::json::value_to_tag<ETLState>, boost::json::value const& jv) tag_invoke(boost::json::value_to_tag<ETLState>, boost::json::value const& jv)
{ {

View File

@@ -19,6 +19,8 @@
#pragma once #pragma once
#include <data/BackendInterface.h>
#include <boost/json.hpp> #include <boost/json.hpp>
#include <cstdint> #include <cstdint>
@@ -26,8 +28,6 @@
namespace etl { namespace etl {
class Source;
/** /**
* @brief This class is responsible for fetching and storing the state of the ETL information, such as the network id * @brief This class is responsible for fetching and storing the state of the ETL information, such as the network id
*/ */
@@ -36,9 +36,22 @@ struct ETLState {
/** /**
* @brief Fetch the ETL state from the rippled server * @brief Fetch the ETL state from the rippled server
* @param source The source to fetch the state from
* @return The ETL state, nullopt if source not available
*/ */
static ETLState template <typename Forward>
fetchETLStateFromSource(Source const& source) noexcept; static std::optional<ETLState>
fetchETLStateFromSource(Forward const& source) noexcept
{
auto const serverInfoRippled = data::synchronous([&source](auto yield) {
return source.forwardToRippled({{"command", "server_info"}}, std::nullopt, yield);
});
if (serverInfoRippled)
return boost::json::value_to<ETLState>(boost::json::value(*serverInfoRippled));
return std::nullopt;
}
}; };
ETLState ETLState

View File

@@ -83,33 +83,44 @@ LoadBalancer::LoadBalancer(
downloadRanges_ = 4; downloadRanges_ = 4;
} }
auto const allowNoEtl = config.valueOr("allow_no_etl", false);
auto const checkOnETLFailure = [this, allowNoEtl](std::string const& log) {
LOG(log_.error()) << log;
if (!allowNoEtl) {
LOG(log_.error()) << "Set allow_no_etl as true in config to allow clio run without valid ETL sources.";
throw std::logic_error("ETL configuration error.");
}
};
for (auto const& entry : config.array("etl_sources")) { for (auto const& entry : config.array("etl_sources")) {
std::unique_ptr<Source> source = make_Source(entry, ioc, backend, subscriptions, validatedLedgers, *this); std::unique_ptr<Source> source = make_Source(entry, ioc, backend, subscriptions, validatedLedgers, *this);
// checking etl node validity // checking etl node validity
auto const state = ETLState::fetchETLStateFromSource(*source); auto const stateOpt = ETLState::fetchETLStateFromSource(*source);
if (!state.networkID) { if (!stateOpt) {
LOG(log_.error()) << "Failed to fetch ETL state from source = " << source->toString() checkOnETLFailure(fmt::format(
<< " Please check the configuration and network"; "Failed to fetch ETL state from source = {} Please check the configuration and network",
throw std::logic_error("ETL node not available"); source->toString()
));
} else if (etlState_ && etlState_->networkID && stateOpt->networkID && etlState_->networkID != stateOpt->networkID) {
checkOnETLFailure(fmt::format(
"ETL sources must be on the same network. Source network id = {} does not match others network id = {}",
*(stateOpt->networkID),
*(etlState_->networkID)
));
} else {
etlState_ = stateOpt;
} }
if (etlState_ && etlState_->networkID != state.networkID) {
LOG(log_.error()) << "ETL sources must be on the same network. "
<< "Source network id = " << *(state.networkID)
<< " does not match others network id = " << *(etlState_->networkID);
throw std::logic_error("ETL nodes are not in the same network");
}
etlState_ = state;
sources_.push_back(std::move(source)); sources_.push_back(std::move(source));
LOG(log_.info()) << "Added etl source - " << sources_.back()->toString(); LOG(log_.info()) << "Added etl source - " << sources_.back()->toString();
} }
if (sources_.empty()) { if (sources_.empty())
LOG(log_.error()) << "No ETL sources configured. Please check the configuration"; checkOnETLFailure("No ETL sources configured. Please check the configuration");
throw std::logic_error("No ETL sources configured");
}
} }
LoadBalancer::~LoadBalancer() LoadBalancer::~LoadBalancer()
@@ -256,11 +267,14 @@ LoadBalancer::execute(Func f, uint32_t ledgerSequence)
return true; return true;
} }
ETLState std::optional<ETLState>
LoadBalancer::getETLState() const noexcept LoadBalancer::getETLState() noexcept
{ {
assert(etlState_); // etlState_ is set in the constructor if (!etlState_) {
return *etlState_; // retry ETLState fetch
etlState_ = ETLState::fetchETLStateFromSource(*this);
}
return etlState_;
} }
} // namespace etl } // namespace etl

View File

@@ -181,9 +181,10 @@ public:
/** /**
* @brief Return state of ETL nodes. * @brief Return state of ETL nodes.
* @return ETL state, nullopt if etl nodes not available
*/ */
ETLState std::optional<ETLState>
getETLState() const noexcept; getETLState() noexcept;
private: private:
/** /**

View File

@@ -98,7 +98,9 @@ public:
return Error{Status{RippledError::rpcEXCESSIVE_LGR_RANGE}}; return Error{Status{RippledError::rpcEXCESSIVE_LGR_RANGE}};
} }
auto const currentNetId = etl_->getETLState().networkID; std::optional<uint32_t> currentNetId = std::nullopt;
if (auto const& etlState = etl_->getETLState(); etlState.has_value())
currentNetId = etlState->networkID;
std::optional<data::TransactionAndMetadata> dbResponse; std::optional<data::TransactionAndMetadata> dbResponse;

View File

@@ -36,7 +36,7 @@ TEST_F(ETLStateTest, Error)
{ {
EXPECT_CALL(source, forwardToRippled).WillOnce(Return(std::nullopt)); EXPECT_CALL(source, forwardToRippled).WillOnce(Return(std::nullopt));
auto const state = etl::ETLState::fetchETLStateFromSource(source); auto const state = etl::ETLState::fetchETLStateFromSource(source);
EXPECT_FALSE(state.networkID.has_value()); EXPECT_FALSE(state);
} }
TEST_F(ETLStateTest, NetworkIdValid) TEST_F(ETLStateTest, NetworkIdValid)
@@ -52,8 +52,9 @@ TEST_F(ETLStateTest, NetworkIdValid)
); );
EXPECT_CALL(source, forwardToRippled).WillOnce(Return(json.as_object())); EXPECT_CALL(source, forwardToRippled).WillOnce(Return(json.as_object()));
auto const state = etl::ETLState::fetchETLStateFromSource(source); auto const state = etl::ETLState::fetchETLStateFromSource(source);
ASSERT_TRUE(state.networkID.has_value()); ASSERT_TRUE(state.has_value());
EXPECT_EQ(state.networkID.value(), 12); ASSERT_TRUE(state->networkID.has_value());
EXPECT_EQ(state->networkID.value(), 12);
} }
TEST_F(ETLStateTest, NetworkIdInvalid) TEST_F(ETLStateTest, NetworkIdInvalid)
@@ -69,5 +70,6 @@ TEST_F(ETLStateTest, NetworkIdInvalid)
); );
EXPECT_CALL(source, forwardToRippled).WillOnce(Return(json.as_object())); EXPECT_CALL(source, forwardToRippled).WillOnce(Return(json.as_object()));
auto const state = etl::ETLState::fetchETLStateFromSource(source); auto const state = etl::ETLState::fetchETLStateFromSource(source);
EXPECT_FALSE(state.networkID.has_value()); ASSERT_TRUE(state.has_value());
EXPECT_FALSE(state->networkID.has_value());
} }

View File

@@ -410,7 +410,7 @@ TEST_F(RPCTxTest, ReturnBinaryWithCTID)
TEST_F(RPCTxTest, MintNFT) TEST_F(RPCTxTest, MintNFT)
{ {
// Note: `inLedger` is API v1 only. See DefaultOutput_* // Note: `inLedger` is API v1 only. See DefaultOutput_*
auto const static OUT = fmt::format( auto static const OUT = fmt::format(
R"({{ R"({{
"Account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", "Account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn",
"Fee": "50", "Fee": "50",
@@ -756,9 +756,82 @@ TEST_F(RPCTxTest, ReturnCTIDForTxInput)
}); });
} }
TEST_F(RPCTxTest, NotReturnCTIDIfETLNotAvaiable)
{
auto constexpr static OUT = R"({
"Account":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn",
"Fee":"2",
"Sequence":100,
"SigningPubKey":"74657374",
"TakerGets":
{
"currency":"0158415500000000C1F76FF6ECB0BAC600000000",
"issuer":"rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun",
"value":"200"
},
"TakerPays":"300",
"TransactionType":"OfferCreate",
"hash":"2E2FBAAFF767227FE4381C4BE9855986A6B9F96C62F6E443731AB36F7BBB8A08",
"meta":
{
"AffectedNodes":
[
{
"CreatedNode":
{
"LedgerEntryType":"Offer",
"NewFields":
{
"TakerGets":"200",
"TakerPays":
{
"currency":"0158415500000000C1F76FF6ECB0BAC600000000",
"issuer":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn",
"value":"300"
}
}
}
}
],
"TransactionIndex":100,
"TransactionResult":"tesSUCCESS"
},
"date":123456,
"ledger_index":100,
"inLedger":100,
"validated": true
})";
auto const rawBackendPtr = dynamic_cast<MockBackend*>(mockBackendPtr.get());
TransactionAndMetadata tx;
tx.metadata = CreateMetaDataForCreateOffer(CURRENCY, ACCOUNT, 100, 200, 300).getSerializer().peekData();
tx.transaction =
CreateCreateOfferTransactionObject(ACCOUNT, 2, 100, CURRENCY, ACCOUNT2, 200, 300).getSerializer().peekData();
tx.date = 123456;
tx.ledgerSequence = 100;
EXPECT_CALL(*rawBackendPtr, fetchTransaction(ripple::uint256{TXNID}, _)).WillOnce(Return(tx));
auto const rawETLPtr = dynamic_cast<MockETLService*>(mockETLServicePtr.get());
ASSERT_NE(rawETLPtr, nullptr);
EXPECT_CALL(*rawETLPtr, getETLState).WillOnce(Return(std::nullopt));
runSpawn([this](auto yield) {
auto const handler = AnyHandler{TestTxHandler{mockBackendPtr, mockETLServicePtr}};
auto const req = json::parse(fmt::format(
R"({{
"command": "tx",
"transaction": "{}"
}})",
TXNID
));
auto const output = handler.process(req, Context{yield});
ASSERT_TRUE(output);
EXPECT_EQ(*output, json::parse(OUT));
});
}
TEST_F(RPCTxTest, ViaCTID) TEST_F(RPCTxTest, ViaCTID)
{ {
auto const static OUT = fmt::format( auto static const OUT = fmt::format(
R"({{ R"({{
"Account":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", "Account":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn",
"Fee":"2", "Fee":"2",

View File

@@ -32,5 +32,5 @@ struct MockETLService {
MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const)); MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const));
MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const)); MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const));
MOCK_METHOD(bool, isAmendmentBlocked, (), (const)); MOCK_METHOD(bool, isAmendmentBlocked, (), (const));
MOCK_METHOD(etl::ETLState, getETLState, (), (const)); MOCK_METHOD(std::optional<etl::ETLState>, getETLState, (), (const));
}; };