Make earliest ledger sequence configurable

This commit is contained in:
Miguel Portilla
2018-02-16 13:28:05 -05:00
committed by Nikolaos D. Bougalis
parent 8d9dffcf84
commit 0b18b36186
22 changed files with 304 additions and 160 deletions

View File

@@ -824,6 +824,10 @@
# require administrative RPC call "can_delete" # require administrative RPC call "can_delete"
# to enable online deletion of ledger records. # to enable online deletion of ledger records.
# #
# earliest_seq The default is 32570 to match the XRP ledger
# network's earliest allowed sequence. Alternate
# networks may set this value. Minimum value of 1.
#
# Notes: # Notes:
# The 'node_db' entry configures the primary, persistent storage. # The 'node_db' entry configures the primary, persistent storage.
# #

View File

@@ -111,7 +111,7 @@ InboundLedger::init(ScopedLockType& collectionLock)
if (mFailed) if (mFailed)
return; return;
} }
else if (shardStore && mSeq >= NodeStore::genesisSeq) else if (shardStore && mSeq >= shardStore->earliestSeq())
{ {
if (auto l = shardStore->fetchLedger(mHash, mSeq)) if (auto l = shardStore->fetchLedger(mHash, mSeq))
{ {

View File

@@ -1542,8 +1542,8 @@ LedgerMaster::fetchForHistory(
ledger = app_.getInboundLedgers().acquire( ledger = app_.getInboundLedgers().acquire(
*hash, missing, reason); *hash, missing, reason);
if (!ledger && if (!ledger &&
missing > NodeStore::genesisSeq && missing != fetch_seq_ &&
missing != fetch_seq_) missing > app_.getNodeStore().earliestSeq())
{ {
JLOG(m_journal.trace()) JLOG(m_journal.trace())
<< "fetchForHistory want fetch pack " << missing; << "fetchForHistory want fetch pack " << missing;
@@ -1603,12 +1603,12 @@ LedgerMaster::fetchForHistory(
if (reason == InboundLedger::Reason::SHARD) if (reason == InboundLedger::Reason::SHARD)
// Do not fetch ledger sequences lower // Do not fetch ledger sequences lower
// than the shard's first ledger sequence // than the shard's first ledger sequence
fetchSz = NodeStore::DatabaseShard::firstSeq( fetchSz = app_.getShardStore()->firstLedgerSeq(
NodeStore::DatabaseShard::seqToShardIndex(missing)); app_.getShardStore()->seqToShardIndex(missing));
else else
// Do not fetch ledger sequences lower // Do not fetch ledger sequences lower
// than the genesis ledger sequence // than the earliest ledger sequence
fetchSz = NodeStore::genesisSeq; fetchSz = app_.getNodeStore().earliestSeq();
fetchSz = missing >= fetchSz ? fetchSz = missing >= fetchSz ?
std::min(ledger_fetch_size_, (missing - fetchSz) + 1) : 0; std::min(ledger_fetch_size_, (missing - fetchSz) + 1) : 0;
try try
@@ -1666,7 +1666,8 @@ void LedgerMaster::doAdvance (ScopedLockType& sl)
{ {
ScopedLockType sl(mCompleteLock); ScopedLockType sl(mCompleteLock);
missing = prevMissing(mCompleteLedgers, missing = prevMissing(mCompleteLedgers,
mPubLedger->info().seq, NodeStore::genesisSeq); mPubLedger->info().seq,
app_.getNodeStore().earliestSeq());
} }
if (missing) if (missing)
{ {

View File

@@ -203,6 +203,20 @@ SHAMapStoreImp::SHAMapStoreImp (
} }
if (! setup_.shardDatabase.empty()) if (! setup_.shardDatabase.empty())
{ {
// The node and shard stores must use
// the same earliest ledger sequence
std::array<std::uint32_t, 2> seq;
if (get_if_exists<std::uint32_t>(
setup_.nodeDatabase, "earliest_seq", seq[0]))
{
if (get_if_exists<std::uint32_t>(
setup_.shardDatabase, "earliest_seq", seq[1]) &&
seq[0] != seq[1])
{
Throw<std::runtime_error>("earliest_seq set more than once");
}
}
boost::filesystem::path dbPath = boost::filesystem::path dbPath =
get<std::string>(setup_.shardDatabase, "path"); get<std::string>(setup_.shardDatabase, "path");
if (dbPath.empty()) if (dbPath.empty())
@@ -231,9 +245,6 @@ SHAMapStoreImp::SHAMapStoreImp (
if (! setup_.standalone) if (! setup_.standalone)
Throw<std::runtime_error>( Throw<std::runtime_error>(
"ledgers_per_shard only honored in stand alone"); "ledgers_per_shard only honored in stand alone");
if (lps == 0 || lps % 256 != 0)
Throw<std::runtime_error>(
"ledgers_per_shard must be a multiple of 256");
} }
} }
} }
@@ -259,7 +270,7 @@ SHAMapStoreImp::makeDatabase (std::string const& name,
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>( auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
"NodeStore.main", scheduler_, readThreads, parent, "NodeStore.main", scheduler_, readThreads, parent,
std::move(writableBackend), std::move(archiveBackend), std::move(writableBackend), std::move(archiveBackend),
nodeStoreJournal_); setup_.nodeDatabase, nodeStoreJournal_);
fdlimit_ += dbr->fdlimit(); fdlimit_ += dbr->fdlimit();
dbRotating_ = dbr.get(); dbRotating_ = dbr.get();
db.reset(dynamic_cast<NodeStore::Database*>(dbr.release())); db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));

View File

@@ -27,6 +27,7 @@
#include <ripple/nodestore/impl/Tuning.h> #include <ripple/nodestore/impl/Tuning.h>
#include <ripple/nodestore/Scheduler.h> #include <ripple/nodestore/Scheduler.h>
#include <ripple/nodestore/NodeObject.h> #include <ripple/nodestore/NodeObject.h>
#include <ripple/protocol/SystemParameters.h>
#include <thread> #include <thread>
@@ -63,7 +64,7 @@ public:
@param journal Destination for logging output. @param journal Destination for logging output.
*/ */
Database(std::string name, Stoppable& parent, Scheduler& scheduler, Database(std::string name, Stoppable& parent, Scheduler& scheduler,
int readThreads, beast::Journal j); int readThreads, Section const& config, beast::Journal j);
/** Destroy the node store. /** Destroy the node store.
All pending operations are completed, pending writes flushed, All pending operations are completed, pending writes flushed,
@@ -209,6 +210,14 @@ public:
void void
onStop(); onStop();
/** @return The earliest ledger sequence allowed
*/
std::uint32_t
earliestSeq() const
{
return earliestSeq_;
}
protected: protected:
beast::Journal j_; beast::Journal j_;
Scheduler& scheduler_; Scheduler& scheduler_;
@@ -265,6 +274,10 @@ private:
// current read generation // current read generation
uint64_t readGen_ {0}; uint64_t readGen_ {0};
// The default is 32570 to match the XRP ledger network's earliest
// allowed sequence. Alternate networks may set this value.
std::uint32_t earliestSeq_ {XRP_LEDGER_EARLIEST_SEQ};
virtual virtual
std::shared_ptr<NodeObject> std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) = 0; fetchFrom(uint256 const& hash, std::uint32_t seq) = 0;

View File

@@ -33,9 +33,14 @@ namespace NodeStore {
class DatabaseRotating : public Database class DatabaseRotating : public Database
{ {
public: public:
DatabaseRotating(std::string const& name, Stoppable& parent, DatabaseRotating(
Scheduler& scheduler, int readThreads, beast::Journal journal) std::string const& name,
: Database(name, parent, scheduler, readThreads, journal) Stoppable& parent,
Scheduler& scheduler,
int readThreads,
Section const& config,
beast::Journal journal)
: Database(name, parent, scheduler, readThreads, config, journal)
{} {}
virtual virtual

View File

@@ -45,12 +45,15 @@ public:
@param config The configuration for the database @param config The configuration for the database
@param journal Destination for logging output @param journal Destination for logging output
*/ */
DatabaseShard(std::string const& name, Stoppable& parent, DatabaseShard(
Scheduler& scheduler, int readThreads, std::string const& name,
Section const& config, beast::Journal journal) Stoppable& parent,
: Database(name, parent, scheduler, readThreads, journal) Scheduler& scheduler,
int readThreads,
Section const& config,
beast::Journal journal)
: Database(name, parent, scheduler, readThreads, config, journal)
{ {
get_if_exists<std::uint32_t>(config, "ledgers_per_shard", lps_);
} }
/** Initialize the database /** Initialize the database
@@ -120,55 +123,47 @@ public:
void void
validate() = 0; validate() = 0;
/** @return The number of ledgers stored in a shard /** @return The maximum number of ledgers stored in a shard
*/ */
static virtual
std::uint32_t std::uint32_t
ledgersPerShard() ledgersPerShard() const = 0;
{
return lps_; /** @return The earliest shard index
} */
virtual
std::uint32_t
earliestShardIndex() const = 0;
/** Calculates the shard index for a given ledger sequence /** Calculates the shard index for a given ledger sequence
@param seq ledger sequence @param seq ledger sequence
@return The shard index of the ledger sequence @return The shard index of the ledger sequence
*/ */
static virtual
std::uint32_t std::uint32_t
seqToShardIndex(std::uint32_t seq) seqToShardIndex(std::uint32_t seq) const = 0;
{
assert(seq >= genesisSeq);
return (seq - 1) / lps_;
}
/** Calculates the first ledger sequence for a given shard index /** Calculates the first ledger sequence for a given shard index
@param shardIndex The shard index considered @param shardIndex The shard index considered
@return The first ledger sequence pertaining to the shard index @return The first ledger sequence pertaining to the shard index
*/ */
static virtual
std::uint32_t std::uint32_t
firstSeq(std::uint32_t shardIndex) firstLedgerSeq(std::uint32_t shardIndex) const = 0;
{
return 1 + (shardIndex * lps_);
}
/** Calculates the last ledger sequence for a given shard index /** Calculates the last ledger sequence for a given shard index
@param shardIndex The shard index considered @param shardIndex The shard index considered
@return The last ledger sequence pertaining to the shard index @return The last ledger sequence pertaining to the shard index
*/ */
static virtual
std::uint32_t std::uint32_t
lastSeq(std::uint32_t shardIndex) lastLedgerSeq(std::uint32_t shardIndex) const = 0;
{
return (shardIndex + 1) * lps_;
}
protected: /** The number of ledgers in a shard */
// The number of ledgers stored in a shard, default is 16384 static constexpr std::uint32_t ledgersPerShardDefault {16384u};
static std::uint32_t lps_;
}; };
} }

View File

@@ -49,9 +49,6 @@ enum Status
/** A batch of NodeObjects to write at once. */ /** A batch of NodeObjects to write at once. */
using Batch = std::vector <std::shared_ptr<NodeObject>>; using Batch = std::vector <std::shared_ptr<NodeObject>>;
// System constant/invariant
static constexpr std::uint32_t genesisSeq {32570u};
} }
} }

