Separate unit tests and integration tests (#1393)

Fixes #1391
This commit is contained in:
Alex Kremer
2024-05-07 15:12:48 +01:00
committed by GitHub
parent 98ef83d470
commit cbc856b190
177 changed files with 168 additions and 106 deletions

View File

@@ -0,0 +1,54 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlock.hpp"
#include "util/FakeAmendmentBlockAction.hpp"
#include "util/Fixtures.hpp"
#include "util/MockPrometheus.hpp"
#include <boost/asio/io_context.hpp>
#include <gtest/gtest.h>
#include <chrono>
#include <cstddef>
#include <functional>
using namespace testing;
using namespace etl;
struct AmendmentBlockHandlerTest : util::prometheus::WithPrometheus, NoLoggerFixture {
using AmendmentBlockHandlerType = impl::AmendmentBlockHandler<FakeAmendmentBlockAction>;
boost::asio::io_context ioc_;
};
TEST_F(AmendmentBlockHandlerTest, CallToOnAmendmentBlockSetsStateAndRepeatedlyCallsAction)
{
std::size_t callCount = 0;
SystemState state;
AmendmentBlockHandlerType handler{ioc_, state, std::chrono::nanoseconds{1}, {std::ref(callCount)}};
EXPECT_FALSE(state.isAmendmentBlocked);
handler.onAmendmentBlock();
EXPECT_TRUE(state.isAmendmentBlocked);
ioc_.run_for(std::chrono::milliseconds{1});
EXPECT_TRUE(callCount >= 10);
}

View File

@@ -0,0 +1,107 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/CacheLoaderSettings.hpp"
#include "util/config/Config.hpp"
#include <boost/json/parse.hpp>
#include <gtest/gtest.h>
namespace json = boost::json;
using namespace etl;
using namespace testing;
struct CacheLoaderSettingsTest : Test {};
TEST_F(CacheLoaderSettingsTest, DefaultSettingsParsedCorrectly)
{
auto const cfg = util::Config{json::parse(R"({})")};
auto const settings = make_CacheLoaderSettings(cfg);
auto const defaults = CacheLoaderSettings{};
EXPECT_EQ(settings, defaults);
}
TEST_F(CacheLoaderSettingsTest, NumThreadsCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"io_threads": 42})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.numThreads, 42);
}
TEST_F(CacheLoaderSettingsTest, NumDiffsCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"num_diffs": 42}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.numCacheDiffs, 42);
}
TEST_F(CacheLoaderSettingsTest, NumMarkersCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"num_markers": 42}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.numCacheMarkers, 42);
}
TEST_F(CacheLoaderSettingsTest, PageFetchSizeCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"page_fetch_size": 42}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.cachePageFetchSize, 42);
}
TEST_F(CacheLoaderSettingsTest, SyncLoadStyleCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "sYNC"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::SYNC);
EXPECT_TRUE(settings.isSync());
}
TEST_F(CacheLoaderSettingsTest, AsyncLoadStyleCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "aSynC"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::ASYNC);
EXPECT_TRUE(settings.isAsync());
}
TEST_F(CacheLoaderSettingsTest, NoLoadStyleCorrectlyPropagatedThroughConfig)
{
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "nONe"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::NONE);
EXPECT_TRUE(settings.isDisabled());
}
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "nO"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::NONE);
EXPECT_TRUE(settings.isDisabled());
}
}

View File

@@ -0,0 +1,256 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/Types.hpp"
#include "etl/CacheLoader.hpp"
#include "etl/CacheLoaderSettings.hpp"
#include "etl/FakeDiffProvider.hpp"
#include "etl/impl/CacheLoader.hpp"
#include "util/Fixtures.hpp"
#include "util/MockCache.hpp"
#include "util/MockPrometheus.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/config/Config.hpp"
#include <boost/json/parse.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <vector>
namespace json = boost::json;
using namespace etl;
using namespace util;
using namespace data;
using namespace testing;
namespace {
constexpr auto SEQ = 30;
struct CacheLoaderTest : util::prometheus::WithPrometheus, MockBackendTest {
void
SetUp() override
{
MockBackendTest::SetUp();
}
void
TearDown() override
{
MockBackendTest::TearDown();
}
protected:
DiffProvider diffProvider;
MockCache cache;
};
using Settings = etl::CacheLoaderSettings;
struct ParametrizedCacheLoaderTest : CacheLoaderTest, WithParamInterface<Settings> {};
}; // namespace
//
// Tests of implementation
//
INSTANTIATE_TEST_CASE_P(
CacheLoaderTest,
ParametrizedCacheLoaderTest,
Values(
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 2},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 4},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 8},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 16},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 128, .cachePageFetchSize = 24, .numThreads = 2},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 64, .cachePageFetchSize = 48, .numThreads = 4},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 64, .numThreads = 8},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 24, .cachePageFetchSize = 128, .numThreads = 16},
Settings{.numCacheDiffs = 128, .numCacheMarkers = 128, .cachePageFetchSize = 24, .numThreads = 2},
Settings{.numCacheDiffs = 1024, .numCacheMarkers = 64, .cachePageFetchSize = 48, .numThreads = 4},
Settings{.numCacheDiffs = 512, .numCacheMarkers = 48, .cachePageFetchSize = 64, .numThreads = 8},
Settings{.numCacheDiffs = 64, .numCacheMarkers = 24, .cachePageFetchSize = 128, .numThreads = 16}
),
[](auto const& info) {
auto const settings = info.param;
return fmt::format(
"diffs_{}__markers_{}__fetchSize_{}__threads_{}",
settings.numCacheDiffs,
settings.numCacheMarkers,
settings.cachePageFetchSize,
settings.numThreads
);
}
);
TEST_P(ParametrizedCacheLoaderTest, LoadCacheWithDifferentSettings)
{
auto const& settings = GetParam();
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 14;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey(_, SEQ, _)).Times(keysSize * loops).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
EXPECT_CALL(cache, updateImp).Times(loops);
EXPECT_CALL(cache, setFull).Times(1);
async::CoroExecutionContext ctx{settings.numThreads};
etl::impl::CursorFromFixDiffNumProvider const provider{backend, settings.numCacheDiffs};
etl::impl::CacheLoaderImpl<MockCache> loader{
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
};
loader.wait();
}
TEST_P(ParametrizedCacheLoaderTest, AutomaticallyCancelledAndAwaitedInDestructor)
{
auto const& settings = GetParam();
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 1024;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey(_, SEQ, _)).Times(AtMost(keysSize * loops)).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
EXPECT_CALL(cache, updateImp).Times(AtMost(loops));
EXPECT_CALL(cache, setFull).Times(AtMost(1));
async::CoroExecutionContext ctx{settings.numThreads};
etl::impl::CursorFromFixDiffNumProvider const provider{backend, settings.numCacheDiffs};
etl::impl::CacheLoaderImpl<MockCache> const loader{
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
};
// no loader.wait(): loader is immediately stopped and awaited in destructor
}
TEST_P(ParametrizedCacheLoaderTest, CacheDisabledLeadsToCancellation)
{
auto const& settings = GetParam();
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 1024;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey(_, SEQ, _)).Times(AtMost(keysSize * loops)).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
EXPECT_CALL(cache, isDisabled).WillOnce(Return(false)).WillRepeatedly(Return(true));
EXPECT_CALL(cache, updateImp).Times(AtMost(1));
EXPECT_CALL(cache, setFull).Times(0);
async::CoroExecutionContext ctx{settings.numThreads};
etl::impl::CursorFromFixDiffNumProvider const provider{backend, settings.numCacheDiffs};
etl::impl::CacheLoaderImpl<MockCache> loader{
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
};
loader.wait();
}
//
// Tests of public CacheLoader interface
//
TEST_F(CacheLoaderTest, SyncCacheLoaderWaitsTillFullyLoaded)
{
auto const cfg = util::Config(json::parse(R"({"cache": {"load": "sync"}})"));
CacheLoader loader{cfg, backend, cache};
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 14;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(32).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey).Times(keysSize * loops).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.Times(loops)
.WillRepeatedly(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
EXPECT_CALL(cache, updateImp).Times(loops);
EXPECT_CALL(cache, isFull).WillOnce(Return(false)).WillRepeatedly(Return(true));
EXPECT_CALL(cache, setFull).Times(1);
loader.load(SEQ);
}
TEST_F(CacheLoaderTest, AsyncCacheLoaderCanBeStopped)
{
auto const cfg = util::Config(json::parse(R"({"cache": {"load": "async"}})"));
CacheLoader loader{cfg, backend, cache};
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 14;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(AtMost(32)).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey).Times(AtMost(keysSize * loops)).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.Times(AtMost(loops))
.WillRepeatedly(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
EXPECT_CALL(cache, isDisabled).WillRepeatedly(Return(false));
EXPECT_CALL(cache, updateImp).Times(AtMost(loops));
EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false));
EXPECT_CALL(cache, setFull).Times(AtMost(1));
loader.load(SEQ);
loader.stop();
loader.wait();
}
TEST_F(CacheLoaderTest, DisabledCacheLoaderDoesNotLoadCache)
{
auto cfg = util::Config(json::parse(R"({"cache": {"load": "none"}})"));
CacheLoader loader{cfg, backend, cache};
EXPECT_CALL(cache, updateImp).Times(0);
EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false));
EXPECT_CALL(cache, setDisabled).Times(1);
loader.load(SEQ);
}

