Remove shards (#5066)

This commit is contained in:
John Freeman
2024-08-02 19:03:05 -05:00
committed by tequ
parent 16b4550d93
commit d27bc94249
111 changed files with 128 additions and 17228 deletions

View File

@@ -1,315 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright 2019 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
#include <test/jtx/TrustedPublisherServer.h>
#include <test/unit_test/FileDirGuard.h>
#include <xrpld/net/DatabaseDownloader.h>
#include <boost/filesystem/operations.hpp>
#include <condition_variable>
#include <mutex>
namespace ripple {
namespace test {
#define REPORT_FAILURE(D) reportFailure(D, __FILE__, __LINE__)
class DatabaseDownloader_test : public beast::unit_test::suite
{
std::shared_ptr<TrustedPublisherServer>
createServer(jtx::Env& env, bool ssl = true)
{
std::vector<TrustedPublisherServer::Validator> list;
list.push_back(TrustedPublisherServer::randomValidator());
return make_TrustedPublisherServer(
env.app().getIOService(),
list,
env.timeKeeper().now() + std::chrono::seconds{3600},
// No future VLs
{},
ssl);
}
struct DownloadCompleter
{
std::mutex m;
std::condition_variable cv;
bool called = false;
boost::filesystem::path dest;
void
operator()(boost::filesystem::path dst)
{
std::unique_lock<std::mutex> lk(m);
called = true;
dest = std::move(dst);
cv.notify_one();
};
bool
waitComplete()
{
std::unique_lock<std::mutex> lk(m);
auto stat = cv.wait_for(
lk, std::chrono::seconds(10), [this] { return called; });
called = false;
return stat;
};
};
DownloadCompleter cb;
struct Downloader
{
test::StreamSink sink_;
beast::Journal journal_;
std::shared_ptr<DatabaseDownloader> ptr_;
Downloader(jtx::Env& env)
: journal_{sink_}
, ptr_{make_DatabaseDownloader(
env.app().getIOService(),
env.app().config(),
journal_)}
{
}
~Downloader()
{
ptr_->stop();
}
DatabaseDownloader*
operator->()
{
return ptr_.get();
}
DatabaseDownloader const*
operator->() const
{
return ptr_.get();
}
};
void
reportFailure(Downloader const& dl, char const* file, int line)
{
std::stringstream ss;
ss << "Failed. LOGS:\n"
<< dl.sink_.messages().str()
<< "\nDownloadCompleter failure."
"\nDatabaseDownloader session active? "
<< std::boolalpha << dl->sessionIsActive()
<< "\nDatabaseDownloader is stopping? " << std::boolalpha
<< dl->isStopping();
fail(ss.str(), file, line);
}
void
testDownload(bool verify)
{
testcase << std::string("Basic download - SSL ") +
(verify ? "Verify" : "No Verify");
using namespace jtx;
ripple::test::detail::FileDirGuard cert{
*this, "_cert", "ca.pem", TrustedPublisherServer::ca_cert()};
Env env{*this, envconfig([&cert, &verify](std::unique_ptr<Config> cfg) {
if ((cfg->SSL_VERIFY = verify)) // yes, this is assignment
cfg->SSL_VERIFY_FILE = cert.file().string();
return cfg;
})};
Downloader dl{env};
// create a TrustedPublisherServer as a simple HTTP
// server to request from. Use the /textfile endpoint
// to get a simple text file sent as response.
auto server = createServer(env);
log << "Downloading DB from " << server->local_endpoint() << std::endl;
ripple::test::detail::FileDirGuard const data{
*this, "downloads", "data", "", false, false};
// initiate the download and wait for the callback
// to be invoked
auto stat = dl->download(
server->local_endpoint().address().to_string(),
std::to_string(server->local_endpoint().port()),
"/textfile",
11,
data.file(),
std::function<void(boost::filesystem::path)>{std::ref(cb)});
if (!BEAST_EXPECT(stat))
{
REPORT_FAILURE(dl);
return;
}
if (!BEAST_EXPECT(cb.waitComplete()))
{
REPORT_FAILURE(dl);
return;
}
BEAST_EXPECT(cb.dest == data.file());
if (!BEAST_EXPECT(boost::filesystem::exists(data.file())))
return;
BEAST_EXPECT(boost::filesystem::file_size(data.file()) > 0);
}
void
testFailures()
{
testcase("Error conditions");
using namespace jtx;
Env env{*this};
{
// bad hostname
boost::system::error_code ec;
boost::asio::ip::tcp::resolver resolver{env.app().getIOService()};
auto const results = resolver.resolve("badhostname", "443", ec);
// we require an error in resolving this name in order
// for this test to pass. Some networks might have DNS hijacking
// that prevent NXDOMAIN, in which case the failure is not
// possible, so we skip the test.
if (ec)
{
Downloader dl{env};
ripple::test::detail::FileDirGuard const datafile{
*this, "downloads", "data", "", false, false};
BEAST_EXPECT(dl->download(
"badhostname",
"443",
"",
11,
datafile.file(),
std::function<void(boost::filesystem::path)>{
std::ref(cb)}));
if (!BEAST_EXPECT(cb.waitComplete()))
{
REPORT_FAILURE(dl);
}
BEAST_EXPECT(!boost::filesystem::exists(datafile.file()));
BEAST_EXPECTS(
dl.sink_.messages().str().find("async_resolve") !=
std::string::npos,
dl.sink_.messages().str());
}
}
{
// can't connect
Downloader dl{env};
ripple::test::detail::FileDirGuard const datafile{
*this, "downloads", "data", "", false, false};
auto server = createServer(env);
auto host = server->local_endpoint().address().to_string();
auto port = std::to_string(server->local_endpoint().port());
log << "Downloading DB from " << server->local_endpoint()
<< std::endl;
server->stop();
BEAST_EXPECT(dl->download(
host,
port,
"",
11,
datafile.file(),
std::function<void(boost::filesystem::path)>{std::ref(cb)}));
if (!BEAST_EXPECT(cb.waitComplete()))
{
REPORT_FAILURE(dl);
}
BEAST_EXPECT(!boost::filesystem::exists(datafile.file()));
BEAST_EXPECTS(
dl.sink_.messages().str().find("async_connect") !=
std::string::npos,
dl.sink_.messages().str());
}
{
// not ssl (failed handlshake)
Downloader dl{env};
ripple::test::detail::FileDirGuard const datafile{
*this, "downloads", "data", "", false, false};
auto server = createServer(env, false);
log << "Downloading DB from " << server->local_endpoint()
<< std::endl;
BEAST_EXPECT(dl->download(
server->local_endpoint().address().to_string(),
std::to_string(server->local_endpoint().port()),
"",
11,
datafile.file(),
std::function<void(boost::filesystem::path)>{std::ref(cb)}));
if (!BEAST_EXPECT(cb.waitComplete()))
{
REPORT_FAILURE(dl);
}
BEAST_EXPECT(!boost::filesystem::exists(datafile.file()));
BEAST_EXPECTS(
dl.sink_.messages().str().find("async_handshake") !=
std::string::npos,
dl.sink_.messages().str());
}
{
// huge file (content length)
Downloader dl{env};
ripple::test::detail::FileDirGuard const datafile{
*this, "downloads", "data", "", false, false};
auto server = createServer(env);
log << "Downloading DB from " << server->local_endpoint()
<< std::endl;
BEAST_EXPECT(dl->download(
server->local_endpoint().address().to_string(),
std::to_string(server->local_endpoint().port()),
"/textfile/huge",
11,
datafile.file(),
std::function<void(boost::filesystem::path)>{std::ref(cb)}));
if (!BEAST_EXPECT(cb.waitComplete()))
{
REPORT_FAILURE(dl);
}
BEAST_EXPECT(!boost::filesystem::exists(datafile.file()));
BEAST_EXPECTS(
dl.sink_.messages().str().find("Insufficient disk space") !=
std::string::npos,
dl.sink_.messages().str());
}
}
public:
void
run() override
{
testDownload(true);
testDownload(false);
testFailures();
}
};
#undef REPORT_FAILURE
BEAST_DEFINE_TESTSUITE(DatabaseDownloader, net, ripple);
} // namespace test
} // namespace ripple

File diff suppressed because it is too large Load Diff

View File

@@ -616,37 +616,6 @@ public:
std::strcmp(e.what(), "earliest_seq set more than once") ==
0);
}
// Verify default ledgers per shard
{
std::unique_ptr<Database> db =
Manager::instance().make_Database(
megabytes(4), scheduler, 2, nodeParams, journal_);
BEAST_EXPECT(
db->ledgersPerShard() == DEFAULT_LEDGERS_PER_SHARD);
}
// Set an invalid ledgers per shard
try
{
nodeParams.set("ledgers_per_shard", "100");
std::unique_ptr<Database> db =
Manager::instance().make_Database(
megabytes(4), scheduler, 2, nodeParams, journal_);
}
catch (std::runtime_error const& e)
{
BEAST_EXPECT(
std::strcmp(e.what(), "Invalid ledgers_per_shard") == 0);
}
// Set a valid ledgers per shard
nodeParams.set("ledgers_per_shard", "256");
std::unique_ptr<Database> db = Manager::instance().make_Database(
megabytes(4), scheduler, 2, nodeParams, journal_);
// Verify database uses the ledgers per shard
BEAST_EXPECT(db->ledgersPerShard() == 256);
}
}

