Repeatedly log on amendment block (#829)

Fixes #364
This commit is contained in:
Alex Kremer
2023-09-13 13:34:02 +01:00
committed by GitHub
parent 91648f98ad
commit 6cfbfda014
12 changed files with 262 additions and 57 deletions

View File

@@ -163,6 +163,7 @@ if (tests)
unittests/etl/ExtractorTests.cpp
unittests/etl/TransformerTests.cpp
unittests/etl/CacheLoaderTests.cpp
unittests/etl/AmendmentBlockHandlerTests.cpp
# RPC
unittests/rpc/ErrorTests.cpp
unittests/rpc/BaseTests.cpp

View File

@@ -47,7 +47,8 @@ ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)
extractors.push_back(std::make_unique<ExtractorType>(
pipe, networkValidatedLedgers_, ledgerFetcher_, startSequence + i, finishSequence_, state_));
auto transformer = TransformerType{pipe, backend_, ledgerLoader_, ledgerPublisher_, startSequence, state_};
auto transformer =
TransformerType{pipe, backend_, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, startSequence, state_};
transformer.waitTillFinished(); // suspend current thread until exit condition is met
pipe.cleanup(); // TODO: this should probably happen automatically using destructor
@@ -110,12 +111,8 @@ ETLService::monitor()
}
catch (std::runtime_error const& e)
{
setAmendmentBlocked();
log_.fatal()
<< "Failed to load initial ledger, Exiting monitor loop: " << e.what()
<< " Possible cause: The ETL node is not compatible with the version of the rippled lib Clio is using.";
return;
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
return amendmentBlockHandler_.onAmendmentBlock();
}
if (ledger)
@@ -259,6 +256,7 @@ ETLService::ETLService(
, ledgerFetcher_(backend, balancer)
, ledgerLoader_(backend, balancer, ledgerFetcher_, state_)
, ledgerPublisher_(ioc, backend, subscriptions, state_)
, amendmentBlockHandler_(ioc, state_)
{
startSequence_ = config.maybeValue<uint32_t>("start_sequence");
finishSequence_ = config.maybeValue<uint32_t>("finish_sequence");

View File

@@ -24,6 +24,7 @@
#include <etl/LoadBalancer.h>
#include <etl/Source.h>
#include <etl/SystemState.h>
#include <etl/impl/AmendmentBlock.h>
#include <etl/impl/CacheLoader.h>
#include <etl/impl/ExtractionDataPipe.h>
#include <etl/impl/Extractor.h>
@@ -35,6 +36,7 @@
#include <util/log/Logger.h>
#include <ripple/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <boost/asio/steady_timer.hpp>
#include <grpcpp/grpcpp.h>
#include <memory>
@@ -76,7 +78,9 @@ class ETLService
using ExtractorType = etl::detail::Extractor<DataPipeType, NetworkValidatedLedgersType, LedgerFetcherType>;
using LedgerLoaderType = etl::detail::LedgerLoader<LoadBalancerType, LedgerFetcherType>;
using LedgerPublisherType = etl::detail::LedgerPublisher<SubscriptionManagerType>;
using TransformerType = etl::detail::Transformer<DataPipeType, LedgerLoaderType, LedgerPublisherType>;
using AmendmentBlockHandlerType = etl::detail::AmendmentBlockHandler<>;
using TransformerType =
etl::detail::Transformer<DataPipeType, LedgerLoaderType, LedgerPublisherType, AmendmentBlockHandlerType>;
util::Logger log_{"ETL"};
@@ -91,6 +95,7 @@ class ETLService
LedgerFetcherType ledgerFetcher_;
LedgerLoaderType ledgerLoader_;
LedgerPublisherType ledgerPublisher_;
AmendmentBlockHandlerType amendmentBlockHandler_;
SystemState state_;
@@ -267,14 +272,5 @@ private:
*/
void
doWork();
/**
* @brief Sets amendment blocked flag.
*/
void
setAmendmentBlocked()
{
state_.isAmendmentBlocked = true;
}
};
} // namespace etl

View File

@@ -36,10 +36,18 @@ struct SystemState
*/
bool isReadOnly = false;
std::atomic_bool isWriting = false; /**< @brief Whether the process is writing to the database. */
std::atomic_bool isStopping = false; /**< @brief Whether the software is stopping. */
std::atomic_bool writeConflict = false; /**< @brief Whether a write conflict was detected. */
std::atomic_bool isAmendmentBlocked = false; /**< @brief Whether we detected an amendment block. */
std::atomic_bool isWriting = false; /**< @brief Whether the process is writing to the database. */
std::atomic_bool isStopping = false; /**< @brief Whether the software is stopping. */
std::atomic_bool writeConflict = false; /**< @brief Whether a write conflict was detected. */
/**
* @brief Whether clio detected an amendment block.
*
* Being amendment blocked means that Clio was compiled with libxrpl that does not yet support some field that
* arrived from rippled and therefore can't extract the ledger diff. When this happens, Clio can't proceed with ETL
* and should log this error and only handle RPC requests.
*/
std::atomic_bool isAmendmentBlocked = false;
};
} // namespace etl