View File

@@ -0,0 +1,47 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/CorruptionDetector.hpp"
#include "etl/SystemState.hpp"
#include "util/Fixtures.hpp"
#include "util/MockCache.hpp"
#include "util/MockPrometheus.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
using namespace data;
using namespace util::prometheus;
using namespace testing;
struct CorruptionDetectorTest : NoLoggerFixture, WithPrometheus {};
TEST_F(CorruptionDetectorTest, DisableCacheOnCorruption)
{
using namespace ripple;
auto state = etl::SystemState{};
auto cache = MockCache{};
auto detector = etl::CorruptionDetector{state, cache};
EXPECT_CALL(cache, setDisabled()).Times(1);
detector.onCorruptionDetected();
EXPECT_TRUE(state.isCorruptionDetected);
}

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/Types.hpp"
#include "etl/impl/CursorFromAccountProvider.hpp"
#include "util/Fixtures.hpp"
#include "util/MockPrometheus.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <ripple/basics/base_uint.h>
#include <vector>
using namespace etl;
using namespace util;
using namespace data;
using namespace testing;
namespace {
constexpr auto SEQ = 30;
std::vector<ripple::uint256> const ACCOUNTROOTS = {
ripple::uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"},
ripple::uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD130B"},
ripple::uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E18"},
ripple::uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F04"},
ripple::uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"},
ripple::uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E01"},
ripple::uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B38E"},
ripple::uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE441"},
ripple::uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CEDF"},
};
struct CursorFromAccountProviderTests : util::prometheus::WithPrometheus, MockBackendTestNaggy {};
} // namespace
TEST_F(CursorFromAccountProviderTests, EnoughAccountRoots)
{
auto const numCursors = 9;
auto const pageSize = 100;
auto const provider = etl::impl::CursorFromAccountProvider{backend, numCursors, pageSize};
ON_CALL(*backend, fetchAccountRoots(numCursors, _, SEQ, _)).WillByDefault(Return(ACCOUNTROOTS));
EXPECT_CALL(*backend, fetchAccountRoots(_, _, _, _)).Times(1);
auto const cursors = provider.getCursors(SEQ);
ASSERT_EQ(cursors.size(), numCursors + 1);
EXPECT_EQ(cursors.front().start, firstKey);
EXPECT_EQ(cursors.back().end, lastKey);
}

View File

@@ -0,0 +1,108 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/Types.hpp"
#include "etl/impl/CursorFromDiffProvider.hpp"
#include "util/Fixtures.hpp"
#include "util/MockPrometheus.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <ripple/basics/base_uint.h>
#include <vector>
using namespace etl;
using namespace util;
using namespace data;
using namespace testing;
namespace {
constexpr auto SEQ = 30;
std::vector<data::LedgerObject> const DIFFS_FOR_SEQ = {
{.key = ripple::uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"}, .blob = Blob{}
}, // This object is removed in SEQ while it exists in SEQ-1
{.key = ripple::uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD130B"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E18"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F04"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E01"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B38E"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE441"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CEDF"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"BC0DAE09C0BFBC4A49AA94B849266588BFD6E1F554B184B5788AC55D6E07EB95"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"DCC8759A35CB946511763AA5553A82AA25F20B901C98C9BB74D423BCFAFF5F9D"}, .blob = Blob{'s'}},
};
std::vector<data::LedgerObject> const DIFFS_FOR_SEQ_MINUS1 = {
{.key = ripple::uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD1301"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E12"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F03"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"}, .blob = Blob{'s'}
}, // This object is changed in both SEQ and SEQ-1
{.key = ripple::uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E05"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B386"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE447"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CED8"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"BC0DAE09C0BFBC4A49AA94B849266588BFD6E1F554B184B5788AC55D6E07EB99"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"DCC8759A35CB946511763AA5553A82AA25F20B901C98C9BB74D423BCFAFF5F90"}, .blob = Blob{'s'}},
};
struct CursorFromDiffProviderTests : util::prometheus::WithPrometheus, MockBackendTestNaggy {};
} // namespace
TEST_F(CursorFromDiffProviderTests, MultipleDiffs)
{
auto const numCursors = 15;
auto const provider = etl::impl::CursorFromDiffProvider{backend, numCursors};
backend->setRange(SEQ - 10, SEQ);
ON_CALL(*backend, fetchLedgerDiff(SEQ, _)).WillByDefault(Return(DIFFS_FOR_SEQ));
ON_CALL(*backend, fetchLedgerDiff(SEQ - 1, _)).WillByDefault(Return(DIFFS_FOR_SEQ_MINUS1));
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(2);
auto const cursors = provider.getCursors(SEQ);
ASSERT_EQ(cursors.size(), numCursors + 1);
EXPECT_EQ(cursors.front().start, firstKey);
EXPECT_EQ(cursors.back().end, lastKey);
}
TEST_F(CursorFromDiffProviderTests, NotEnoughDiffs)
{
auto const numCursors = 35;
auto const provider = etl::impl::CursorFromDiffProvider{backend, numCursors};
auto const AVAILABLE_DIFFS = 10;
backend->setRange(SEQ - AVAILABLE_DIFFS + 1, SEQ);
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(std::vector<data::LedgerObject>{}));
ON_CALL(*backend, fetchLedgerDiff(SEQ, _)).WillByDefault(Return(DIFFS_FOR_SEQ));
ON_CALL(*backend, fetchLedgerDiff(SEQ - 1, _)).WillByDefault(Return(DIFFS_FOR_SEQ_MINUS1));
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(AVAILABLE_DIFFS);
auto const cursors = provider.getCursors(SEQ);
auto const removed = 2; // lost 2 objects because it is removed.
auto const repeated = 1; // repeated 1 object
ASSERT_EQ(cursors.size(), DIFFS_FOR_SEQ.size() + DIFFS_FOR_SEQ_MINUS1.size() - removed - repeated + 1);
EXPECT_EQ(cursors.front().start, firstKey);
EXPECT_EQ(cursors.back().end, lastKey);
}

View File

@@ -0,0 +1,86 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/Types.hpp"
#include "etl/FakeDiffProvider.hpp"
#include "etl/impl/CursorFromFixDiffNumProvider.hpp"
#include "util/Fixtures.hpp"
#include "util/MockPrometheus.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <cstddef>
using namespace etl;
using namespace util;
using namespace data;
using namespace testing;
namespace {
constexpr auto SEQ = 30;
struct CursorProviderTest : util::prometheus::WithPrometheus, MockBackendTestNaggy {
DiffProvider diffProvider;
};
struct ParametrizedCursorProviderTest : CursorProviderTest, WithParamInterface<std::size_t> {};
INSTANTIATE_TEST_CASE_P(
CursorProviderTest,
ParametrizedCursorProviderTest,
Values(32, 64, 128, 512, 1024, 3, 2, 1),
[](auto const& info) {
auto const diffs = info.param;
return fmt::format("diffs_{}", diffs);
}
);
}; // namespace
TEST_P(ParametrizedCursorProviderTest, GetCursorsWithDifferentProviderSettings)
{
auto const numDiffs = GetParam();
auto const diffs = diffProvider.getLatestDiff();
auto const provider = etl::impl::CursorFromFixDiffNumProvider{backend, numDiffs};
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(diffs));
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(numDiffs);
auto const cursors = provider.getCursors(SEQ);
ASSERT_EQ(cursors.size(), diffs.size() + 1);
EXPECT_EQ(cursors.front().start, firstKey);
EXPECT_EQ(cursors.back().end, lastKey);
}
TEST_F(CursorProviderTest, EmptyCursorIsHandledCorrectly)
{
auto const diffs = diffProvider.getLatestDiff();
auto const provider = etl::impl::CursorFromFixDiffNumProvider{backend, 0};
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(diffs));
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(0);
auto const cursors = provider.getCursors(SEQ);
ASSERT_EQ(cursors.size(), 1);
EXPECT_EQ(cursors.front().start, firstKey);
EXPECT_EQ(cursors.back().end, lastKey);
}

View File

