Files
rippled/src/test/app/LedgerReplay_test.cpp

1608 lines
46 KiB
C++

#include <test/jtx.h>
#include <test/jtx/envconfig.h>
#include <xrpld/app/ledger/BuildLedger.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/ledger/LedgerReplay.h>
#include <xrpld/app/ledger/LedgerReplayTask.h>
#include <xrpld/app/ledger/LedgerReplayer.h>
#include <xrpld/app/ledger/detail/LedgerDeltaAcquire.h>
#include <xrpld/app/ledger/detail/LedgerReplayMsgHandler.h>
#include <xrpld/app/ledger/detail/SkipListAcquire.h>
#include <xrpld/overlay/PeerSet.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpl/basics/Slice.h>
#include <chrono>
#include <thread>
namespace ripple {
namespace test {
struct LedgerReplay_test : public beast::unit_test::suite
{
void
run() override
{
testcase("Replay ledger");
using namespace jtx;
// Build a ledger normally
auto const alice = Account("alice");
auto const bob = Account("bob");
Env env(*this);
env.fund(XRP(100000), alice, bob);
env.close();
LedgerMaster& ledgerMaster = env.app().getLedgerMaster();
auto const lastClosed = ledgerMaster.getClosedLedger();
auto const lastClosedParent =
ledgerMaster.getLedgerByHash(lastClosed->info().parentHash);
auto const replayed = buildLedger(
LedgerReplay(lastClosedParent, lastClosed),
tapNONE,
env.app(),
env.journal);
BEAST_EXPECT(replayed->info().hash == lastClosed->info().hash);
}
};
enum class InboundLedgersBehavior {
Good,
DropAll,
};
/**
* Simulate a network InboundLedgers.
* Depending on the configured InboundLedgersBehavior,
* it either provides the ledger or not
*/
class MagicInboundLedgers : public InboundLedgers
{
public:
MagicInboundLedgers(
LedgerMaster& ledgerSource,
LedgerMaster& ledgerSink,
InboundLedgersBehavior bhvr)
: ledgerSource(ledgerSource), ledgerSink(ledgerSink), bhvr(bhvr)
{
}
virtual ~MagicInboundLedgers() = default;
virtual std::shared_ptr<Ledger const>
acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason)
override
{
if (bhvr == InboundLedgersBehavior::DropAll)
return {};
if (auto l = ledgerSource.getLedgerByHash(hash); l)
{
ledgerSink.storeLedger(l);
return l;
}
return {};
}
virtual void
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
}
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) override
{
return {};
}
virtual bool
gotLedgerData(
LedgerHash const& ledgerHash,
std::shared_ptr<Peer>,
std::shared_ptr<protocol::TMLedgerData>) override
{
return false;
}
virtual void
gotStaleData(std::shared_ptr<protocol::TMLedgerData> packet) override
{
}
virtual void
logFailure(uint256 const& h, std::uint32_t seq) override
{
}
virtual bool
isFailure(uint256 const& h) override
{
return false;
}
virtual void
clearFailures() override
{
}
virtual Json::Value
getInfo() override
{
return {};
}
virtual std::size_t
fetchRate() override
{
return 0;
}
virtual void
onLedgerFetched() override
{
}
virtual void
gotFetchPack() override
{
}
virtual void
sweep() override
{
}
virtual void
stop() override
{
}
virtual size_t
cacheSize() override
{
return 0;
}
LedgerMaster& ledgerSource;
LedgerMaster& ledgerSink;
InboundLedgersBehavior bhvr;
};
enum class PeerFeature {
LedgerReplayEnabled,
None,
};
/**
* Simulate a network peer.
* Depending on the configured PeerFeature,
* it either supports the ProtocolFeature::LedgerReplay or not
*/
class TestPeer : public Peer
{
public:
TestPeer(bool enableLedgerReplay)
: ledgerReplayEnabled_(enableLedgerReplay)
, nodePublicKey_(derivePublicKey(KeyType::ed25519, randomSecretKey()))
{
}
void
send(std::shared_ptr<Message> const& m) override
{
}
beast::IP::Endpoint
getRemoteAddress() const override
{
return {};
}
void
charge(Resource::Charge const& fee, std::string const& context = {})
override
{
}
id_t
id() const override
{
return 1234;
}
bool
cluster() const override
{
return false;
}
bool
isHighLatency() const override
{
return false;
}
int
getScore(bool) const override
{
return 0;
}
PublicKey const&
getNodePublic() const override
{
return nodePublicKey_;
}
Json::Value
json() override
{
return {};
}
bool
supportsFeature(ProtocolFeature f) const override
{
if (f == ProtocolFeature::LedgerReplay && ledgerReplayEnabled_)
return true;
return false;
}
std::optional<std::size_t>
publisherListSequence(PublicKey const&) const override
{
return {};
}
void
setPublisherListSequence(PublicKey const&, std::size_t const) override
{
}
uint256 const&
getClosedLedgerHash() const override
{
static uint256 hash{};
return hash;
}
bool
hasLedger(uint256 const& hash, std::uint32_t seq) const override
{
return true;
}
void
ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const override
{
}
bool
hasTxSet(uint256 const& hash) const override
{
return false;
}
void
cycleStatus() override
{
}
bool
hasRange(std::uint32_t uMin, std::uint32_t uMax) override
{
return false;
}
bool
compressionEnabled() const override
{
return false;
}
void
sendTxQueue() override
{
}
void
addTxQueue(uint256 const&) override
{
}
void
removeTxQueue(uint256 const&) override
{
}
bool
txReduceRelayEnabled() const override
{
return false;
}
std::string const&
fingerprint() const override
{
return fingerprint_;
}
std::string fingerprint_;
bool ledgerReplayEnabled_;
PublicKey nodePublicKey_;
};
enum class PeerSetBehavior {
Good,
Drop50,
DropAll,
DropSkipListReply,
DropLedgerDeltaReply,
Repeat,
};
/**
* Simulate a peerSet that supplies peers to ledger replay subtasks.
* It connects the ledger replay client side and server side message handlers.
* Depending on the configured PeerSetBehavior,
* it may drop or repeat some of the messages.
*/
struct TestPeerSet : public PeerSet
{
TestPeerSet(
LedgerReplayMsgHandler& me,
LedgerReplayMsgHandler& other,
PeerSetBehavior bhvr,
bool enableLedgerReplay)
: local(me)
, remote(other)
, dummyPeer(std::make_shared<TestPeer>(enableLedgerReplay))
, behavior(bhvr)
{
}
void
addPeers(
std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> hasItem,
std::function<void(std::shared_ptr<Peer> const&)> onPeerAdded) override
{
hasItem(dummyPeer);
onPeerAdded(dummyPeer);
}
void
sendRequest(
::google::protobuf::Message const& msg,
protocol::MessageType type,
std::shared_ptr<Peer> const& peer) override
{
int dropRate = 0;
if (behavior == PeerSetBehavior::Drop50)
dropRate = 50;
else if (behavior == PeerSetBehavior::DropAll)
dropRate = 100;
if ((rand() % 100 + 1) <= dropRate)
return;
switch (type)
{
case protocol::mtPROOF_PATH_REQ: {
if (behavior == PeerSetBehavior::DropSkipListReply)
return;
auto request = std::make_shared<protocol::TMProofPathRequest>(
dynamic_cast<protocol::TMProofPathRequest const&>(msg));
auto reply = std::make_shared<protocol::TMProofPathResponse>(
remote.processProofPathRequest(request));
local.processProofPathResponse(reply);
if (behavior == PeerSetBehavior::Repeat)
local.processProofPathResponse(reply);
break;
}
case protocol::mtREPLAY_DELTA_REQ: {
if (behavior == PeerSetBehavior::DropLedgerDeltaReply)
return;
auto request = std::make_shared<protocol::TMReplayDeltaRequest>(
dynamic_cast<protocol::TMReplayDeltaRequest const&>(msg));
auto reply = std::make_shared<protocol::TMReplayDeltaResponse>(
remote.processReplayDeltaRequest(request));
local.processReplayDeltaResponse(reply);
if (behavior == PeerSetBehavior::Repeat)
local.processReplayDeltaResponse(reply);
break;
}
default:
return;
}
}
std::set<Peer::id_t> const&
getPeerIds() const override
{
static std::set<Peer::id_t> emptyPeers;
return emptyPeers;
}
LedgerReplayMsgHandler& local;
LedgerReplayMsgHandler& remote;
std::shared_ptr<TestPeer> dummyPeer;
PeerSetBehavior behavior;
};
/**
* Build the TestPeerSet.
*/
class TestPeerSetBuilder : public PeerSetBuilder
{
public:
TestPeerSetBuilder(
LedgerReplayMsgHandler& me,
LedgerReplayMsgHandler& other,
PeerSetBehavior bhvr,
PeerFeature peerFeature)
: local(me)
, remote(other)
, behavior(bhvr)
, enableLedgerReplay(peerFeature == PeerFeature::LedgerReplayEnabled)
{
}
std::unique_ptr<PeerSet>
build() override
{
return std::make_unique<TestPeerSet>(
local, remote, behavior, enableLedgerReplay);
}
private:
LedgerReplayMsgHandler& local;
LedgerReplayMsgHandler& remote;
PeerSetBehavior behavior;
bool enableLedgerReplay;
};
/**
* Utility class for (1) creating ledgers with txns and
* (2) providing the ledgers via the ledgerMaster
*/
struct LedgerServer
{
struct Parameter
{
int initLedgers;
int initAccounts = 10;
int initAmount = 1'000'000;
int numTxPerLedger = 10;
int txAmount = 10;
};
LedgerServer(beast::unit_test::suite& suite, Parameter const& p)
: env(suite)
, app(env.app())
, ledgerMaster(env.app().getLedgerMaster())
, msgHandler(env.app(), env.app().getLedgerReplayer())
, param(p)
{
assert(param.initLedgers > 0);
createAccounts(param.initAccounts);
createLedgerHistory();
app.logs().threshold(beast::severities::kWarning);
}
/**
* @note close a ledger
*/
void
createAccounts(int newAccounts)
{
auto fundedAccounts = accounts.size();
for (int i = 0; i < newAccounts; ++i)
{
accounts.emplace_back(
"alice_" + std::to_string(fundedAccounts + i));
env.fund(jtx::XRP(param.initAmount), accounts.back());
}
env.close();
}
/**
* @note close a ledger
*/
void
sendPayments(int newTxes)
{
int fundedAccounts = accounts.size();
assert(fundedAccounts >= newTxes);
std::unordered_set<int> senders;
// somewhat random but reproducible
int r = ledgerMaster.getClosedLedger()->seq() * 7;
int fromIdx = 0;
int toIdx = 0;
auto updateIdx = [&]() {
assert(fundedAccounts > senders.size());
fromIdx = (fromIdx + r) % fundedAccounts;
while (senders.count(fromIdx) != 0)
fromIdx = (fromIdx + 1) % fundedAccounts;
senders.insert(fromIdx);
toIdx = (toIdx + r * 2) % fundedAccounts;
if (toIdx == fromIdx)
toIdx = (toIdx + 1) % fundedAccounts;
};
for (int i = 0; i < newTxes; ++i)
{
updateIdx();
env.apply(
pay(accounts[fromIdx],
accounts[toIdx],
jtx::drops(ledgerMaster.getClosedLedger()->fees().base) +
jtx::XRP(param.txAmount)),
jtx::seq(jtx::autofill),
jtx::fee(jtx::autofill),
jtx::sig(jtx::autofill));
}
env.close();
}
/**
* create ledger history
*/
void
createLedgerHistory()
{
for (int i = 0; i < param.initLedgers - 1; ++i)
{
sendPayments(param.numTxPerLedger);
}
}
jtx::Env env;
Application& app;
LedgerMaster& ledgerMaster;
LedgerReplayMsgHandler msgHandler;
Parameter param;
std::vector<jtx::Account> accounts;
};
enum class TaskStatus {
Failed,
Completed,
NotDone,
NotExist,
};
/**
* Ledger replay client side.
* It creates the LedgerReplayer which has the client side logic.
* The client side and server side message handlers are connect via
* the peerSet to pass the requests and responses.
* It also has utility functions for checking task status
*/
class LedgerReplayClient
{
public:
LedgerReplayClient(
beast::unit_test::suite& suite,
LedgerServer& server,
PeerSetBehavior behavior = PeerSetBehavior::Good,
InboundLedgersBehavior inboundBhvr = InboundLedgersBehavior::Good,
PeerFeature peerFeature = PeerFeature::LedgerReplayEnabled)
: env(suite, jtx::envconfig(), nullptr, beast::severities::kDisabled)
, app(env.app())
, ledgerMaster(env.app().getLedgerMaster())
, inboundLedgers(
server.app.getLedgerMaster(),
ledgerMaster,
inboundBhvr)
, serverMsgHandler(server.app, server.app.getLedgerReplayer())
, clientMsgHandler(env.app(), replayer)
, replayer(
env.app(),
inboundLedgers,
std::make_unique<TestPeerSetBuilder>(
clientMsgHandler,
serverMsgHandler,
behavior,
peerFeature))
{
}
void
addLedger(std::shared_ptr<Ledger const> const& l)
{
ledgerMaster.storeLedger(l);
}
bool
haveLedgers(uint256 const& finishLedgerHash, int totalReplay)
{
uint256 hash = finishLedgerHash;
int i = 0;
for (; i < totalReplay; ++i)
{
auto const l = ledgerMaster.getLedgerByHash(hash);
if (!l)
return false;
hash = l->info().parentHash;
}
return true;
}
bool
waitForLedgers(uint256 const& finishLedgerHash, int totalReplay)
{
int totalRound = 100;
for (int i = 0; i < totalRound; ++i)
{
if (haveLedgers(finishLedgerHash, totalReplay))
return true;
if (i < totalRound - 1)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return false;
}
bool
waitForDone()
{
int totalRound = 100;
for (int i = 0; i < totalRound; ++i)
{
bool allDone = true;
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
for (auto const& t : replayer.tasks_)
{
if (!t->finished())
{
allDone = false;
break;
}
}
}
if (allDone)
return true;
if (i < totalRound - 1)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return false;
}
std::vector<std::shared_ptr<LedgerReplayTask>>
getTasks()
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
return replayer.tasks_;
}
std::shared_ptr<LedgerReplayTask>
findTask(uint256 const& hash, int totalReplay)
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
auto i = std::find_if(
replayer.tasks_.begin(), replayer.tasks_.end(), [&](auto const& t) {
return t->parameter_.finishHash_ == hash &&
t->parameter_.totalLedgers_ == totalReplay;
});
if (i == replayer.tasks_.end())
return {};
return *i;
}
std::size_t
countDeltas()
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
return replayer.deltas_.size();
}
std::size_t
countSkipLists()
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
return replayer.skipLists_.size();
}
bool
countsAsExpected(
std::size_t tasks,
std::size_t skipLists,
std::size_t deltas)
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
return replayer.tasks_.size() == tasks &&
replayer.skipLists_.size() == skipLists &&
replayer.deltas_.size() == deltas;
}
std::shared_ptr<SkipListAcquire>
findSkipListAcquire(uint256 const& hash)
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
auto i = replayer.skipLists_.find(hash);
if (i == replayer.skipLists_.end())
return {};
return i->second.lock();
}
std::shared_ptr<LedgerDeltaAcquire>
findLedgerDeltaAcquire(uint256 const& hash)
{
std::unique_lock<std::mutex> lock(replayer.mtx_);
auto i = replayer.deltas_.find(hash);
if (i == replayer.deltas_.end())
return {};
return i->second.lock();
}
template <typename T>
TaskStatus
taskStatus(std::shared_ptr<T> const& t)
{
if (t->failed_)
return TaskStatus::Failed;
if (t->complete_)
return TaskStatus::Completed;
return TaskStatus::NotDone;
}
bool
asExpected(
std::shared_ptr<LedgerReplayTask> const& task,
TaskStatus taskExpect,
TaskStatus skiplistExpect,
std::vector<TaskStatus> const& deltaExpects)
{
if (taskStatus(task) == taskExpect)
{
if (taskStatus(task->skipListAcquirer_) == skiplistExpect)
{
if (task->deltas_.size() == deltaExpects.size())
{
for (int i = 0; i < deltaExpects.size(); ++i)
{
if (taskStatus(task->deltas_[i]) != deltaExpects[i])
return false;
}
return true;
}
}
}
return false;
}
bool
asExpected(
uint256 const& hash,
int totalReplay,
TaskStatus taskExpect,
TaskStatus skiplistExpect,
std::vector<TaskStatus> const& deltaExpects)
{
auto t = findTask(hash, totalReplay);
if (!t)
{
if (taskExpect == TaskStatus::NotExist)
return true;
return false;
}
return asExpected(t, taskExpect, skiplistExpect, deltaExpects);
}
bool
checkStatus(
uint256 const& hash,
int totalReplay,
TaskStatus taskExpect,
TaskStatus skiplistExpect,
std::vector<TaskStatus> const& deltaExpects)
{
auto t = findTask(hash, totalReplay);
if (!t)
{
if (taskExpect == TaskStatus::NotExist)
return true;
return false;
}
return asExpected(t, taskExpect, skiplistExpect, deltaExpects);
}
bool
waitAndCheckStatus(
uint256 const& hash,
int totalReplay,
TaskStatus taskExpect,
TaskStatus skiplistExpect,
std::vector<TaskStatus> const& deltaExpects)
{
if (!waitForDone())
return false;
return checkStatus(
hash, totalReplay, taskExpect, skiplistExpect, deltaExpects);
}
jtx::Env env;
Application& app;
LedgerMaster& ledgerMaster;
MagicInboundLedgers inboundLedgers;
LedgerReplayMsgHandler serverMsgHandler;
LedgerReplayMsgHandler clientMsgHandler;
LedgerReplayer replayer;
};
using namespace beast::severities;
void
logAll(
LedgerServer& server,
LedgerReplayClient& client,
beast::severities::Severity level = Severity::kTrace)
{
server.app.logs().threshold(level);
client.app.logs().threshold(level);
}
// logAll(net.server, net.client);
/*
* Create a LedgerServer and a LedgerReplayClient
*/
struct NetworkOfTwo
{
NetworkOfTwo(
beast::unit_test::suite& suite,
LedgerServer::Parameter const& param,
PeerSetBehavior behavior = PeerSetBehavior::Good,
InboundLedgersBehavior inboundBhvr = InboundLedgersBehavior::Good,
PeerFeature peerFeature = PeerFeature::LedgerReplayEnabled)
: server(suite, param)
, client(suite, server, behavior, inboundBhvr, peerFeature)
{
// logAll(server, client);
}
LedgerServer server;
LedgerReplayClient client;
};
/**
* Test cases:
* LedgerReplayer_test:
* -- process TMProofPathRequest and TMProofPathResponse
* -- process TMReplayDeltaRequest and TMReplayDeltaResponse
* -- update and merge LedgerReplayTask::TaskParameter
* -- process [ledger_replay] section in config
* -- peer handshake
* -- replay a range of ledgers that the local node already has
* -- replay a range of ledgers and fallback to InboundLedgers because
* peers do not support ProtocolFeature::LedgerReplay
* -- replay a range of ledgers and the network drops or repeats messages
* -- call stop() and the tasks and subtasks are removed
* -- process a bad skip list
* -- process a bad ledger delta
* -- replay ledger ranges with different overlaps
*
* LedgerReplayerTimeout_test:
* -- timeouts of SkipListAcquire
* -- timeouts of LedgerDeltaAcquire
*
* LedgerReplayerLong_test: (MANUAL)
* -- call replayer.replay() 4 times to replay 1000 ledgers
*/
struct LedgerReplayer_test : public beast::unit_test::suite
{
void
testProofPath()
{
testcase("ProofPath");
LedgerServer server(*this, {1});
auto const l = server.ledgerMaster.getClosedLedger();
{
// request, missing key
auto request = std::make_shared<protocol::TMProofPathRequest>();
request->set_ledgerhash(
l->info().hash.data(), l->info().hash.size());
request->set_type(protocol::TMLedgerMapType::lmACCOUNT_STATE);
auto reply = std::make_shared<protocol::TMProofPathResponse>(
server.msgHandler.processProofPathRequest(request));
BEAST_EXPECT(reply->has_error());
BEAST_EXPECT(!server.msgHandler.processProofPathResponse(reply));
}
{
// request, wrong hash
auto request = std::make_shared<protocol::TMProofPathRequest>();
request->set_type(protocol::TMLedgerMapType::lmACCOUNT_STATE);
request->set_key(
keylet::skip().key.data(), keylet::skip().key.size());
uint256 hash(1234567);
request->set_ledgerhash(hash.data(), hash.size());
auto reply = std::make_shared<protocol::TMProofPathResponse>(
server.msgHandler.processProofPathRequest(request));
BEAST_EXPECT(reply->has_error());
}
{
// good request
auto request = std::make_shared<protocol::TMProofPathRequest>();
request->set_ledgerhash(
l->info().hash.data(), l->info().hash.size());
request->set_type(protocol::TMLedgerMapType::lmACCOUNT_STATE);
request->set_key(
keylet::skip().key.data(), keylet::skip().key.size());
// generate response
auto reply = std::make_shared<protocol::TMProofPathResponse>(
server.msgHandler.processProofPathRequest(request));
BEAST_EXPECT(!reply->has_error());
BEAST_EXPECT(server.msgHandler.processProofPathResponse(reply));
{
// bad reply
// bad header
std::string r(reply->ledgerheader());
r.back()--;
reply->set_ledgerheader(r);
BEAST_EXPECT(
!server.msgHandler.processProofPathResponse(reply));
r.back()++;
reply->set_ledgerheader(r);
BEAST_EXPECT(server.msgHandler.processProofPathResponse(reply));
// bad proof path
reply->mutable_path()->RemoveLast();
BEAST_EXPECT(
!server.msgHandler.processProofPathResponse(reply));
}
}
}
void
testReplayDelta()
{
testcase("ReplayDelta");
LedgerServer server(*this, {1});
auto const l = server.ledgerMaster.getClosedLedger();
{
// request, missing hash
auto request = std::make_shared<protocol::TMReplayDeltaRequest>();
auto reply = std::make_shared<protocol::TMReplayDeltaResponse>(
server.msgHandler.processReplayDeltaRequest(request));
BEAST_EXPECT(reply->has_error());
BEAST_EXPECT(!server.msgHandler.processReplayDeltaResponse(reply));
// request, wrong hash
uint256 hash(1234567);
request->set_ledgerhash(hash.data(), hash.size());
reply = std::make_shared<protocol::TMReplayDeltaResponse>(
server.msgHandler.processReplayDeltaRequest(request));
BEAST_EXPECT(reply->has_error());
BEAST_EXPECT(!server.msgHandler.processReplayDeltaResponse(reply));
}
{
// good request
auto request = std::make_shared<protocol::TMReplayDeltaRequest>();
request->set_ledgerhash(
l->info().hash.data(), l->info().hash.size());
auto reply = std::make_shared<protocol::TMReplayDeltaResponse>(
server.msgHandler.processReplayDeltaRequest(request));
BEAST_EXPECT(!reply->has_error());
BEAST_EXPECT(server.msgHandler.processReplayDeltaResponse(reply));
{
// bad reply
// bad header
std::string r(reply->ledgerheader());
r.back()--;
reply->set_ledgerheader(r);
BEAST_EXPECT(
!server.msgHandler.processReplayDeltaResponse(reply));
r.back()++;
reply->set_ledgerheader(r);
BEAST_EXPECT(
server.msgHandler.processReplayDeltaResponse(reply));
// bad txns
reply->mutable_transaction()->RemoveLast();
BEAST_EXPECT(
!server.msgHandler.processReplayDeltaResponse(reply));
}
}
}
void
testTaskParameter()
{
testcase("TaskParameter");
auto makeSkipList = [](int count) -> std::vector<uint256> const {
std::vector<uint256> sList;
for (int i = 0; i < count; ++i)
sList.emplace_back(i);
return sList;
};
LedgerReplayTask::TaskParameter tp10(
InboundLedger::Reason::GENERIC, uint256(10), 10);
BEAST_EXPECT(!tp10.update(uint256(777), 5, makeSkipList(10)));
BEAST_EXPECT(!tp10.update(uint256(10), 5, makeSkipList(8)));
BEAST_EXPECT(tp10.update(uint256(10), 10, makeSkipList(10)));
// can merge to self
BEAST_EXPECT(tp10.canMergeInto(tp10));
// smaller task
LedgerReplayTask::TaskParameter tp9(
InboundLedger::Reason::GENERIC, uint256(9), 9);
BEAST_EXPECT(tp9.canMergeInto(tp10));
BEAST_EXPECT(!tp10.canMergeInto(tp9));
tp9.totalLedgers_++;
BEAST_EXPECT(!tp9.canMergeInto(tp10));
tp9.totalLedgers_--;
BEAST_EXPECT(tp9.canMergeInto(tp10));
tp9.reason_ = InboundLedger::Reason::CONSENSUS;
BEAST_EXPECT(!tp9.canMergeInto(tp10));
tp9.reason_ = InboundLedger::Reason::GENERIC;
BEAST_EXPECT(tp9.canMergeInto(tp10));
tp9.finishHash_ = uint256(1234);
BEAST_EXPECT(!tp9.canMergeInto(tp10));
tp9.finishHash_ = uint256(9);
BEAST_EXPECT(tp9.canMergeInto(tp10));
// larger task
LedgerReplayTask::TaskParameter tp20(
InboundLedger::Reason::GENERIC, uint256(20), 20);
BEAST_EXPECT(tp20.update(uint256(20), 20, makeSkipList(20)));
BEAST_EXPECT(tp10.canMergeInto(tp20));
BEAST_EXPECT(tp9.canMergeInto(tp20));
BEAST_EXPECT(!tp20.canMergeInto(tp10));
BEAST_EXPECT(!tp20.canMergeInto(tp9));
}
void
testConfig()
{
testcase("config test");
{
Config c;
BEAST_EXPECT(c.LEDGER_REPLAY == false);
}
{
Config c;
std::string toLoad(R"rippleConfig(
[ledger_replay]
1
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.LEDGER_REPLAY == true);
}
{
Config c;
std::string toLoad = (R"rippleConfig(
[ledger_replay]
0
)rippleConfig");
c.loadFromString(toLoad);
BEAST_EXPECT(c.LEDGER_REPLAY == false);
}
}
void
testHandshake()
{
testcase("handshake test");
auto handshake = [&](bool client, bool server, bool expecting) -> bool {
auto request =
ripple::makeRequest(true, false, client, false, false);
http_request_type http_request;
http_request.version(request.version());
http_request.base() = request.base();
bool serverResult =
peerFeatureEnabled(http_request, FEATURE_LEDGER_REPLAY, server);
if (serverResult != expecting)
return false;
beast::IP::Address addr =
boost::asio::ip::make_address("172.1.1.100");
jtx::Env serverEnv(*this);
serverEnv.app().config().LEDGER_REPLAY = server;
auto http_resp = ripple::makeResponse(
true,
http_request,
addr,
addr,
uint256{1},
1,
{1, 0},
serverEnv.app());
auto const clientResult =
peerFeatureEnabled(http_resp, FEATURE_LEDGER_REPLAY, client);
if (clientResult != expecting)
return false;
return true;
};
BEAST_EXPECT(handshake(false, false, false));
BEAST_EXPECT(handshake(false, true, false));
BEAST_EXPECT(handshake(true, false, false));
BEAST_EXPECT(handshake(true, true, true));
}
void
testAllLocal(int totalReplay)
{
testcase("local node has all the ledgers");
auto psBhvr = PeerSetBehavior::DropAll;
auto ilBhvr = InboundLedgersBehavior::DropAll;
auto peerFeature = PeerFeature::None;
NetworkOfTwo net(*this, {totalReplay + 1}, psBhvr, ilBhvr, peerFeature);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
for (int i = 0; i < totalReplay; ++i)
{
BEAST_EXPECT(l);
if (l)
{
net.client.ledgerMaster.storeLedger(l);
l = net.server.ledgerMaster.getLedgerByHash(
l->info().parentHash);
}
else
break;
}
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
std::vector<TaskStatus> deltaStatuses(
totalReplay - 1, TaskStatus::Completed);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses));
// sweep
net.client.replayer.sweep();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
void
testAllInboundLedgers(int totalReplay)
{
testcase("all the ledgers from InboundLedgers");
NetworkOfTwo net(
*this,
{totalReplay + 1},
PeerSetBehavior::DropAll,
InboundLedgersBehavior::Good,
PeerFeature::None);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
std::vector<TaskStatus> deltaStatuses(
totalReplay - 1, TaskStatus::Completed);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses));
// sweep
net.client.replayer.sweep();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
void
testPeerSetBehavior(PeerSetBehavior peerSetBehavior, int totalReplay = 4)
{
switch (peerSetBehavior)
{
case PeerSetBehavior::Good:
testcase("good network");
break;
case PeerSetBehavior::Drop50:
testcase("network drops 50% messages");
break;
case PeerSetBehavior::Repeat:
testcase("network repeats all messages");
break;
default:
return;
}
NetworkOfTwo net(
*this,
{totalReplay + 1},
peerSetBehavior,
InboundLedgersBehavior::DropAll,
PeerFeature::LedgerReplayEnabled);
// feed client with start ledger since InboundLedgers drops all
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
for (int i = 0; i < totalReplay - 1; ++i)
{
l = net.server.ledgerMaster.getLedgerByHash(l->info().parentHash);
}
net.client.ledgerMaster.storeLedger(l);
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
std::vector<TaskStatus> deltaStatuses(
totalReplay - 1, TaskStatus::Completed);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses));
BEAST_EXPECT(net.client.waitForLedgers(finalHash, totalReplay));
// sweep
net.client.replayer.sweep();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
void
testStop()
{
testcase("stop before timeout");
int totalReplay = 3;
NetworkOfTwo net(
*this,
{totalReplay + 1},
PeerSetBehavior::DropAll,
InboundLedgersBehavior::Good,
PeerFeature::LedgerReplayEnabled);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
std::vector<TaskStatus> deltaStatuses;
BEAST_EXPECT(net.client.checkStatus(
finalHash,
totalReplay,
TaskStatus::NotDone,
TaskStatus::NotDone,
deltaStatuses));
BEAST_EXPECT(net.client.countsAsExpected(1, 1, 0));
net.client.replayer.stop();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
void
testSkipListBadReply()
{
testcase("SkipListAcquire bad reply");
int totalReplay = 3;
NetworkOfTwo net(
*this,
{totalReplay + 1 + 1},
PeerSetBehavior::DropAll,
InboundLedgersBehavior::DropAll,
PeerFeature::LedgerReplayEnabled);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
auto skipList = net.client.findSkipListAcquire(finalHash);
std::uint8_t payload[55] = {
0x6A, 0x09, 0xE6, 0x67, 0xF3, 0xBC, 0xC9, 0x08, 0xB2};
auto item =
make_shamapitem(uint256(12345), Slice(payload, sizeof(payload)));
skipList->processData(l->seq(), item);
std::vector<TaskStatus> deltaStatuses;
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Failed,
TaskStatus::Failed,
deltaStatuses));
// add another task
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay + 1);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Failed,
TaskStatus::Failed,
deltaStatuses));
BEAST_EXPECT(net.client.countsAsExpected(2, 1, 0));
}
void
testLedgerDeltaBadReply()
{
testcase("LedgerDeltaAcquire bad reply");
int totalReplay = 3;
NetworkOfTwo net(
*this,
{totalReplay + 1},
PeerSetBehavior::DropLedgerDeltaReply,
InboundLedgersBehavior::DropAll,
PeerFeature::LedgerReplayEnabled);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
net.client.ledgerMaster.storeLedger(l);
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
auto delta = net.client.findLedgerDeltaAcquire(l->info().parentHash);
delta->processData(
l->info(), // wrong ledger info
std::map<std::uint32_t, std::shared_ptr<STTx const>>());
BEAST_EXPECT(net.client.taskStatus(delta) == TaskStatus::Failed);
BEAST_EXPECT(
net.client.taskStatus(net.client.findTask(
finalHash, totalReplay)) == TaskStatus::Failed);
// add another task
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay + 1);
BEAST_EXPECT(
net.client.taskStatus(net.client.findTask(
finalHash, totalReplay + 1)) == TaskStatus::Failed);
}
void
testLedgerReplayOverlap()
{
testcase("Overlap tasks");
int totalReplay = 5;
NetworkOfTwo net(
*this,
{totalReplay * 3 + 1},
PeerSetBehavior::Good,
InboundLedgersBehavior::Good,
PeerFeature::LedgerReplayEnabled);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
std::vector<TaskStatus> deltaStatuses(
totalReplay - 1, TaskStatus::Completed);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses));
BEAST_EXPECT(net.client.waitForLedgers(finalHash, totalReplay));
// same range, same reason
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
BEAST_EXPECT(net.client.countsAsExpected(1, 1, totalReplay - 1));
// same range, different reason
net.client.replayer.replay(
InboundLedger::Reason::CONSENSUS, finalHash, totalReplay);
BEAST_EXPECT(net.client.countsAsExpected(2, 1, totalReplay - 1));
// no overlap
for (int i = 0; i < totalReplay + 2; ++i)
{
l = net.server.ledgerMaster.getLedgerByHash(l->info().parentHash);
}
auto finalHash_early = l->info().hash;
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash_early, totalReplay);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash_early,
totalReplay,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses)); // deltaStatuses no change
BEAST_EXPECT(net.client.waitForLedgers(finalHash_early, totalReplay));
BEAST_EXPECT(net.client.countsAsExpected(3, 2, 2 * (totalReplay - 1)));
// partial overlap
l = net.server.ledgerMaster.getLedgerByHash(l->info().parentHash);
auto finalHash_moreEarly = l->info().parentHash;
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash_moreEarly, totalReplay);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash_moreEarly,
totalReplay,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses)); // deltaStatuses no change
BEAST_EXPECT(
net.client.waitForLedgers(finalHash_moreEarly, totalReplay));
BEAST_EXPECT(
net.client.countsAsExpected(4, 3, 2 * (totalReplay - 1) + 2));
// cover
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay * 3);
deltaStatuses =
std::vector<TaskStatus>(totalReplay * 3 - 1, TaskStatus::Completed);
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay * 3,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses)); // deltaStatuses changed
BEAST_EXPECT(net.client.waitForLedgers(finalHash, totalReplay * 3));
BEAST_EXPECT(net.client.countsAsExpected(5, 3, totalReplay * 3 - 1));
// sweep
net.client.replayer.sweep();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
void
run() override
{
testProofPath();
testReplayDelta();
testTaskParameter();
testConfig();
testHandshake();
testAllLocal(1);
testAllLocal(3);
testAllInboundLedgers(1);
testAllInboundLedgers(4);
testPeerSetBehavior(PeerSetBehavior::Good, 1);
testPeerSetBehavior(PeerSetBehavior::Good);
testPeerSetBehavior(PeerSetBehavior::Drop50);
testPeerSetBehavior(PeerSetBehavior::Repeat);
testStop();
testSkipListBadReply();
testLedgerDeltaBadReply();
testLedgerReplayOverlap();
}
};
struct LedgerReplayerTimeout_test : public beast::unit_test::suite
{
void
testSkipListTimeout()
{
testcase("SkipListAcquire timeout");
int totalReplay = 3;
NetworkOfTwo net(
*this,
{totalReplay + 1},
PeerSetBehavior::DropAll,
InboundLedgersBehavior::Good,
PeerFeature::LedgerReplayEnabled);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
std::vector<TaskStatus> deltaStatuses;
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Failed,
TaskStatus::Failed,
deltaStatuses));
// sweep
BEAST_EXPECT(net.client.countsAsExpected(1, 1, 0));
net.client.replayer.sweep();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
void
testLedgerDeltaTimeout()
{
testcase("LedgerDeltaAcquire timeout");
int totalReplay = 3;
NetworkOfTwo net(
*this,
{totalReplay + 1},
PeerSetBehavior::DropAll,
InboundLedgersBehavior::Good,
PeerFeature::LedgerReplayEnabled);
auto l = net.server.ledgerMaster.getClosedLedger();
uint256 finalHash = l->info().hash;
net.client.ledgerMaster.storeLedger(l);
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finalHash, totalReplay);
std::vector<TaskStatus> deltaStatuses(
totalReplay - 1, TaskStatus::Failed);
deltaStatuses.back() = TaskStatus::Completed; // in client ledgerMaster
BEAST_EXPECT(net.client.waitAndCheckStatus(
finalHash,
totalReplay,
TaskStatus::Failed,
TaskStatus::Completed,
deltaStatuses));
// sweep
BEAST_EXPECT(net.client.countsAsExpected(1, 1, totalReplay - 1));
net.client.replayer.sweep();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
void
run() override
{
testSkipListTimeout();
testLedgerDeltaTimeout();
}
};
struct LedgerReplayerLong_test : public beast::unit_test::suite
{
void
run() override
{
testcase("Acquire 1000 ledgers");
int totalReplay = 250;
int rounds = 4;
NetworkOfTwo net(
*this,
{totalReplay * rounds + 1},
PeerSetBehavior::Good,
InboundLedgersBehavior::Good,
PeerFeature::LedgerReplayEnabled);
std::vector<uint256> finishHashes;
auto l = net.server.ledgerMaster.getClosedLedger();
for (int i = 0; i < rounds; ++i)
{
finishHashes.push_back(l->info().hash);
for (int j = 0; j < totalReplay; ++j)
{
l = net.server.ledgerMaster.getLedgerByHash(
l->info().parentHash);
}
}
BEAST_EXPECT(finishHashes.size() == rounds);
for (int i = 0; i < rounds; ++i)
{
net.client.replayer.replay(
InboundLedger::Reason::GENERIC, finishHashes[i], totalReplay);
}
std::vector<TaskStatus> deltaStatuses(
totalReplay - 1, TaskStatus::Completed);
for (int i = 0; i < rounds; ++i)
{
BEAST_EXPECT(net.client.waitAndCheckStatus(
finishHashes[i],
totalReplay,
TaskStatus::Completed,
TaskStatus::Completed,
deltaStatuses));
}
BEAST_EXPECT(
net.client.waitForLedgers(finishHashes[0], totalReplay * rounds));
BEAST_EXPECT(net.client.countsAsExpected(
rounds, rounds, rounds * (totalReplay - 1)));
// sweep
net.client.replayer.sweep();
BEAST_EXPECT(net.client.countsAsExpected(0, 0, 0));
}
};
BEAST_DEFINE_TESTSUITE(LedgerReplay, app, ripple);
BEAST_DEFINE_TESTSUITE_PRIO(LedgerReplayer, app, ripple, 1);
BEAST_DEFINE_TESTSUITE(LedgerReplayerTimeout, app, ripple);
BEAST_DEFINE_TESTSUITE_MANUAL(LedgerReplayerLong, app, ripple);
} // namespace test
} // namespace ripple