View File

@@ -0,0 +1,96 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <etl/SystemState.h>
#include <util/log/Logger.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <functional>
namespace etl::detail {
struct AmendmentBlockAction
{
void
operator()()
{
static util::Logger log{"ETL"};
LOG(log.fatal())
<< "Can't process new ledgers: The current ETL source is not compatible with the version of the "
"libxrpl Clio is currently using. Please upgrade Clio to a newer version.";
}
};
template <typename ActionCallableType = AmendmentBlockAction>
class AmendmentBlockHandler
{
std::reference_wrapper<boost::asio::io_context> ctx_;
std::reference_wrapper<SystemState> state_;
boost::asio::steady_timer timer_;
std::chrono::milliseconds interval_;
ActionCallableType action_;
public:
template <typename DurationType = std::chrono::seconds>
AmendmentBlockHandler(
boost::asio::io_context& ioc,
SystemState& state,
DurationType interval = DurationType{1},
ActionCallableType&& action = ActionCallableType())
: ctx_{std::ref(ioc)}
, state_{std::ref(state)}
, timer_{ioc}
, interval_{std::chrono::duration_cast<std::chrono::milliseconds>(interval)}
, action_{std::move(action)}
{
}
~AmendmentBlockHandler()
{
boost::asio::post(ctx_.get(), [this]() { timer_.cancel(); });
}
void
onAmendmentBlock()
{
state_.get().isAmendmentBlocked = true;
startReportingTimer();
}
private:
void
startReportingTimer()
{
action_();
timer_.expires_after(interval_);
timer_.async_wait([this](auto ec) {
if (!ec)
boost::asio::post(ctx_.get(), [this] { startReportingTimer(); });
});
}
};
} // namespace etl::detail

View File

@@ -21,6 +21,7 @@
#include <data/BackendInterface.h>
#include <etl/SystemState.h>
#include <etl/impl/AmendmentBlock.h>
#include <etl/impl/LedgerLoader.h>
#include <util/LedgerUtils.h>
#include <util/Profiler.h>
@@ -47,7 +48,11 @@ namespace etl::detail {
/**
* @brief Transformer thread that prepares new ledger out of raw data from GRPC.
*/
template <typename DataPipeType, typename LedgerLoaderType, typename LedgerPublisherType>
template <
typename DataPipeType,
typename LedgerLoaderType,
typename LedgerPublisherType,
typename AmendmentBlockHandlerType>
class Transformer
{
using GetLedgerResponseType = typename LedgerLoaderType::GetLedgerResponseType;
@@ -59,6 +64,8 @@ class Transformer
std::shared_ptr<BackendInterface> backend_;
std::reference_wrapper<LedgerLoaderType> loader_;
std::reference_wrapper<LedgerPublisherType> publisher_;
std::reference_wrapper<AmendmentBlockHandlerType> amendmentBlockHandler_;
uint32_t startSequence_;
std::reference_wrapper<SystemState> state_; // shared state for ETL
@@ -76,12 +83,14 @@ public:
std::shared_ptr<BackendInterface> backend,
LedgerLoaderType& loader,
LedgerPublisherType& publisher,
AmendmentBlockHandlerType& amendmentBlockHandler,
uint32_t startSequence,
SystemState& state)
: pipe_(std::ref(pipe))
: pipe_{std::ref(pipe)}
, backend_{backend}
, loader_(std::ref(loader))
, publisher_(std::ref(publisher))
, loader_{std::ref(loader)}
, publisher_{std::ref(publisher)}
, amendmentBlockHandler_{std::ref(amendmentBlockHandler)}
, startSequence_{startSequence}
, state_{std::ref(state)}
{
@@ -185,11 +194,9 @@ private:
}
catch (std::runtime_error const& e)
{
setAmendmentBlocked();
LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();
log_.fatal()
<< "Failed to build next ledger: " << e.what()
<< " Possible cause: The ETL node is not compatible with the version of the rippled lib Clio is using.";
amendmentBlockHandler_.get().onAmendmentBlock();
return {ripple::LedgerHeader{}, false};
}
@@ -238,7 +245,7 @@ private:
LOG(log_.debug()) << "object neighbors not included. using cache";
if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1)
throw std::runtime_error("Cache is not full, but object neighbors were not included");
throw std::logic_error("Cache is not full, but object neighbors were not included");
auto const blob = obj.mutable_data();
auto checkBookBase = false;
@@ -288,7 +295,7 @@ private:
{
LOG(log_.debug()) << "object neighbors not included. using cache";
if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq)
throw std::runtime_error("Cache is not full, but object neighbors were not included");
throw std::logic_error("Cache is not full, but object neighbors were not included");
for (auto const& obj : cacheUpdates)
{
@@ -423,19 +430,6 @@ private:
{
state_.get().writeConflict = conflict;
}
/**
* @brief Sets the amendment blocked flag.
*
* Being amendment blocked means that Clio was compiled with libxrpl that does not yet support some field that
* arrived from rippled and therefore can't extract the ledger diff. When this happens, Clio can't proceed with ETL
* and should log this error and only handle RPC requests.
*/
void
setAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
}
};
} // namespace etl::detail