@@ -0,0 +1,78 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/ETLState.hpp"
#include "util/Fixtures.hpp"
#include "util/MockSource.hpp"
#include <boost/json/parse.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <optional>
namespace json = boost::json;
using namespace util;
using namespace testing;
struct ETLStateTest : public NoLoggerFixture {
MockSource source = MockSource{};
};
TEST_F(ETLStateTest, Error)
{
EXPECT_CALL(source, forwardToRippled).WillOnce(Return(std::nullopt));
auto const state = etl::ETLState::fetchETLStateFromSource(source);
EXPECT_FALSE(state);
}
TEST_F(ETLStateTest, NetworkIdValid)
{
auto const json = json::parse(
R"JSON({
"result": {
"info": {
"network_id": 12
}
}
})JSON"
);
EXPECT_CALL(source, forwardToRippled).WillOnce(Return(json.as_object()));
auto const state = etl::ETLState::fetchETLStateFromSource(source);
ASSERT_TRUE(state.has_value());
ASSERT_TRUE(state->networkID.has_value());
EXPECT_EQ(state->networkID.value(), 12);
}
TEST_F(ETLStateTest, NetworkIdInvalid)
{
auto const json = json::parse(
R"JSON({
"result": {
"info": {
"network_id2": 12
}
}
})JSON"
);
EXPECT_CALL(source, forwardToRippled).WillOnce(Return(json.as_object()));
auto const state = etl::ETLState::fetchETLStateFromSource(source);
ASSERT_TRUE(state.has_value());
EXPECT_FALSE(state->networkID.has_value());
}

View File

@@ -0,0 +1,83 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/ExtractionDataPipe.hpp"
#include "util/Fixtures.hpp"
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <thread>
constexpr static auto STRIDE = 4;
constexpr static auto START_SEQ = 1234;
class ETLExtractionDataPipeTest : public NoLoggerFixture {
protected:
etl::impl::ExtractionDataPipe<uint32_t> pipe_{STRIDE, START_SEQ};
};
TEST_F(ETLExtractionDataPipeTest, StrideMatchesInput)
{
EXPECT_EQ(pipe_.getStride(), STRIDE);
}
TEST_F(ETLExtractionDataPipeTest, PushedDataCanBeRetrievedAndMatchesOriginal)
{
for (std::size_t i = 0; i < 8; ++i)
pipe_.push(START_SEQ + i, START_SEQ + i);
for (std::size_t i = 0; i < 8; ++i) {
auto const data = pipe_.popNext(START_SEQ + i);
EXPECT_EQ(data.value(), START_SEQ + i);
}
}
TEST_F(ETLExtractionDataPipeTest, CallingFinishPushesAnEmptyOptional)
{
for (std::size_t i = 0; i < 4; ++i)
pipe_.finish(START_SEQ + i);
for (std::size_t i = 0; i < 4; ++i) {
auto const data = pipe_.popNext(START_SEQ + i);
EXPECT_FALSE(data.has_value());
}
}
TEST_F(ETLExtractionDataPipeTest, CallingCleanupUnblocksOtherThread)
{
std::atomic_bool unblocked = false;
auto bgThread = std::thread([this, &unblocked] {
for (std::size_t i = 0; i < 252; ++i)
pipe_.push(START_SEQ, 1234); // 251st element will block this thread here
unblocked = true;
});
// emulate waiting for above thread to push and get blocked
std::this_thread::sleep_for(std::chrono::milliseconds{100});
EXPECT_FALSE(unblocked);
pipe_.cleanup();
bgThread.join();
EXPECT_TRUE(unblocked);
}

View File

@@ -0,0 +1,164 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/SystemState.hpp"
#include "etl/impl/Extractor.hpp"
#include "util/FakeFetchResponse.hpp"
#include "util/Fixtures.hpp"
#include "util/MockExtractionDataPipe.hpp"
#include "util/MockLedgerFetcher.hpp"
#include "util/MockNetworkValidatedLedgers.hpp"
#include "util/MockPrometheus.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <memory>
#include <optional>
using namespace testing;
using namespace etl;
struct ETLExtractorTest : util::prometheus::WithPrometheus, NoLoggerFixture {
using ExtractionDataPipeType = MockExtractionDataPipe;
using LedgerFetcherType = MockLedgerFetcher;
using ExtractorType = etl::impl::Extractor<ExtractionDataPipeType, MockNetworkValidatedLedgers, LedgerFetcherType>;
ExtractionDataPipeType dataPipe_;
std::shared_ptr<MockNetworkValidatedLedgers> networkValidatedLedgers_ =
std::make_shared<MockNetworkValidatedLedgers>();
LedgerFetcherType ledgerFetcher_;
SystemState state_;
std::unique_ptr<ExtractorType> extractor_;
void
SetUp() override
{
state_.isStopping = false;
state_.writeConflict = false;
state_.isReadOnly = false;
state_.isWriting = false;
}
void
TearDown() override
{
extractor_.reset();
}
};
TEST_F(ETLExtractorTest, StopsWhenCurrentSequenceExceedsFinishSequence)
{
auto const rawNetworkValidatedLedgersPtr = networkValidatedLedgers_.get();
ON_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).WillByDefault(Return(true));
EXPECT_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).Times(3);
ON_CALL(dataPipe_, getStride).WillByDefault(Return(4));
EXPECT_CALL(dataPipe_, getStride).Times(3);
auto response = FakeFetchResponse{};
ON_CALL(ledgerFetcher_, fetchDataAndDiff(_)).WillByDefault(Return(response));
EXPECT_CALL(ledgerFetcher_, fetchDataAndDiff).Times(3);
EXPECT_CALL(dataPipe_, push).Times(3);
EXPECT_CALL(dataPipe_, finish(0)).Times(1);
// expected to invoke for seq 0, 4, 8 and finally stop as seq will be greater than finishing seq
extractor_ = std::make_unique<ExtractorType>(dataPipe_, networkValidatedLedgers_, ledgerFetcher_, 0, 11, state_);
}
TEST_F(ETLExtractorTest, StopsOnWriteConflict)
{
EXPECT_CALL(dataPipe_, finish(0)).Times(1);
state_.writeConflict = true;
// despite finish sequence being far ahead, we set writeConflict and so exit the loop immediately
extractor_ = std::make_unique<ExtractorType>(dataPipe_, networkValidatedLedgers_, ledgerFetcher_, 0, 64, state_);
}
TEST_F(ETLExtractorTest, StopsOnServerShutdown)
{
EXPECT_CALL(dataPipe_, finish(0)).Times(1);
state_.isStopping = true;
// despite finish sequence being far ahead, we set isStopping and so exit the loop immediately
extractor_ = std::make_unique<ExtractorType>(dataPipe_, networkValidatedLedgers_, ledgerFetcher_, 0, 64, state_);
}
// stop extractor thread if fetcheResponse is empty
TEST_F(ETLExtractorTest, StopsIfFetchIsUnsuccessful)
{
auto const rawNetworkValidatedLedgersPtr = networkValidatedLedgers_.get();
ON_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).WillByDefault(Return(true));
EXPECT_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).Times(1);
ON_CALL(ledgerFetcher_, fetchDataAndDiff(_)).WillByDefault(Return(std::nullopt));
EXPECT_CALL(ledgerFetcher_, fetchDataAndDiff).Times(1);
EXPECT_CALL(dataPipe_, finish(0)).Times(1);
// we break immediately because fetchDataAndDiff returns nullopt
extractor_ = std::make_unique<ExtractorType>(dataPipe_, networkValidatedLedgers_, ledgerFetcher_, 0, 64, state_);
}
TEST_F(ETLExtractorTest, StopsIfWaitingUntilValidatedByNetworkTimesOut)
{
auto const rawNetworkValidatedLedgersPtr = networkValidatedLedgers_.get();
// note that in actual clio code we don't return false unless a timeout is specified and exceeded
ON_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).WillByDefault(Return(false));
EXPECT_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).Times(1);
EXPECT_CALL(dataPipe_, finish(0)).Times(1);
// we emulate waitUntilValidatedByNetwork timing out which would lead to shutdown of the extractor thread
extractor_ = std::make_unique<ExtractorType>(dataPipe_, networkValidatedLedgers_, ledgerFetcher_, 0, 64, state_);
}
TEST_F(ETLExtractorTest, SendsCorrectResponseToDataPipe)
{
auto const rawNetworkValidatedLedgersPtr = networkValidatedLedgers_.get();
ON_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).WillByDefault(Return(true));
EXPECT_CALL(*rawNetworkValidatedLedgersPtr, waitUntilValidatedByNetwork).Times(1);
ON_CALL(dataPipe_, getStride).WillByDefault(Return(4));
EXPECT_CALL(dataPipe_, getStride).Times(1);
auto response = FakeFetchResponse{1234};
auto optionalResponse = std::optional<FakeFetchResponse>{};
ON_CALL(ledgerFetcher_, fetchDataAndDiff(_)).WillByDefault(Return(response));
EXPECT_CALL(ledgerFetcher_, fetchDataAndDiff).Times(1);
EXPECT_CALL(dataPipe_, push).Times(1).WillOnce(SaveArg<1>(&optionalResponse));
EXPECT_CALL(dataPipe_, finish(0)).Times(1);
// expect to finish after just one response due to finishSequence set to 1
extractor_ = std::make_unique<ExtractorType>(dataPipe_, networkValidatedLedgers_, ledgerFetcher_, 0, 1, state_);
extractor_->waitTillFinished(); // this is what clio does too. waiting for the thread to join
EXPECT_TRUE(optionalResponse.has_value());
EXPECT_EQ(optionalResponse.value(), response);
}
TEST_F(ETLExtractorTest, CallsPipeFinishWithInitialSequenceAtExit)
{
EXPECT_CALL(dataPipe_, finish(123)).Times(1);
state_.isStopping = true;
extractor_ = std::make_unique<ExtractorType>(dataPipe_, networkValidatedLedgers_, ledgerFetcher_, 123, 234, state_);
}