View File

@@ -25,12 +25,25 @@
namespace ripple { namespace ripple {
namespace NodeStore { namespace NodeStore {
Database::Database(std::string name, Stoppable& parent, Database::Database(
Scheduler& scheduler, int readThreads, beast::Journal journal) std::string name,
Stoppable& parent,
Scheduler& scheduler,
int readThreads,
Section const& config,
beast::Journal journal)
: Stoppable(name, parent) : Stoppable(name, parent)
, j_(journal) , j_(journal)
, scheduler_(scheduler) , scheduler_(scheduler)
{ {
std::uint32_t seq;
if (get_if_exists<std::uint32_t>(config, "earliest_seq", seq))
{
if (seq < 1)
Throw<std::runtime_error>("Invalid earliest_seq");
earliestSeq_ = seq;
}
while (readThreads-- > 0) while (readThreads-- > 0)
readThreads_.emplace_back(&Database::threadEntry, this); readThreads_.emplace_back(&Database::threadEntry, this);
} }

View File

@@ -33,10 +33,15 @@ public:
DatabaseNodeImp(DatabaseNodeImp const&) = delete; DatabaseNodeImp(DatabaseNodeImp const&) = delete;
DatabaseNodeImp& operator=(DatabaseNodeImp const&) = delete; DatabaseNodeImp& operator=(DatabaseNodeImp const&) = delete;
DatabaseNodeImp(std::string const& name, DatabaseNodeImp(
Scheduler& scheduler, int readThreads, Stoppable& parent, std::string const& name,
std::unique_ptr<Backend> backend, beast::Journal j) Scheduler& scheduler,
: Database(name, parent, scheduler, readThreads, j) int readThreads,
Stoppable& parent,
std::unique_ptr<Backend> backend,
Section const& config,
beast::Journal j)
: Database(name, parent, scheduler, readThreads, config, j)
, pCache_(std::make_shared<TaggedCache<uint256, NodeObject>>( , pCache_(std::make_shared<TaggedCache<uint256, NodeObject>>(
name, cacheTargetSize, cacheTargetSeconds, stopwatch(), j)) name, cacheTargetSize, cacheTargetSeconds, stopwatch(), j))
, nCache_(std::make_shared<KeyCache<uint256>>( , nCache_(std::make_shared<KeyCache<uint256>>(

View File

@@ -26,10 +26,15 @@ namespace ripple {
namespace NodeStore { namespace NodeStore {
DatabaseRotatingImp::DatabaseRotatingImp( DatabaseRotatingImp::DatabaseRotatingImp(
std::string const& name, Scheduler& scheduler, int readThreads, std::string const& name,
Stoppable& parent, std::unique_ptr<Backend> writableBackend, Scheduler& scheduler,
std::unique_ptr<Backend> archiveBackend, beast::Journal j) int readThreads,
: DatabaseRotating(name, parent, scheduler, readThreads, j) Stoppable& parent,
std::unique_ptr<Backend> writableBackend,
std::unique_ptr<Backend> archiveBackend,
Section const& config,
beast::Journal j)
: DatabaseRotating(name, parent, scheduler, readThreads, config, j)
, pCache_(std::make_shared<TaggedCache<uint256, NodeObject>>( , pCache_(std::make_shared<TaggedCache<uint256, NodeObject>>(
name, cacheTargetSize, cacheTargetSeconds, stopwatch(), j)) name, cacheTargetSize, cacheTargetSeconds, stopwatch(), j))
, nCache_(std::make_shared<KeyCache<uint256>>( , nCache_(std::make_shared<KeyCache<uint256>>(

View File

@@ -32,10 +32,14 @@ public:
DatabaseRotatingImp(DatabaseRotatingImp const&) = delete; DatabaseRotatingImp(DatabaseRotatingImp const&) = delete;
DatabaseRotatingImp& operator=(DatabaseRotatingImp const&) = delete; DatabaseRotatingImp& operator=(DatabaseRotatingImp const&) = delete;
DatabaseRotatingImp(std::string const& name, DatabaseRotatingImp(
Scheduler& scheduler, int readThreads, Stoppable& parent, std::string const& name,
Scheduler& scheduler,
int readThreads,
Stoppable& parent,
std::unique_ptr<Backend> writableBackend, std::unique_ptr<Backend> writableBackend,
std::unique_ptr<Backend> archiveBackend, std::unique_ptr<Backend> archiveBackend,
Section const& config,
beast::Journal j); beast::Journal j);
~DatabaseRotatingImp() override ~DatabaseRotatingImp() override

View File

@@ -30,7 +30,7 @@
namespace ripple { namespace ripple {
namespace NodeStore { namespace NodeStore {
std::uint32_t DatabaseShard::lps_ {16384u}; constexpr std::uint32_t DatabaseShard::ledgersPerShardDefault;
DatabaseShardImp::DatabaseShardImp(Application& app, DatabaseShardImp::DatabaseShardImp(Application& app,
std::string const& name, Stoppable& parent, Scheduler& scheduler, std::string const& name, Stoppable& parent, Scheduler& scheduler,
@@ -40,8 +40,14 @@ DatabaseShardImp::DatabaseShardImp(Application& app,
, config_(config) , config_(config)
, dir_(get<std::string>(config, "path")) , dir_(get<std::string>(config, "path"))
, maxDiskSpace_(get<std::uint64_t>(config, "max_size_gb") << 30) , maxDiskSpace_(get<std::uint64_t>(config, "max_size_gb") << 30)
, avgShardSz_(lps_ * (192 * 1024)) , ledgersPerShard_(get<std::uint32_t>(
config, "ledgers_per_shard", ledgersPerShardDefault))
, earliestShardIndex_(seqToShardIndex(earliestSeq()))
, avgShardSz_(ledgersPerShard() * (192 * 1024))
{ {
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>(
"ledgers_per_shard must be a multiple of 256");
} }
DatabaseShardImp::~DatabaseShardImp() DatabaseShardImp::~DatabaseShardImp()
@@ -83,7 +89,6 @@ DatabaseShardImp::init()
return true; return true;
} }
auto const genesisShardIndex {seqToShardIndex(genesisSeq)};
// Find shards // Find shards
for (auto const& d : directory_iterator(dir_)) for (auto const& d : directory_iterator(dir_))
{ {
@@ -93,10 +98,15 @@ DatabaseShardImp::init()
if (!std::all_of(dirName.begin(), dirName.end(), ::isdigit)) if (!std::all_of(dirName.begin(), dirName.end(), ::isdigit))
continue; continue;
auto const shardIndex {std::stoul(dirName)}; auto const shardIndex {std::stoul(dirName)};
if (shardIndex < genesisShardIndex) if (shardIndex < earliestShardIndex())
continue; {
JLOG(j_.fatal()) <<
"Invalid shard index " << shardIndex <<
". Earliest shard index " << earliestShardIndex();
return false;
}
auto shard = std::make_unique<Shard>( auto shard = std::make_unique<Shard>(
shardIndex, cacheSz_, cacheAge_, j_); *this, shardIndex, cacheSz_, cacheAge_, j_);
if (!shard->open(config_, scheduler_, dir_)) if (!shard->open(config_, scheduler_, dir_))
return false; return false;
usedDiskSpace_ += shard->fileSize(); usedDiskSpace_ += shard->fileSize();
@@ -106,7 +116,7 @@ DatabaseShardImp::init()
{ {
if (incomplete_) if (incomplete_)
{ {
JLOG(j_.error()) << JLOG(j_.fatal()) <<
"More than one control file found"; "More than one control file found";
return false; return false;
} }
@@ -118,7 +128,7 @@ DatabaseShardImp::init()
// New Shard Store, calculate file descriptor requirements // New Shard Store, calculate file descriptor requirements
if (maxDiskSpace_ > space(dir_).free) if (maxDiskSpace_ > space(dir_).free)
{ {
JLOG(j_.warn()) << JLOG(j_.error()) <<
"Insufficient disk space"; "Insufficient disk space";
} }
fdLimit_ = 1 + (fdLimit_ * fdLimit_ = 1 + (fdLimit_ *
@@ -171,7 +181,7 @@ DatabaseShardImp::prepare(std::uint32_t validLedgerSeq)
int const sz {std::max(shardCacheSz, cacheSz_ / std::max( int const sz {std::max(shardCacheSz, cacheSz_ / std::max(
1, static_cast<int>(complete_.size() + 1)))}; 1, static_cast<int>(complete_.size() + 1)))};
incomplete_ = std::make_unique<Shard>( incomplete_ = std::make_unique<Shard>(
*shardIndex, sz, cacheAge_, j_); *this, *shardIndex, sz, cacheAge_, j_);
if (!incomplete_->open(config_, scheduler_, dir_)) if (!incomplete_->open(config_, scheduler_, dir_))
{ {
incomplete_.reset(); incomplete_.reset();
@@ -617,7 +627,7 @@ DatabaseShardImp::findShardIndexToAdd(
std::uint32_t validLedgerSeq, std::lock_guard<std::mutex>&) std::uint32_t validLedgerSeq, std::lock_guard<std::mutex>&)
{ {
auto maxShardIndex {seqToShardIndex(validLedgerSeq)}; auto maxShardIndex {seqToShardIndex(validLedgerSeq)};
if (validLedgerSeq != lastSeq(maxShardIndex)) if (validLedgerSeq != lastLedgerSeq(maxShardIndex))
--maxShardIndex; --maxShardIndex;
auto const numShards {complete_.size() + (incomplete_ ? 1 : 0)}; auto const numShards {complete_.size() + (incomplete_ ? 1 : 0)};
@@ -625,15 +635,13 @@ DatabaseShardImp::findShardIndexToAdd(
if (numShards >= maxShardIndex + 1) if (numShards >= maxShardIndex + 1)
return boost::none; return boost::none;
auto const genesisShardIndex {seqToShardIndex(genesisSeq)};
if (maxShardIndex < 1024 || float(numShards) / maxShardIndex > 0.5f) if (maxShardIndex < 1024 || float(numShards) / maxShardIndex > 0.5f)
{ {
// Small or mostly full index space to sample // Small or mostly full index space to sample
// Find the available indexes and select one at random // Find the available indexes and select one at random
std::vector<std::uint32_t> available; std::vector<std::uint32_t> available;
available.reserve(maxShardIndex - numShards + 1); available.reserve(maxShardIndex - numShards + 1);
for (std::uint32_t i = genesisShardIndex; i <= maxShardIndex; ++i) for (std::uint32_t i = earliestShardIndex(); i <= maxShardIndex; ++i)
{ {
if (complete_.find(i) == complete_.end() && if (complete_.find(i) == complete_.end() &&
(!incomplete_ || incomplete_->index() != i)) (!incomplete_ || incomplete_->index() != i))
@@ -649,7 +657,7 @@ DatabaseShardImp::findShardIndexToAdd(
// chances of running more than 30 times is less than 1 in a billion // chances of running more than 30 times is less than 1 in a billion
for (int i = 0; i < 40; ++i) for (int i = 0; i < 40; ++i)
{ {
auto const r = rand_int(genesisShardIndex, maxShardIndex); auto const r = rand_int(earliestShardIndex(), maxShardIndex);
if (complete_.find(r) == complete_.end() && if (complete_.find(r) == complete_.end() &&
(!incomplete_ || incomplete_->index() != r)) (!incomplete_ || incomplete_->index() != r))
return r; return r;

View File

@@ -60,6 +60,41 @@ public:
void void
validate() override; validate() override;
std::uint32_t
ledgersPerShard() const override
{
return ledgersPerShard_;
}
std::uint32_t
earliestShardIndex() const override
{
return earliestShardIndex_;
}
std::uint32_t
seqToShardIndex(std::uint32_t seq) const override
{
assert(seq >= earliestSeq());
return (seq - 1) / ledgersPerShard_;
}
std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const override
{
assert(shardIndex >= earliestShardIndex_);
if (shardIndex <= earliestShardIndex_)
return earliestSeq();
return 1 + (shardIndex * ledgersPerShard_);
}
std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const override
{
assert(shardIndex >= earliestShardIndex_);
return (shardIndex + 1) * ledgersPerShard_;
}
std::string std::string
getName() const override getName() const override
{ {
@@ -125,6 +160,14 @@ private:
// Disk space used to store the shards (in bytes) // Disk space used to store the shards (in bytes)
std::uint64_t usedDiskSpace_ {0}; std::uint64_t usedDiskSpace_ {0};
// Each shard stores 16384 ledgers. The earliest shard may store
// less if the earliest ledger sequence truncates its beginning.
// The value should only be altered for unit tests.
std::uint32_t const ledgersPerShard_;
// The earliest shard index
std::uint32_t const earliestShardIndex_;
// Average disk space a shard requires (in bytes) // Average disk space a shard requires (in bytes)
std::uint64_t avgShardSz_; std::uint64_t avgShardSz_;

View File

@@ -72,18 +72,18 @@ ManagerImp::make_Database (
Scheduler& scheduler, Scheduler& scheduler,
int readThreads, int readThreads,
Stoppable& parent, Stoppable& parent,
Section const& backendParameters, Section const& config,
beast::Journal journal) beast::Journal journal)
{ {
auto backend {make_Backend( auto backend {make_Backend(config, scheduler, journal)};
backendParameters, scheduler, journal)};
backend->open(); backend->open();
return std::make_unique <DatabaseNodeImp> ( return std::make_unique <DatabaseNodeImp>(
name, name,
scheduler, scheduler,
readThreads, readThreads,
parent, parent,
std::move(backend), std::move(backend),
config,
journal); journal);
} }

View File

@@ -65,7 +65,7 @@ public:
Scheduler& scheduler, Scheduler& scheduler,
int readThreads, int readThreads,
Stoppable& parent, Stoppable& parent,
Section const& backendParameters, Section const& config,
beast::Journal journal) override; beast::Journal journal) override;
}; };

View File

@@ -28,14 +28,13 @@
namespace ripple { namespace ripple {
namespace NodeStore { namespace NodeStore {
Shard::Shard(std::uint32_t index, int cacheSz, Shard::Shard(DatabaseShard const& db, std::uint32_t index,
PCache::clock_type::rep cacheAge, int cacheSz, PCache::clock_type::rep cacheAge, beast::Journal& j)
beast::Journal& j)
: index_(index) : index_(index)
, firstSeq_(std::max(genesisSeq, , firstSeq_(db.firstLedgerSeq(index))
DatabaseShard::firstSeq(index))) , lastSeq_(std::max(firstSeq_, db.lastLedgerSeq(index)))
, lastSeq_(std::max(firstSeq_, , maxLedgers_(index == db.earliestShardIndex() ?
DatabaseShard::lastSeq(index))) lastSeq_ - firstSeq_ + 1 : db.ledgersPerShard())
, pCache_(std::make_shared<PCache>( , pCache_(std::make_shared<PCache>(
"shard " + std::to_string(index_), "shard " + std::to_string(index_),
cacheSz, cacheAge, stopwatch(), j)) cacheSz, cacheAge, stopwatch(), j))
@@ -44,7 +43,7 @@ Shard::Shard(std::uint32_t index, int cacheSz,
stopwatch(), cacheSz, cacheAge)) stopwatch(), cacheSz, cacheAge))
, j_(j) , j_(j)
{ {
if (index_ < DatabaseShard::seqToShardIndex(genesisSeq)) if (index_ < db.earliestShardIndex())
Throw<std::runtime_error>("Shard: Invalid index"); Throw<std::runtime_error>("Shard: Invalid index");
} }
@@ -102,16 +101,7 @@ Shard::open(Section config, Scheduler& scheduler,
" invalid control file"; " invalid control file";
return false; return false;
} }
if (boost::icl::length(storedSeqs_) >= maxLedgers_)
auto const genesisShardIndex {
DatabaseShard::seqToShardIndex(genesisSeq)};
auto const genesisNumLedgers {
DatabaseShard::ledgersPerShard() - (
genesisSeq - DatabaseShardImp::firstSeq(
genesisShardIndex))};
if (boost::icl::length(storedSeqs_) ==
(index_ == genesisShardIndex ? genesisNumLedgers :
DatabaseShard::ledgersPerShard()))
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"shard " << index_ << "shard " << index_ <<
@@ -140,15 +130,7 @@ Shard::setStored(std::shared_ptr<Ledger const> const& l)
" already stored"; " already stored";
return false; return false;
} }
auto const genesisShardIndex { if (boost::icl::length(storedSeqs_) >= maxLedgers_ - 1)
DatabaseShard::seqToShardIndex(genesisSeq)};
auto const genesisNumLedgers {
DatabaseShard::ledgersPerShard() - (
genesisSeq - DatabaseShardImp::firstSeq(
genesisShardIndex))};
if (boost::icl::length(storedSeqs_) >=
(index_ == genesisShardIndex ? genesisNumLedgers :
DatabaseShard::ledgersPerShard()) - 1)
{ {
if (backend_->fdlimit() != 0) if (backend_->fdlimit() != 0)
{ {

View File

@@ -36,8 +36,9 @@ namespace NodeStore {
using PCache = TaggedCache<uint256, NodeObject>; using PCache = TaggedCache<uint256, NodeObject>;
using NCache = KeyCache<uint256>; using NCache = KeyCache<uint256>;
class DatabaseShard;
/* A range of historical ledgers backed by a nodestore. /* A range of historical ledgers backed by a node store.
Shards are indexed and store `ledgersPerShard`. Shards are indexed and store `ledgersPerShard`.
Shard `i` stores ledgers starting with sequence: `1 + (i * ledgersPerShard)` Shard `i` stores ledgers starting with sequence: `1 + (i * ledgersPerShard)`
and ending with sequence: `(i + 1) * ledgersPerShard`. and ending with sequence: `(i + 1) * ledgersPerShard`.
@@ -46,9 +47,8 @@ using NCache = KeyCache<uint256>;
class Shard class Shard
{ {
public: public:
Shard(std::uint32_t index, int cacheSz, Shard(DatabaseShard const& db, std::uint32_t index, int cacheSz,
PCache::clock_type::rep cacheAge, PCache::clock_type::rep cacheAge, beast::Journal& j);
beast::Journal& j);
bool bool
open(Section config, Scheduler& scheduler, open(Section config, Scheduler& scheduler,
@@ -117,6 +117,11 @@ private:
// Last ledger sequence in this shard // Last ledger sequence in this shard
std::uint32_t const lastSeq_; std::uint32_t const lastSeq_;
// The maximum number of ledgers this shard can store
// The earliest shard may store less ledgers than
// subsequent shards
std::uint32_t const maxLedgers_;
// Database positive cache // Database positive cache
std::shared_ptr<PCache> pCache_; std::shared_ptr<PCache> pCache_;

View File

@@ -368,8 +368,9 @@ PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const
if (std::find(recentLedgers_.begin(), if (std::find(recentLedgers_.begin(),
recentLedgers_.end(), hash) != recentLedgers_.end()) recentLedgers_.end(), hash) != recentLedgers_.end())
return true; return true;
return seq != 0 && boost::icl::contains( return seq >= app_.getNodeStore().earliestSeq() &&
shards_, NodeStore::DatabaseShard::seqToShardIndex(seq)); boost::icl::contains(shards_,
(seq - 1) / NodeStore::DatabaseShard::ledgersPerShardDefault);
} }
void void
@@ -1729,11 +1730,14 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
// need to inject the NodeStore interfaces. // need to inject the NodeStore interfaces.
std::uint32_t seq {obj.has_ledgerseq() ? obj.ledgerseq() : 0}; std::uint32_t seq {obj.has_ledgerseq() ? obj.ledgerseq() : 0};
auto hObj {app_.getNodeStore ().fetch (hash, seq)}; auto hObj {app_.getNodeStore ().fetch (hash, seq)};
if (!hObj && seq >= NodeStore::genesisSeq) if (!hObj)
{ {
if (auto shardStore = app_.getShardStore()) if (auto shardStore = app_.getShardStore())
{
if (seq >= shardStore->earliestSeq())
hObj = shardStore->fetch(hash, seq); hObj = shardStore->fetch(hash, seq);
} }
}
if (hObj) if (hObj)
{ {
protocol::TMIndexedObject& newObj = *reply.add_objects (); protocol::TMIndexedObject& newObj = *reply.add_objects ();
@@ -2181,9 +2185,9 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (packet.has_ledgerseq()) if (packet.has_ledgerseq())
{ {
seq = packet.ledgerseq(); seq = packet.ledgerseq();
if (seq >= NodeStore::genesisSeq)
{
if (auto shardStore = app_.getShardStore()) if (auto shardStore = app_.getShardStore())
{
if (seq >= shardStore->earliestSeq())
ledger = shardStore->fetchLedger(ledgerhash, seq); ledger = shardStore->fetchLedger(ledgerhash, seq);
} }
} }

View File

@@ -64,6 +64,11 @@ systemCurrencyCode ()
return code; return code;
} }
/** The XRP ledger network's earliest allowed sequence */
static
std::uint32_t constexpr
XRP_LEDGER_EARLIEST_SEQ {32570};
} // ripple } // ripple
#endif #endif

View File

@@ -140,7 +140,6 @@ public:
} }
if (testPersistence) if (testPersistence)
{
{ {
// Re-open the database without the ephemeral DB // Re-open the database without the ephemeral DB
std::unique_ptr <Database> db = Manager::instance().make_Database ( std::unique_ptr <Database> db = Manager::instance().make_Database (
@@ -155,12 +154,71 @@ public:
std::sort (copy.begin (), copy.end (), LessThan{}); std::sort (copy.begin (), copy.end (), LessThan{});
BEAST_EXPECT(areBatchesEqual (batch, copy)); BEAST_EXPECT(areBatchesEqual (batch, copy));
} }
if (type == "memory")
{
// Earliest ledger sequence tests
{
// Verify default earliest ledger sequence
std::unique_ptr<Database> db =
Manager::instance().make_Database(
"test", scheduler, 2, parent, nodeParams, j);
BEAST_EXPECT(db->earliestSeq() == XRP_LEDGER_EARLIEST_SEQ);
}
// Set an invalid earliest ledger sequence
try
{
nodeParams.set("earliest_seq", "0");
std::unique_ptr<Database> db =
Manager::instance().make_Database(
"test", scheduler, 2, parent, nodeParams, j);
}
catch (std::runtime_error const& e)
{
BEAST_EXPECT(std::strcmp(e.what(),
"Invalid earliest_seq") == 0);
}
{
// Set a valid earliest ledger sequence
nodeParams.set("earliest_seq", "1");
std::unique_ptr<Database> db =
Manager::instance().make_Database(
"test", scheduler, 2, parent, nodeParams, j);
// Verify database uses the earliest ledger sequence setting
BEAST_EXPECT(db->earliestSeq() == 1);
}
// Create another database that attempts to set the value again
try
{
// Set to default earliest ledger sequence
nodeParams.set("earliest_seq",
std::to_string(XRP_LEDGER_EARLIEST_SEQ));
std::unique_ptr<Database> db2 =
Manager::instance().make_Database(
"test", scheduler, 2, parent, nodeParams, j);
}
catch (std::runtime_error const& e)
{
BEAST_EXPECT(std::strcmp(e.what(),
"earliest_seq set more than once") == 0);
}
} }
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
void runBackendTests (std::int64_t const seedValue) void run ()
{
std::int64_t const seedValue = 50;
testNodeStore ("memory", false, seedValue);
// Persistent backend tests
{ {
testNodeStore ("nudb", true, seedValue); testNodeStore ("nudb", true, seedValue);
@@ -169,9 +227,7 @@ public:
#endif #endif
} }
//-------------------------------------------------------------------------- // Import tests
void runImportTests (std::int64_t const seedValue)
{ {
testImport ("nudb", "nudb", seedValue); testImport ("nudb", "nudb", seedValue);
@@ -183,18 +239,6 @@ public:
testImport ("sqlite", "sqlite", seedValue); testImport ("sqlite", "sqlite", seedValue);
#endif #endif
} }
//--------------------------------------------------------------------------
void run ()
{
std::int64_t const seedValue = 50;
testNodeStore ("memory", false, seedValue);
runBackendTests (seedValue);
runImportTests (seedValue);
} }
}; };

View File

@@ -195,7 +195,7 @@ public:
db.store (object->getType (), db.store (object->getType (),
std::move (data), std::move (data),
object->getHash (), object->getHash (),
NodeStore::genesisSeq); db.earliestSeq());
} }
} }