Allow multiple paths for shard storage:

* Distinguish between recent and historical shards
* Allow multiple storage paths for historical shards
* Add documentation for this feature
* Add unit tests
This commit is contained in:
Devon White
2020-06-16 14:06:55 -04:00
committed by Nik Bougalis
parent e5ff70f606
commit 6c268a3e9c
10 changed files with 1215 additions and 155 deletions

View File

@@ -48,6 +48,7 @@ struct ConfigSection
// VFALCO TODO Rename and replace these macros with variables. // VFALCO TODO Rename and replace these macros with variables.
#define SECTION_AMENDMENTS "amendments" #define SECTION_AMENDMENTS "amendments"
#define SECTION_AMENDMENT_MAJORITY_TIME "amendment_majority_time"
#define SECTION_CLUSTER_NODES "cluster_nodes" #define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_COMPRESSION "compression" #define SECTION_COMPRESSION "compression"
#define SECTION_DEBUG_LOGFILE "debug_logfile" #define SECTION_DEBUG_LOGFILE "debug_logfile"
@@ -56,11 +57,11 @@ struct ConfigSection
#define SECTION_FEE_ACCOUNT_RESERVE "fee_account_reserve" #define SECTION_FEE_ACCOUNT_RESERVE "fee_account_reserve"
#define SECTION_FEE_OWNER_RESERVE "fee_owner_reserve" #define SECTION_FEE_OWNER_RESERVE "fee_owner_reserve"
#define SECTION_FETCH_DEPTH "fetch_depth" #define SECTION_FETCH_DEPTH "fetch_depth"
#define SECTION_LEDGER_HISTORY "ledger_history" #define SECTION_HISTORICAL_SHARD_PATHS "historical_shard_paths"
#define SECTION_INSIGHT "insight" #define SECTION_INSIGHT "insight"
#define SECTION_IPS "ips" #define SECTION_IPS "ips"
#define SECTION_IPS_FIXED "ips_fixed" #define SECTION_IPS_FIXED "ips_fixed"
#define SECTION_AMENDMENT_MAJORITY_TIME "amendment_majority_time" #define SECTION_LEDGER_HISTORY "ledger_history"
#define SECTION_NETWORK_QUORUM "network_quorum" #define SECTION_NETWORK_QUORUM "network_quorum"
#define SECTION_NODE_SEED "node_seed" #define SECTION_NODE_SEED "node_seed"
#define SECTION_NODE_SIZE "node_size" #define SECTION_NODE_SIZE "node_size"

View File

@@ -177,7 +177,7 @@ The `download_path` field of the `shard_db` entry is used to determine where to
type=NuDB type=NuDB
path=/var/lib/rippled/db/shards/nudb path=/var/lib/rippled/db/shards/nudb
download_path=/var/lib/rippled/db/shards/ download_path=/var/lib/rippled/db/shards/
max_size_gb=50 max_historical_shards=50
``` ```
##### Resuming Partial Downloads ##### Resuming Partial Downloads

View File

@@ -1601,7 +1601,7 @@ rpcClient(
&jvOutput, &jvOutput,
std::placeholders::_1), std::placeholders::_1),
headers); headers);
isService.run(); // This blocks until there is no more isService.run(); // This blocks until there are no more
// outstanding async calls. // outstanding async calls.
} }
if (jvOutput.isMember("result")) if (jvOutput.isMember("result"))

View File

@@ -0,0 +1,127 @@
# Shard Storage Paths
## Overview
The shard database stores validated ledgers in logical groups called shards. As
of June 2020, a shard stores 16384 ledgers by default. In order to allow users
to store shards on multiple devices, the shard database can be configured with
several file system paths. Each path provided should refer to a directory on a
distinct filesystem, and no two paths should ever correspond to the same
filesystem. Violating this restriction will cause the server to inaccurately
estimate the amount of space available for storing shards. In the absence of a
suitable platform agnostic solution, this requirement is enforced only on
Linux. However, on other platforms we employ a heuristic that issues a warning
if we suspect that this restriction is violated.
## Configuration
The `shard_db` and `historical_shard_paths` sections of the server's
configuration file will be used to determine where the server stores shards.
Minimally, the `shard_db` section must contain a single `path` key.
If this is the only storage path provided, all shards will be stored at this
location. If the configuration also lists one or more lines in the
`historical_shard_paths` section, all older shards will be stored at these
locations, and the `path` will be used only to store the current
and previous shards. The goal is to allow users to provide an efficient SSD for
storing recent shards, as these will be accessed more frequently, while using
large mechanical drives for storing older shards that will be accessed less
frequently.
Below is a sample configuration snippet that provides a path for main storage
and several paths for historical storage:
```dosini
# This is the persistent datastore for shards. It is important for the health
# of the ripple network that rippled operators shard as much as practical.
# NuDB requires SSD storage. Helpful information can be found here
# https://ripple.com/build/history-sharding
[shard_db]
type=NuDB
# A single path for storing
# the current and previous
# shards:
# -------------------------
path=/var/lib/rippled/db/shards/nudb
# Path where shards are stored
# while being downloaded:
# ----------------------------
download_path=/var/lib/rippled/db/shards/
# The number of historical shards to store.
# The default value is 0, which means that
# the server won't store any historical
# shards - only the current and previous
# shards will be stored.
# ------------------------------------
max_historical_shards=100
# List of paths for storing older shards.
[historical_shard_paths]
/mnt/disk1
/mnt/disk2
/mnt/disk3
```
## Shard Migration
When a new shard (*current shard*) is confirmed by the network, the recent
shards will shift. The *previous shard* will become a *historical shard*, the
*current shard* will become the *previous shard*, and the new shard will become
the *current shard*. These are just logical labels, and the shards themselves
don't change to reflect being current, previous, or historical. However, if the
server's configuration specifies one or more paths for historical storage,
during this shift the formerly *previous shard* will be migrated to one of the
historical paths. If multiple paths are provided, the server dynamically
chooses one with sufficient space for storing the shard.
**Note:** As of June 2020, the shard database does not store the partial shard
currently being built by live network transactions, but this is planned to
change. When this feature is implemented, the *current shard* will refer to this
partial shard, and the *previous shard* will refer to the most recently
validated shard.
### Selecting a Historical Storage Path
When storing historical shards, if multiple historical paths are provided, the
path to use for each shard will be selected in a random fashion. By using all
available storage devices, we create a uniform distribution of disk utilization
for disks of equivalent size, (provided that the disks are used only to store
shards). In theory, selecting devices in this manner will also increase our
chances for concurrent access to stored shards, however as of June 2020
concurrent shard access is not implemented. Lastly, a storage path is included
in the random distribution only if it has enough storage capacity to hold the
next shard.
## Shard Acquisition
When the server is acquiring shard history, these acquired shards will be stored
at a path designated for historical storage (`historical_storage_path`). If no
such path is provided, acquired shards will be stored at the
`path`.
## Storage capacity
### Filesystem Capacity
When the shard database updates its record of disk utilization, it trusts that
the provided historical paths refer to distinct devices, or at least distinct
filesystems. If this requirement is violated, the database will operate with an
inaccurate view of how many shards it can store. Violation of this requirement
won't necessarily impede database operations, but the database will fail to
identify scenarios wherein storing the maximum number of historical shards (as
per the 'historical_shard_count' parameter in the configuration file) would
exceed the amount of storage space available.
### Shard Migration
During a "recent shard shift", if the server has already reached the configured
limit of stored historical shards, instead of moving the formerly *previous
shard* to a historical drive (or keeping it at the 'path') the
shard will be dropped and removed from the filesystem.
### Shard Acquisition
Once the configured limit of stored historical shards has been reached, shard
acquisition halts, and no additional shards will be acquired.