View File

@@ -0,0 +1,133 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/ForwardingCache.hpp"
#include <boost/json/object.hpp>
#include <gtest/gtest.h>
#include <chrono>
#include <thread>
using namespace etl::impl;
struct CacheEntryTests : public ::testing::Test {
CacheEntry entry_;
boost::json::object const object_ = {{"key", "value"}};
};
TEST_F(CacheEntryTests, PutAndGet)
{
EXPECT_FALSE(entry_.get());
entry_.put(object_);
auto result = entry_.get();
ASSERT_TRUE(result);
EXPECT_EQ(*result, object_);
}
TEST_F(CacheEntryTests, LastUpdated)
{
EXPECT_EQ(entry_.lastUpdated().time_since_epoch().count(), 0);
entry_.put(object_);
auto const lastUpdated = entry_.lastUpdated();
EXPECT_GE(
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - lastUpdated).count(), 0
);
entry_.put(boost::json::object{{"key", "new value"}});
auto const newLastUpdated = entry_.lastUpdated();
EXPECT_GT(newLastUpdated, lastUpdated);
EXPECT_GE(
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - newLastUpdated)
.count(),
0
);
}
TEST_F(CacheEntryTests, Invalidate)
{
entry_.put(object_);
entry_.invalidate();
EXPECT_FALSE(entry_.get());
}
TEST(ForwardingCacheTests, ShouldCache)
{
for (auto const& command : ForwardingCache::CACHEABLE_COMMANDS) {
auto const request = boost::json::object{{"command", command}};
EXPECT_TRUE(ForwardingCache::shouldCache(request));
}
auto const request = boost::json::object{{"command", "tx"}};
EXPECT_FALSE(ForwardingCache::shouldCache(request));
auto const requestWithoutCommand = boost::json::object{{"key", "value"}};
EXPECT_FALSE(ForwardingCache::shouldCache(requestWithoutCommand));
}
TEST(ForwardingCacheTests, Get)
{
ForwardingCache cache{std::chrono::seconds{100}};
auto const request = boost::json::object{{"command", "server_info"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
auto const result = cache.get(request);
ASSERT_TRUE(result);
EXPECT_EQ(*result, response);
}
TEST(ForwardingCacheTests, GetExpired)
{
ForwardingCache cache{std::chrono::milliseconds{1}};
auto const request = boost::json::object{{"command", "server_info"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
std::this_thread::sleep_for(std::chrono::milliseconds{2});
auto const result = cache.get(request);
EXPECT_FALSE(result);
}
TEST(ForwardingCacheTests, GetAndPutNotCommand)
{
ForwardingCache cache{std::chrono::seconds{100}};
auto const request = boost::json::object{{"key", "value"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
auto const result = cache.get(request);
EXPECT_FALSE(result);
}
TEST(ForwardingCache, Invalidate)
{
ForwardingCache cache{std::chrono::seconds{100}};
auto const request = boost::json::object{{"command", "server_info"}};
auto const response = boost::json::object{{"key", "value"}};
cache.put(request, response);
cache.invalidate();
EXPECT_FALSE(cache.get(request));
}

View File

@@ -0,0 +1,143 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/ForwardingSource.hpp"
#include "util/Fixtures.hpp"
#include "util/TestWsServer.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <gtest/gtest.h>
#include <chrono>
#include <optional>
#include <string>
#include <utility>
using namespace etl::impl;
struct ForwardingSourceTests : SyncAsioContextTest {
TestWsServer server_{ctx, "0.0.0.0", 11114};
ForwardingSource forwardingSource{"127.0.0.1", "11114", std::chrono::milliseconds{1}};
};
TEST_F(ForwardingSourceTests, ConnectionFailed)
{
runSpawn([&](boost::asio::yield_context yield) {
auto result = forwardingSource.forwardToRippled({}, {}, yield);
EXPECT_FALSE(result);
});
}
struct ForwardingSourceOperationsTests : ForwardingSourceTests {
std::string const message_ = R"({"data": "some_data"})";
boost::json::object const reply_ = {{"reply", "some_reply"}};
TestWsConnection
serverConnection(boost::asio::yield_context yield)
{
// First connection attempt is SSL handshake so it will fail
auto failedConnection = server_.acceptConnection(yield);
[&]() { ASSERT_FALSE(failedConnection); }();
auto connection = server_.acceptConnection(yield);
[&]() { ASSERT_TRUE(connection); }();
return std::move(connection).value();
}
};
TEST_F(ForwardingSourceOperationsTests, ReadFailed)
{
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
connection.close(yield);
});
runSpawn([&](boost::asio::yield_context yield) {
auto result = forwardingSource.forwardToRippled(boost::json::parse(message_).as_object(), {}, yield);
EXPECT_FALSE(result);
});
}
TEST_F(ForwardingSourceOperationsTests, ParseFailed)
{
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
auto receivedMessage = connection.receive(yield);
[&]() { ASSERT_TRUE(receivedMessage); }();
EXPECT_EQ(boost::json::parse(*receivedMessage), boost::json::parse(message_)) << *receivedMessage;
auto sendError = connection.send("invalid_json", yield);
[&]() { ASSERT_FALSE(sendError) << *sendError; }();
connection.close(yield);
});
runSpawn([&](boost::asio::yield_context yield) {
auto result = forwardingSource.forwardToRippled(boost::json::parse(message_).as_object(), {}, yield);
EXPECT_FALSE(result);
});
}
TEST_F(ForwardingSourceOperationsTests, GotNotAnObject)
{
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
auto receivedMessage = connection.receive(yield);
[&]() { ASSERT_TRUE(receivedMessage); }();
EXPECT_EQ(boost::json::parse(*receivedMessage), boost::json::parse(message_)) << *receivedMessage;
auto sendError = connection.send(R"(["some_value"])", yield);
[&]() { ASSERT_FALSE(sendError) << *sendError; }();
connection.close(yield);
});
runSpawn([&](boost::asio::yield_context yield) {
auto result = forwardingSource.forwardToRippled(boost::json::parse(message_).as_object(), {}, yield);
EXPECT_FALSE(result);
});
}
TEST_F(ForwardingSourceOperationsTests, Success)
{
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
auto receivedMessage = connection.receive(yield);
[&]() { ASSERT_TRUE(receivedMessage); }();
EXPECT_EQ(boost::json::parse(*receivedMessage), boost::json::parse(message_)) << *receivedMessage;
auto sendError = connection.send(boost::json::serialize(reply_), yield);
[&]() { ASSERT_FALSE(sendError) << *sendError; }();
});
runSpawn([&](boost::asio::yield_context yield) {
auto result = forwardingSource.forwardToRippled(boost::json::parse(message_).as_object(), "some_ip", yield);
[&]() { ASSERT_TRUE(result); }();
auto expectedReply = reply_;
expectedReply["forwarded"] = true;
EXPECT_EQ(*result, expectedReply) << *result;
});
}

View File