View File

@@ -1,420 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx/Env.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpld/nodestore/DatabaseShard.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/beast/utility/temp_dir.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/jss.h>
namespace ripple {
namespace test {
class NodeToShardRPC_test : public beast::unit_test::suite
{
bool
importCompleted(
NodeStore::DatabaseShard* shardStore,
std::uint8_t const numberOfShards,
Json::Value const& result)
{
auto const info = shardStore->getShardInfo();
// Assume completed if the import isn't running
auto const completed =
result[jss::error_message] == "Database import not running";
if (completed)
{
BEAST_EXPECT(
info->incomplete().size() + info->finalized().size() ==
numberOfShards);
}
return completed;
}
public:
void
testDisabled()
{
testcase("Disabled");
beast::temp_dir tempDir;
jtx::Env env = [&] {
auto c = jtx::envconfig();
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
std::uint8_t const numberOfShards = 10;
// Create some ledgers so that we can initiate a
// shard store database import.
for (int i = 0; i < 256 * (numberOfShards + 1); ++i)
{
env.close();
}
{
auto shardStore = env.app().getShardStore();
if (!BEAST_EXPECT(!shardStore))
return;
}
{
// Try the node_to_shard status RPC command. Should fail.
Json::Value jvParams;
jvParams[jss::action] = "status";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(result[jss::error_code] == rpcNOT_ENABLED);
}
{
// Try to start a shard store import via the RPC
// interface. Should fail.
Json::Value jvParams;
jvParams[jss::action] = "start";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(result[jss::error_code] == rpcNOT_ENABLED);
}
{
// Try the node_to_shard status RPC command. Should fail.
Json::Value jvParams;
jvParams[jss::action] = "status";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(result[jss::error_code] == rpcNOT_ENABLED);
}
}
void
testStart()
{
testcase("Start");
beast::temp_dir tempDir;
jtx::Env env = [&] {
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
std::uint8_t const numberOfShards = 10;
// Create some ledgers so that we can initiate a
// shard store database import.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
(numberOfShards + 1);
++i)
{
env.close();
}
auto shardStore = env.app().getShardStore();
if (!BEAST_EXPECT(shardStore))
return;
{
// Initiate a shard store import via the RPC
// interface.
Json::Value jvParams;
jvParams[jss::action] = "start";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(
result[jss::message] == "Database import initiated...");
}
while (!shardStore->getDatabaseImportSequence())
{
// Wait until the import starts
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
{
// Verify that the import is in progress with
// the node_to_shard status RPC command
Json::Value jvParams;
jvParams[jss::action] = "status";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(
result[jss::status] == "success" ||
importCompleted(shardStore, numberOfShards, result));
std::chrono::seconds const maxWait{180};
{
auto const start = std::chrono::system_clock::now();
while (true)
{
// Verify that the status object accurately
// reflects import progress.
auto const completeShards =
shardStore->getShardInfo()->finalized();
if (!completeShards.empty())
{
auto const result = env.rpc(
"json",
"node_to_shard",
to_string(jvParams))[jss::result];
if (!importCompleted(
shardStore, numberOfShards, result))
{
BEAST_EXPECT(result[jss::firstShardIndex] == 1);
BEAST_EXPECT(result[jss::lastShardIndex] == 10);
}
}
if (boost::icl::contains(completeShards, 1))
{
auto const result = env.rpc(
"json",
"node_to_shard",
to_string(jvParams))[jss::result];
BEAST_EXPECT(
result[jss::currentShardIndex] >= 1 ||
importCompleted(
shardStore, numberOfShards, result));
break;
}
if (std::this_thread::sleep_for(
std::chrono::milliseconds{100});
std::chrono::system_clock::now() - start > maxWait)
{
BEAST_EXPECTS(
false,
"Import timeout: could just be a slow machine.");
break;
}
}
}
{
// Wait for the import to complete
auto const start = std::chrono::system_clock::now();
while (!boost::icl::contains(
shardStore->getShardInfo()->finalized(), 10))
{
if (std::this_thread::sleep_for(
std::chrono::milliseconds{100});
std::chrono::system_clock::now() - start > maxWait)
{
BEAST_EXPECT(importCompleted(
shardStore, numberOfShards, result));
break;
}
}
}
}
}
void
testStop()
{
testcase("Stop");
beast::temp_dir tempDir;
jtx::Env env = [&] {
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(
*this, std::move(c), nullptr, beast::severities::kDisabled);
}();
std::uint8_t const numberOfShards = 10;
// Create some ledgers so that we can initiate a
// shard store database import.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
(numberOfShards + 1);
++i)
{
env.close();
}
auto shardStore = env.app().getShardStore();
if (!BEAST_EXPECT(shardStore))
return;
{
// Initiate a shard store import via the RPC
// interface.
Json::Value jvParams;
jvParams[jss::action] = "start";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(
result[jss::message] == "Database import initiated...");
}
{
// Verify that the import is in progress with
// the node_to_shard status RPC command
Json::Value jvParams;
jvParams[jss::action] = "status";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(
result[jss::status] == "success" ||
importCompleted(shardStore, numberOfShards, result));
std::chrono::seconds const maxWait{30};
auto const start = std::chrono::system_clock::now();
while (shardStore->getShardInfo()->finalized().empty())
{
// Wait for at least one shard to complete
if (std::this_thread::sleep_for(std::chrono::milliseconds{100});
std::chrono::system_clock::now() - start > maxWait)
{
BEAST_EXPECTS(
false, "Import timeout: could just be a slow machine.");
break;
}
}
}
{
Json::Value jvParams;
jvParams[jss::action] = "stop";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
BEAST_EXPECT(
result[jss::message] == "Database import halt initiated..." ||
importCompleted(shardStore, numberOfShards, result));
}
std::chrono::seconds const maxWait{30};
auto const start = std::chrono::system_clock::now();
while (true)
{
// Wait until we can verify that the import has
// stopped
Json::Value jvParams;
jvParams[jss::action] = "status";
auto const result = env.rpc(
"json", "node_to_shard", to_string(jvParams))[jss::result];
// When the import has stopped, polling the
// status returns an error
if (result.isMember(jss::error))
{
if (BEAST_EXPECT(result.isMember(jss::error_message)))
{
BEAST_EXPECT(
result[jss::error_message] ==
"Database import not running");
}
break;
}
if (std::this_thread::sleep_for(std::chrono::milliseconds{100});
std::chrono::system_clock::now() - start > maxWait)
{
BEAST_EXPECTS(
false, "Import timeout: could just be a slow machine.");
break;
}
}
}
void
run() override
{
testDisabled();
testStart();
testStop();
}
};
BEAST_DEFINE_TESTSUITE(NodeToShardRPC, rpc, ripple);
} // namespace test
} // namespace ripple

View File

@@ -2591,231 +2591,6 @@ static RPCCallTestData const rpcCallTestArray[] = {
]
})"},
// download_shard
// --------------------------------------------------------------
{"download_shard: minimal.",
__LINE__,
{
"download_shard",
"20",
"url_NotValidated",
},
RPCCallTestData::no_exception,
R"({
"method" : "download_shard",
"params" : [
{
"api_version" : %API_VER%,
"shards" : [
{
"index" : 20,
"url" : "url_NotValidated"
}
]
}
]
})"},
{"download_shard:",
__LINE__,
{
"download_shard",
"20",
"url_NotValidated",
},
RPCCallTestData::no_exception,
R"({
"method" : "download_shard",
"params" : [
{
"api_version" : %API_VER%,
"shards" : [
{
"index" : 20,
"url" : "url_NotValidated"
}
]
}
]
})"},
{"download_shard: many shards.",
__LINE__,
{
"download_shard",
"200000000",
"url_NotValidated0",
"199999999",
"url_NotValidated1",
"199999998",
"url_NotValidated2",
"199999997",
"url_NotValidated3",
},
RPCCallTestData::no_exception,
R"({
"method" : "download_shard",
"params" : [
{
"api_version" : %API_VER%,
"shards" : [
{
"index" : 200000000,
"url" : "url_NotValidated0"
},
{
"index" : 199999999,
"url" : "url_NotValidated1"
},
{
"index" : 199999998,
"url" : "url_NotValidated2"
},
{
"index" : 199999997,
"url" : "url_NotValidated3"
}
]
}
]
})"},
{"download_shard: many shards.",
__LINE__,
{
"download_shard",
"2000000",
"url_NotValidated0",
"2000001",
"url_NotValidated1",
"2000002",
"url_NotValidated2",
"2000003",
"url_NotValidated3",
"2000004",
"url_NotValidated4",
},
RPCCallTestData::no_exception,
R"({
"method" : "download_shard",
"params" : [
{
"api_version" : %API_VER%,
"shards" : [
{
"index" : 2000000,
"url" : "url_NotValidated0"
},
{
"index" : 2000001,
"url" : "url_NotValidated1"
},
{
"index" : 2000002,
"url" : "url_NotValidated2"
},
{
"index" : 2000003,
"url" : "url_NotValidated3"
},
{
"index" : 2000004,
"url" : "url_NotValidated4"
}
]
}
]
})"},
{"download_shard: too few arguments.",
__LINE__,
{"download_shard", "20"},
RPCCallTestData::no_exception,
R"({
"method" : "download_shard",
"params" : [
{
"error" : "badSyntax",
"error_code" : 1,
"error_message" : "Syntax error."
}
]
})"},
{// Note: this should return an error but not throw.
"download_shard: novalidate too few arguments.",
__LINE__,
{"download_shard", "novalidate", "20"},
RPCCallTestData::bad_cast,
R"()"},
{"download_shard: novalidate at end.",
__LINE__,
{
"download_shard",
"20",
"url_NotValidated",
"novalidate",
},
RPCCallTestData::no_exception,
R"({
"method" : "download_shard",
"params" : [
{
"api_version" : %API_VER%,
"shards" : [
{
"index" : 20,
"url" : "url_NotValidated"
}
]
}
]
})"},
{"download_shard: novalidate in middle.",
__LINE__,
{
"download_shard",
"20",
"url_NotValidated20",
"novalidate",
"200",
"url_NotValidated200",
},
RPCCallTestData::no_exception,
R"({
"method" : "download_shard",
"params" : [
{
"error" : "invalidParams",
"error_code" : 31,
"error_message" : "Invalid parameters."
}
]
})"},
{// Note: this should return an error but not throw.
"download_shard: arguments swapped.",
__LINE__,
{
"download_shard",
"url_NotValidated",
"20",
},
RPCCallTestData::bad_cast,
R"()"},
{"download_shard: index too small.",
__LINE__,
{
"download_shard",
"-1",
"url_NotValidated",
},
RPCCallTestData::bad_cast,
R"()"},
{"download_shard: index too big.",
__LINE__,
{
"download_shard",
"4294967296",
"url_NotValidated",
},
RPCCallTestData::bad_cast,
R"()"},
// feature
// ---------------------------------------------------------------------
{"feature: minimal.",
@@ -4278,75 +4053,6 @@ static RPCCallTestData const rpcCallTestArray[] = {
]
})"},
// node_to_shard
// -------------------------------------------------------------------
{"node_to_shard: status.",
__LINE__,
{"node_to_shard", "status"},
RPCCallTestData::no_exception,
R"({
"method" : "node_to_shard",
"params" : [
{
"api_version" : %API_VER%,
"action" : "status"
}
]
})"},
{"node_to_shard: start.",
__LINE__,
{"node_to_shard", "start"},
RPCCallTestData::no_exception,
R"({
"method" : "node_to_shard",
"params" : [
{
"api_version" : %API_VER%,
"action" : "start"
}
]
})"},
{"node_to_shard: stop.",
__LINE__,
{"node_to_shard", "stop"},
RPCCallTestData::no_exception,
R"({
"method" : "node_to_shard",
"params" : [
{
"api_version" : %API_VER%,
"action" : "stop"
}
]
})"},
{"node_to_shard: too many arguments.",
__LINE__,
{"node_to_shard", "start", "stop"},
RPCCallTestData::no_exception,
R"({
"method" : "node_to_shard",
"params" : [
{
"error" : "badSyntax",
"error_code" : 1,
"error_message" : "Syntax error."
}
]
})"},
{"node_to_shard: invalid argument.",
__LINE__,
{"node_to_shard", "invalid"},
RPCCallTestData::no_exception,
R"({
"method" : "node_to_shard",
"params" : [
{
"api_version" : %API_VER%,
"action" : "invalid"
}
]
})"},
// owner_info
// ------------------------------------------------------------------
{"owner_info: minimal.",

View File

@@ -1,705 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx/CaptureLogs.h>
#include <test/jtx/Env.h>
#include <test/jtx/TrustedPublisherServer.h>
#include <test/jtx/envconfig.h>
#include <test/nodestore/TestBase.h>
#include <xrpld/app/rdb/ShardArchive.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpld/nodestore/DummyScheduler.h>
#include <xrpld/nodestore/Manager.h>
#include <xrpld/nodestore/detail/DecodedBlob.h>
#include <xrpld/rpc/ShardArchiveHandler.h>
#include <xrpl/beast/utility/temp_dir.h>
#include <xrpl/protocol/jss.h>
namespace ripple {
namespace test {
class ShardArchiveHandler_test : public beast::unit_test::suite
{
using Downloads = std::vector<std::pair<std::uint32_t, std::string>>;
std::shared_ptr<TrustedPublisherServer>
createServer(jtx::Env& env, bool ssl = true)
{
std::vector<TrustedPublisherServer::Validator> list;
list.push_back(TrustedPublisherServer::randomValidator());
return make_TrustedPublisherServer(
env.app().getIOService(),
list,
env.timeKeeper().now() + std::chrono::seconds{3600},
// No future VLs
{},
ssl);
}
public:
// Test the shard downloading module by queueing
// a download and verifying the contents of the
// state database.
void
testSingleDownloadAndStateDB()
{
testcase("testSingleDownloadAndStateDB");
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);
BEAST_EXPECT(dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
std::string const rawUrl = "https://foo:443/1.tar.lz4";
parsedURL url;
parseUrl(url, rawUrl);
handler->add(1, {url, rawUrl});
{
std::lock_guard<std::mutex> lock(handler->m_);
std::uint64_t rowCount = 0;
readArchiveDB(
*handler->sqlDB_, [&](std::string const& url, int state) {
BEAST_EXPECT(state == 1);
BEAST_EXPECT(url == rawUrl);
++rowCount;
});
BEAST_EXPECT(rowCount == 1);
}
handler->release();
}
// Test the shard downloading module by queueing
// three downloads and verifying the contents of
// the state database.
void
testDownloadsAndStateDB()
{
testcase("testDownloadsAndStateDB");
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);
BEAST_EXPECT(dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
Downloads const dl = {
{1, "https://foo:443/1.tar.lz4"},
{2, "https://foo:443/2.tar.lz4"},
{3, "https://foo:443/3.tar.lz4"}};
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
{
std::lock_guard<std::mutex> lock(handler->m_);
std::uint64_t pos = 0;
readArchiveDB(
*handler->sqlDB_, [&](std::string const& url, int state) {
BEAST_EXPECT(state == dl[pos].first);
BEAST_EXPECT(url == dl[pos].second);
++pos;
});
BEAST_EXPECT(pos == dl.size());
}
handler->release();
}
// Test the shard downloading module by initiating
// and completing ten downloads and verifying the
// contents of the filesystem and the handler's
// archives.
void
testDownloadsAndFileSystem()
{
testcase("testDownloadsAndFileSystem");
beast::temp_dir tempDir;
auto c = jtx::envconfig();
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
jtx::Env env(
*this, std::move(c), nullptr, beast::severities::kDisabled);
std::uint8_t const numberOfDownloads = 10;
// Create some ledgers so that the ShardArchiveHandler
// can verify the last ledger hash for the shard
// downloads.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
(numberOfDownloads + 1);
++i)
{
env.close();
}
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);
BEAST_EXPECT(dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
auto server = createServer(env);
auto host = server->local_endpoint().address().to_string();
auto port = std::to_string(server->local_endpoint().port());
server->stop();
Downloads const dl = [count = numberOfDownloads, &host, &port] {
Downloads ret;
for (int i = 1; i <= count; ++i)
{
ret.push_back(
{i,
(boost::format("https://%s:%d/%d.tar.lz4") % host % port %
i)
.str()});
}
return ret;
}();
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
BEAST_EXPECT(handler->start());
auto stateDir =
RPC::ShardArchiveHandler::getDownloadDirectory(env.app().config());
std::unique_lock<std::mutex> lock(handler->m_);
BEAST_EXPECT(
boost::filesystem::exists(stateDir) || handler->archives_.empty());
using namespace std::chrono_literals;
auto waitMax = 60s;
while (!handler->archives_.empty())
{
lock.unlock();
std::this_thread::sleep_for(1s);
if (waitMax -= 1s; waitMax <= 0s)
{
BEAST_EXPECT(false);
break;
}
lock.lock();
}
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
}
// Test the shard downloading module by initiating
// and completing ten downloads and verifying the
// contents of the filesystem and the handler's
// archives. Then restart the application and ensure
// that the handler is created and started automatically.
void
testDownloadsAndRestart()
{
testcase("testDownloadsAndRestart");
beast::temp_dir tempDir;
{
auto c = jtx::envconfig();
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
jtx::Env env(
*this, std::move(c), nullptr, beast::severities::kDisabled);
std::uint8_t const numberOfDownloads = 10;
// Create some ledgers so that the ShardArchiveHandler
// can verify the last ledger hash for the shard
// downloads.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
(numberOfDownloads + 1);
++i)
{
env.close();
}
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);
BEAST_EXPECT(
dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
auto server = createServer(env);
auto host = server->local_endpoint().address().to_string();
auto port = std::to_string(server->local_endpoint().port());
server->stop();
Downloads const dl = [count = numberOfDownloads, &host, &port] {
Downloads ret;
for (int i = 1; i <= count; ++i)
{
ret.push_back(
{i,
(boost::format("https://%s:%d/%d.tar.lz4") % host %
port % i)
.str()});
}
return ret;
}();
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
env.app().config());
boost::filesystem::copy_file(
stateDir / stateDBName,
boost::filesystem::path(tempDir.path()) / stateDBName);
BEAST_EXPECT(handler->start());
std::unique_lock<std::mutex> lock(handler->m_);
BEAST_EXPECT(
boost::filesystem::exists(stateDir) ||
handler->archives_.empty());
using namespace std::chrono_literals;
auto waitMax = 60s;
while (!handler->archives_.empty())
{
lock.unlock();
std::this_thread::sleep_for(1s);
if (waitMax -= 1s; waitMax <= 0s)
{
BEAST_EXPECT(false);
break;
}
lock.lock();
}
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
boost::filesystem::create_directory(stateDir);
boost::filesystem::copy_file(
boost::filesystem::path(tempDir.path()) / stateDBName,
stateDir / stateDBName);
}
auto c = jtx::envconfig();
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("shard_verification_retry_interval", "1");
section.set("shard_verification_max_attempts", "10000");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
jtx::Env env(
*this, std::move(c), nullptr, beast::severities::kDisabled);
std::uint8_t const numberOfDownloads = 10;
// Create some ledgers so that the ShardArchiveHandler
// can verify the last ledger hash for the shard
// downloads.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
(numberOfDownloads + 1);
++i)
{
env.close();
}
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(dynamic_cast<RPC::RecoveryHandler*>(handler) != nullptr);
auto stateDir =
RPC::ShardArchiveHandler::getDownloadDirectory(env.app().config());
std::unique_lock<std::mutex> lock(handler->m_);
BEAST_EXPECT(
boost::filesystem::exists(stateDir) || handler->archives_.empty());
using namespace std::chrono_literals;
auto waitMax = 60s;
while (!handler->archives_.empty())
{
lock.unlock();
std::this_thread::sleep_for(1s);
if (waitMax -= 1s; waitMax <= 0s)
{
BEAST_EXPECT(false);
break;
}
lock.lock();
}
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
}
// Ensure that downloads fail when the shard
// database cannot store any more shards
void
testShardCountFailure()
{
testcase("testShardCountFailure");
std::string capturedLogs;
{
beast::temp_dir tempDir;
auto c = jtx::envconfig();
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "1");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
jtx::Env env(*this, std::move(c), std::move(logs));
std::uint8_t const numberOfDownloads = 10;
// Create some ledgers so that the ShardArchiveHandler
// can verify the last ledger hash for the shard
// downloads.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
(numberOfDownloads + 1);
++i)
{
env.close();
}
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);
BEAST_EXPECT(
dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
auto server = createServer(env);
auto host = server->local_endpoint().address().to_string();
auto port = std::to_string(server->local_endpoint().port());
server->stop();
Downloads const dl = [count = numberOfDownloads, &host, &port] {
Downloads ret;
for (int i = 1; i <= count; ++i)
{
ret.push_back(
{i,
(boost::format("https://%s:%d/%d.tar.lz4") % host %
port % i)
.str()});
}
return ret;
}();
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
BEAST_EXPECT(!handler->start());
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
env.app().config());
handler->release();
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
}
auto const expectedErrorMessage =
"shards 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 maximum number of historical "
"shards reached";
BEAST_EXPECT(
capturedLogs.find(expectedErrorMessage) != std::string::npos);
{
beast::temp_dir tempDir;
auto c = jtx::envconfig();
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "0");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
jtx::Env env(*this, std::move(c), std::move(logs));
std::uint8_t const numberOfDownloads = 1;
// Create some ledgers so that the ShardArchiveHandler
// can verify the last ledger hash for the shard
// downloads.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
((numberOfDownloads * 3) + 1);
++i)
{
env.close();
}
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);
BEAST_EXPECT(
dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
auto server = createServer(env);
auto host = server->local_endpoint().address().to_string();
auto port = std::to_string(server->local_endpoint().port());
server->stop();
Downloads const dl = [count = numberOfDownloads, &host, &port] {
Downloads ret;
for (int i = 1; i <= count; ++i)
{
ret.push_back(
{i,
(boost::format("https://%s:%d/%d.tar.lz4") % host %
port % i)
.str()});
}
return ret;
}();
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
BEAST_EXPECT(!handler->start());
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
env.app().config());
handler->release();
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
}
auto const expectedErrorMessage2 =
"shard 1 maximum number of historical shards reached";
BEAST_EXPECT(
capturedLogs.find(expectedErrorMessage2) != std::string::npos);
}
// Ensure that downloads fail when the shard
// database has already stored one of the
// queued shards
void
testRedundantShardFailure()
{
testcase("testRedundantShardFailure");
std::string capturedLogs;
{
beast::temp_dir tempDir;
auto c = jtx::envconfig();
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "1");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
jtx::Env env(
*this,
std::move(c),
std::move(logs),
beast::severities::kDebug);
std::uint8_t const numberOfDownloads = 10;
// Create some ledgers so that the ShardArchiveHandler
// can verify the last ledger hash for the shard
// downloads.
for (int i = 0; i < env.app().getShardStore()->ledgersPerShard() *
(numberOfDownloads + 1);
++i)
{
env.close();
}
BEAST_EXPECT(env.app().getShardStore()->prepareShards({1}));
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);
BEAST_EXPECT(
dynamic_cast<RPC::RecoveryHandler*>(handler) == nullptr);
auto server = createServer(env);
auto host = server->local_endpoint().address().to_string();
auto port = std::to_string(server->local_endpoint().port());
server->stop();
Downloads const dl = [count = numberOfDownloads, &host, &port] {
Downloads ret;
for (int i = 1; i <= count; ++i)
{
ret.push_back(
{i,
(boost::format("https://%s:%d/%d.tar.lz4") % host %
port % i)
.str()});
}
return ret;
}();
for (auto const& entry : dl)
{
parsedURL url;
parseUrl(url, entry.second);
handler->add(entry.first, {url, entry.second});
}
BEAST_EXPECT(!handler->start());
auto stateDir = RPC::ShardArchiveHandler::getDownloadDirectory(
env.app().config());
handler->release();
BEAST_EXPECT(!boost::filesystem::exists(stateDir));
}
auto const expectedErrorMessage =
"shard 1 is already queued for import";
BEAST_EXPECT(
capturedLogs.find(expectedErrorMessage) != std::string::npos);
}
void
run() override
{
testSingleDownloadAndStateDB();
testDownloadsAndStateDB();
testDownloadsAndFileSystem();
testDownloadsAndRestart();
testShardCountFailure();
testRedundantShardFailure();
}
};
BEAST_DEFINE_TESTSUITE_PRIO(ShardArchiveHandler, app, ripple, 3);
} // namespace test
} // namespace ripple

View File

@@ -20,7 +20,6 @@
#ifndef RIPPLE_SHAMAP_TESTS_COMMON_H_INCLUDED
#define RIPPLE_SHAMAP_TESTS_COMMON_H_INCLUDED
#include <xrpld/nodestore/DatabaseShard.h>
#include <xrpld/nodestore/DummyScheduler.h>
#include <xrpld/nodestore/Manager.h>
#include <xrpld/shamap/Family.h>
@@ -81,12 +80,14 @@ public:
return j_;
}
std::shared_ptr<FullBelowCache> getFullBelowCache(std::uint32_t) override
std::shared_ptr<FullBelowCache>
getFullBelowCache() override
{
return fbCache_;
}
std::shared_ptr<TreeNodeCache> getTreeNodeCache(std::uint32_t) override
std::shared_ptr<TreeNodeCache>
getTreeNodeCache() override
{
return tnCache_;
}
@@ -98,12 +99,6 @@ public:
tnCache_->sweep();
}
bool
isShardBacked() const override
{
return true;
}
void
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash)
override