View File

@@ -32,6 +32,10 @@
#include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/predicate.hpp>
#if BOOST_OS_LINUX
#include <sys/statvfs.h>
#endif
namespace ripple { namespace ripple {
namespace NodeStore { namespace NodeStore {
@@ -82,95 +86,135 @@ DatabaseShardImp::init()
try try
{ {
using namespace boost::filesystem; using namespace boost::filesystem;
if (exists(dir_))
// Consolidate the main storage path and all
// historical paths
std::vector<path> paths{dir_};
paths.insert(
paths.end(), historicalPaths_.begin(), historicalPaths_.end());
for (auto const& path : paths)
{ {
if (!is_directory(dir_)) try
{ {
JLOG(j_.error()) << "'path' must be a directory"; if (exists(path))
{
if (!is_directory(path))
{
JLOG(j_.error()) << path << " must be a directory";
return false;
}
}
else if (!create_directories(path))
{
JLOG(j_.error())
<< "failed to create path: " + path.string();
return false;
}
}
catch (...)
{
JLOG(j_.error())
<< "failed to create path: " + path.string();
return false; return false;
} }
} }
else
create_directories(dir_); if (!app_.config().standalone() && !historicalPaths_.empty())
{
// Check historical paths for duplicated
// filesystems
if (!checkHistoricalPaths())
return false;
}
ctx_ = std::make_unique<nudb::context>(); ctx_ = std::make_unique<nudb::context>();
ctx_->start(); ctx_->start();
// Find shards // Find shards
for (auto const& d : directory_iterator(dir_)) for (auto const& path : paths)
{ {
if (!is_directory(d)) for (auto const& d : directory_iterator(path))
continue; {
if (!is_directory(d))
continue;
// Check shard directory name is numeric auto const shardDir = d.path();
auto dirName = d.path().stem().string();
if (!std::all_of(dirName.begin(), dirName.end(), [](auto c) {
return ::isdigit(static_cast<unsigned char>(c));
}))
{
continue;
}
auto const shardIndex{std::stoul(dirName)}; // Check shard directory name is numeric
if (shardIndex < earliestShardIndex()) auto dirName = shardDir.stem().string();
{ if (!std::all_of(
JLOG(j_.error()) << "shard " << shardIndex dirName.begin(), dirName.end(), [](auto c) {
<< " comes before earliest shard index " return ::isdigit(static_cast<unsigned char>(c));
<< earliestShardIndex(); }))
return false; {
} continue;
}
auto const shardDir{dir_ / std::to_string(shardIndex)}; auto const shardIndex{std::stoul(dirName)};
if (shardIndex < earliestShardIndex())
// Check if a previous import failed
if (is_regular_file(shardDir / importMarker_))
{
JLOG(j_.warn()) << "shard " << shardIndex
<< " previously failed import, removing";
remove_all(shardDir);
continue;
}
auto shard{
std::make_unique<Shard>(app_, *this, shardIndex, j_)};
if (!shard->open(scheduler_, *ctx_))
{
// Remove corrupted or legacy shard
shard->removeOnDestroy();
JLOG(j_.warn())
<< "shard " << shardIndex << " removed, "
<< (shard->isLegacy() ? "legacy" : "corrupted")
<< " shard";
continue;
}
if (shard->isFinal())
{
shards_.emplace(
shardIndex,
ShardInfo(std::move(shard), ShardInfo::State::final));
}
else if (shard->isBackendComplete())
{
auto const result{shards_.emplace(
shardIndex,
ShardInfo(std::move(shard), ShardInfo::State::none))};
finalizeShard(
result.first->second, true, lock, boost::none);
}
else
{
if (acquireIndex_ != 0)
{ {
JLOG(j_.error()) JLOG(j_.error())
<< "more than one shard being acquired"; << "shard " << shardIndex
<< " comes before earliest shard index "
<< earliestShardIndex();
return false; return false;
} }
shards_.emplace( // Check if a previous import failed
shardIndex, if (is_regular_file(shardDir / importMarker_))
ShardInfo(std::move(shard), ShardInfo::State::acquire)); {
acquireIndex_ = shardIndex; JLOG(j_.warn())
<< "shard " << shardIndex
<< " previously failed import, removing";
remove_all(shardDir);
continue;
}
auto shard{std::make_unique<Shard>(
app_, *this, shardIndex, shardDir.parent_path(), j_)};
if (!shard->open(scheduler_, *ctx_))
{
// Remove corrupted or legacy shard
shard->removeOnDestroy();
JLOG(j_.warn())
<< "shard " << shardIndex << " removed, "
<< (shard->isLegacy() ? "legacy" : "corrupted")
<< " shard";
continue;
}
if (shard->isFinal())
{
shards_.emplace(
shardIndex,
ShardInfo(
std::move(shard), ShardInfo::State::final));
}
else if (shard->isBackendComplete())
{
auto const result{shards_.emplace(
shardIndex,
ShardInfo(
std::move(shard), ShardInfo::State::none))};
finalizeShard(
result.first->second, true, lock, boost::none);
}
else
{
if (acquireIndex_ != 0)
{
JLOG(j_.error())
<< "more than one shard being acquired";
return false;
}
shards_.emplace(
shardIndex,
ShardInfo(
std::move(shard), ShardInfo::State::acquire));
acquireIndex_ = shardIndex;
}
} }
} }
} }
@@ -178,6 +222,7 @@ DatabaseShardImp::init()
{ {
JLOG(j_.error()) JLOG(j_.error())
<< "exception " << e.what() << " in function " << __func__; << "exception " << e.what() << " in function " << __func__;
return false;
} }
updateStatus(lock); updateStatus(lock);
@@ -209,20 +254,6 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
if (!canAdd_) if (!canAdd_)
return boost::none; return boost::none;
// Check available storage space
if (fileSz_ + avgShardFileSz_ > maxFileSz_)
{
JLOG(j_.debug()) << "maximum storage size reached";
canAdd_ = false;
return boost::none;
}
if (avgShardFileSz_ > available())
{
JLOG(j_.error()) << "insufficient storage space available";
canAdd_ = false;
return boost::none;
}
shardIndex = findAcquireIndex(validLedgerSeq, lock); shardIndex = findAcquireIndex(validLedgerSeq, lock);
} }
@@ -236,7 +267,27 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
return boost::none; return boost::none;
} }
auto shard{std::make_unique<Shard>(app_, *this, *shardIndex, j_)}; auto const pathDesignation = [this, shardIndex = *shardIndex]() {
std::lock_guard lock(mutex_);
return prepareForNewShard(shardIndex, numHistoricalShards(lock), lock);
}();
if (!pathDesignation)
return false;
auto const needsHistoricalPath =
*pathDesignation == PathDesignation::historical;
auto shard = [this, shardIndex, needsHistoricalPath] {
std::lock_guard lock(mutex_);
return std::make_unique<Shard>(
app_,
*this,
*shardIndex,
(needsHistoricalPath ? chooseHistoricalPath(lock) : ""),
j_);
}();
if (!shard->open(scheduler_, *ctx_)) if (!shard->open(scheduler_, *ctx_))
return boost::none; return boost::none;
@@ -292,10 +343,19 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
return false; return false;
} }
// Check available storage space // Any shard earlier than the two most recent shards
if (fileSz_ + avgShardFileSz_ > maxFileSz_) // is a historical shard
return fail("maximum storage size reached"); bool const isHistoricalShard = shardIndex < shardBoundaryIndex(lock);
if (avgShardFileSz_ > available()) auto const numHistShards = numHistoricalShards(lock);
// Check shard count and available storage space
if (isHistoricalShard && numHistShards >= maxHistoricalShards_)
return fail("maximum number of historical shards reached");
if (!sufficientStorage(
1,
isHistoricalShard ? PathDesignation::historical
: PathDesignation::none,
lock))
return fail("insufficient storage space available"); return fail("insufficient storage space available");
shards_.emplace(shardIndex, ShardInfo(nullptr, ShardInfo::State::import)); shards_.emplace(shardIndex, ShardInfo(nullptr, ShardInfo::State::import));
@@ -392,15 +452,30 @@ DatabaseShardImp::importShard(
return false; return false;
} }
dstDir = dir_ / std::to_string(shardIndex); auto const pathDesignation =
prepareForNewShard(shardIndex, numHistoricalShards(lock), lock);
if (!pathDesignation)
{
JLOG(j_.error()) << "shard " << shardIndex << " failed to import";
return false;
}
auto const needsHistoricalPath =
*pathDesignation == PathDesignation::historical;
dstDir = needsHistoricalPath ? chooseHistoricalPath(lock) : dir_;
} }
dstDir /= std::to_string(shardIndex);
// Rename source directory to the shard database directory // Rename source directory to the shard database directory
if (!renameDir(srcDir, dstDir)) if (!renameDir(srcDir, dstDir))
return false; return false;
// Create the new shard // Create the new shard
auto shard{std::make_unique<Shard>(app_, *this, shardIndex, j_)}; auto shard{std::make_unique<Shard>(
app_, *this, shardIndex, dstDir.parent_path(), j_)};
if (!shard->open(scheduler_, *ctx_) || !shard->isBackendComplete()) if (!shard->open(scheduler_, *ctx_) || !shard->isBackendComplete())
{ {
JLOG(j_.error()) << "shard " << shardIndex << " failed to import"; JLOG(j_.error()) << "shard " << shardIndex << " failed to import";
@@ -688,23 +763,21 @@ DatabaseShardImp::import(Database& source)
} }
} }
auto numHistShards = this->numHistoricalShards(lock);
// Import the shards // Import the shards
for (std::uint32_t shardIndex = earliestIndex; for (std::uint32_t shardIndex = earliestIndex;
shardIndex <= latestIndex; shardIndex <= latestIndex;
++shardIndex) ++shardIndex)
{ {
if (fileSz_ + avgShardFileSz_ > maxFileSz_) auto const pathDesignation =
{ prepareForNewShard(shardIndex, numHistShards, lock);
JLOG(j_.error()) << "maximum storage size reached";
canAdd_ = false; if (!pathDesignation)
break; break;
}
if (avgShardFileSz_ > available()) auto const needsHistoricalPath =
{ *pathDesignation == PathDesignation::historical;
JLOG(j_.error()) << "insufficient storage space available";
canAdd_ = false;
break;
}
// Skip if already stored // Skip if already stored
if (shardIndex == acquireIndex_ || if (shardIndex == acquireIndex_ ||
@@ -741,13 +814,18 @@ DatabaseShardImp::import(Database& source)
continue; continue;
} }
auto const path =
needsHistoricalPath ? chooseHistoricalPath(lock) : dir_;
// Create the new shard // Create the new shard
auto shard{std::make_unique<Shard>(app_, *this, shardIndex, j_)}; auto shard =
std::make_unique<Shard>(app_, *this, shardIndex, path, j_);
if (!shard->open(scheduler_, *ctx_)) if (!shard->open(scheduler_, *ctx_))
continue; continue;
// Create a marker file to signify an import in progress // Create a marker file to signify an import in progress
auto const shardDir{dir_ / std::to_string(shardIndex)}; auto const shardDir{path / std::to_string(shardIndex)};
auto const markerFile{shardDir / importMarker_}; auto const markerFile{shardDir / importMarker_};
{ {
std::ofstream ofs{markerFile.string()}; std::ofstream ofs{markerFile.string()};
@@ -818,6 +896,9 @@ DatabaseShardImp::import(Database& source)
ShardInfo(std::move(shard), ShardInfo::State::none))}; ShardInfo(std::move(shard), ShardInfo::State::none))};
finalizeShard( finalizeShard(
result.first->second, true, lock, boost::none); result.first->second, true, lock, boost::none);
if (shardIndex < shardBoundaryIndex(lock))
++numHistShards;
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
@@ -1074,19 +1155,28 @@ DatabaseShardImp::initConfig(std::lock_guard<std::mutex>&)
return fail("'path' missing"); return fail("'path' missing");
{ {
std::uint64_t sz; get_if_exists(section, "max_historical_shards", maxHistoricalShards_);
if (!get_if_exists<std::uint64_t>(section, "max_size_gb", sz))
return fail("'max_size_gb' missing");
if ((sz << 30) < sz) Section const& historicalShardPaths =
return fail("'max_size_gb' overflow"); config.section(SECTION_HISTORICAL_SHARD_PATHS);
// Minimum storage space required (in gigabytes) auto values = historicalShardPaths.values();
if (sz < 10)
return fail("'max_size_gb' must be at least 10");
// Convert to bytes std::sort(values.begin(), values.end());
maxFileSz_ = sz << 30; values.erase(std::unique(values.begin(), values.end()), values.end());
for (auto const& s : values)
{
auto const dir = path(s);
if (dir_ == dir)
{
return fail(
"the 'path' cannot also be in the "
"'historical_shard_path' section");
}
historicalPaths_.push_back(s);
}
} }
if (section.exists("ledgers_per_shard")) if (section.exists("ledgers_per_shard"))
@@ -1245,6 +1335,43 @@ DatabaseShardImp::finalizeShard(
return; return;
it->second.state = ShardInfo::State::final; it->second.state = ShardInfo::State::final;
updateStatus(lock); updateStatus(lock);
auto const boundaryIndex = shardBoundaryIndex(lock);
auto const isHistoricalShard = shardIndex < boundaryIndex;
if (isHistoricalShard)
{
if (!historicalPaths_.empty() &&
shard->getDir().parent_path() == dir_)
{
// This is a historical shard that wasn't
// placed at a separate historical path
JLOG(j_.warn()) << "shard " << shardIndex
<< " is not stored at a historical path";
}
}
else
{
// Not a historical shard. Shift recent shards
// if necessary
relocateOutdatedShards(lock);
assert(!boundaryIndex || shardIndex - boundaryIndex <= 1);
auto& recentShard = shardIndex == boundaryIndex
? secondLatestShardIndex_
: latestShardIndex_;
// Set the appropriate recent shard
// index
recentShard = shardIndex;
if (shard->getDir().parent_path() != dir_)
{
JLOG(j_.warn()) << "shard " << shard->index()
<< " is not stored at the path";
}
}
} }
setFileStats(); setFileStats();
@@ -1298,12 +1425,16 @@ DatabaseShardImp::setFileStats()
fdRequired_ = sumFd; fdRequired_ = sumFd;
avgShardFileSz_ = (numShards == 0 ? fileSz_ : fileSz_ / numShards); avgShardFileSz_ = (numShards == 0 ? fileSz_ : fileSz_ / numShards);
if (fileSz_ >= maxFileSz_) if (auto const count = numHistoricalShards(lock);
count >= maxHistoricalShards_)
{ {
JLOG(j_.warn()) << "maximum storage size reached"; JLOG(j_.warn()) << "maximum number of historical shards reached";
canAdd_ = false; canAdd_ = false;
} }
else if (maxFileSz_ - fileSz_ > available()) else if (!sufficientStorage(
maxHistoricalShards_ - count,
PathDesignation::historical,
lock))
{ {
JLOG(j_.warn()) JLOG(j_.warn())
<< "maximum shard store size exceeds available storage space"; << "maximum shard store size exceeds available storage space";
@@ -1350,19 +1481,59 @@ DatabaseShardImp::getCache(std::uint32_t seq)
return std::make_pair(pCache, nCache); return std::make_pair(pCache, nCache);
} }
std::uint64_t bool
DatabaseShardImp::available() const DatabaseShardImp::sufficientStorage(
std::uint32_t numShards,
PathDesignation pathDesignation,
std::lock_guard<std::mutex> const&) const
{ {
try try
{ {
return boost::filesystem::space(dir_).available; std::vector<std::uint64_t> capacities;
if (pathDesignation == PathDesignation::historical &&
!historicalPaths_.empty())
{
capacities.reserve(historicalPaths_.size());
for (auto const& path : historicalPaths_)
{
// Get the available storage for each historical path
auto const availableSpace =
boost::filesystem::space(path).available;
capacities.push_back(availableSpace);
}
}
else
{
// Get the available storage for the main shard path
capacities.push_back(boost::filesystem::space(dir_).available);
}
for (std::uint64_t const capacity : capacities)
{
// Leverage all the historical shard paths to
// see if collectively they can fit the specified
// number of shards. For this to work properly,
// each historical path must correspond to a separate
// physical device or filesystem.
auto const shardCap = capacity / avgShardFileSz_;
if (numShards <= shardCap)
return true;
numShards -= shardCap;
}
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
JLOG(j_.error()) << "exception " << e.what() << " in function " JLOG(j_.error()) << "exception " << e.what() << " in function "
<< __func__; << __func__;
return 0; return false;
} }
return false;
} }
bool bool
@@ -1402,7 +1573,7 @@ DatabaseShardImp::storeLedgerInShard(
} }
void void
DatabaseShardImp::removeFailedShard(std::shared_ptr<Shard> shard) DatabaseShardImp::removeFailedShard(std::shared_ptr<Shard>& shard)
{ {
{ {
std::lock_guard lock(mutex_); std::lock_guard lock(mutex_);
@@ -1410,15 +1581,367 @@ DatabaseShardImp::removeFailedShard(std::shared_ptr<Shard> shard)
if (shard->index() == acquireIndex_) if (shard->index() == acquireIndex_)
acquireIndex_ = 0; acquireIndex_ = 0;
if (shard->index() == latestShardIndex_)
latestShardIndex_ = boost::none;
if (shard->index() == secondLatestShardIndex_)
secondLatestShardIndex_ = boost::none;
if ((shards_.erase(shard->index()) > 0) && shard->isFinal()) if ((shards_.erase(shard->index()) > 0) && shard->isFinal())
updateStatus(lock); updateStatus(lock);
} }
shard->removeOnDestroy(); shard->removeOnDestroy();
// Reset the shared_ptr to invoke the shard's
// destructor and remove it from the server
shard.reset(); shard.reset();
setFileStats(); setFileStats();
} }
std::uint32_t
DatabaseShardImp::shardBoundaryIndex(std::lock_guard<std::mutex> const&) const
{
auto const validIndex = app_.getLedgerMaster().getValidLedgerIndex();
// Shards with an index earlier than recentShardBoundaryIndex
// are considered historical. The three shards at or later than
// this index consist of the two most recently validated shards
// and the shard still in the process of being built by live
// transactions.
return NodeStore::seqToShardIndex(validIndex, ledgersPerShard_) - 1;
}
std::uint32_t
DatabaseShardImp::numHistoricalShards(
std::lock_guard<std::mutex> const& lock) const
{
auto const recentShardBoundaryIndex = shardBoundaryIndex(lock);
return std::count_if(
shards_.begin(),
shards_.end(),
[recentShardBoundaryIndex](auto const& entry) {
return entry.first < recentShardBoundaryIndex;
});
}
void
DatabaseShardImp::relocateOutdatedShards(
std::lock_guard<std::mutex> const& lock)
{
if (auto& cur = latestShardIndex_, &prev = secondLatestShardIndex_;
cur || prev)
{
auto const latestShardIndex = NodeStore::seqToShardIndex(
app_.getLedgerMaster().getValidLedgerIndex(), ledgersPerShard_);
auto const separateHistoricalPath = !historicalPaths_.empty();
auto const removeShard =
[this](std::uint32_t const shardIndex) -> void {
canAdd_ = false;
if (auto it = shards_.find(shardIndex); it != shards_.end())
{
if (it->second.shard)
removeFailedShard(it->second.shard);
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
}
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
};
auto const keepShard =
[this, &lock, removeShard, separateHistoricalPath](
std::uint32_t const shardIndex) -> bool {
if (numHistoricalShards(lock) >= maxHistoricalShards_)
{
JLOG(j_.error())
<< "maximum number of historical shards reached";
removeShard(shardIndex);
return false;
}
if (separateHistoricalPath &&
!sufficientStorage(1, PathDesignation::historical, lock))
{
JLOG(j_.error()) << "insufficient storage space available";
removeShard(shardIndex);
return false;
}
return true;
};
// Move a shard from the main shard path to a historical shard
// path by copying the contents, and creating a new shard.
auto const moveShard = [this,
&lock](std::uint32_t const shardIndex) -> void {
auto const dst = chooseHistoricalPath(lock);
if (auto it = shards_.find(shardIndex); it != shards_.end())
{
if (auto& shard = it->second.shard)
{
// Close any open file descriptors before moving
// the shard dir. Don't call removeOnDestroy since
// that would attempt to close the fds after the
// directory has been moved.
shard->closeAll();
try
{
// Move the shard directory to the new path
boost::filesystem::rename(
shard->getDir().string(),
dst / std::to_string(shardIndex));
}
catch (...)
{
JLOG(j_.error())
<< "shard " << shardIndex
<< " failed to move to historical storage";
return;
}
// Create a shard instance at the new location
shard = std::make_unique<Shard>(
app_, *this, shardIndex, dst, j_);
// Open the new shard
if (!shard->open(scheduler_, *ctx_))
{
JLOG(j_.error())
<< "shard " << shardIndex
<< " failed to open in historical storage";
shard->removeOnDestroy();
shard.reset();
}
}
else
{
JLOG(j_.warn())
<< "can't find shard to move to historical path";
}
}
else
{
JLOG(j_.warn())
<< "can't find shard to move to historical path";
}
};
// See if either of the recent shards
// needs to be updated
bool const curNotSynched =
latestShardIndex_ && *latestShardIndex_ != latestShardIndex;
bool const prevNotSynched = secondLatestShardIndex_ &&
*secondLatestShardIndex_ != latestShardIndex - 1;
// A new shard has been published. Move outdated shards
// to historical storage as needed
if (curNotSynched || prevNotSynched)
{
if (prev)
{
// Move the formerly second latest shard to
// historical storage
if (keepShard(*prev) && separateHistoricalPath)
{
moveShard(*prev);
}
prev = boost::none;
}
if (cur)
{
// The formerly latest shard is now
// the second latest
if (cur == latestShardIndex - 1)
{
prev = cur;
}
// The formerly latest shard is no
// longer a 'recent' shard
else
{
// Move the formerly latest shard to
// historical storage
if (keepShard(*cur) && separateHistoricalPath)
{
moveShard(*cur);
}
}
cur = boost::none;
}
}
}
}
auto
DatabaseShardImp::prepareForNewShard(
std::uint32_t shardIndex,
std::uint32_t numHistoricalShards,
std::lock_guard<std::mutex> const& lock) -> boost::optional<PathDesignation>
{
// Any shard earlier than the two most recent shards
// is a historical shard
auto const boundaryIndex = shardBoundaryIndex(lock);
auto const isHistoricalShard = shardIndex < boundaryIndex;
auto const designation = isHistoricalShard && !historicalPaths_.empty()
? PathDesignation::historical
: PathDesignation::none;
// Check shard count and available storage space
if (isHistoricalShard && numHistoricalShards >= maxHistoricalShards_)
{
JLOG(j_.error()) << "maximum number of historical shards reached";
canAdd_ = false;
return boost::none;
}
if (!sufficientStorage(1, designation, lock))
{
JLOG(j_.error()) << "insufficient storage space available";
canAdd_ = false;
return boost::none;
}
return designation;
}
boost::filesystem::path
DatabaseShardImp::chooseHistoricalPath(std::lock_guard<std::mutex> const&) const
{
// If not configured with separate historical paths,
// use the main path (dir_) by default.
if (historicalPaths_.empty())
return dir_;
boost::filesystem::path historicalShardPath;
std::vector<boost::filesystem::path> potentialPaths;
for (boost::filesystem::path const& path : historicalPaths_)
{
if (boost::filesystem::space(path).available >= avgShardFileSz_)
potentialPaths.push_back(path);
}
if (potentialPaths.empty())
{
JLOG(j_.error()) << "failed to select a historical shard path";
return "";
}
std::sample(
potentialPaths.begin(),
potentialPaths.end(),
&historicalShardPath,
1,
default_prng());
return historicalShardPath;
}
bool
DatabaseShardImp::checkHistoricalPaths() const
{
#if BOOST_OS_LINUX
// Each historical shard path must correspond
// to a directory on a distinct device or filesystem.
// Currently, this constraint is enforced only on
// Linux.
std::unordered_map<int, std::vector<std::string>> filesystemIDs(
historicalPaths_.size());
for (auto const& path : historicalPaths_)
{
struct statvfs buffer;
if (statvfs(path.c_str(), &buffer))
{
JLOG(j_.error())
<< "failed to acquire stats for 'historical_shard_path': "
<< path;
return false;
}
filesystemIDs[buffer.f_fsid].push_back(path.string());
}
bool ret = true;
for (auto const& entry : filesystemIDs)
{
// Check to see if any of the paths
// are stored on the same filesystem
if (entry.second.size() > 1)
{
// Two or more historical storage paths
// correspond to the same filesystem.
JLOG(j_.error())
<< "The following paths correspond to the same filesystem: "
<< boost::algorithm::join(entry.second, ", ")
<< ". Each configured historical storage path should"
" be on a unique device or filesystem.";
ret = false;
}
}
return ret;
#else
// The requirement that each historical storage path
// corresponds to a distinct device or filesystem is
// enforced only on Linux, so on other platforms
// keep track of the available capacities for each
// path. Issue a warning if we suspect any of the paths
// may violate this requirement.
// Map byte counts to each path that
// shares that byte count.
std::unordered_map<std::uint64_t, std::vector<std::string>>
uniqueCapacities(historicalPaths_.size());
for (auto const& path : historicalPaths_)
uniqueCapacities[boost::filesystem::space(path).available].push_back(
path.string());
for (auto const& entry : uniqueCapacities)
{
// Check to see if any paths have the
// same amount of available bytes.
if (entry.second.size() > 1)
{
// Two or more historical storage paths may
// correspond to the same device or
// filesystem.
JLOG(j_.warn())
<< "Each of the following paths have " << entry.first
<< " bytes free, and may be located on the same device"
" or filesystem: "
<< boost::algorithm::join(entry.second, ", ")
<< ". Each configured historical storage path should"
" be on a unique device or filesystem.";
}
}
#endif
return true;
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::unique_ptr<DatabaseShard> std::unique_ptr<DatabaseShard>

View File

@@ -193,6 +193,11 @@ private:
State state{State::none}; State state{State::none};
}; };
enum class PathDesignation : uint8_t {
none, // No path specified
historical // Needs a historical path
};
Application& app_; Application& app_;
Stoppable& parent_; Stoppable& parent_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
@@ -222,8 +227,11 @@ private:
// The name associated with the backend used with the shard store // The name associated with the backend used with the shard store
std::string backendName_; std::string backendName_;
// Maximum storage space the shard store can utilize (in bytes) // Maximum number of historical shards to store.
std::uint64_t maxFileSz_; std::uint32_t maxHistoricalShards_{0};
// Contains historical shard paths
std::vector<boost::filesystem::path> historicalPaths_;
// Storage space utilized by the shard store (in bytes) // Storage space utilized by the shard store (in bytes)
std::uint64_t fileSz_{0}; std::uint64_t fileSz_{0};
@@ -242,6 +250,16 @@ private:
// File name used to mark shards being imported from node store // File name used to mark shards being imported from node store
static constexpr auto importMarker_ = "import"; static constexpr auto importMarker_ = "import";
// latestShardIndex_ and secondLatestShardIndex hold the indexes
// of the shards most recently confirmed by the network. These
// values are not updated in real time and are modified only
// when adding shards to the database, in order to determine where
// pending shards will be stored on the filesystem. A value of
// boost::none indicates that the corresponding shard is not held
// by the database.
boost::optional<std::uint32_t> latestShardIndex_;
boost::optional<std::uint32_t> secondLatestShardIndex_;
// Initialize settings from the configuration file // Initialize settings from the configuration file
// Lock must be held // Lock must be held
bool bool
@@ -286,9 +304,17 @@ private:
std::pair<std::shared_ptr<PCache>, std::shared_ptr<NCache>> std::pair<std::shared_ptr<PCache>, std::shared_ptr<NCache>>
getCache(std::uint32_t seq); getCache(std::uint32_t seq);
// Returns available storage space // Returns true if the filesystem has enough storage
std::uint64_t // available to hold the specified number of shards.
available() const; // The value of pathDesignation determines whether
// the shard(s) in question are historical and thus
// meant to be stored at a path designated for historical
// shards.
bool
sufficientStorage(
std::uint32_t numShards,
PathDesignation pathDesignation,
std::lock_guard<std::mutex> const&) const;
bool bool
storeLedgerInShard( storeLedgerInShard(
@@ -296,7 +322,39 @@ private:
std::shared_ptr<Ledger const> const& ledger); std::shared_ptr<Ledger const> const& ledger);
void void
removeFailedShard(std::shared_ptr<Shard> shard); removeFailedShard(std::shared_ptr<Shard>& shard);
// Returns the index that represents the logical
// partition between historical and recent shards
std::uint32_t
shardBoundaryIndex(std::lock_guard<std::mutex> const&) const;
std::uint32_t
numHistoricalShards(std::lock_guard<std::mutex> const& lock) const;
// Shifts the recent and second most recent (by index)
// shards as new shards become available on the network.
// Older shards are moved to a historical shard path.
void
relocateOutdatedShards(std::lock_guard<std::mutex> const& lock);
// Checks whether the shard can be stored. If
// the new shard can't be stored, returns
// boost::none. Otherwise returns an enum
// indicating whether the new shard should be
// placed in a separate directory for historical
// shards.
boost::optional<PathDesignation>
prepareForNewShard(
std::uint32_t shardIndex,
std::uint32_t numHistoricalShards,
std::lock_guard<std::mutex> const& lock);
boost::filesystem::path
chooseHistoricalPath(std::lock_guard<std::mutex> const&) const;
bool
checkHistoricalPaths() const;
}; };
} // namespace NodeStore } // namespace NodeStore