@@ -0,0 +1,151 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/GrpcSource.hpp"
#include "util/Fixtures.hpp"
#include "util/MockBackend.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockXrpLedgerAPIService.hpp"
#include "util/TestObject.hpp"
#include "util/config/Config.hpp"
#include <gmock/gmock.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/status.h>
#include <gtest/gtest.h>
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <org/xrpl/rpc/v1/get_ledger_data.pb.h>
#include <ripple/basics/base_uint.h>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
using namespace etl::impl;
struct GrpcSourceTests : NoLoggerFixture, util::prometheus::WithPrometheus, tests::util::WithMockXrpLedgerAPIService {
GrpcSourceTests()
: WithMockXrpLedgerAPIService("localhost:55051")
, mockBackend_(std::make_shared<testing::StrictMock<MockBackend>>(util::Config{}))
, grpcSource_("127.0.0.1", "55051", mockBackend_)
{
}
std::shared_ptr<testing::StrictMock<MockBackend>> mockBackend_;
testing::StrictMock<GrpcSource> grpcSource_;
};
TEST_F(GrpcSourceTests, fetchLedger)
{
uint32_t const sequence = 123;
bool const getObjects = true;
bool const getObjectNeighbors = false;
EXPECT_CALL(mockXrpLedgerAPIService, GetLedger)
.WillOnce([&](grpc::ServerContext* /*context*/,
org::xrpl::rpc::v1::GetLedgerRequest const* request,
org::xrpl::rpc::v1::GetLedgerResponse* response) {
EXPECT_EQ(request->ledger().sequence(), sequence);
EXPECT_TRUE(request->transactions());
EXPECT_TRUE(request->expand());
EXPECT_EQ(request->get_objects(), getObjects);
EXPECT_EQ(request->get_object_neighbors(), getObjectNeighbors);
EXPECT_EQ(request->user(), "ETL");
response->set_validated(true);
response->set_is_unlimited(false);
response->set_object_neighbors_included(false);
return grpc::Status{};
});
auto const [status, response] = grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
ASSERT_TRUE(status.ok());
EXPECT_TRUE(response.validated());
EXPECT_FALSE(response.is_unlimited());
EXPECT_FALSE(response.object_neighbors_included());
}
TEST_F(GrpcSourceTests, fetchLedgerNoStub)
{
testing::StrictMock<GrpcSource> wrongGrpcSource{"wrong", "wrong", mockBackend_};
auto const [status, _response] = wrongGrpcSource.fetchLedger(0, false, false);
EXPECT_EQ(status.error_code(), grpc::StatusCode::INTERNAL);
}
TEST_F(GrpcSourceTests, loadInitialLedgerNoStub)
{
testing::StrictMock<GrpcSource> wrongGrpcSource{"wrong", "wrong", mockBackend_};
auto const [data, success] = wrongGrpcSource.loadInitialLedger(0, 0, false);
EXPECT_TRUE(data.empty());
EXPECT_FALSE(success);
}
struct GrpcSourceLoadInitialLedgerTests : GrpcSourceTests {
uint32_t const sequence_ = 123;
uint32_t const numMarkers_ = 4;
bool const cacheOnly_ = false;
};
TEST_F(GrpcSourceLoadInitialLedgerTests, GetLedgerDataFailed)
{
EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData)
.Times(numMarkers_)
.WillRepeatedly([&](grpc::ServerContext* /*context*/,
org::xrpl::rpc::v1::GetLedgerDataRequest const* request,
org::xrpl::rpc::v1::GetLedgerDataResponse* /*response*/) {
EXPECT_EQ(request->ledger().sequence(), sequence_);
EXPECT_EQ(request->user(), "ETL");
return grpc::Status{grpc::StatusCode::NOT_FOUND, "Not found"};
});
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, cacheOnly_);
EXPECT_TRUE(data.empty());
EXPECT_FALSE(success);
}
TEST_F(GrpcSourceLoadInitialLedgerTests, worksFine)
{
auto const key = ripple::uint256{4};
std::string const keyStr{reinterpret_cast<char const*>(key.data()), ripple::uint256::size()};
auto const object = CreateTicketLedgerObject("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", sequence_);
auto const objectData = object.getSerializer().peekData();
EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData)
.Times(numMarkers_)
.WillRepeatedly([&](grpc::ServerContext* /*context*/,
org::xrpl::rpc::v1::GetLedgerDataRequest const* request,
org::xrpl::rpc::v1::GetLedgerDataResponse* response) {
EXPECT_EQ(request->ledger().sequence(), sequence_);
EXPECT_EQ(request->user(), "ETL");
response->set_is_unlimited(true);
auto newObject = response->mutable_ledger_objects()->add_objects();
newObject->set_key(key.data(), ripple::uint256::size());
newObject->set_data(objectData.data(), objectData.size());
return grpc::Status{};
});
EXPECT_CALL(*mockBackend_, writeNFTs).Times(numMarkers_);
EXPECT_CALL(*mockBackend_, writeLedgerObject).Times(numMarkers_);
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, cacheOnly_);
EXPECT_TRUE(success);
EXPECT_EQ(data, std::vector<std::string>(4, keyStr));
}

View File