View File

@@ -66,12 +66,12 @@ public:
* @brief Send via shared_ptr of string, that enables SubscriptionManager to publish to clients.
*
* @param msg The message to send
* @throws Not supported unless implemented in child classes. Will always throw std::runtime_error.
* @throws Not supported unless implemented in child classes. Will always throw std::logic_error.
*/
virtual void
send(std::shared_ptr<std::string> msg)
{
throw std::runtime_error("web server can not send the shared payload");
throw std::logic_error("web server can not send the shared payload");
}
/**

View File

@@ -0,0 +1,50 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <util/FakeAmendmentBlockAction.h>
#include <util/Fixtures.h>
#include <etl/impl/AmendmentBlock.h>
#include <gtest/gtest.h>
using namespace testing;
using namespace etl;
class AmendmentBlockHandlerTest : public NoLoggerFixture
{
protected:
using AmendmentBlockHandlerType = detail::AmendmentBlockHandler<FakeAmendmentBlockAction>;
boost::asio::io_context ioc_;
};
TEST_F(AmendmentBlockHandlerTest, CallToOnAmendmentBlockSetsStateAndRepeatedlyCallsAction)
{
std::size_t callCount = 0;
SystemState state;
AmendmentBlockHandlerType handler{ioc_, state, std::chrono::nanoseconds{1}, {std::ref(callCount)}};
EXPECT_FALSE(state.isAmendmentBlocked);
handler.onAmendmentBlock();
EXPECT_TRUE(state.isAmendmentBlocked);
ioc_.run_for(std::chrono::milliseconds{1});
EXPECT_TRUE(callCount >= 10);
}

View File

@@ -20,6 +20,7 @@
#include <etl/impl/Transformer.h>
#include <util/FakeFetchResponse.h>
#include <util/Fixtures.h>
#include <util/MockAmendmentBlockHandler.h>
#include <util/MockExtractionDataPipe.h>
#include <util/MockLedgerLoader.h>
#include <util/MockLedgerPublisher.h>
@@ -47,11 +48,14 @@ protected:
using ExtractionDataPipeType = MockExtractionDataPipe;
using LedgerLoaderType = MockLedgerLoader;
using LedgerPublisherType = MockLedgerPublisher;
using TransformerType = etl::detail::Transformer<ExtractionDataPipeType, LedgerLoaderType, LedgerPublisherType>;
using AmendmentBlockHandlerType = MockAmendmentBlockHandler;
using TransformerType = etl::detail::
Transformer<ExtractionDataPipeType, LedgerLoaderType, LedgerPublisherType, AmendmentBlockHandlerType>;
ExtractionDataPipeType dataPipe_;
LedgerLoaderType ledgerLoader_;
LedgerPublisherType ledgerPublisher_;
AmendmentBlockHandlerType amendmentBlockHandler_;
SystemState state_;
std::unique_ptr<TransformerType> transformer_;
@@ -82,8 +86,8 @@ TEST_F(ETLTransformerTest, StopsOnWriteConflict)
EXPECT_CALL(dataPipe_, popNext).Times(0);
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0);
transformer_ =
std::make_unique<TransformerType>(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_);
transformer_ = std::make_unique<TransformerType>(
dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, 0, state_);
transformer_->waitTillFinished(); // explicitly joins the thread
}
@@ -114,8 +118,8 @@ TEST_F(ETLTransformerTest, StopsOnEmptyFetchResponse)
EXPECT_CALL(*rawBackendPtr, doFinishWrites).Times(AtLeast(1));
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(AtLeast(1));
transformer_ =
std::make_unique<TransformerType>(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_);
transformer_ = std::make_unique<TransformerType>(
dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, 0, state_);
// after 10ms we start spitting out empty responses which means the extractor is finishing up
// this is normally combined with stopping the entire thing by setting the isStopping flag.
@@ -147,6 +151,8 @@ TEST_F(ETLTransformerTest, DoesNotPublishIfCanNotBuildNextLedger)
// should not call publish
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0);
transformer_ =
std::make_unique<TransformerType>(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_);
transformer_ = std::make_unique<TransformerType>(
dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, 0, state_);
}
// TODO: implement tests for amendment block. requires more refactoring

View File

@@ -0,0 +1,33 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <functional>
struct FakeAmendmentBlockAction
{
std::reference_wrapper<std::size_t> callCount;
void
operator()()
{
++(callCount.get());
}
};

View File

@@ -160,10 +160,6 @@ private:
*/
struct SyncAsioContextTest : virtual public NoLoggerFixture
{
SyncAsioContextTest()
{
}
template <typename F>
void
runSpawn(F&& f)

View File

@@ -0,0 +1,27 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <gmock/gmock.h>
struct MockAmendmentBlockHandler
{
MOCK_METHOD(void, onAmendmentBlock, (), ());
};