mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 19:56:00 +00:00
@@ -297,7 +297,7 @@ BackendInterface::fetchLedgerPage(
|
||||
std::uint32_t const limit,
|
||||
bool outOfOrder,
|
||||
boost::asio::yield_context yield
|
||||
) const
|
||||
)
|
||||
{
|
||||
LedgerPage page;
|
||||
|
||||
@@ -326,6 +326,9 @@ BackendInterface::fetchLedgerPage(
|
||||
msg << " - " << ripple::strHex(keys[j]);
|
||||
}
|
||||
LOG(gLog.error()) << msg.str();
|
||||
|
||||
if (corruptionDetector_.has_value())
|
||||
corruptionDetector_->onCorruptionDetected();
|
||||
}
|
||||
}
|
||||
if (!keys.empty() && !reachedEnd)
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "data/DBHelpers.hpp"
|
||||
#include "data/LedgerCache.hpp"
|
||||
#include "data/Types.hpp"
|
||||
#include "etl/CorruptionDetector.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <boost/asio/executor_work_guard.hpp>
|
||||
@@ -44,6 +45,7 @@
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace data {
|
||||
@@ -138,6 +140,7 @@ protected:
|
||||
mutable std::shared_mutex rngMtx_;
|
||||
std::optional<LedgerRange> range;
|
||||
LedgerCache cache_;
|
||||
std::optional<etl::CorruptionDetector<LedgerCache>> corruptionDetector_;
|
||||
|
||||
public:
|
||||
BackendInterface() = default;
|
||||
@@ -162,6 +165,17 @@ public:
|
||||
return cache_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Sets the corruption detector.
|
||||
*
|
||||
* @param detector The corruption detector to set
|
||||
*/
|
||||
void
|
||||
setCorruptionDetector(etl::CorruptionDetector<LedgerCache> detector)
|
||||
{
|
||||
corruptionDetector_ = std::move(detector);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Fetches a specific ledger by sequence number.
|
||||
*
|
||||
@@ -435,7 +449,7 @@ public:
|
||||
std::uint32_t limit,
|
||||
bool outOfOrder,
|
||||
boost::asio::yield_context yield
|
||||
) const;
|
||||
);
|
||||
|
||||
/**
|
||||
* @brief Fetches the successor object.
|
||||
|
||||
@@ -45,6 +45,7 @@ LedgerCache::waitUntilCacheContainsSeq(uint32_t seq)
|
||||
{
|
||||
if (disabled_)
|
||||
return;
|
||||
|
||||
std::unique_lock lock(mtx_);
|
||||
cv_.wait(lock, [this, seq] { return latestSeq_ >= seq; });
|
||||
return;
|
||||
@@ -89,8 +90,9 @@ LedgerCache::update(std::vector<LedgerObject> const& objs, uint32_t seq, bool is
|
||||
std::optional<LedgerObject>
|
||||
LedgerCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
|
||||
{
|
||||
if (!full_)
|
||||
if (disabled_ or not full_)
|
||||
return {};
|
||||
|
||||
std::shared_lock const lck{mtx_};
|
||||
++successorReqCounter_.get();
|
||||
if (seq != latestSeq_)
|
||||
@@ -105,8 +107,9 @@ LedgerCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
|
||||
std::optional<LedgerObject>
|
||||
LedgerCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const
|
||||
{
|
||||
if (!full_)
|
||||
if (disabled_ or not full_)
|
||||
return {};
|
||||
|
||||
std::shared_lock const lck{mtx_};
|
||||
if (seq != latestSeq_)
|
||||
return {};
|
||||
@@ -120,6 +123,9 @@ LedgerCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const
|
||||
std::optional<Blob>
|
||||
LedgerCache::get(ripple::uint256 const& key, uint32_t seq) const
|
||||
{
|
||||
if (disabled_)
|
||||
return {};
|
||||
|
||||
std::shared_lock const lck{mtx_};
|
||||
if (seq > latestSeq_)
|
||||
return {};
|
||||
@@ -139,6 +145,12 @@ LedgerCache::setDisabled()
|
||||
disabled_ = true;
|
||||
}
|
||||
|
||||
bool
|
||||
LedgerCache::isDisabled() const
|
||||
{
|
||||
return disabled_;
|
||||
}
|
||||
|
||||
void
|
||||
LedgerCache::setFull()
|
||||
{
|
||||
|
||||
@@ -133,6 +133,12 @@ public:
|
||||
void
|
||||
setDisabled();
|
||||
|
||||
/**
|
||||
* @return true if the cache is disabled; false otherwise
|
||||
*/
|
||||
bool
|
||||
isDisabled() const;
|
||||
|
||||
/**
|
||||
* @brief Sets the full flag to true.
|
||||
*
|
||||
|
||||
67
src/etl/CorruptionDetector.hpp
Normal file
67
src/etl/CorruptionDetector.hpp
Normal file
@@ -0,0 +1,67 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etl/SystemState.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <functional>
|
||||
|
||||
namespace etl {
|
||||
|
||||
/**
|
||||
* @brief A helper to notify Clio operator about a corruption in the DB
|
||||
*
|
||||
* @tparam CacheType The type of the cache to disable on corruption
|
||||
*/
|
||||
template <typename CacheType>
|
||||
class CorruptionDetector {
|
||||
std::reference_wrapper<SystemState> state_;
|
||||
std::reference_wrapper<CacheType> cache_;
|
||||
|
||||
util::Logger log_{"ETL"};
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a new Corruption Detector object
|
||||
*
|
||||
* @param state The system state
|
||||
* @param cache The cache to disable on corruption
|
||||
*/
|
||||
CorruptionDetector(SystemState& state, CacheType& cache) : state_{std::ref(state)}, cache_{std::ref(cache)}
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Notify the operator about a corruption in the DB.
|
||||
*/
|
||||
void
|
||||
onCorruptionDetected()
|
||||
{
|
||||
if (not state_.get().isCorruptionDetected) {
|
||||
state_.get().isCorruptionDetected = true;
|
||||
|
||||
LOG(log_.error()) << "Disabling the cache to avoid corrupting the DB further. Please investigate.";
|
||||
cache_.get().setDisabled();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace etl
|
||||
@@ -20,6 +20,8 @@
|
||||
#include "etl/ETLService.hpp"
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "data/LedgerCache.hpp"
|
||||
#include "etl/CorruptionDetector.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/Constants.hpp"
|
||||
#include "util/config/Config.hpp"
|
||||
@@ -279,5 +281,8 @@ ETLService::ETLService(
|
||||
state_.isReadOnly = config.valueOr("read_only", static_cast<bool>(state_.isReadOnly));
|
||||
extractorThreads_ = config.valueOr<uint32_t>("extractor_threads", extractorThreads_);
|
||||
txnThreshold_ = config.valueOr<size_t>("txn_threshold", txnThreshold_);
|
||||
|
||||
// This should probably be done in the backend factory but we don't have state available until here
|
||||
backend_->setCorruptionDetector(CorruptionDetector<data::LedgerCache>{state_, backend->cache()});
|
||||
}
|
||||
} // namespace etl
|
||||
|
||||
@@ -201,6 +201,17 @@ public:
|
||||
return state_.isAmendmentBlocked;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check whether Clio detected DB corruptions.
|
||||
*
|
||||
* @return true if corruption of DB was detected and cache was stopped.
|
||||
*/
|
||||
bool
|
||||
isCorruptionDetected() const
|
||||
{
|
||||
return state_.isCorruptionDetected;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get state of ETL as a JSON object
|
||||
*
|
||||
|
||||
@@ -65,6 +65,18 @@ struct SystemState {
|
||||
util::prometheus::Labels{},
|
||||
"Whether clio detected an amendment block"
|
||||
);
|
||||
|
||||
/**
|
||||
* @brief Whether clio detected a corruption that needs manual attention.
|
||||
*
|
||||
* When corruption is detected, Clio should disable cache and stop the cache loading process in order to prevent
|
||||
* further corruption.
|
||||
*/
|
||||
util::prometheus::Bool isCorruptionDetected = PrometheusService::boolMetric(
|
||||
"etl_corruption_detected",
|
||||
util::prometheus::Labels{},
|
||||
"Whether clio detected a corruption that needs manual attention"
|
||||
);
|
||||
};
|
||||
|
||||
} // namespace etl
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/context/detail/config.hpp>
|
||||
#include <ripple/basics/Blob.h>
|
||||
#include <ripple/basics/base_uint.h>
|
||||
#include <ripple/basics/strHex.h>
|
||||
@@ -112,7 +113,7 @@ private:
|
||||
spawnWorker(uint32_t const seq, size_t cachePageFetchSize)
|
||||
{
|
||||
return ctx_.execute([this, seq, cachePageFetchSize](auto token) {
|
||||
while (not token.isStopRequested()) {
|
||||
while (not token.isStopRequested() and not cache_.get().isDisabled()) {
|
||||
auto cursor = queue_.tryPop();
|
||||
if (not cursor.has_value()) {
|
||||
return; // queue is empty
|
||||
@@ -121,7 +122,7 @@ private:
|
||||
auto [start, end] = cursor.value();
|
||||
LOG(log_.debug()) << "Starting a cursor: " << ripple::strHex(start);
|
||||
|
||||
while (not token.isStopRequested()) {
|
||||
while (not token.isStopRequested() and not cache_.get().isDisabled()) {
|
||||
auto res = data::retryOnTimeout([this, seq, cachePageFetchSize, &start, token]() {
|
||||
return backend_->fetchLedgerPage(start, seq, cachePageFetchSize, false, token);
|
||||
});
|
||||
|
||||
@@ -111,6 +111,7 @@ public:
|
||||
*/
|
||||
struct CacheSection {
|
||||
std::size_t size = 0;
|
||||
bool isEnabled = false;
|
||||
bool isFull = false;
|
||||
ripple::LedgerIndex latestLedgerSeq = {};
|
||||
float objectHitRate = 1.0;
|
||||
@@ -132,6 +133,7 @@ public:
|
||||
ValidatedLedgerSection validatedLedger = {};
|
||||
CacheSection cache = {};
|
||||
bool isAmendmentBlocked = false;
|
||||
bool isCorruptionDetected = false;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -241,8 +243,10 @@ public:
|
||||
output.info.cache.latestLedgerSeq = backend_->cache().latestLedgerSequence();
|
||||
output.info.cache.objectHitRate = backend_->cache().getObjectHitRate();
|
||||
output.info.cache.successorHitRate = backend_->cache().getSuccessorHitRate();
|
||||
output.info.cache.isEnabled = not backend_->cache().isDisabled();
|
||||
output.info.uptime = counters_.get().uptime();
|
||||
output.info.isAmendmentBlocked = etl_->isAmendmentBlocked();
|
||||
output.info.isCorruptionDetected = etl_->isCorruptionDetected();
|
||||
|
||||
return output;
|
||||
}
|
||||
@@ -279,6 +283,9 @@ private:
|
||||
if (info.isAmendmentBlocked)
|
||||
jv.as_object()[JS(amendment_blocked)] = true;
|
||||
|
||||
if (info.isCorruptionDetected)
|
||||
jv.as_object()["corruption_detected"] = true;
|
||||
|
||||
if (info.rippledInfo) {
|
||||
auto const& rippledInfo = info.rippledInfo.value();
|
||||
|
||||
@@ -320,6 +327,7 @@ private:
|
||||
{
|
||||
jv = {
|
||||
{"size", cache.size},
|
||||
{"is_enabled", cache.isEnabled},
|
||||
{"is_full", cache.isFull},
|
||||
{"latest_ledger_seq", cache.latestLedgerSeq},
|
||||
{"object_hit_rate", cache.objectHitRate},
|
||||
|
||||
@@ -21,6 +21,7 @@ target_sources(
|
||||
etl/CursorFromAccountProviderTests.cpp
|
||||
etl/CursorFromDiffProviderTests.cpp
|
||||
etl/CursorFromFixDiffNumProviderTests.cpp
|
||||
etl/CorruptionDetectorTests.cpp
|
||||
etl/ETLStateTests.cpp
|
||||
etl/ExtractionDataPipeTests.cpp
|
||||
etl/ExtractorTests.cpp
|
||||
|
||||
@@ -17,15 +17,22 @@
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etl/CorruptionDetector.hpp"
|
||||
#include "etl/SystemState.hpp"
|
||||
#include "util/Fixtures.hpp"
|
||||
#include "util/MockPrometheus.hpp"
|
||||
#include "util/TestObject.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <ripple/basics/Blob.h>
|
||||
#include <ripple/basics/XRPAmount.h>
|
||||
#include <ripple/basics/base_uint.h>
|
||||
#include <ripple/protocol/Indexes.h>
|
||||
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
|
||||
using namespace data;
|
||||
using namespace util::prometheus;
|
||||
using namespace testing;
|
||||
@@ -72,3 +79,60 @@ TEST_F(BackendInterfaceTest, FetchFeesLegacySuccessPath)
|
||||
EXPECT_EQ(fees->reserve, XRPAmount(3));
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(BackendInterfaceTest, FetchLedgerPageSuccessPath)
|
||||
{
|
||||
using namespace ripple;
|
||||
backend->setRange(MINSEQ, MAXSEQ);
|
||||
|
||||
auto state = etl::SystemState{};
|
||||
backend->setCorruptionDetector(etl::CorruptionDetector{state, backend->cache()});
|
||||
|
||||
EXPECT_FALSE(backend->cache().isDisabled());
|
||||
EXPECT_CALL(*backend, doFetchSuccessorKey(_, _, _))
|
||||
.Times(10)
|
||||
.WillRepeatedly(Return(uint256{"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"}));
|
||||
EXPECT_CALL(*backend, doFetchLedgerObjects(_, _, _)).WillOnce(Return(std::vector<Blob>(10, Blob{'s'})));
|
||||
|
||||
runSpawn([this](auto yield) { backend->fetchLedgerPage(std::nullopt, MAXSEQ, 10, false, yield); });
|
||||
EXPECT_FALSE(backend->cache().isDisabled());
|
||||
}
|
||||
|
||||
TEST_F(BackendInterfaceTest, FetchLedgerPageDisablesCacheOnMissingData)
|
||||
{
|
||||
using namespace ripple;
|
||||
backend->setRange(MINSEQ, MAXSEQ);
|
||||
|
||||
auto state = etl::SystemState{};
|
||||
backend->setCorruptionDetector(etl::CorruptionDetector{state, backend->cache()});
|
||||
|
||||
EXPECT_FALSE(backend->cache().isDisabled());
|
||||
EXPECT_CALL(*backend, doFetchSuccessorKey(_, _, _))
|
||||
.Times(10)
|
||||
.WillRepeatedly(Return(uint256{"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"}));
|
||||
EXPECT_CALL(*backend, doFetchLedgerObjects(_, _, _))
|
||||
.WillOnce(Return(std::vector<Blob>{
|
||||
Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{}
|
||||
}));
|
||||
|
||||
runSpawn([this](auto yield) { backend->fetchLedgerPage(std::nullopt, MAXSEQ, 10, false, yield); });
|
||||
EXPECT_TRUE(backend->cache().isDisabled());
|
||||
}
|
||||
|
||||
TEST_F(BackendInterfaceTest, FetchLedgerPageWithoutCorruptionDetectorDoesNotDisableCacheOnMissingData)
|
||||
{
|
||||
using namespace ripple;
|
||||
backend->setRange(MINSEQ, MAXSEQ);
|
||||
|
||||
EXPECT_FALSE(backend->cache().isDisabled());
|
||||
EXPECT_CALL(*backend, doFetchSuccessorKey(_, _, _))
|
||||
.Times(10)
|
||||
.WillRepeatedly(Return(uint256{"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"}));
|
||||
EXPECT_CALL(*backend, doFetchLedgerObjects(_, _, _))
|
||||
.WillOnce(Return(std::vector<Blob>{
|
||||
Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{'s'}, Blob{}
|
||||
}));
|
||||
|
||||
runSpawn([this](auto yield) { backend->fetchLedgerPage(std::nullopt, MAXSEQ, 10, false, yield); });
|
||||
EXPECT_FALSE(backend->cache().isDisabled());
|
||||
}
|
||||
|
||||
@@ -114,6 +114,7 @@ TEST_P(ParametrizedCacheLoaderTest, LoadCacheWithDifferentSettings)
|
||||
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
|
||||
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
|
||||
|
||||
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
|
||||
EXPECT_CALL(cache, updateImp).Times(loops);
|
||||
EXPECT_CALL(cache, setFull).Times(1);
|
||||
|
||||
@@ -142,6 +143,7 @@ TEST_P(ParametrizedCacheLoaderTest, AutomaticallyCancelledAndAwaitedInDestructor
|
||||
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
|
||||
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
|
||||
|
||||
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
|
||||
EXPECT_CALL(cache, updateImp).Times(AtMost(loops));
|
||||
EXPECT_CALL(cache, setFull).Times(AtMost(1));
|
||||
|
||||
@@ -155,6 +157,35 @@ TEST_P(ParametrizedCacheLoaderTest, AutomaticallyCancelledAndAwaitedInDestructor
|
||||
// no loader.wait(): loader is immediately stopped and awaited in destructor
|
||||
}
|
||||
|
||||
TEST_P(ParametrizedCacheLoaderTest, CacheDisabledLeadsToCancellation)
|
||||
{
|
||||
auto const& settings = GetParam();
|
||||
auto const diffs = diffProvider.getLatestDiff();
|
||||
auto const loops = diffs.size() + 1;
|
||||
auto const keysSize = 1024;
|
||||
|
||||
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).WillRepeatedly(Return(diffs));
|
||||
EXPECT_CALL(*backend, doFetchSuccessorKey(_, SEQ, _)).Times(AtMost(keysSize * loops)).WillRepeatedly([this]() {
|
||||
return diffProvider.nextKey(keysSize);
|
||||
});
|
||||
|
||||
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
|
||||
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
|
||||
|
||||
EXPECT_CALL(cache, isDisabled).WillOnce(Return(false)).WillRepeatedly(Return(true));
|
||||
EXPECT_CALL(cache, updateImp).Times(AtMost(1));
|
||||
EXPECT_CALL(cache, setFull).Times(0);
|
||||
|
||||
async::CoroExecutionContext ctx{settings.numThreads};
|
||||
etl::impl::CursorFromFixDiffNumProvider const provider{backend, settings.numCacheDiffs};
|
||||
|
||||
etl::impl::CacheLoaderImpl<MockCache> loader{
|
||||
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
|
||||
};
|
||||
|
||||
loader.wait();
|
||||
}
|
||||
|
||||
//
|
||||
// Tests of public CacheLoader interface
|
||||
//
|
||||
@@ -176,6 +207,7 @@ TEST_F(CacheLoaderTest, SyncCacheLoaderWaitsTillFullyLoaded)
|
||||
.Times(loops)
|
||||
.WillRepeatedly(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
|
||||
|
||||
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
|
||||
EXPECT_CALL(cache, updateImp).Times(loops);
|
||||
EXPECT_CALL(cache, isFull).WillOnce(Return(false)).WillRepeatedly(Return(true));
|
||||
EXPECT_CALL(cache, setFull).Times(1);
|
||||
@@ -201,6 +233,7 @@ TEST_F(CacheLoaderTest, AsyncCacheLoaderCanBeStopped)
|
||||
.Times(AtMost(loops))
|
||||
.WillRepeatedly(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
|
||||
|
||||
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
|
||||
EXPECT_CALL(cache, updateImp).Times(AtMost(loops));
|
||||
EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false));
|
||||
EXPECT_CALL(cache, setFull).Times(AtMost(1));
|
||||
|
||||
47
unittests/etl/CorruptionDetectorTests.cpp
Normal file
47
unittests/etl/CorruptionDetectorTests.cpp
Normal file
@@ -0,0 +1,47 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etl/CorruptionDetector.hpp"
|
||||
#include "etl/SystemState.hpp"
|
||||
#include "util/Fixtures.hpp"
|
||||
#include "util/MockCache.hpp"
|
||||
#include "util/MockPrometheus.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
using namespace data;
|
||||
using namespace util::prometheus;
|
||||
using namespace testing;
|
||||
|
||||
struct CorruptionDetectorTest : NoLoggerFixture, WithPrometheus {};
|
||||
|
||||
TEST_F(CorruptionDetectorTest, DisableCacheOnCorruption)
|
||||
{
|
||||
using namespace ripple;
|
||||
auto state = etl::SystemState{};
|
||||
auto cache = MockCache{};
|
||||
auto detector = etl::CorruptionDetector{state, cache};
|
||||
|
||||
EXPECT_CALL(cache, setDisabled()).Times(1);
|
||||
|
||||
detector.onCorruptionDetected();
|
||||
|
||||
EXPECT_TRUE(state.isCorruptionDetected);
|
||||
}
|
||||
@@ -111,6 +111,7 @@ protected:
|
||||
EXPECT_TRUE(cache.contains("latest_ledger_seq"));
|
||||
EXPECT_TRUE(cache.contains("object_hit_rate"));
|
||||
EXPECT_TRUE(cache.contains("successor_hit_rate"));
|
||||
EXPECT_TRUE(cache.contains("is_enabled"));
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -141,19 +142,6 @@ protected:
|
||||
EXPECT_TRUE(info.contains("network_id"));
|
||||
EXPECT_EQ(info.at("network_id").as_int64(), 2);
|
||||
}
|
||||
|
||||
static void
|
||||
validateCacheOutput(rpc::ReturnType const& output)
|
||||
{
|
||||
auto const& result = output.result.value().as_object();
|
||||
auto const& info = result.at("info").as_object();
|
||||
auto const& cache = info.at("cache").as_object();
|
||||
EXPECT_EQ(cache.at("size").as_uint64(), 1u);
|
||||
EXPECT_EQ(cache.at("is_full").as_bool(), false);
|
||||
EXPECT_EQ(cache.at("latest_ledger_seq").as_uint64(), 30u);
|
||||
EXPECT_EQ(cache.at("object_hit_rate").as_double(), 1.0);
|
||||
EXPECT_EQ(cache.at("successor_hit_rate").as_double(), 1.0);
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(RPCServerInfoHandlerTest, NoLedgerInfoErrorsOutWithInternal)
|
||||
@@ -268,6 +256,87 @@ TEST_F(RPCServerInfoHandlerTest, AmendmentBlockedIsPresentIfSet)
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(RPCServerInfoHandlerTest, CorruptionDetectedIsPresentIfSet)
|
||||
{
|
||||
MockLoadBalancer* rawBalancerPtr = mockLoadBalancerPtr.get();
|
||||
MockCounters* rawCountersPtr = mockCountersPtr.get();
|
||||
MockETLService* rawETLServicePtr = mockETLServicePtr.get();
|
||||
|
||||
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 30, 3); // 3 seconds old
|
||||
EXPECT_CALL(*backend, fetchLedgerBySequence).WillOnce(Return(ledgerinfo));
|
||||
|
||||
auto const feeBlob = CreateLegacyFeeSettingBlob(1, 2, 3, 4, 0);
|
||||
EXPECT_CALL(*backend, doFetchLedgerObject).WillOnce(Return(feeBlob));
|
||||
|
||||
EXPECT_CALL(*rawBalancerPtr, forwardToRippled(testing::_, testing::Eq(CLIENTIP), testing::_))
|
||||
.WillOnce(Return(std::nullopt));
|
||||
|
||||
EXPECT_CALL(*rawCountersPtr, uptime).WillOnce(Return(std::chrono::seconds{1234}));
|
||||
|
||||
EXPECT_CALL(*rawETLServicePtr, isCorruptionDetected).WillOnce(Return(true));
|
||||
|
||||
auto const handler = AnyHandler{TestServerInfoHandler{
|
||||
backend, mockSubscriptionManagerPtr, mockLoadBalancerPtr, mockETLServicePtr, *mockCountersPtr
|
||||
}};
|
||||
|
||||
runSpawn([&](auto yield) {
|
||||
auto const req = json::parse("{}");
|
||||
auto const output = handler.process(req, Context{yield, {}, false, CLIENTIP});
|
||||
|
||||
validateNormalOutput(output);
|
||||
|
||||
auto const& info = output.result.value().as_object().at("info").as_object();
|
||||
EXPECT_TRUE(info.contains("corruption_detected"));
|
||||
EXPECT_EQ(info.at("corruption_detected").as_bool(), true);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(RPCServerInfoHandlerTest, CacheReportsEnabledFlagCorrectly)
|
||||
{
|
||||
MockLoadBalancer* rawBalancerPtr = mockLoadBalancerPtr.get();
|
||||
MockCounters* rawCountersPtr = mockCountersPtr.get();
|
||||
|
||||
auto const ledgerinfo = CreateLedgerInfo(LEDGERHASH, 30, 3); // 3 seconds old
|
||||
EXPECT_CALL(*backend, fetchLedgerBySequence).Times(2).WillRepeatedly(Return(ledgerinfo));
|
||||
|
||||
auto const feeBlob = CreateLegacyFeeSettingBlob(1, 2, 3, 4, 0);
|
||||
EXPECT_CALL(*backend, doFetchLedgerObject).Times(2).WillRepeatedly(Return(feeBlob));
|
||||
|
||||
EXPECT_CALL(*rawBalancerPtr, forwardToRippled(testing::_, testing::Eq(CLIENTIP), testing::_))
|
||||
.Times(2)
|
||||
.WillRepeatedly(Return(std::nullopt));
|
||||
|
||||
EXPECT_CALL(*rawCountersPtr, uptime).Times(2).WillRepeatedly(Return(std::chrono::seconds{1234}));
|
||||
|
||||
auto const handler = AnyHandler{TestServerInfoHandler{
|
||||
backend, mockSubscriptionManagerPtr, mockLoadBalancerPtr, mockETLServicePtr, *mockCountersPtr
|
||||
}};
|
||||
|
||||
runSpawn([&](auto yield) {
|
||||
auto const req = json::parse("{}");
|
||||
auto const output = handler.process(req, Context{yield, {}, false, CLIENTIP});
|
||||
|
||||
validateNormalOutput(output);
|
||||
|
||||
auto const& cache = output.result.value().as_object().at("info").as_object().at("cache").as_object();
|
||||
EXPECT_TRUE(cache.contains("is_enabled"));
|
||||
EXPECT_EQ(cache.at("is_enabled").as_bool(), true);
|
||||
});
|
||||
|
||||
backend->cache().setDisabled();
|
||||
|
||||
runSpawn([&](auto yield) {
|
||||
auto const req = json::parse("{}");
|
||||
auto const output = handler.process(req, Context{yield, {}, false, CLIENTIP});
|
||||
|
||||
validateNormalOutput(output);
|
||||
|
||||
auto const& cache = output.result.value().as_object().at("info").as_object().at("cache").as_object();
|
||||
EXPECT_TRUE(cache.contains("is_enabled"));
|
||||
EXPECT_EQ(cache.at("is_enabled").as_bool(), false);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(RPCServerInfoHandlerTest, AdminSectionPresentWhenAdminFlagIsSet)
|
||||
{
|
||||
MockLoadBalancer* rawBalancerPtr = mockLoadBalancerPtr.get();
|
||||
|
||||
@@ -270,76 +270,55 @@ using MockBackendTestStrict = MockBackendTestBase<::testing::StrictMock>;
|
||||
* @brief Fixture with a mock subscription manager
|
||||
*/
|
||||
struct MockSubscriptionManagerTest : virtual public NoLoggerFixture {
|
||||
void
|
||||
SetUp() override
|
||||
{
|
||||
mockSubscriptionManagerPtr = std::make_shared<MockSubscriptionManager>();
|
||||
}
|
||||
void
|
||||
TearDown() override
|
||||
{
|
||||
mockSubscriptionManagerPtr.reset();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::shared_ptr<MockSubscriptionManager> mockSubscriptionManagerPtr;
|
||||
std::shared_ptr<MockSubscriptionManager> mockSubscriptionManagerPtr = std::make_shared<MockSubscriptionManager>();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Fixture with a mock etl balancer
|
||||
*/
|
||||
struct MockLoadBalancerTest : virtual public NoLoggerFixture {
|
||||
void
|
||||
SetUp() override
|
||||
{
|
||||
mockLoadBalancerPtr = std::make_shared<MockLoadBalancer>();
|
||||
}
|
||||
void
|
||||
TearDown() override
|
||||
{
|
||||
mockLoadBalancerPtr.reset();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::shared_ptr<MockLoadBalancer> mockLoadBalancerPtr;
|
||||
std::shared_ptr<MockLoadBalancer> mockLoadBalancerPtr = std::make_shared<MockLoadBalancer>();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Fixture with a mock subscription manager
|
||||
*/
|
||||
struct MockETLServiceTest : virtual public NoLoggerFixture {
|
||||
void
|
||||
SetUp() override
|
||||
{
|
||||
mockETLServicePtr = std::make_shared<MockETLService>();
|
||||
}
|
||||
void
|
||||
TearDown() override
|
||||
{
|
||||
mockETLServicePtr.reset();
|
||||
}
|
||||
template <template <typename> typename MockType = ::testing::NiceMock>
|
||||
struct MockETLServiceTestBase : virtual public NoLoggerFixture {
|
||||
using Mock = MockType<MockETLService>;
|
||||
|
||||
protected:
|
||||
std::shared_ptr<MockETLService> mockETLServicePtr;
|
||||
std::shared_ptr<Mock> mockETLServicePtr = std::make_shared<Mock>();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Fixture with a "nice" ETLService mock.
|
||||
*
|
||||
* Use @see MockETLServiceTestNaggy during development to get unset call expectation warnings from the embeded mock.
|
||||
* Once the test is ready and you are happy you can switch to this fixture to mute the warnings.
|
||||
*/
|
||||
using MockETLServiceTest = MockETLServiceTestBase<::testing::NiceMock>;
|
||||
|
||||
/**
|
||||
* @brief Fixture with a "naggy" ETLService mock.
|
||||
*
|
||||
* Use this during development to get unset call expectation warnings from the embedded mock.
|
||||
*/
|
||||
using MockETLServiceTestNaggy = MockETLServiceTestBase<::testing::NaggyMock>;
|
||||
|
||||
/**
|
||||
* @brief Fixture with a "strict" ETLService mock.
|
||||
*/
|
||||
using MockETLServiceTestStrict = MockETLServiceTestBase<::testing::StrictMock>;
|
||||
|
||||
/**
|
||||
* @brief Fixture with mock counters
|
||||
*/
|
||||
struct MockCountersTest : virtual public NoLoggerFixture {
|
||||
void
|
||||
SetUp() override
|
||||
{
|
||||
mockCountersPtr = std::make_shared<MockCounters>();
|
||||
}
|
||||
void
|
||||
TearDown() override
|
||||
{
|
||||
mockCountersPtr.reset();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::shared_ptr<MockCounters> mockCountersPtr;
|
||||
std::shared_ptr<MockCounters> mockCountersPtr = std::make_shared<MockCounters>();
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -352,20 +331,20 @@ template <template <typename> typename MockType = ::testing::NiceMock>
|
||||
struct HandlerBaseTestBase : public MockBackendTestBase<MockType>,
|
||||
public util::prometheus::WithPrometheus,
|
||||
public SyncAsioContextTest,
|
||||
public MockETLServiceTest {
|
||||
public MockETLServiceTestBase<MockType> {
|
||||
protected:
|
||||
void
|
||||
SetUp() override
|
||||
{
|
||||
MockBackendTestBase<MockType>::SetUp();
|
||||
SyncAsioContextTest::SetUp();
|
||||
MockETLServiceTest::SetUp();
|
||||
MockETLServiceTestBase<MockType>::SetUp();
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override
|
||||
{
|
||||
MockETLServiceTest::TearDown();
|
||||
MockETLServiceTestBase<MockType>::TearDown();
|
||||
SyncAsioContextTest::TearDown();
|
||||
MockBackendTestBase<MockType>::TearDown();
|
||||
}
|
||||
|
||||
@@ -48,6 +48,8 @@ struct MockCache {
|
||||
|
||||
MOCK_METHOD(void, setDisabled, (), ());
|
||||
|
||||
MOCK_METHOD(bool, isDisabled, (), (const));
|
||||
|
||||
MOCK_METHOD(void, setFull, (), ());
|
||||
|
||||
MOCK_METHOD(bool, isFull, (), (const));
|
||||
|
||||
@@ -35,5 +35,6 @@ struct MockETLService {
|
||||
MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const));
|
||||
MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const));
|
||||
MOCK_METHOD(bool, isAmendmentBlocked, (), (const));
|
||||
MOCK_METHOD(bool, isCorruptionDetected, (), (const));
|
||||
MOCK_METHOD(std::optional<etl::ETLState>, getETLState, (), (const));
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user