@@ -0,0 +1,307 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/LedgerPublisher.hpp"
#include "util/Fixtures.hpp"
#include "util/MockCache.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp"
#include "util/TestObject.hpp"
#include "util/config/Config.hpp"
#include <boost/json/parse.hpp>
#include <fmt/core.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <ripple/basics/chrono.h>
#include <ripple/protocol/Indexes.h>
#include <ripple/protocol/LedgerHeader.h>
#include <chrono>
#include <vector>
using namespace testing;
using namespace etl;
namespace json = boost::json;
using namespace std::chrono;
static auto constexpr ACCOUNT = "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn";
static auto constexpr ACCOUNT2 = "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun";
static auto constexpr LEDGERHASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
static auto constexpr SEQ = 30;
static auto constexpr AGE = 800;
struct ETLLedgerPublisherTest : util::prometheus::WithPrometheus,
MockBackendTest,
SyncAsioContextTest,
MockSubscriptionManagerTest {
void
SetUp() override
{
MockBackendTest::SetUp();
SyncAsioContextTest::SetUp();
MockSubscriptionManagerTest::SetUp();
}
void
TearDown() override
{
MockSubscriptionManagerTest::TearDown();
SyncAsioContextTest::TearDown();
MockBackendTest::TearDown();
}
protected:
util::Config cfg{json::parse("{}")};
MockCache mockCache;
};
TEST_F(ETLLedgerPublisherTest, PublishLedgerInfoIsWritingFalse)
{
SystemState dummyState;
dummyState.isWriting = false;
auto const dummyLedgerInfo = CreateLedgerInfo(LEDGERHASH, SEQ, AGE);
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerInfo);
ON_CALL(*backend, fetchLedgerDiff(SEQ, _)).WillByDefault(Return(std::vector<LedgerObject>{}));
EXPECT_CALL(*backend, fetchLedgerDiff(SEQ, _)).Times(1);
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), SEQ);
EXPECT_CALL(mockCache, updateImp).Times(1);
ctx.run();
EXPECT_TRUE(backend->fetchLedgerRange());
EXPECT_EQ(backend->fetchLedgerRange().value().minSequence, SEQ);
EXPECT_EQ(backend->fetchLedgerRange().value().maxSequence, SEQ);
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerInfoIsWritingTrue)
{
SystemState dummyState;
dummyState.isWriting = true;
auto const dummyLedgerInfo = CreateLedgerInfo(LEDGERHASH, SEQ, AGE);
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerInfo);
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(0);
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), SEQ);
ctx.run();
EXPECT_FALSE(backend->fetchLedgerRange());
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerInfoInRange)
{
SystemState dummyState;
dummyState.isWriting = true;
auto const dummyLedgerInfo = CreateLedgerInfo(LEDGERHASH, SEQ, 0); // age is 0
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
backend->setRange(SEQ - 1, SEQ);
publisher.publish(dummyLedgerInfo);
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(0);
// mock fetch fee
EXPECT_CALL(*backend, doFetchLedgerObject).Times(1);
ON_CALL(*backend, doFetchLedgerObject(ripple::keylet::fees().key, SEQ, _))
.WillByDefault(Return(CreateLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
// mock fetch transactions
EXPECT_CALL(*backend, fetchAllTransactionsInLedger).Times(1);
TransactionAndMetadata t1;
t1.transaction = CreatePaymentTransactionObject(ACCOUNT, ACCOUNT2, 100, 3, SEQ).getSerializer().peekData();
t1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT, ACCOUNT2, 110, 30).getSerializer().peekData();
t1.ledgerSequence = SEQ;
ON_CALL(*backend, fetchAllTransactionsInLedger(SEQ, _))
.WillByDefault(Return(std::vector<TransactionAndMetadata>{t1}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), SEQ);
MockSubscriptionManager* rawSubscriptionManagerPtr =
dynamic_cast<MockSubscriptionManager*>(mockSubscriptionManagerPtr.get());
EXPECT_CALL(*rawSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", SEQ - 1, SEQ), 1)).Times(1);
EXPECT_CALL(*rawSubscriptionManagerPtr, pubBookChanges).Times(1);
// mock 1 transaction
EXPECT_CALL(*rawSubscriptionManagerPtr, pubTransaction).Times(1);
ctx.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerInfoCloseTimeGreaterThanNow)
{
SystemState dummyState;
dummyState.isWriting = true;
ripple::LedgerInfo dummyLedgerInfo = CreateLedgerInfo(LEDGERHASH, SEQ, 0);
auto const nowPlus10 = system_clock::now() + seconds(10);
auto const closeTime = duration_cast<seconds>(nowPlus10.time_since_epoch()).count() - rippleEpochStart;
dummyLedgerInfo.closeTime = ripple::NetClock::time_point{seconds{closeTime}};
backend->setRange(SEQ - 1, SEQ);
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerInfo);
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(0);
// mock fetch fee
EXPECT_CALL(*backend, doFetchLedgerObject).Times(1);
ON_CALL(*backend, doFetchLedgerObject(ripple::keylet::fees().key, SEQ, _))
.WillByDefault(Return(CreateLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
// mock fetch transactions
EXPECT_CALL(*backend, fetchAllTransactionsInLedger).Times(1);
TransactionAndMetadata t1;
t1.transaction = CreatePaymentTransactionObject(ACCOUNT, ACCOUNT2, 100, 3, SEQ).getSerializer().peekData();
t1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT, ACCOUNT2, 110, 30).getSerializer().peekData();
t1.ledgerSequence = SEQ;
ON_CALL(*backend, fetchAllTransactionsInLedger(SEQ, _))
.WillByDefault(Return(std::vector<TransactionAndMetadata>{t1}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), SEQ);
MockSubscriptionManager* rawSubscriptionManagerPtr =
dynamic_cast<MockSubscriptionManager*>(mockSubscriptionManagerPtr.get());
EXPECT_CALL(*rawSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", SEQ - 1, SEQ), 1)).Times(1);
EXPECT_CALL(*rawSubscriptionManagerPtr, pubBookChanges).Times(1);
// mock 1 transaction
EXPECT_CALL(*rawSubscriptionManagerPtr, pubTransaction).Times(1);
ctx.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue)
{
SystemState dummyState;
dummyState.isStopping = true;
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
EXPECT_FALSE(publisher.publish(SEQ, {}));
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttampt)
{
SystemState dummyState;
dummyState.isStopping = false;
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
static auto constexpr MAX_ATTEMPT = 2;
EXPECT_CALL(*backend, hardFetchLedgerRange).Times(MAX_ATTEMPT);
LedgerRange const range{.minSequence = SEQ - 1, .maxSequence = SEQ - 1};
ON_CALL(*backend, hardFetchLedgerRange(_)).WillByDefault(Return(range));
EXPECT_FALSE(publisher.publish(SEQ, MAX_ATTEMPT));
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse)
{
SystemState dummyState;
dummyState.isStopping = false;
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
LedgerRange const range{.minSequence = SEQ, .maxSequence = SEQ};
ON_CALL(*backend, hardFetchLedgerRange(_)).WillByDefault(Return(range));
EXPECT_CALL(*backend, hardFetchLedgerRange).Times(1);
auto const dummyLedgerInfo = CreateLedgerInfo(LEDGERHASH, SEQ, AGE);
ON_CALL(*backend, fetchLedgerBySequence(SEQ, _)).WillByDefault(Return(dummyLedgerInfo));
EXPECT_CALL(*backend, fetchLedgerBySequence).Times(1);
ON_CALL(*backend, fetchLedgerDiff(SEQ, _)).WillByDefault(Return(std::vector<LedgerObject>{}));
EXPECT_CALL(*backend, fetchLedgerDiff(SEQ, _)).Times(1);
EXPECT_CALL(mockCache, updateImp).Times(1);
EXPECT_TRUE(publisher.publish(SEQ, {}));
ctx.run();
}
TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
{
SystemState dummyState;
dummyState.isWriting = true;
auto const dummyLedgerInfo = CreateLedgerInfo(LEDGERHASH, SEQ, 0); // age is 0
impl::LedgerPublisher publisher(ctx, backend, mockCache, mockSubscriptionManagerPtr, dummyState);
backend->setRange(SEQ - 1, SEQ);
publisher.publish(dummyLedgerInfo);
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(0);
// mock fetch fee
EXPECT_CALL(*backend, doFetchLedgerObject).Times(1);
ON_CALL(*backend, doFetchLedgerObject(ripple::keylet::fees().key, SEQ, _))
.WillByDefault(Return(CreateLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
// mock fetch transactions
EXPECT_CALL(*backend, fetchAllTransactionsInLedger).Times(1);
// t1 index > t2 index
TransactionAndMetadata t1;
t1.transaction = CreatePaymentTransactionObject(ACCOUNT, ACCOUNT2, 100, 3, SEQ).getSerializer().peekData();
t1.metadata = CreatePaymentTransactionMetaObject(ACCOUNT, ACCOUNT2, 110, 30, 2).getSerializer().peekData();
t1.ledgerSequence = SEQ;
t1.date = 1;
TransactionAndMetadata t2;
t2.transaction = CreatePaymentTransactionObject(ACCOUNT, ACCOUNT2, 100, 3, SEQ).getSerializer().peekData();
t2.metadata = CreatePaymentTransactionMetaObject(ACCOUNT, ACCOUNT2, 110, 30, 1).getSerializer().peekData();
t2.ledgerSequence = SEQ;
t2.date = 2;
ON_CALL(*backend, fetchAllTransactionsInLedger(SEQ, _))
.WillByDefault(Return(std::vector<TransactionAndMetadata>{t1, t2}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), SEQ);
MockSubscriptionManager* rawSubscriptionManagerPtr =
dynamic_cast<MockSubscriptionManager*>(mockSubscriptionManagerPtr.get());
EXPECT_CALL(*rawSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", SEQ - 1, SEQ), 2)).Times(1);
EXPECT_CALL(*rawSubscriptionManagerPtr, pubBookChanges).Times(1);
// should call pubTransaction t2 first (greater tx index)
Sequence const s;
EXPECT_CALL(*rawSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s);
EXPECT_CALL(*rawSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s);
ctx.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}

View File

@@ -0,0 +1,187 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/Source.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/value_to.hpp>
#include <gmock/gmock.h>
#include <grpcpp/support/status.h>
#include <gtest/gtest.h>
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <chrono>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
using namespace etl;
using testing::Return;
using testing::StrictMock;
struct GrpcSourceMock {
using FetchLedgerReturnType = std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>;
MOCK_METHOD(FetchLedgerReturnType, fetchLedger, (uint32_t, bool, bool));
using LoadLedgerReturnType = std::pair<std::vector<std::string>, bool>;
MOCK_METHOD(LoadLedgerReturnType, loadInitialLedger, (uint32_t, uint32_t, bool));
};
struct SubscriptionSourceMock {
MOCK_METHOD(void, run, ());
MOCK_METHOD(bool, hasLedger, (uint32_t), (const));
MOCK_METHOD(bool, isConnected, (), (const));
MOCK_METHOD(void, setForwarding, (bool));
MOCK_METHOD(std::chrono::steady_clock::time_point, lastMessageTime, (), (const));
MOCK_METHOD(std::string, validatedRange, (), (const));
MOCK_METHOD(void, stop, ());
};
struct ForwardingSourceMock {
MOCK_METHOD(void, constructor, (std::string const&, std::string const&, std::chrono::steady_clock::duration));
using ForwardToRippledReturnType = std::optional<boost::json::object>;
using ClientIpOpt = std::optional<std::string>;
MOCK_METHOD(
ForwardToRippledReturnType,
forwardToRippled,
(boost::json::object const&, ClientIpOpt const&, boost::asio::yield_context),
(const)
);
};
struct SourceTest : public ::testing::Test {
boost::asio::io_context ioc_;
StrictMock<GrpcSourceMock> grpcSourceMock_;
std::shared_ptr<StrictMock<SubscriptionSourceMock>> subscriptionSourceMock_ =
std::make_shared<StrictMock<SubscriptionSourceMock>>();
StrictMock<ForwardingSourceMock> forwardingSourceMock_;
SourceImpl<
StrictMock<GrpcSourceMock>&,
std::shared_ptr<StrictMock<SubscriptionSourceMock>>,
StrictMock<ForwardingSourceMock>&>
source_{
"some_ip",
"some_ws_port",
"some_grpc_port",
grpcSourceMock_,
subscriptionSourceMock_,
forwardingSourceMock_
};
};
TEST_F(SourceTest, run)
{
EXPECT_CALL(*subscriptionSourceMock_, run());
source_.run();
}
TEST_F(SourceTest, isConnected)
{
EXPECT_CALL(*subscriptionSourceMock_, isConnected()).WillOnce(testing::Return(true));
EXPECT_TRUE(source_.isConnected());
}
TEST_F(SourceTest, setForwarding)
{
EXPECT_CALL(*subscriptionSourceMock_, setForwarding(true));
source_.setForwarding(true);
}
TEST_F(SourceTest, toJson)
{
EXPECT_CALL(*subscriptionSourceMock_, validatedRange()).WillOnce(Return(std::string("some_validated_range")));
EXPECT_CALL(*subscriptionSourceMock_, isConnected()).WillOnce(Return(true));
auto const lastMessageTime = std::chrono::steady_clock::now();
EXPECT_CALL(*subscriptionSourceMock_, lastMessageTime()).WillOnce(Return(lastMessageTime));
auto const json = source_.toJson();
EXPECT_EQ(boost::json::value_to<std::string>(json.at("validated_range")), "some_validated_range");
EXPECT_EQ(boost::json::value_to<std::string>(json.at("is_connected")), "1");
EXPECT_EQ(boost::json::value_to<std::string>(json.at("ip")), "some_ip");
EXPECT_EQ(boost::json::value_to<std::string>(json.at("ws_port")), "some_ws_port");
EXPECT_EQ(boost::json::value_to<std::string>(json.at("grpc_port")), "some_grpc_port");
auto lastMessageAgeStr = boost::json::value_to<std::string>(json.at("last_msg_age_seconds"));
EXPECT_GE(std::stoi(lastMessageAgeStr), 0);
}
TEST_F(SourceTest, toString)
{
EXPECT_CALL(*subscriptionSourceMock_, validatedRange()).WillOnce(Return(std::string("some_validated_range")));
auto const str = source_.toString();
EXPECT_EQ(
str,
"{validated range: some_validated_range, ip: some_ip, web socket port: some_ws_port, grpc port: some_grpc_port}"
);
}
TEST_F(SourceTest, hasLedger)
{
uint32_t const ledgerSeq = 123;
EXPECT_CALL(*subscriptionSourceMock_, hasLedger(ledgerSeq)).WillOnce(Return(true));
EXPECT_TRUE(source_.hasLedger(ledgerSeq));
}
TEST_F(SourceTest, fetchLedger)
{
uint32_t const ledgerSeq = 123;
EXPECT_CALL(grpcSourceMock_, fetchLedger(ledgerSeq, true, false));
auto const [actualStatus, actualResponse] = source_.fetchLedger(ledgerSeq);
EXPECT_EQ(actualStatus.error_code(), grpc::StatusCode::OK);
}
TEST_F(SourceTest, loadInitialLedger)
{
uint32_t const ledgerSeq = 123;
uint32_t const numMarkers = 3;
EXPECT_CALL(grpcSourceMock_, loadInitialLedger(ledgerSeq, numMarkers, false))
.WillOnce(Return(std::make_pair(std::vector<std::string>{}, true)));
auto const [actualLedgers, actualSuccess] = source_.loadInitialLedger(ledgerSeq, numMarkers);
EXPECT_TRUE(actualLedgers.empty());
EXPECT_TRUE(actualSuccess);
}
TEST_F(SourceTest, forwardToRippled)
{
boost::json::object const request = {{"some_key", "some_value"}};
std::optional<std::string> const clientIp = "some_client_ip";
EXPECT_CALL(forwardingSourceMock_, forwardToRippled(request, clientIp, testing::_)).WillOnce(Return(request));
boost::asio::io_context ioContext;
boost::asio::spawn(ioContext, [&](boost::asio::yield_context yield) {
auto const response = source_.forwardToRippled(request, clientIp, yield);
EXPECT_EQ(response, request);
});
ioContext.run();
}

View File

@@ -0,0 +1,70 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/SubscriptionSourceDependencies.hpp"
#include "util/MockNetworkValidatedLedgers.hpp"
#include "util/MockSubscriptionManager.hpp"
#include <boost/json/object.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <memory>
using namespace etl::impl;
using testing::StrictMock;
struct SubscriptionSourceDependenciesTest : testing::Test {
std::shared_ptr<StrictMock<MockNetworkValidatedLedgers>> networkValidatedLedgers_ =
std::make_shared<StrictMock<MockNetworkValidatedLedgers>>();
std::shared_ptr<StrictMock<MockSubscriptionManager>> subscriptionManager_ =
std::make_shared<StrictMock<MockSubscriptionManager>>();
SubscriptionSourceDependencies dependencies_{networkValidatedLedgers_, subscriptionManager_};
};
TEST_F(SubscriptionSourceDependenciesTest, ForwardProposedTransaction)
{
boost::json::object const txJson = {{"tx", "json"}};
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(txJson));
dependencies_.forwardProposedTransaction(txJson);
}
TEST_F(SubscriptionSourceDependenciesTest, ForwardValidation)
{
boost::json::object const validationJson = {{"validation", "json"}};
EXPECT_CALL(*subscriptionManager_, forwardValidation(validationJson));
dependencies_.forwardValidation(validationJson);
}
TEST_F(SubscriptionSourceDependenciesTest, ForwardManifest)
{
boost::json::object const manifestJson = {{"manifest", "json"}};
EXPECT_CALL(*subscriptionManager_, forwardManifest(manifestJson));
dependencies_.forwardManifest(manifestJson);
}
TEST_F(SubscriptionSourceDependenciesTest, PushValidatedLedger)
{
uint32_t const idx = 42;
EXPECT_CALL(*networkValidatedLedgers_, push(idx));
dependencies_.pushValidatedLedger(idx);
}

View File

@@ -0,0 +1,519 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/impl/SubscriptionSource.hpp"
#include "util/Fixtures.hpp"
#include "util/MockNetworkValidatedLedgers.hpp"
#include "util/MockSubscriptionManager.hpp"
#include "util/TestWsServer.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/serialize.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
#include <optional>
#include <string>
#include <utility>
using namespace etl::impl;
using testing::MockFunction;
using testing::StrictMock;
struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
SubscriptionSourceConnectionTests()
{
subscriptionSource_->run();
}
boost::asio::io_context ioContext_;
TestWsServer wsServer_{ioContext_, "0.0.0.0", 11113};
template <typename T>
using StrictMockPtr = std::shared_ptr<StrictMock<T>>;
StrictMockPtr<MockNetworkValidatedLedgers> networkValidatedLedgers_ =
std::make_shared<StrictMock<MockNetworkValidatedLedgers>>();
StrictMockPtr<MockSubscriptionManager> subscriptionManager_ =
std::make_shared<StrictMock<MockSubscriptionManager>>();
StrictMock<MockFunction<void()>> onConnectHook_;
StrictMock<MockFunction<void()>> onDisconnectHook_;
StrictMock<MockFunction<void()>> onLedgerClosedHook_;
std::unique_ptr<SubscriptionSource> subscriptionSource_ = std::make_unique<SubscriptionSource>(
ioContext_,
"127.0.0.1",
"11113",
networkValidatedLedgers_,
subscriptionManager_,
onConnectHook_.AsStdFunction(),
onDisconnectHook_.AsStdFunction(),
onLedgerClosedHook_.AsStdFunction(),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
);
[[maybe_unused]] TestWsConnection
serverConnection(boost::asio::yield_context yield)
{
// The first one is an SSL attempt
auto failedConnection = wsServer_.acceptConnection(yield);
[&]() { ASSERT_FALSE(failedConnection); }();
auto connection = wsServer_.acceptConnection(yield);
[&]() { ASSERT_TRUE(connection) << connection.error().message(); }();
auto message = connection->receive(yield);
[&]() {
ASSERT_TRUE(message);
EXPECT_EQ(
message.value(),
R"({"command":"subscribe","streams":["ledger","manifests","validations","transactions_proposed"]})"
);
}();
return std::move(connection).value();
}
};
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed)
{
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed)
{
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceConnectionTests, ReadError)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
for (int i = 0; i < 2; ++i) {
auto connection = serverConnection(yield);
connection.close(yield);
}
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceConnectionTests, IsConnected)
{
EXPECT_FALSE(subscriptionSource_->isConnected());
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
EXPECT_TRUE(subscriptionSource_->isConnected());
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests {
[[maybe_unused]] TestWsConnection
connectAndSendMessage(std::string const message, boost::asio::yield_context yield)
{
auto connection = serverConnection(yield);
auto error = connection.send(message, yield);
[&]() { ASSERT_FALSE(error) << *error; }();
return connection;
}
};
TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage("something", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
serverConnection(yield);
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotResult)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndex)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"ledger_index":123}})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"ledger_index":"123"}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
serverConnection(yield);
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"validated_ledgers":123}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
serverConnection(yield);
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers)
{
EXPECT_FALSE(subscriptionSource_->hasLedger(123));
EXPECT_FALSE(subscriptionSource_->hasLedger(124));
EXPECT_FALSE(subscriptionSource_->hasLedger(455));
EXPECT_FALSE(subscriptionSource_->hasLedger(456));
EXPECT_FALSE(subscriptionSource_->hasLedger(457));
EXPECT_FALSE(subscriptionSource_->hasLedger(32));
EXPECT_FALSE(subscriptionSource_->hasLedger(31));
EXPECT_FALSE(subscriptionSource_->hasLedger(789));
EXPECT_FALSE(subscriptionSource_->hasLedger(790));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"validated_ledgers":"123-456,789,32"}})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
EXPECT_TRUE(subscriptionSource_->hasLedger(123));
EXPECT_TRUE(subscriptionSource_->hasLedger(124));
EXPECT_TRUE(subscriptionSource_->hasLedger(455));
EXPECT_TRUE(subscriptionSource_->hasLedger(456));
EXPECT_FALSE(subscriptionSource_->hasLedger(457));
EXPECT_TRUE(subscriptionSource_->hasLedger(32));
EXPECT_FALSE(subscriptionSource_->hasLedger(31));
EXPECT_TRUE(subscriptionSource_->hasLedger(789));
EXPECT_FALSE(subscriptionSource_->hasLedger(790));
EXPECT_EQ(subscriptionSource_->validatedRange(), "123-456,789,32");
}
TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"validated_ledgers":"123-456-789,32"}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
serverConnection(yield);
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
{
EXPECT_FALSE(subscriptionSource_->hasLedger(1));
EXPECT_FALSE(subscriptionSource_->hasLedger(1));
EXPECT_FALSE(subscriptionSource_->hasLedger(2));
EXPECT_FALSE(subscriptionSource_->hasLedger(3));
EXPECT_FALSE(subscriptionSource_->hasLedger(4));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"ledger_index":123,"validated_ledgers":"1-3"}})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
EXPECT_EQ(subscriptionSource_->validatedRange(), "1-3");
EXPECT_FALSE(subscriptionSource_->hasLedger(0));
EXPECT_TRUE(subscriptionSource_->hasLedger(1));
EXPECT_TRUE(subscriptionSource_->hasLedger(2));
EXPECT_TRUE(subscriptionSource_->hasLedger(3));
EXPECT_FALSE(subscriptionSource_->hasLedger(4));
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosed)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet)
{
subscriptionSource_->setForwarding(true);
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type": "ledgerClosed"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onLedgerClosedHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type": "ledgerClosed","ledger_index": 123})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed","ledger_index":"123"}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
serverConnection(yield);
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed","validated_ledgers":123})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
serverConnection(yield);
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers)
{
EXPECT_FALSE(subscriptionSource_->hasLedger(0));
EXPECT_FALSE(subscriptionSource_->hasLedger(1));
EXPECT_FALSE(subscriptionSource_->hasLedger(2));
EXPECT_FALSE(subscriptionSource_->hasLedger(3));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed","validated_ledgers":"1-2"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
EXPECT_FALSE(subscriptionSource_->hasLedger(0));
EXPECT_TRUE(subscriptionSource_->hasLedger(1));
EXPECT_TRUE(subscriptionSource_->hasLedger(2));
EXPECT_FALSE(subscriptionSource_->hasLedger(3));
EXPECT_EQ(subscriptionSource_->validatedRange(), "1-2");
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLedgers)
{
EXPECT_FALSE(subscriptionSource_->hasLedger(0));
EXPECT_FALSE(subscriptionSource_->hasLedger(1));
EXPECT_FALSE(subscriptionSource_->hasLedger(2));
EXPECT_FALSE(subscriptionSource_->hasLedger(3));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection =
connectAndSendMessage(R"({"type":"ledgerClosed","ledger_index":123,"validated_ledgers":"1-2"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
EXPECT_FALSE(subscriptionSource_->hasLedger(0));
EXPECT_TRUE(subscriptionSource_->hasLedger(1));
EXPECT_TRUE(subscriptionSource_->hasLedger(2));
EXPECT_FALSE(subscriptionSource_->hasLedger(3));
EXPECT_EQ(subscriptionSource_->validatedRange(), "1-2");
}
TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingFalse)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"transaction":"some_transaction_data"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue)
{
subscriptionSource_->setForwarding(true);
boost::json::object const message = {{"transaction", "some_transaction_data"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message));
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"validationReceived"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue)
{
subscriptionSource_->setForwarding(true);
boost::json::object const message = {{"type", "validationReceived"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
EXPECT_CALL(*subscriptionManager_, forwardValidation(message));
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotManiefstReceivedIsForwardingFalse)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"manifestReceived"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue)
{
subscriptionSource_->setForwarding(true);
boost::json::object const message = {{"type", "manifestReceived"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
EXPECT_CALL(*subscriptionManager_, forwardManifest(message));
ioContext_.run();
}
TEST_F(SubscriptionSourceReadTests, LastMessageTime)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage("some_message", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_->stop(); });
ioContext_.run();
auto const actualLastTimeMessage = subscriptionSource_->lastMessageTime();
auto const now = std::chrono::steady_clock::now();
auto const diff = std::chrono::duration_cast<std::chrono::milliseconds>(now - actualLastTimeMessage);
EXPECT_LT(diff, std::chrono::milliseconds(100));
}

View File

@@ -0,0 +1,162 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/SystemState.hpp"
#include "etl/impl/Transformer.hpp"
#include "util/FakeFetchResponse.hpp"
#include "util/Fixtures.hpp"
#include "util/MockAmendmentBlockHandler.hpp"
#include "util/MockExtractionDataPipe.hpp"
#include "util/MockLedgerLoader.hpp"
#include "util/MockLedgerPublisher.hpp"
#include "util/MockPrometheus.hpp"
#include "util/StringUtils.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
#include <optional>
#include <thread>
using namespace testing;
using namespace etl;
// taken from BackendTests
constexpr static auto RAW_HEADER =
"03C3141A01633CD656F91B4EBB5EB89B791BD34DBC8A04BB6F407C5335BC54351E"
"DD733898497E809E04074D14D271E4832D7888754F9230800761563A292FA2315A"
"6DB6FE30CC5909B285080FCD6773CC883F9FE0EE4D439340AC592AADB973ED3CF5"
"3E2232B33EF57CECAC2816E3122816E31A0A00F8377CD95DFA484CFAE282656A58"
"CE5AA29652EFFD80AC59CD91416E4E13DBBE";
struct ETLTransformerTest : util::prometheus::WithPrometheus, MockBackendTest {
using DataType = FakeFetchResponse;
using ExtractionDataPipeType = MockExtractionDataPipe;
using LedgerLoaderType = MockLedgerLoader;
using LedgerPublisherType = MockLedgerPublisher;
using AmendmentBlockHandlerType = MockAmendmentBlockHandler;
using TransformerType = etl::impl::
Transformer<ExtractionDataPipeType, LedgerLoaderType, LedgerPublisherType, AmendmentBlockHandlerType>;
ExtractionDataPipeType dataPipe_;
LedgerLoaderType ledgerLoader_;
LedgerPublisherType ledgerPublisher_;
AmendmentBlockHandlerType amendmentBlockHandler_;
SystemState state_;
std::unique_ptr<TransformerType> transformer_;
void
SetUp() override
{
MockBackendTest::SetUp();
state_.isStopping = false;
state_.writeConflict = false;
state_.isReadOnly = false;
state_.isWriting = false;
}
void
TearDown() override
{
transformer_.reset();
MockBackendTest::TearDown();
}
};
TEST_F(ETLTransformerTest, StopsOnWriteConflict)
{
state_.writeConflict = true;
EXPECT_CALL(dataPipe_, popNext).Times(0);
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0);
transformer_ = std::make_unique<TransformerType>(
dataPipe_, backend, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, 0, state_
);
transformer_->waitTillFinished(); // explicitly joins the thread
}
TEST_F(ETLTransformerTest, StopsOnEmptyFetchResponse)
{
backend->cache().setFull(); // to avoid throwing exception in updateCache
auto const blob = hexStringToBinaryString(RAW_HEADER);
auto const response = std::make_optional<FakeFetchResponse>(blob);
ON_CALL(dataPipe_, popNext).WillByDefault([this, &response](auto) -> std::optional<FakeFetchResponse> {
if (state_.isStopping)
return std::nullopt;
return response; // NOLINT (performance-no-automatic-move)
});
ON_CALL(*backend, doFinishWrites).WillByDefault(Return(true));
// TODO: most of this should be hidden in a smaller entity that is injected into the transformer thread
EXPECT_CALL(dataPipe_, popNext).Times(AtLeast(1));
EXPECT_CALL(*backend, startWrites).Times(AtLeast(1));
EXPECT_CALL(*backend, writeLedger(_, _)).Times(AtLeast(1));
EXPECT_CALL(ledgerLoader_, insertTransactions).Times(AtLeast(1));
EXPECT_CALL(*backend, writeAccountTransactions).Times(AtLeast(1));
EXPECT_CALL(*backend, writeNFTs).Times(AtLeast(1));
EXPECT_CALL(*backend, writeNFTTransactions).Times(AtLeast(1));
EXPECT_CALL(*backend, doFinishWrites).Times(AtLeast(1));
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(AtLeast(1));
transformer_ = std::make_unique<TransformerType>(
dataPipe_, backend, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, 0, state_
);
// after 10ms we start spitting out empty responses which means the extractor is finishing up
// this is normally combined with stopping the entire thing by setting the isStopping flag.
std::this_thread::sleep_for(std::chrono::milliseconds{10});
state_.isStopping = true;
}
TEST_F(ETLTransformerTest, DoesNotPublishIfCanNotBuildNextLedger)
{
backend->cache().setFull(); // to avoid throwing exception in updateCache
auto const blob = hexStringToBinaryString(RAW_HEADER);
auto const response = std::make_optional<FakeFetchResponse>(blob);
ON_CALL(dataPipe_, popNext).WillByDefault(Return(response));
ON_CALL(*backend, doFinishWrites).WillByDefault(Return(false)); // emulate write failure
// TODO: most of this should be hidden in a smaller entity that is injected into the transformer thread
EXPECT_CALL(dataPipe_, popNext).Times(AtLeast(1));
EXPECT_CALL(*backend, startWrites).Times(AtLeast(1));
EXPECT_CALL(*backend, writeLedger(_, _)).Times(AtLeast(1));
EXPECT_CALL(ledgerLoader_, insertTransactions).Times(AtLeast(1));
EXPECT_CALL(*backend, writeAccountTransactions).Times(AtLeast(1));
EXPECT_CALL(*backend, writeNFTs).Times(AtLeast(1));
EXPECT_CALL(*backend, writeNFTTransactions).Times(AtLeast(1));
EXPECT_CALL(*backend, doFinishWrites).Times(AtLeast(1));
// should not call publish
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0);
transformer_ = std::make_unique<TransformerType>(
dataPipe_, backend, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, 0, state_
);
}
// TODO: implement tests for amendment block. requires more refactoring