View File

@@ -22,7 +22,6 @@
#include <ripple/basics/StringUtilities.h> #include <ripple/basics/StringUtilities.h>
#include <ripple/core/ConfigSections.h> #include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/Manager.h> #include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/DatabaseShardImp.h>
#include <ripple/nodestore/impl/Shard.h> #include <ripple/nodestore/impl/Shard.h>
#include <ripple/protocol/digest.h> #include <ripple/protocol/digest.h>
@@ -39,6 +38,16 @@ Shard::Shard(
DatabaseShard const& db, DatabaseShard const& db,
std::uint32_t index, std::uint32_t index,
beast::Journal j) beast::Journal j)
: Shard(app, db, index, "", j)
{
}
Shard::Shard(
Application& app,
DatabaseShard const& db,
std::uint32_t index,
boost::filesystem::path const& dir,
beast::Journal j)
: app_(app) : app_(app)
, index_(index) , index_(index)
, firstSeq_(db.firstLedgerSeq(index)) , firstSeq_(db.firstLedgerSeq(index))
@@ -46,7 +55,7 @@ Shard::Shard(
, maxLedgers_( , maxLedgers_(
index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1 index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1
: db.ledgersPerShard()) : db.ledgersPerShard())
, dir_(db.getRootDir() / std::to_string(index_)) , dir_((dir.empty() ? db.getRootDir() : dir) / std::to_string(index_))
, j_(j) , j_(j)
{ {
if (index_ < db.earliestShardIndex()) if (index_ < db.earliestShardIndex())
@@ -227,6 +236,15 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx)
return true; return true;
} }
void
Shard::closeAll()
{
backend_.reset();
lgrSQLiteDB_.reset();
txSQLiteDB_.reset();
acquireInfo_.reset();
}
boost::optional<std::uint32_t> boost::optional<std::uint32_t>
Shard::prepare() Shard::prepare()
{ {

View File

@@ -51,6 +51,13 @@ class DatabaseShard;
class Shard final class Shard final
{ {
public: public:
Shard(
Application& app,
DatabaseShard const& db,
std::uint32_t index,
boost::filesystem::path const& dir,
beast::Journal j);
Shard( Shard(
Application& app, Application& app,
DatabaseShard const& db, DatabaseShard const& db,
@@ -62,6 +69,9 @@ public:
bool bool
open(Scheduler& scheduler, nudb::context& ctx); open(Scheduler& scheduler, nudb::context& ctx);
void
closeAll();
boost::optional<std::uint32_t> boost::optional<std::uint32_t>
prepare(); prepare();

View File

@@ -23,11 +23,10 @@
#include <ripple/core/ConfigSections.h> #include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/DatabaseShard.h> #include <ripple/nodestore/DatabaseShard.h>
#include <ripple/nodestore/DummyScheduler.h> #include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/DecodedBlob.h> #include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/nodestore/impl/EncodedBlob.h>
#include <ripple/nodestore/impl/Shard.h> #include <ripple/nodestore/impl/Shard.h>
#include <chrono> #include <chrono>
#include <numeric>
#include <test/jtx.h> #include <test/jtx.h>
#include <test/nodestore/TestBase.h> #include <test/nodestore/TestBase.h>
@@ -39,6 +38,7 @@ namespace NodeStore {
class DatabaseShard_test : public TestBase class DatabaseShard_test : public TestBase
{ {
static constexpr std::uint32_t maxSizeGb = 10; static constexpr std::uint32_t maxSizeGb = 10;
static constexpr std::uint32_t maxHistoricalShards = 100;
static constexpr std::uint32_t ledgersPerShard = 256; static constexpr std::uint32_t ledgersPerShard = 256;
static constexpr std::uint32_t earliestSeq = ledgersPerShard + 1; static constexpr std::uint32_t earliestSeq = ledgersPerShard + 1;
static constexpr std::uint32_t dataSizeMax = 4; static constexpr std::uint32_t dataSizeMax = 4;
@@ -61,7 +61,7 @@ class DatabaseShard_test : public TestBase
* ledger */ * ledger */
std::vector<int> nAccounts_; std::vector<int> nAccounts_;
/* payAccounts_[i][j] = {from, to} is the pair which consists of two /* payAccounts_[i][j] = {from, to} is the pair which consists of two
* number of acoounts: source and destinations, which participate in * number of accounts: source and destinations, which participate in
* j-th payment on i-th ledger */ * j-th payment on i-th ledger */
std::vector<std::vector<std::pair<int, int>>> payAccounts_; std::vector<std::vector<std::pair<int, int>>> payAccounts_;
/* xrpAmount_[i] is the amount for all payments on i-th ledger */ /* xrpAmount_[i] is the amount for all payments on i-th ledger */
@@ -147,26 +147,31 @@ class DatabaseShard_test : public TestBase
} }
bool bool
makeLedgers(test::jtx::Env& env_) makeLedgers(test::jtx::Env& env_, std::uint32_t startIndex = 0)
{ {
for (std::uint32_t i = 3; i <= ledgersPerShard; ++i) if (startIndex == 0)
{ {
if (!env_.close()) for (std::uint32_t i = 3; i <= ledgersPerShard; ++i)
return false; {
std::shared_ptr<const Ledger> ledger = if (!env_.close())
env_.app().getLedgerMaster().getClosedLedger(); return false;
if (ledger->info().seq != i) std::shared_ptr<const Ledger> ledger =
return false; env_.app().getLedgerMaster().getClosedLedger();
if (ledger->info().seq != i)
return false;
}
} }
for (std::uint32_t i = 0; i < ledgersPerShard * nShards_; ++i) for (std::uint32_t i = 0; i < ledgersPerShard * nShards_; ++i)
{ {
auto const index = i + (startIndex * ledgersPerShard);
makeLedgerData(env_, i); makeLedgerData(env_, i);
if (!env_.close()) if (!env_.close())
return false; return false;
std::shared_ptr<const Ledger> ledger = std::shared_ptr<const Ledger> ledger =
env_.app().getLedgerMaster().getClosedLedger(); env_.app().getLedgerMaster().getClosedLedger();
if (ledger->info().seq != i + ledgersPerShard + 1) if (ledger->info().seq != index + ledgersPerShard + 1)
return false; return false;
ledgers_.push_back(ledger); ledgers_.push_back(ledger);
} }
@@ -446,8 +451,8 @@ class DatabaseShard_test : public TestBase
cfg->overwrite(ConfigSection::shardDatabase(), "path", shardDir); cfg->overwrite(ConfigSection::shardDatabase(), "path", shardDir);
cfg->overwrite( cfg->overwrite(
ConfigSection::shardDatabase(), ConfigSection::shardDatabase(),
"max_size_gb", "max_historical_shards",
std::to_string(maxSizeGb)); std::to_string(maxHistoricalShards));
cfg->overwrite( cfg->overwrite(
ConfigSection::shardDatabase(), ConfigSection::shardDatabase(),
"ledgers_per_shard", "ledgers_per_shard",
@@ -495,7 +500,11 @@ class DatabaseShard_test : public TestBase
} }
std::optional<int> std::optional<int>
createShard(TestData& data, DatabaseShard& db, int maxShardNumber) createShard(
TestData& data,
DatabaseShard& db,
int maxShardNumber = 1,
int ledgerOffset = 0)
{ {
int shardNumber = -1; int shardNumber = -1;
@@ -505,7 +514,8 @@ class DatabaseShard_test : public TestBase
if (!BEAST_EXPECT(ind != boost::none)) if (!BEAST_EXPECT(ind != boost::none))
return {}; return {};
shardNumber = db.seqToShardIndex(*ind); shardNumber = db.seqToShardIndex(*ind);
int arrInd = *ind - ledgersPerShard - 1; int arrInd =
*ind - (ledgersPerShard * ledgerOffset) - ledgersPerShard - 1;
BEAST_EXPECT( BEAST_EXPECT(
arrInd >= 0 && arrInd < maxShardNumber * ledgersPerShard); arrInd >= 0 && arrInd < maxShardNumber * ledgersPerShard);
BEAST_EXPECT(saveLedger(db, *data.ledgers_[arrInd])); BEAST_EXPECT(saveLedger(db, *data.ledgers_[arrInd]));
@@ -978,6 +988,317 @@ class DatabaseShard_test : public TestBase
} }
} }
void
testImportWithHistoricalPaths(
std::string const& backendType,
std::uint64_t const seedValue)
{
using namespace test::jtx;
// Test importing with multiple historical
// paths
{
beast::temp_dir shardDir;
std::array<beast::temp_dir, 4> historicalDirs;
std::array<boost::filesystem::path, 4> historicalPaths;
std::transform(
historicalDirs.begin(),
historicalDirs.end(),
historicalPaths.begin(),
[](const beast::temp_dir& dir) { return dir.path(); });
beast::temp_dir nodeDir;
auto c = testConfig(
"importWithHistoricalPaths",
backendType,
shardDir.path(),
nodeDir.path());
auto& historyPaths = c->section(SECTION_HISTORICAL_SHARD_PATHS);
historyPaths.append(
{historicalPaths[0].string(),
historicalPaths[1].string(),
historicalPaths[2].string(),
historicalPaths[3].string()});
Env env{*this, std::move(c)};
DatabaseShard* db = env.app().getShardStore();
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
auto const ledgerCount = 4;
TestData data(seedValue, 4, ledgerCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i)
BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i]));
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0));
db->import(ndb);
for (std::uint32_t i = 1; i <= ledgerCount; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110));
auto const mainPathCount = std::distance(
boost::filesystem::directory_iterator(shardDir.path()),
boost::filesystem::directory_iterator());
// Only the two most recent shards
// should be stored at the main path
BEAST_EXPECT(mainPathCount == 2);
auto const historicalPathCount = std::accumulate(
historicalPaths.begin(),
historicalPaths.end(),
0,
[](int const sum, boost::filesystem::path const& path) {
return sum +
std::distance(
boost::filesystem::directory_iterator(path),
boost::filesystem::directory_iterator());
});
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == ledgerCount - 2);
}
// Test importing with a single historical
// path
{
beast::temp_dir shardDir;
beast::temp_dir historicalDir;
beast::temp_dir nodeDir;
auto c = testConfig(
"importWithSingleHistoricalPath",
backendType,
shardDir.path(),
nodeDir.path());
auto& historyPaths = c->section(SECTION_HISTORICAL_SHARD_PATHS);
historyPaths.append({historicalDir.path()});
Env env{*this, std::move(c)};
DatabaseShard* db = env.app().getShardStore();
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
auto const ledgerCount = 4;
TestData data(seedValue * 2, 4, ledgerCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i)
BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i]));
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0));
db->import(ndb);
for (std::uint32_t i = 1; i <= ledgerCount; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110));
auto const mainPathCount = std::distance(
boost::filesystem::directory_iterator(shardDir.path()),
boost::filesystem::directory_iterator());
// Only the two most recent shards
// should be stored at the main path
BEAST_EXPECT(mainPathCount == 2);
auto const historicalPathCount = std::distance(
boost::filesystem::directory_iterator(historicalDir.path()),
boost::filesystem::directory_iterator());
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == ledgerCount - 2);
}
}
void
testPrepareWithHistoricalPaths(
std::string const& backendType,
std::uint64_t const seedValue)
{
using namespace test::jtx;
// Test importing with multiple historical
// paths
{
beast::temp_dir shardDir;
std::array<beast::temp_dir, 4> historicalDirs;
std::array<boost::filesystem::path, 4> historicalPaths;
std::transform(
historicalDirs.begin(),
historicalDirs.end(),
historicalPaths.begin(),
[](const beast::temp_dir& dir) { return dir.path(); });
beast::temp_dir nodeDir;
auto c = testConfig(
"prepareWithHistoricalPaths", backendType, shardDir.path());
auto& historyPaths = c->section(SECTION_HISTORICAL_SHARD_PATHS);
historyPaths.append(
{historicalPaths[0].string(),
historicalPaths[1].string(),
historicalPaths[2].string(),
historicalPaths[3].string()});
Env env{*this, std::move(c)};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
auto const ledgerCount = 4;
TestData data(seedValue, 4, ledgerCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
BEAST_EXPECT(db->getCompleteShards() == "");
std::uint64_t bitMask = 0;
// Add ten shards to the Shard Database
for (std::uint32_t i = 0; i < ledgerCount; ++i)
{
auto n = createShard(data, *db, ledgerCount);
if (!BEAST_EXPECT(n && *n >= 1 && *n <= ledgerCount))
return;
bitMask |= 1ll << *n;
BEAST_EXPECT(
db->getCompleteShards() == bitmask2Rangeset(bitMask));
}
auto mainPathCount = std::distance(
boost::filesystem::directory_iterator(shardDir.path()),
boost::filesystem::directory_iterator());
// Only the two most recent shards
// should be stored at the main path
BEAST_EXPECT(mainPathCount == 2);
// Confirm recent shard locations
std::set<boost::filesystem::path> mainPathShards{
shardDir.path() / boost::filesystem::path("3"),
shardDir.path() / boost::filesystem::path("4")};
std::set<boost::filesystem::path> actual(
boost::filesystem::directory_iterator(shardDir.path()),
boost::filesystem::directory_iterator());
BEAST_EXPECT(mainPathShards == actual);
const auto generateHistoricalStems = [&historicalPaths, &actual] {
for (auto const& path : historicalPaths)
{
for (auto const& shard :
boost::filesystem::directory_iterator(path))
{
actual.insert(boost::filesystem::path(shard).stem());
}
}
};
// Confirm historical shard locations
std::set<boost::filesystem::path> historicalPathShards;
std::generate_n(
std::inserter(
historicalPathShards, historicalPathShards.begin()),
2,
[n = 1]() mutable { return std::to_string(n++); });
actual.clear();
generateHistoricalStems();
BEAST_EXPECT(historicalPathShards == actual);
auto historicalPathCount = std::accumulate(
historicalPaths.begin(),
historicalPaths.end(),
0,
[](int const sum, boost::filesystem::path const& path) {
return sum +
std::distance(
boost::filesystem::directory_iterator(path),
boost::filesystem::directory_iterator());
});
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == ledgerCount - 2);
data = TestData(seedValue * 2, 4, ledgerCount);
if (!BEAST_EXPECT(data.makeLedgers(env, ledgerCount)))
return;
// Add ten more shards to the Shard Database
// to exercise recent shard rotation
for (std::uint32_t i = 0; i < ledgerCount; ++i)
{
auto n = createShard(data, *db, ledgerCount * 2, ledgerCount);
if (!BEAST_EXPECT(
n && *n >= 1 + ledgerCount && *n <= ledgerCount * 2))
return;
bitMask |= 1ll << *n;
BEAST_EXPECT(
db->getCompleteShards() == bitmask2Rangeset(bitMask));
}
mainPathCount = std::distance(
boost::filesystem::directory_iterator(shardDir.path()),
boost::filesystem::directory_iterator());
// Only the two most recent shards
// should be stored at the main path
BEAST_EXPECT(mainPathCount == 2);
// Confirm recent shard locations
mainPathShards = {
shardDir.path() / boost::filesystem::path("7"),
shardDir.path() / boost::filesystem::path("8")};
actual = {
boost::filesystem::directory_iterator(shardDir.path()),
boost::filesystem::directory_iterator()};
BEAST_EXPECT(mainPathShards == actual);
// Confirm historical shard locations
historicalPathShards.clear();
std::generate_n(
std::inserter(
historicalPathShards, historicalPathShards.begin()),
6,
[n = 1]() mutable { return std::to_string(n++); });
actual.clear();
generateHistoricalStems();
BEAST_EXPECT(historicalPathShards == actual);
historicalPathCount = std::accumulate(
historicalPaths.begin(),
historicalPaths.end(),
0,
[](int const sum, boost::filesystem::path const& path) {
return sum +
std::distance(
boost::filesystem::directory_iterator(path),
boost::filesystem::directory_iterator());
});
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == (ledgerCount * 2) - 2);
}
}
void void
testAll(std::string const& backendType) testAll(std::string const& backendType)
{ {
@@ -991,6 +1312,8 @@ class DatabaseShard_test : public TestBase
testCorruptedDatabase(backendType, seedValue + 40); testCorruptedDatabase(backendType, seedValue + 40);
testIllegalFinalKey(backendType, seedValue + 50); testIllegalFinalKey(backendType, seedValue + 50);
testImport(backendType, seedValue + 60); testImport(backendType, seedValue + 60);
testImportWithHistoricalPaths(backendType, seedValue + 80);
testPrepareWithHistoricalPaths(backendType, seedValue + 90);
} }
public: public:

View File

@@ -62,7 +62,7 @@ public:
auto c = jtx::envconfig(); auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase()); auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path()); section.set("path", tempDir.path());
section.set("max_size_gb", "100"); section.set("max_historical_shards", "20");
c->setupControl(true, true, true); c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c)); jtx::Env env(*this, std::move(c));
@@ -111,7 +111,7 @@ public:
auto c = jtx::envconfig(); auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase()); auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path()); section.set("path", tempDir.path());
section.set("max_size_gb", "100"); section.set("max_historical_shards", "20");
c->setupControl(true, true, true); c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c)); jtx::Env env(*this, std::move(c));
@@ -165,7 +165,7 @@ public:
auto c = jtx::envconfig(); auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase()); auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path()); section.set("path", tempDir.path());
section.set("max_size_gb", "100"); section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256"); section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257"); section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase()); auto& sectionNode = c->section(ConfigSection::nodeDatabase());
@@ -263,7 +263,7 @@ public:
auto c = jtx::envconfig(); auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase()); auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path()); section.set("path", tempDir.path());
section.set("max_size_gb", "100"); section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256"); section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257"); section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase()); auto& sectionNode = c->section(ConfigSection::nodeDatabase());
@@ -360,7 +360,7 @@ public:
auto c = jtx::envconfig(); auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase()); auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path()); section.set("path", tempDir.path());
section.set("max_size_gb", "100"); section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256"); section.set("ledgers_per_shard", "256");
section.set("shard_verification_retry_interval", "1"); section.set("shard_verification_retry_interval", "1");
section.set("shard_verification_max_attempts", "10000"); section.set("shard_verification_max_attempts", "10000");