Add Shard pool management

This commit is contained in:
Miguel Portilla
2020-04-06 15:34:27 -04:00
committed by manojsdoshi
parent d282b0bf85
commit 03c809371a
32 changed files with 2018 additions and 1555 deletions

View File

@@ -321,11 +321,11 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
};
// Try to fetch the ledger header from the DB
if (auto node = srcDB.fetch(mHash, mSeq))
if (auto nodeObject = srcDB.fetchNodeObject(mHash, mSeq))
{
JLOG(m_journal.trace()) << "Ledger header found in local store";
makeLedger(node->getData());
makeLedger(nodeObject->getData());
if (mFailed)
return;
@@ -333,7 +333,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
auto& dstDB{mLedger->stateMap().family().db()};
if (std::addressof(dstDB) != std::addressof(srcDB))
{
Blob blob{node->getData()};
Blob blob{nodeObject->getData()};
dstDB.store(
hotLEDGER, std::move(blob), mHash, mLedger->info().seq);
}

View File

@@ -703,7 +703,7 @@ LedgerMaster::tryFill(Job& job, std::shared_ptr<Ledger const> ledger)
if (it == ledgerHashes.end())
break;
if (!nodeStore.fetch(
if (!nodeStore.fetchNodeObject(
ledgerHashes.begin()->second.first,
ledgerHashes.begin()->first))
{
@@ -1572,10 +1572,11 @@ LedgerMaster::getCloseTimeByHash(
LedgerHash const& ledgerHash,
std::uint32_t index)
{
auto node = app_.getNodeStore().fetch(ledgerHash, index);
if (node && (node->getData().size() >= 120))
auto nodeObject = app_.getNodeStore().fetchNodeObject(ledgerHash, index);
if (nodeObject && (nodeObject->getData().size() >= 120))
{
SerialIter it(node->getData().data(), node->getData().size());
SerialIter it(
nodeObject->getData().data(), nodeObject->getData().size());
if (safe_cast<HashPrefix>(it.get32()) == HashPrefix::ledgerMaster)
{
it.skip(

View File

@@ -1241,8 +1241,7 @@ private:
bool
nodeToShards();
bool
validateShards();
void
startGenesisLedger();
@@ -1476,12 +1475,9 @@ ApplicationImp::setup()
if (!config_->standalone())
{
// validation and node import require the sqlite db
// NodeStore import into the ShardStore requires the SQLite database
if (config_->nodeToShard && !nodeToShards())
return false;
if (config_->validateShards && !validateShards())
return false;
}
validatorSites_->start();
@@ -2173,27 +2169,6 @@ ApplicationImp::nodeToShards()
return true;
}
bool
ApplicationImp::validateShards()
{
assert(overlay_);
assert(!config_->standalone());
if (config_->section(ConfigSection::shardDatabase()).empty())
{
JLOG(m_journal.fatal())
<< "The [shard_db] configuration setting must be set";
return false;
}
if (!shardStore_)
{
JLOG(m_journal.fatal()) << "Invalid [shard_db] configuration";
return false;
}
shardStore_->validate();
return true;
}
void
ApplicationImp::setMaxDisallowedLedger()
{

View File

@@ -353,12 +353,6 @@ run(int argc, char** argv)
importText += ConfigSection::nodeDatabase();
importText += "] configuration file section).";
}
std::string shardsText;
{
shardsText += "Validate an existing shard database (specified in the [";
shardsText += ConfigSection::shardDatabase();
shardsText += "] configuration file section).";
}
// Set up option parsing.
//
@@ -388,8 +382,7 @@ run(int argc, char** argv)
"replay", "Replay a ledger close.")(
"start", "Start from a fresh Ledger.")(
"vacuum", "VACUUM the transaction db.")(
"valid", "Consider the initial ledger a valid network ledger.")(
"validateShards", shardsText.c_str());
"valid", "Consider the initial ledger a valid network ledger.");
po::options_description rpc("RPC Client Options");
rpc.add_options()(
@@ -611,9 +604,6 @@ run(int argc, char** argv)
if (vm.count("nodetoshard"))
config->nodeToShard = true;
if (vm.count("validateShards"))
config->validateShards = true;
if (vm.count("ledger"))
{
config->START_LEDGER = vm["ledger"].as<std::string>();

View File

@@ -80,7 +80,8 @@ NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report)
{
if (report.wentToDisk)
m_jobQueue->addLoadEvents(
report.isAsync ? jtNS_ASYNC_READ : jtNS_SYNC_READ,
report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ
: jtNS_SYNC_READ,
1,
report.elapsed);
}

View File

@@ -302,7 +302,8 @@ SHAMapStoreImp::copyNode(
SHAMapAbstractNode const& node)
{
// Copy a single record from node to dbRotating_
dbRotating_->fetch(node.getNodeHash().as_uint256(), node.getSeq());
dbRotating_->fetchNodeObject(
node.getNodeHash().as_uint256(), node.getSeq());
if (!(++nodeCount % checkHealthInterval_))
{
if (health())

View File

@@ -211,7 +211,7 @@ private:
for (auto const& key : cache.getKeys())
{
dbRotating_->fetch(key, 0);
dbRotating_->fetchNodeObject(key, 0);
if (!(++check % checkHealthInterval_) && health())
return true;
}

View File

@@ -56,7 +56,8 @@ enum class SizedItem : std::size_t {
nodeCacheAge,
hashNodeDBCache,
txnDBCache,
lgrDBCache
lgrDBCache,
openFinalLimit
};
// This entire derived class is deprecated.
@@ -113,7 +114,6 @@ private:
public:
bool doImport = false;
bool nodeToShard = false;
bool validateShards = false;
bool ELB_SUPPORT = false;
std::vector<std::string> IPS; // Peer IPs from rippled.cfg.

View File

@@ -40,14 +40,15 @@
namespace ripple {
inline constexpr std::array<std::pair<SizedItem, std::array<int, 5>>, 11>
sizedItems{{
// FIXME: We should document each of these items, explaining exactly
inline constexpr std::array<std::pair<SizedItem, std::array<int, 5>>, 12>
sizedItems{
{// FIXME: We should document each of these items, explaining exactly
// what
// they control and whether there exists an explicit config
// option that can be used to override the default.
{SizedItem::sweepInterval, {{10, 30, 60, 90, 120}}},
{SizedItem::treeCacheSize, {{128000, 256000, 512000, 768000, 2048000}}},
{SizedItem::treeCacheSize,
{{128000, 256000, 512000, 768000, 2048000}}},
{SizedItem::treeCacheAge, {{30, 60, 90, 120, 900}}},
{SizedItem::ledgerSize, {{32, 128, 256, 384, 768}}},
{SizedItem::ledgerAge, {{30, 90, 180, 240, 900}}},
@@ -57,7 +58,7 @@ inline constexpr std::array<std::pair<SizedItem, std::array<int, 5>>, 11>
{SizedItem::hashNodeDBCache, {{4, 12, 24, 64, 128}}},
{SizedItem::txnDBCache, {{4, 12, 24, 64, 128}}},
{SizedItem::lgrDBCache, {{4, 8, 16, 32, 128}}},
}};
{SizedItem::openFinalLimit, {{8, 16, 32, 64, 128}}}}};
// Ensure that the order of entries in the table corresponds to the
// order of entries in the enum:

View File

@@ -58,6 +58,11 @@ public:
virtual void
open(bool createIfMissing = true) = 0;
/** Returns true is the database is open.
*/
virtual bool
isOpen() = 0;
/** Close the backend.
This allows the caller to catch exceptions.
*/

View File

@@ -103,7 +103,7 @@ public:
@param data The payload of the object. The caller's
variable is overwritten.
@param hash The 256-bit hash of the payload data.
@param seq The sequence of the ledger the object belongs to.
@param ledgerSeq The sequence of the ledger the object belongs to.
@return `true` if the object was stored?
*/
@@ -112,20 +112,24 @@ public:
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t seq) = 0;
std::uint32_t ledgerSeq) = 0;
/** Fetch an object.
/** Fetch a node object.
If the object is known to be not in the database, isn't found in the
database during the fetch, or failed to load correctly during the fetch,
`nullptr` is returned.
@note This can be called concurrently.
@param hash The key of the object to retrieve.
@param seq The sequence of the ledger where the object is stored.
@param ledgerSeq The sequence of the ledger where the object is stored.
@param fetchType the type of fetch, synchronous or asynchronous.
@return The object, or nullptr if it couldn't be retrieved.
*/
virtual std::shared_ptr<NodeObject>
fetch(uint256 const& hash, std::uint32_t seq) = 0;
std::shared_ptr<NodeObject>
fetchNodeObject(
uint256 const& hash,
std::uint32_t ledgerSeq,
FetchType fetchType = FetchType::synchronous);
/** Fetch an object without waiting.
If I/O is required to determine whether or not the object is present,
@@ -135,19 +139,19 @@ public:
@note This can be called concurrently.
@param hash The key of the object to retrieve
@param seq The sequence of the ledger where the object is stored.
@param object The object retrieved
@param ledgerSeq The sequence of the ledger where the object is stored.
@param nodeObject The object retrieved
@return Whether the operation completed
*/
virtual bool
asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<NodeObject>& object) = 0;
std::uint32_t ledgerSeq,
std::shared_ptr<NodeObject>& nodeObject) = 0;
/** Copies a ledger stored in a different database to this one.
/** Store a ledger from a different database.
@param ledger The ledger to copy.
@param srcLedger The ledger to store.
@return true if the operation was successful
*/
virtual bool
@@ -160,12 +164,12 @@ public:
/** Get the maximum number of async reads the node store prefers.
@param seq A ledger sequence specifying a shard to query.
@param ledgerSeq A ledger sequence specifying a shard to query.
@return The number of async reads preferred.
@note The sequence is only used with the shard store.
*/
virtual int
getDesiredAsyncReadCount(std::uint32_t seq) = 0;
getDesiredAsyncReadCount(std::uint32_t ledgerSeq) = 0;
/** Get the positive cache hits to total attempts ratio. */
virtual float
@@ -187,7 +191,7 @@ public:
@return The total read and written bytes.
*/
std::uint32_t
std::uint64_t
getStoreCount() const
{
return storeCount_;
@@ -205,7 +209,7 @@ public:
return fetchHitCount_;
}
std::uint32_t
std::uint64_t
getStoreSize() const
{
return storeSz_;
@@ -243,68 +247,47 @@ protected:
Scheduler& scheduler_;
int fdRequired_{0};
void
stopThreads();
std::atomic<std::uint32_t> fetchHitCount_{0};
std::atomic<std::uint32_t> fetchSz_{0};
void
storeStats(size_t sz)
stopReadThreads();
void
storeStats(std::uint64_t count, std::uint64_t sz)
{
++storeCount_;
assert(count <= sz);
storeCount_ += count;
storeSz_ += sz;
}
// Called by the public asyncFetch function
void
asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache);
// Called by the public fetch function
std::shared_ptr<NodeObject>
fetchInternal(uint256 const& hash, std::shared_ptr<Backend> backend);
asyncFetch(uint256 const& hash, std::uint32_t ledgerSeq);
// Called by the public import function
void
importInternal(Backend& dstBackend, Database& srcDB);
std::shared_ptr<NodeObject>
doFetch(
uint256 const& hash,
std::uint32_t seq,
TaggedCache<uint256, NodeObject>& pCache,
KeyCache<uint256>& nCache,
bool isAsync);
// Called by the public storeLedger function
bool
storeLedger(
Ledger const& srcLedger,
std::shared_ptr<Backend> dstBackend,
std::shared_ptr<TaggedCache<uint256, NodeObject>> dstPCache,
std::shared_ptr<KeyCache<uint256>> dstNCache,
std::shared_ptr<Ledger const> next);
std::shared_ptr<KeyCache<uint256>> dstNCache);
private:
std::atomic<std::uint32_t> storeCount_{0};
std::atomic<std::uint32_t> fetchTotalCount_{0};
std::atomic<std::uint32_t> fetchHitCount_{0};
std::atomic<std::uint32_t> storeSz_{0};
std::atomic<std::uint32_t> fetchSz_{0};
std::atomic<std::uint64_t> storeCount_{0};
std::atomic<std::uint64_t> storeSz_{0};
std::atomic<std::uint64_t> fetchTotalCount_{0};
std::mutex readLock_;
std::condition_variable readCondVar_;
std::condition_variable readGenCondVar_;
// reads to do
std::map<
uint256,
std::tuple<
std::uint32_t,
std::weak_ptr<TaggedCache<uint256, NodeObject>>,
std::weak_ptr<KeyCache<uint256>>>>
read_;
std::map<uint256, std::uint32_t> read_;
// last read
uint256 readLastHash_;
@@ -320,7 +303,10 @@ private:
std::uint32_t const earliestLedgerSeq_;
virtual std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) = 0;
fetchNodeObject(
uint256 const& hash,
std::uint32_t ledgerSeq,
FetchReport& fetchReport) = 0;
/** Visit every object in the database
This is usually called during import.

View File

@@ -66,9 +66,9 @@ public:
/** Prepare to store a new ledger in the shard being acquired
@param validLedgerSeq The index of the maximum valid ledgers
@param validLedgerSeq The sequence of the maximum valid ledgers
@return If a ledger should be fetched and stored, then returns the
ledger index of the ledger to request. Otherwise returns boost::none.
ledger sequence of the ledger to request. Otherwise returns boost::none.
Some reasons this may return boost::none are: all shards are
stored and full, max allowed disk space would be exceeded, or a
ledger was recently requested and not enough time has passed
@@ -136,13 +136,6 @@ public:
virtual std::string
getCompleteShards() = 0;
/** Verifies shard store data is valid.
@param app The application object
*/
virtual void
validate() = 0;
/** @return The maximum number of ledgers stored in a shard
*/
virtual std::uint32_t
@@ -188,10 +181,10 @@ public:
constexpr std::uint32_t
seqToShardIndex(
std::uint32_t seq,
std::uint32_t ledgerSeq,
std::uint32_t ledgersPerShard = DatabaseShard::ledgersPerShardDefault)
{
return (seq - 1) / ledgersPerShard;
return (ledgerSeq - 1) / ledgersPerShard;
}
extern std::unique_ptr<DatabaseShard>

View File

@@ -26,15 +26,19 @@
namespace ripple {
namespace NodeStore {
enum class FetchType { synchronous, async };
/** Contains information about a fetch operation. */
struct FetchReport
{
explicit FetchReport() = default;
explicit FetchReport(FetchType fetchType_) : fetchType(fetchType_)
{
}
std::chrono::milliseconds elapsed;
bool isAsync;
bool wentToDisk;
bool wasFound;
FetchType const fetchType;
bool wentToDisk = false;
bool wasFound = false;
};
/** Contains information about a batch write operation. */

View File

@@ -0,0 +1,43 @@
# Open Shard Management
## Overview
Shard NuDB and SQLite databases consume server resources. This can be unnecessarily taxing on servers with many shards. The open shard management feature aims to improve the situation by managing a limited number of open shard database connections. The feature, which is integrated into the existing DatabaseShardImp and Shard classes, maintains a limited pool of open databases prioritized by their last use time stamp. The following sections describe the feature in greater detail.
### Open Shard Management
The open shard management feature is integrated into the DatabaseShardImp and Shard classes. As the DatabaseShardImp sweep function is periodically called, the number of finalized open shards, which constitutes the open pool, are examined. Upon the pool exceeding a pool limit, an attempt is made to close enough open shards to remain within the limit. Shards to be closed are selected based on their last use time stamp, which is automatically updated on database access. If necessary, shards will automatically open their databases when accessed.
```C++
if (openFinals.size() > openFinalLimit_)
{
// Try to close enough shards to be within the limit.
// Sort on largest elapsed time since last use.
std::sort(
openFinals.begin(),
openFinals.end(),
[&](std::shared_ptr<Shard> const& lhsShard,
std::shared_ptr<Shard> const& rhsShard) {
return lhsShard->getLastUse() > rhsShard->getLastUse();
});
for (auto it{openFinals.cbegin()};
it != openFinals.cend() && openFinals.size() > openFinalLimit_;)
{
if ((*it)->tryClose())
it = openFinals.erase(it);
else
++it;
}
}
```
### Shard
When closing an open shard, DatabaseShardImp will call the Shard 'tryClose' function. This function will only close the shard databases if there are no outstanding references.
DatabaseShardImp will use the Shard 'isOpen' function to determine the state of a shard's database.
### Caveats
The Shard class must check the state of its databases before use. Prior use assumed databases were always open, that is no longer the case with the open shard management feature.

View File

@@ -114,6 +114,12 @@ public:
db_ = &memoryFactory.open(name_);
}
bool
isOpen() override
{
return static_cast<bool>(db_);
}
void
close() override
{

View File

@@ -132,6 +132,12 @@ public:
Throw<std::runtime_error>("nodestore: unknown appnum");
}
bool
isOpen() override
{
return db_.is_open();
}
void
close() override
{

View File

@@ -43,6 +43,12 @@ public:
{
}
bool
isOpen() override
{
return false;
}
void
close() override
{

View File

@@ -226,6 +226,12 @@ public:
m_db.reset(db);
}
bool
isOpen() override
{
return static_cast<bool>(m_db);
}
void
close() override
{

View File

@@ -49,12 +49,12 @@ Database::Database(
Database::~Database()
{
// NOTE!
// Any derived class should call the stopThreads() method in its
// Any derived class should call the stopReadThreads() method in its
// destructor. Otherwise, occasionally, the derived class may
// crash during shutdown when its members are accessed by one of
// these threads after the derived class is destroyed but before
// this base class is destroyed.
stopThreads();
stopReadThreads();
}
void
@@ -80,7 +80,7 @@ Database::onStop()
{
// After stop time we can no longer use the JobQueue for background
// reads. Join the background read threads.
stopThreads();
stopReadThreads();
}
void
@@ -90,7 +90,7 @@ Database::onChildrenStopped()
}
void
Database::stopThreads()
Database::stopReadThreads()
{
{
std::lock_guard lock(readLock_);
@@ -107,123 +107,77 @@ Database::stopThreads()
}
void
Database::asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache)
Database::asyncFetch(uint256 const& hash, std::uint32_t ledgerSeq)
{
// Post a read
std::lock_guard lock(readLock_);
if (read_.emplace(hash, std::make_tuple(seq, pCache, nCache)).second)
if (read_.emplace(hash, ledgerSeq).second)
readCondVar_.notify_one();
}
std::shared_ptr<NodeObject>
Database::fetchInternal(uint256 const& hash, std::shared_ptr<Backend> backend)
{
std::shared_ptr<NodeObject> nObj;
Status status;
try
{
status = backend->fetch(hash.begin(), &nObj);
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "Exception, " << e.what();
Rethrow();
}
switch (status)
{
case ok:
++fetchHitCount_;
if (nObj)
fetchSz_ += nObj->getData().size();
break;
case notFound:
break;
case dataCorrupt:
// VFALCO TODO Deal with encountering corrupt data!
JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash;
break;
default:
JLOG(j_.warn()) << "Unknown status=" << status;
break;
}
return nObj;
}
void
Database::importInternal(Backend& dstBackend, Database& srcDB)
{
Batch b;
b.reserve(batchWritePreallocationSize);
srcDB.for_each([&](std::shared_ptr<NodeObject> nObj) {
assert(nObj);
if (!nObj) // This should never happen
Batch batch;
batch.reserve(batchWritePreallocationSize);
auto storeBatch = [&]() {
try
{
dstBackend.storeBatch(batch);
}
catch (std::exception const& e)
{
JLOG(j_.error()) << "Exception caught in function " << __func__
<< ". Error: " << e.what();
return;
}
std::uint64_t sz{0};
for (auto const& nodeObject : batch)
sz += nodeObject->getData().size();
storeStats(batch.size(), sz);
batch.clear();
};
srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
assert(nodeObject);
if (!nodeObject) // This should never happen
return;
++storeCount_;
storeSz_ += nObj->getData().size();
b.push_back(nObj);
if (b.size() >= batchWritePreallocationSize)
{
dstBackend.storeBatch(b);
b.clear();
b.reserve(batchWritePreallocationSize);
}
batch.emplace_back(std::move(nodeObject));
if (batch.size() >= batchWritePreallocationSize)
storeBatch();
});
if (!b.empty())
dstBackend.storeBatch(b);
if (!batch.empty())
storeBatch();
}
// Perform a fetch and report the time it took
std::shared_ptr<NodeObject>
Database::doFetch(
Database::fetchNodeObject(
uint256 const& hash,
std::uint32_t seq,
TaggedCache<uint256, NodeObject>& pCache,
KeyCache<uint256>& nCache,
bool isAsync)
std::uint32_t ledgerSeq,
FetchType fetchType)
{
FetchReport report;
report.isAsync = isAsync;
report.wentToDisk = false;
FetchReport fetchReport(fetchType);
using namespace std::chrono;
auto const before = steady_clock::now();
auto const begin{steady_clock::now()};
// See if the object already exists in the cache
auto nObj = pCache.fetch(hash);
if (!nObj && !nCache.touch_if_exists(hash))
auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport)};
if (nodeObject)
{
// Try the database(s)
report.wentToDisk = true;
nObj = fetchFrom(hash, seq);
++fetchHitCount_;
fetchSz_ += nodeObject->getData().size();
}
if (fetchReport.wentToDisk)
++fetchTotalCount_;
if (!nObj)
{
// Just in case a write occurred
nObj = pCache.fetch(hash);
if (!nObj)
// We give up
nCache.insert(hash);
}
else
{
// Ensure all threads get the same object
pCache.canonicalize_replace_client(hash, nObj);
// Since this was a 'hard' fetch, we will log it.
JLOG(j_.trace()) << "HOS: " << hash << " fetch: in db";
}
}
report.wasFound = static_cast<bool>(nObj);
report.elapsed = duration_cast<milliseconds>(steady_clock::now() - before);
scheduler_.onFetch(report);
return nObj;
fetchReport.elapsed =
duration_cast<milliseconds>(steady_clock::now() - begin);
scheduler_.onFetch(fetchReport);
return nodeObject;
}
bool
@@ -231,58 +185,52 @@ Database::storeLedger(
Ledger const& srcLedger,
std::shared_ptr<Backend> dstBackend,
std::shared_ptr<TaggedCache<uint256, NodeObject>> dstPCache,
std::shared_ptr<KeyCache<uint256>> dstNCache,
std::shared_ptr<Ledger const> next)
std::shared_ptr<KeyCache<uint256>> dstNCache)
{
assert(static_cast<bool>(dstPCache) == static_cast<bool>(dstNCache));
if (srcLedger.info().hash.isZero() || srcLedger.info().accountHash.isZero())
{
assert(false);
JLOG(j_.error()) << "source ledger seq " << srcLedger.info().seq
<< " is invalid";
auto fail = [&](std::string const& msg) {
JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
<< ". " << msg;
return false;
}
};
if (!dstPCache || !dstNCache)
return fail("Invalid destination cache");
if (srcLedger.info().hash.isZero())
return fail("Invalid hash");
if (srcLedger.info().accountHash.isZero())
return fail("Invalid account hash");
auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
if (&srcDB == this)
{
assert(false);
JLOG(j_.error()) << "source and destination databases are the same";
return false;
}
return fail("Source and destination databases are the same");
Batch batch;
batch.reserve(batchWritePreallocationSize);
auto storeBatch = [&]() {
if (dstPCache && dstNCache)
std::uint64_t sz{0};
for (auto const& nodeObject : batch)
{
for (auto& nObj : batch)
dstPCache->canonicalize_replace_cache(
nodeObject->getHash(), nodeObject);
dstNCache->erase(nodeObject->getHash());
sz += nodeObject->getData().size();
}
try
{
dstPCache->canonicalize_replace_cache(nObj->getHash(), nObj);
dstNCache->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
}
dstBackend->storeBatch(batch);
batch.clear();
batch.reserve(batchWritePreallocationSize);
};
bool error = false;
auto visit = [&](SHAMapAbstractNode& node) {
if (auto nObj = srcDB.fetch(
node.getNodeHash().as_uint256(), srcLedger.info().seq))
}
catch (std::exception const& e)
{
batch.emplace_back(std::move(nObj));
if (batch.size() < batchWritePreallocationSize)
return true;
storeBatch();
if (!isStopping())
return true;
fail(
std::string("Exception caught in function ") + __func__ +
". Error: " + e.what());
return false;
}
error = true;
return false;
storeStats(batch.size(), sz);
batch.clear();
return true;
};
// Store ledger header
@@ -295,43 +243,48 @@ Database::storeLedger(
batch.emplace_back(std::move(nObj));
}
bool error = false;
auto visit = [&](SHAMapAbstractNode& node) {
if (!isStopping())
{
if (auto nodeObject = srcDB.fetchNodeObject(
node.getNodeHash().as_uint256(), srcLedger.info().seq))
{
batch.emplace_back(std::move(nodeObject));
if (batch.size() < batchWritePreallocationSize || storeBatch())
return true;
}
}
error = true;
return false;
};
// Store the state map
if (srcLedger.stateMap().getHash().isNonZero())
{
if (!srcLedger.stateMap().isValid())
{
JLOG(j_.error()) << "source ledger seq " << srcLedger.info().seq
<< " state map invalid";
return false;
}
if (next && next->info().parentHash == srcLedger.info().hash)
{
auto have = next->stateMap().snapShot(false);
srcLedger.stateMap().snapShot(false)->visitDifferences(
&(*have), visit);
}
else
return fail("Invalid state map");
srcLedger.stateMap().snapShot(false)->visitNodes(visit);
if (error)
return false;
return fail("Failed to store state map");
}
// Store the transaction map
if (srcLedger.info().txHash.isNonZero())
{
if (!srcLedger.txMap().isValid())
{
JLOG(j_.error()) << "source ledger seq " << srcLedger.info().seq
<< " transaction map invalid";
return false;
}
return fail("Invalid transaction map");
srcLedger.txMap().snapShot(false)->visitNodes(visit);
if (error)
return false;
return fail("Failed to store transaction map");
}
if (!batch.empty())
storeBatch();
if (!batch.empty() && !storeBatch())
return fail("Failed to store");
return true;
}
@@ -344,8 +297,6 @@ Database::threadEntry()
{
uint256 lastHash;
std::uint32_t lastSeq;
std::shared_ptr<TaggedCache<uint256, NodeObject>> lastPcache;
std::shared_ptr<KeyCache<uint256>> lastNcache;
{
std::unique_lock<std::mutex> lock(readLock_);
while (!readShut_ && read_.empty())
@@ -367,16 +318,13 @@ Database::threadEntry()
readGenCondVar_.notify_all();
}
lastHash = it->first;
lastSeq = std::get<0>(it->second);
lastPcache = std::get<1>(it->second).lock();
lastNcache = std::get<2>(it->second).lock();
lastSeq = it->second;
read_.erase(it);
readLastHash_ = lastHash;
}
// Perform the read
if (lastPcache && lastNcache)
doFetch(lastHash, lastSeq, *lastPcache, *lastNcache, true);
fetchNodeObject(lastHash, lastSeq, FetchType::async);
}
}

View File

@@ -29,27 +29,28 @@ DatabaseNodeImp::store(
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t seq)
std::uint32_t)
{
auto nObj = NodeObject::createObject(type, std::move(data), hash);
pCache_->canonicalize_replace_cache(hash, nObj);
backend_->store(nObj);
nCache_->erase(hash);
storeStats(nObj->getData().size());
storeStats(1, nObj->getData().size());
}
bool
DatabaseNodeImp::asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<NodeObject>& object)
std::uint32_t ledgerSeq,
std::shared_ptr<NodeObject>& nodeObject)
{
// See if the object is in cache
object = pCache_->fetch(hash);
if (object || nCache_->touch_if_exists(hash))
nodeObject = pCache_->fetch(hash);
if (nodeObject || nCache_->touch_if_exists(hash))
return true;
// Otherwise post a read
Database::asyncFetch(hash, seq, pCache_, nCache_);
Database::asyncFetch(hash, ledgerSeq);
return false;
}
@@ -69,5 +70,69 @@ DatabaseNodeImp::sweep()
nCache_->sweep();
}
std::shared_ptr<NodeObject>
DatabaseNodeImp::fetchNodeObject(
uint256 const& hash,
std::uint32_t,
FetchReport& fetchReport)
{
// See if the node object exists in the cache
auto nodeObject{pCache_->fetch(hash)};
if (!nodeObject && !nCache_->touch_if_exists(hash))
{
// Try the backend
fetchReport.wentToDisk = true;
Status status;
try
{
status = backend_->fetch(hash.data(), &nodeObject);
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "Exception, " << e.what();
Rethrow();
}
switch (status)
{
case ok:
++fetchHitCount_;
if (nodeObject)
fetchSz_ += nodeObject->getData().size();
break;
case notFound:
break;
case dataCorrupt:
JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash;
break;
default:
JLOG(j_.warn()) << "Unknown status=" << status;
break;
}
if (!nodeObject)
{
// Just in case a write occurred
nodeObject = pCache_->fetch(hash);
if (!nodeObject)
// We give up
nCache_->insert(hash);
}
else
{
fetchReport.wasFound = true;
// Ensure all threads get the same object
pCache_->canonicalize_replace_client(hash, nodeObject);
// Since this was a 'hard' fetch, we will log it
JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db";
}
}
return nodeObject;
}
} // namespace NodeStore
} // namespace ripple

View File

@@ -62,8 +62,8 @@ public:
~DatabaseNodeImp() override
{
// Stop threads before data members are destroyed.
stopThreads();
// Stop read threads in base before data members are destroyed
stopReadThreads();
}
std::string
@@ -85,33 +85,22 @@ public:
}
void
store(
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t seq) override;
std::shared_ptr<NodeObject>
fetch(uint256 const& hash, std::uint32_t seq) override
{
return doFetch(hash, seq, *pCache_, *nCache_, false);
}
store(NodeObjectType type, Blob&& data, uint256 const& hash, std::uint32_t)
override;
bool
asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<NodeObject>& object) override;
std::uint32_t ledgerSeq,
std::shared_ptr<NodeObject>& nodeObject) override;
bool
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override
{
return Database::storeLedger(
*srcLedger, backend_, pCache_, nCache_, nullptr);
return Database::storeLedger(*srcLedger, backend_, pCache_, nCache_);
}
int
getDesiredAsyncReadCount(std::uint32_t seq) override
int getDesiredAsyncReadCount(std::uint32_t) override
{
// We prefer a client not fill our cache
// We don't want to push data out of the cache
@@ -142,10 +131,10 @@ private:
std::shared_ptr<Backend> backend_;
std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) override
{
return fetchInternal(hash, backend_);
}
fetchNodeObject(
uint256 const& hash,
std::uint32_t,
FetchReport& fetchReport) override;
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override

View File

@@ -101,8 +101,7 @@ DatabaseRotatingImp::storeLedger(std::shared_ptr<Ledger const> const& srcLedger)
return writableBackend_;
}();
return Database::storeLedger(
*srcLedger, backend, pCache_, nCache_, nullptr);
return Database::storeLedger(*srcLedger, backend, pCache_, nCache_);
}
void
@@ -110,7 +109,7 @@ DatabaseRotatingImp::store(
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t seq)
std::uint32_t)
{
auto nObj = NodeObject::createObject(type, std::move(data), hash);
pCache_->canonicalize_replace_cache(hash, nObj);
@@ -122,22 +121,22 @@ DatabaseRotatingImp::store(
backend->store(nObj);
nCache_->erase(hash);
storeStats(nObj->getData().size());
storeStats(1, nObj->getData().size());
}
bool
DatabaseRotatingImp::asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<NodeObject>& object)
std::uint32_t ledgerSeq,
std::shared_ptr<NodeObject>& nodeObject)
{
// See if the object is in cache
object = pCache_->fetch(hash);
if (object || nCache_->touch_if_exists(hash))
nodeObject = pCache_->fetch(hash);
if (nodeObject || nCache_->touch_if_exists(hash))
return true;
// Otherwise post a read
Database::asyncFetch(hash, seq, pCache_, nCache_);
Database::asyncFetch(hash, ledgerSeq);
return false;
}
@@ -158,20 +157,62 @@ DatabaseRotatingImp::sweep()
}
std::shared_ptr<NodeObject>
DatabaseRotatingImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
DatabaseRotatingImp::fetchNodeObject(
uint256 const& hash,
std::uint32_t,
FetchReport& fetchReport)
{
auto fetch = [&](std::shared_ptr<Backend> const& backend) {
Status status;
std::shared_ptr<NodeObject> nodeObject;
try
{
status = backend->fetch(hash.data(), &nodeObject);
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "Exception, " << e.what();
Rethrow();
}
switch (status)
{
case ok:
++fetchHitCount_;
if (nodeObject)
fetchSz_ += nodeObject->getData().size();
break;
case notFound:
break;
case dataCorrupt:
JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash;
break;
default:
JLOG(j_.warn()) << "Unknown status=" << status;
break;
}
return nodeObject;
};
// See if the node object exists in the cache
auto nodeObject{pCache_->fetch(hash)};
if (!nodeObject && !nCache_->touch_if_exists(hash))
{
auto [writable, archive] = [&] {
std::lock_guard lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();
fetchReport.wentToDisk = true;
// Try to fetch from the writable backend
auto nObj = fetchInternal(hash, writable);
if (!nObj)
nodeObject = fetch(writable);
if (!nodeObject)
{
// Otherwise try to fetch from the archive backend
nObj = fetchInternal(hash, archive);
if (nObj)
nodeObject = fetch(archive);
if (nodeObject)
{
{
// Refresh the writable backend pointer
@@ -180,11 +221,32 @@ DatabaseRotatingImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
}
// Update writable backend with data from the archive backend
writable->store(nObj);
writable->store(nodeObject);
nCache_->erase(hash);
}
}
return nObj;
if (!nodeObject)
{
// Just in case a write occurred
nodeObject = pCache_->fetch(hash);
if (!nodeObject)
// We give up
nCache_->insert(hash);
}
else
{
fetchReport.wasFound = true;
// Ensure all threads get the same object
pCache_->canonicalize_replace_client(hash, nodeObject);
// Since this was a 'hard' fetch, we will log it
JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db";
}
}
return nodeObject;
}
void

View File

@@ -45,8 +45,8 @@ public:
~DatabaseRotatingImp() override
{
// Stop threads before data members are destroyed.
stopThreads();
// Stop read threads in base before data members are destroyed
stopReadThreads();
}
void
@@ -64,29 +64,19 @@ public:
import(Database& source) override;
void
store(
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t seq) override;
std::shared_ptr<NodeObject>
fetch(uint256 const& hash, std::uint32_t seq) override
{
return doFetch(hash, seq, *pCache_, *nCache_, false);
}
store(NodeObjectType type, Blob&& data, uint256 const& hash, std::uint32_t)
override;
bool
asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<NodeObject>& object) override;
std::uint32_t ledgerSeq,
std::shared_ptr<NodeObject>& nodeObject) override;
bool
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override;
int
getDesiredAsyncReadCount(std::uint32_t seq) override
int getDesiredAsyncReadCount(std::uint32_t) override
{
// We prefer a client not fill our cache
// We don't want to push data out of the cache
@@ -124,7 +114,10 @@ private:
mutable std::mutex mutex_;
std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) override;
fetchNodeObject(
uint256 const& hash,
std::uint32_t,
FetchReport& fetchReport) override;
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override;

File diff suppressed because it is too large Load Diff

View File

@@ -48,12 +48,10 @@ public:
int readThreads,
beast::Journal j);
~DatabaseShardImp() override;
bool
[[nodiscard]] bool
init() override;
boost::optional<std::uint32_t>
[[nodiscard]] boost::optional<std::uint32_t>
prepareLedger(std::uint32_t validLedgerSeq) override;
bool
@@ -70,7 +68,7 @@ public:
override;
std::shared_ptr<Ledger>
fetchLedger(uint256 const& hash, std::uint32_t seq) override;
fetchLedger(uint256 const& hash, std::uint32_t ledgerSeq) override;
void
setStored(std::shared_ptr<Ledger const> const& ledger) override;
@@ -78,9 +76,6 @@ public:
std::string
getCompleteShards() override;
void
validate() override;
std::uint32_t
ledgersPerShard() const override
{
@@ -94,10 +89,10 @@ public:
}
std::uint32_t
seqToShardIndex(std::uint32_t seq) const override
seqToShardIndex(std::uint32_t ledgerSeq) const override
{
assert(seq >= earliestLedgerSeq());
return NodeStore::seqToShardIndex(seq, ledgersPerShard_);
assert(ledgerSeq >= earliestLedgerSeq());
return NodeStore::seqToShardIndex(ledgerSeq, ledgersPerShard_);
}
std::uint32_t
@@ -131,6 +126,9 @@ public:
void
onStop() override;
void
onChildrenStopped() override;
/** Import the application local node store
@param source The application node store.
@@ -146,22 +144,19 @@ public:
NodeObjectType type,
Blob&& data,
uint256 const& hash,
std::uint32_t seq) override;
std::shared_ptr<NodeObject>
fetch(uint256 const& hash, std::uint32_t seq) override;
std::uint32_t ledgerSeq) override;
bool
asyncFetch(
uint256 const& hash,
std::uint32_t seq,
std::shared_ptr<NodeObject>& object) override;
std::uint32_t ledgerSeq,
std::shared_ptr<NodeObject>& nodeObject) override;
bool
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override;
int
getDesiredAsyncReadCount(std::uint32_t seq) override;
getDesiredAsyncReadCount(std::uint32_t ledgerSeq) override;
float
getCacheHitRate() override;
@@ -173,26 +168,6 @@ public:
sweep() override;
private:
struct ShardInfo
{
enum class State {
none,
final, // Immutable, complete and validated
acquire, // Being acquired
import, // Being imported
finalize // Being finalized
};
ShardInfo() = default;
ShardInfo(std::shared_ptr<Shard> shard_, State state_)
: shard(std::move(shard_)), state(state_)
{
}
std::shared_ptr<Shard> shard;
State state{State::none};
};
enum class PathDesignation : uint8_t {
none, // No path specified
historical // Needs a historical path
@@ -210,7 +185,10 @@ private:
std::unique_ptr<TaskQueue> taskQueue_;
// Shards held by this server
std::map<std::uint32_t, ShardInfo> shards_;
std::unordered_map<std::uint32_t, std::shared_ptr<Shard>> shards_;
// Shard indexes being imported
std::set<std::uint32_t> preparedIndexes_;
// Shard index being acquired from the peer network
std::uint32_t acquireIndex_{0};
@@ -247,6 +225,9 @@ private:
// Average storage space required by a shard (in bytes)
std::uint64_t avgShardFileSz_;
// The limit of final shards with open databases at any time
std::uint32_t const openFinalLimit_;
// File name used to mark shards being imported from node store
static constexpr auto importMarker_ = "import";
@@ -263,10 +244,13 @@ private:
// Initialize settings from the configuration file
// Lock must be held
bool
initConfig(std::lock_guard<std::mutex>&);
initConfig(std::lock_guard<std::mutex> const&);
std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) override;
fetchNodeObject(
uint256 const& hash,
std::uint32_t ledgerSeq,
FetchReport& fetchReport) override;
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
@@ -279,30 +263,24 @@ private:
boost::optional<std::uint32_t>
findAcquireIndex(
std::uint32_t validLedgerSeq,
std::lock_guard<std::mutex>&);
std::lock_guard<std::mutex> const&);
private:
// Queue a task to finalize a shard by validating its databases
// Queue a task to finalize a shard by verifying its databases
// Lock must be held
void
finalizeShard(
ShardInfo& shardInfo,
std::shared_ptr<Shard>& shard,
bool writeSQLite,
std::lock_guard<std::mutex>&,
boost::optional<uint256> const& expectedHash);
// Set storage and file descriptor usage stats
// Lock must NOT be held
void
setFileStats();
// Update status string
// Lock must be held
void
updateStatus(std::lock_guard<std::mutex>&);
std::pair<std::shared_ptr<PCache>, std::shared_ptr<NCache>>
getCache(std::uint32_t seq);
updateStatus(std::lock_guard<std::mutex> const&);
// Returns true if the filesystem has enough storage
// available to hold the specified number of shards.
@@ -317,7 +295,7 @@ private:
std::lock_guard<std::mutex> const&) const;
bool
storeLedgerInShard(
setStoredInShard(
std::shared_ptr<Shard>& shard,
std::shared_ptr<Ledger const> const& ledger);
@@ -327,7 +305,7 @@ private:
// Returns the index that represents the logical
// partition between historical and recent shards
std::uint32_t
shardBoundaryIndex(std::lock_guard<std::mutex> const&) const;
shardBoundaryIndex() const;
std::uint32_t
numHistoricalShards(std::lock_guard<std::mutex> const& lock) const;

File diff suppressed because it is too large Load Diff

View File

@@ -51,6 +51,18 @@ class DatabaseShard;
class Shard final
{
public:
enum class State {
acquire, // Being acquired
complete, // Backend contains all ledgers but is not yet final
finalizing, // Being finalized
final // Database verified, shard is immutable
};
static constexpr State acquire = State::acquire;
static constexpr State complete = State::complete;
static constexpr State finalizing = State::finalizing;
static constexpr State final = State::final;
Shard(
Application& app,
DatabaseShard const& db,
@@ -66,70 +78,114 @@ public:
~Shard();
/** Initialize shard.
@param scheduler The scheduler to use for performing asynchronous tasks.
@param context The context to use for the backend.
*/
[[nodiscard]] bool
init(Scheduler& scheduler, nudb::context& context);
/** Returns true if the database are open.
*/
[[nodiscard]] bool
isOpen() const;
/** Try to close databases if not in use.
@return true if databases were closed.
*/
bool
open(Scheduler& scheduler, nudb::context& ctx);
tryClose();
/** Notify shard to prepare for shutdown.
*/
void
closeAll();
stop()
{
stop_ = true;
}
boost::optional<std::uint32_t>
[[nodiscard]] boost::optional<std::uint32_t>
prepare();
bool
store(std::shared_ptr<Ledger const> const& ledger);
[[nodiscard]] bool
storeNodeObject(std::shared_ptr<NodeObject> const& nodeObject);
bool
containsLedger(std::uint32_t seq) const;
[[nodiscard]] std::shared_ptr<NodeObject>
fetchNodeObject(uint256 const& hash, FetchReport& fetchReport);
[[nodiscard]] bool
fetchNodeObjectFromCache(
uint256 const& hash,
std::shared_ptr<NodeObject>& nodeObject);
/** Store a ledger.
@param srcLedger The ledger to store.
@param next The ledger that immediately follows srcLedger, can be null.
@return StoreLedgerResult containing data about the store.
*/
struct StoreLedgerResult
{
std::uint64_t count{0}; // Number of storage calls
std::uint64_t size{0}; // Number of bytes stored
bool error{false};
};
[[nodiscard]] StoreLedgerResult
storeLedger(
std::shared_ptr<Ledger const> const& srcLedger,
std::shared_ptr<Ledger const> const& next);
[[nodiscard]] bool
setLedgerStored(std::shared_ptr<Ledger const> const& ledger);
[[nodiscard]] bool
containsLedger(std::uint32_t ledgerSeq) const;
void
sweep();
std::uint32_t
[[nodiscard]] std::uint32_t
index() const
{
return index_;
}
boost::filesystem::path const&
[[nodiscard]] boost::filesystem::path const&
getDir() const
{
return dir_;
}
std::tuple<
std::shared_ptr<Backend>,
std::shared_ptr<PCache>,
std::shared_ptr<NCache>>
getBackendAll() const;
[[nodiscard]] int
getDesiredAsyncReadCount();
std::shared_ptr<Backend>
getBackend() const;
[[nodiscard]] float
getCacheHitRate();
/** Returns `true` if all shard ledgers have been stored in the backend
*/
bool
isBackendComplete() const;
std::shared_ptr<PCache>
pCache() const;
std::shared_ptr<NCache>
nCache() const;
[[nodiscard]] std::chrono::steady_clock::time_point
getLastUse() const;
/** Returns a pair where the first item describes the storage space
utilized and the second item is the number of file descriptors required.
*/
std::pair<std::uint64_t, std::uint32_t>
fileInfo() const;
[[nodiscard]] std::pair<std::uint64_t, std::uint32_t>
getFileInfo() const;
/** Returns `true` if the shard is complete, validated, and immutable.
*/
bool
isFinal() const;
[[nodiscard]] State
getState() const
{
return state_;
}
/** Returns `true` if the shard is older, without final key data
[[nodiscard]] std::int32_t
getWriteLoad();
/** Returns `true` if shard is older, without final key data
*/
bool
[[nodiscard]] bool
isLegacy() const;
/** Finalize shard by walking its ledgers and verifying each Merkle tree.
@@ -139,19 +195,12 @@ public:
@param referenceHash If present, this hash must match the hash
of the last ledger in the shard.
*/
bool
[[nodiscard]] bool
finalize(
bool const writeSQLite,
boost::optional<uint256> const& referenceHash);
void
stop()
{
stop_ = true;
}
/** If called, the shard directory will be removed when
the shard is destroyed.
/** Enables removal of the shard directory on destruction.
*/
void
removeOnDestroy()
@@ -168,6 +217,41 @@ public:
static uint256 const finalKey;
private:
class Count final
{
public:
Count(Count const&) = delete;
Count&
operator=(Count&&) = delete;
Count&
operator=(Count const&) = delete;
Count(Count&& other) : counter_(other.counter_)
{
other.counter_ = nullptr;
}
Count(std::atomic<std::uint32_t>* counter) : counter_(counter)
{
if (counter_)
++(*counter_);
}
~Count()
{
if (counter_)
--(*counter_);
}
operator bool() const
{
return counter_ != nullptr;
}
private:
std::atomic<std::uint32_t>* counter_;
};
struct AcquireInfo
{
// SQLite database to track information about what has been acquired
@@ -178,7 +262,8 @@ private:
};
Application& app_;
mutable std::recursive_mutex mutex_;
beast::Journal const j_;
mutable std::mutex mutex_;
// Shard Index
std::uint32_t const index_;
@@ -194,10 +279,10 @@ private:
std::uint32_t const maxLedgers_;
// Database positive cache
std::shared_ptr<PCache> pCache_;
std::unique_ptr<PCache> pCache_;
// Database negative cache
std::shared_ptr<NCache> nCache_;
std::unique_ptr<NCache> nCache_;
// Path to database files
boost::filesystem::path const dir_;
@@ -209,7 +294,9 @@ private:
std::uint32_t fdRequired_{0};
// NuDB key/value store for node objects
std::shared_ptr<Backend> backend_;
std::unique_ptr<Backend> backend_;
std::atomic<std::uint32_t> backendCount_{0};
// Ledger SQLite database used for indexes
std::unique_ptr<DatabaseCon> lgrSQLiteDB_;
@@ -221,50 +308,55 @@ private:
// If the shard is final, this member will be null.
std::unique_ptr<AcquireInfo> acquireInfo_;
beast::Journal const j_;
// True if backend has stored all ledgers pertaining to the shard
bool backendComplete_{false};
// Older shard without an acquire database or final key
// Eventually there will be no need for this and should be removed
bool legacy_{false};
// True if the backend has a final key stored
bool final_{false};
// Determines if the shard needs to stop processing for shutdown
std::atomic<bool> stop_{false};
std::atomic<State> state_{State::acquire};
// Determines if the shard directory should be removed in the destructor
std::atomic<bool> removeOnDestroy_{false};
// The time of the last access of a shard that has a final state
std::chrono::steady_clock::time_point lastAccess_;
// Open shard databases
[[nodiscard]] bool
open(std::lock_guard<std::mutex> const& lock);
// Open/Create SQLite databases
// Lock over mutex_ required
bool
initSQLite(std::lock_guard<std::recursive_mutex> const& lock);
[[nodiscard]] bool
initSQLite(std::lock_guard<std::mutex> const&);
// Write SQLite entries for this ledger
// Lock over mutex_ required
bool
[[nodiscard]] bool
storeSQLite(
std::shared_ptr<Ledger const> const& ledger,
std::lock_guard<std::recursive_mutex> const& lock);
std::lock_guard<std::mutex> const&);
// Set storage and file descriptor usage stats
// Lock over mutex_ required
void
setFileStats(std::lock_guard<std::recursive_mutex> const& lock);
setFileStats(std::lock_guard<std::mutex> const&);
// Validate this ledger by walking its SHAMaps and verifying Merkle trees
bool
valLedger(
[[nodiscard]] bool
verifyLedger(
std::shared_ptr<Ledger const> const& ledger,
std::shared_ptr<Ledger const> const& next) const;
// Fetches from backend and log errors based on status codes
std::shared_ptr<NodeObject>
valFetch(uint256 const& hash) const;
[[nodiscard]] std::shared_ptr<NodeObject>
verifyFetch(uint256 const& hash) const;
// Open databases if they are closed
[[nodiscard]] Shard::Count
makeBackendCount();
};
} // namespace NodeStore

View File

@@ -2221,21 +2221,22 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
// VFALCO TODO Move this someplace more sensible so we dont
// need to inject the NodeStore interfaces.
std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
auto hObj{app_.getNodeStore().fetch(hash, seq)};
if (!hObj)
auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
if (!nodeObject)
{
if (auto shardStore = app_.getShardStore())
{
if (seq >= shardStore->earliestLedgerSeq())
hObj = shardStore->fetch(hash, seq);
nodeObject = shardStore->fetchNodeObject(hash, seq);
}
}
if (hObj)
if (nodeObject)
{
protocol::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(hash.begin(), hash.size());
newObj.set_data(
&hObj->getData().front(), hObj->getData().size());
&nodeObject->getData().front(),
nodeObject->getData().size());
if (obj.has_nodeid())
newObj.set_index(obj.nodeid());

View File

@@ -120,10 +120,11 @@ getCountsJson(Application& app, int minObjectCount)
textTime(uptime, s, "second", 1s);
ret[jss::uptime] = uptime;
ret[jss::node_writes] = app.getNodeStore().getStoreCount();
ret[jss::node_writes] = std::to_string(app.getNodeStore().getStoreCount());
ret[jss::node_reads_total] = app.getNodeStore().getFetchTotalCount();
ret[jss::node_reads_hit] = app.getNodeStore().getFetchHitCount();
ret[jss::node_written_bytes] = app.getNodeStore().getStoreSize();
ret[jss::node_written_bytes] =
std::to_string(app.getNodeStore().getStoreSize());
ret[jss::node_read_bytes] = app.getNodeStore().getFetchSize();
if (auto shardStore = app.getShardStore())
@@ -137,10 +138,11 @@ getCountsJson(Application& app, int minObjectCount)
jv[jss::treenode_track_size] = trackSz;
ret[jss::write_load] = shardStore->getWriteLoad();
ret[jss::node_hit_rate] = shardStore->getCacheHitRate();
jv[jss::node_writes] = shardStore->getStoreCount();
jv[jss::node_writes] = std::to_string(shardStore->getStoreCount());
jv[jss::node_reads_total] = shardStore->getFetchTotalCount();
jv[jss::node_reads_hit] = shardStore->getFetchHitCount();
jv[jss::node_written_bytes] = shardStore->getStoreSize();
jv[jss::node_written_bytes] =
std::to_string(shardStore->getStoreSize());
jv[jss::node_read_bytes] = shardStore->getFetchSize();
}

View File

@@ -151,12 +151,13 @@ SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const
if (backed_)
{
if (auto obj = f_.db().fetch(hash.as_uint256(), ledgerSeq_))
if (auto nodeObject =
f_.db().fetchNodeObject(hash.as_uint256(), ledgerSeq_))
{
try
{
node = SHAMapAbstractNode::makeFromPrefix(
makeSlice(obj->getData()), hash);
makeSlice(nodeObject->getData()), hash);
if (node)
canonicalize(hash, node);
}

View File

@@ -272,7 +272,7 @@ class DatabaseShard_test : public TestBase
{
// Store header
{
Serializer s(128);
Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
s.add32(HashPrefix::ledgerMaster);
addRaw(ledger.info(), s);
db.store(
@@ -369,8 +369,8 @@ class DatabaseShard_test : public TestBase
if (!BEAST_EXPECT(nSrc))
return false;
auto nDst =
db.fetch(node.getNodeHash().as_uint256(), ledger.info().seq);
auto nDst = db.fetchNodeObject(
node.getNodeHash().as_uint256(), ledger.info().seq);
if (!BEAST_EXPECT(nDst))
return false;
@@ -393,8 +393,8 @@ class DatabaseShard_test : public TestBase
if (!BEAST_EXPECT(nSrc))
return false;
auto nDst =
db.fetch(node.getNodeHash().as_uint256(), ledger.info().seq);
auto nDst = db.fetchNodeObject(
node.getNodeHash().as_uint256(), ledger.info().seq);
if (!BEAST_EXPECT(nDst))
return false;
@@ -432,22 +432,13 @@ class DatabaseShard_test : public TestBase
std::unique_ptr<Config>
testConfig(
std::string const& testName,
std::string const& backendType,
std::string const& shardDir,
std::string const& nodeDir = std::string())
{
using namespace test::jtx;
if (testName != "")
{
std::string caseName =
"DatabaseShard " + testName + " with backend " + backendType;
testcase(caseName);
}
return envconfig([&](std::unique_ptr<Config> cfg) {
cfg->overwrite(ConfigSection::shardDatabase(), "type", backendType);
// Shard store configuration
cfg->overwrite(ConfigSection::shardDatabase(), "path", shardDir);
cfg->overwrite(
ConfigSection::shardDatabase(),
@@ -461,20 +452,16 @@ class DatabaseShard_test : public TestBase
ConfigSection::shardDatabase(),
"earliest_seq",
std::to_string(earliestSeq));
cfg->overwrite(ConfigSection::nodeDatabase(), "type", backendType);
cfg->overwrite(
ConfigSection::nodeDatabase(),
"max_size_gb",
std::to_string(maxSizeGb));
// Node store configuration
cfg->overwrite(
ConfigSection::nodeDatabase(),
"earliest_seq",
std::to_string(earliestSeq));
if (nodeDir.empty())
cfg->overwrite(
ConfigSection::nodeDatabase(), "path", defNodeDir.path());
else
cfg->overwrite(ConfigSection::nodeDatabase(), "path", nodeDir);
ConfigSection::nodeDatabase(),
"path",
nodeDir.empty() ? defNodeDir.path() : nodeDir);
return cfg;
});
}
@@ -482,21 +469,21 @@ class DatabaseShard_test : public TestBase
std::optional<int>
waitShard(
DatabaseShard& db,
int shardNumber,
int shardIndex,
std::chrono::seconds timeout = shardStoreTimeout)
{
RangeSet<std::uint32_t> rs;
auto start = std::chrono::system_clock::now();
auto end = start + timeout;
while (!from_string(rs, db.getCompleteShards()) ||
!boost::icl::contains(rs, shardNumber))
!boost::icl::contains(rs, shardIndex))
{
if (!BEAST_EXPECT(std::chrono::system_clock::now() < end))
return {};
std::this_thread::yield();
}
return shardNumber;
return shardIndex;
}
std::optional<int>
@@ -506,16 +493,19 @@ class DatabaseShard_test : public TestBase
int maxShardNumber = 1,
int ledgerOffset = 0)
{
int shardNumber = -1;
int shardIndex{-1};
for (std::uint32_t i = 0; i < ledgersPerShard; ++i)
{
auto ind = db.prepareLedger((maxShardNumber + 1) * ledgersPerShard);
if (!BEAST_EXPECT(ind != boost::none))
auto const ledgerSeq{
db.prepareLedger((maxShardNumber + 1) * ledgersPerShard)};
if (!BEAST_EXPECT(ledgerSeq != boost::none))
return {};
shardNumber = db.seqToShardIndex(*ind);
int arrInd =
*ind - (ledgersPerShard * ledgerOffset) - ledgersPerShard - 1;
shardIndex = db.seqToShardIndex(*ledgerSeq);
int const arrInd = *ledgerSeq - (ledgersPerShard * ledgerOffset) -
ledgersPerShard - 1;
BEAST_EXPECT(
arrInd >= 0 && arrInd < maxShardNumber * ledgersPerShard);
BEAST_EXPECT(saveLedger(db, *data.ledgers_[arrInd]));
@@ -524,24 +514,27 @@ class DatabaseShard_test : public TestBase
uint256 const finalKey_{0};
Serializer s;
s.add32(Shard::version);
s.add32(db.firstLedgerSeq(shardNumber));
s.add32(db.lastLedgerSeq(shardNumber));
s.add32(db.firstLedgerSeq(shardIndex));
s.add32(db.lastLedgerSeq(shardIndex));
s.addRaw(data.ledgers_[arrInd]->info().hash.data(), 256 / 8);
db.store(hotUNKNOWN, std::move(s.modData()), finalKey_, *ind);
db.store(
hotUNKNOWN, std::move(s.modData()), finalKey_, *ledgerSeq);
}
db.setStored(data.ledgers_[arrInd]);
}
return waitShard(db, shardNumber);
return waitShard(db, shardIndex);
}
void
testStandalone(std::string const& backendType)
testStandalone()
{
testcase("Standalone");
using namespace test::jtx;
beast::temp_dir shardDir;
Env env{*this, testConfig("standalone", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DummyScheduler scheduler;
RootStoppable parent("TestRootStoppable");
@@ -563,14 +556,14 @@ class DatabaseShard_test : public TestBase
}
void
testCreateShard(
std::string const& backendType,
std::uint64_t const seedValue)
testCreateShard(std::uint64_t const seedValue)
{
testcase("Create shard");
using namespace test::jtx;
beast::temp_dir shardDir;
Env env{*this, testConfig("createShard", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -586,17 +579,15 @@ class DatabaseShard_test : public TestBase
}
void
testReopenDatabase(
std::string const& backendType,
std::uint64_t const seedValue)
testReopenDatabase(std::uint64_t const seedValue)
{
testcase("Reopen shard store");
using namespace test::jtx;
beast::temp_dir shardDir;
{
Env env{
*this,
testConfig("reopenDatabase", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -609,7 +600,7 @@ class DatabaseShard_test : public TestBase
return;
}
{
Env env{*this, testConfig("", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -626,16 +617,14 @@ class DatabaseShard_test : public TestBase
}
void
testGetCompleteShards(
std::string const& backendType,
std::uint64_t const seedValue)
testGetCompleteShards(std::uint64_t const seedValue)
{
testcase("Get complete shards");
using namespace test::jtx;
beast::temp_dir shardDir;
Env env{
*this,
testConfig("getCompleteShards", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -658,15 +647,14 @@ class DatabaseShard_test : public TestBase
}
void
testPrepareShard(
std::string const& backendType,
std::uint64_t const seedValue)
testPrepareShard(std::uint64_t const seedValue)
{
testcase("Prepare shard");
using namespace test::jtx;
beast::temp_dir shardDir;
Env env{
*this, testConfig("prepareShard", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -727,19 +715,17 @@ class DatabaseShard_test : public TestBase
}
void
testImportShard(
std::string const& backendType,
std::uint64_t const seedValue)
testImportShard(std::uint64_t const seedValue)
{
testcase("Import shard");
using namespace test::jtx;
beast::temp_dir importDir;
TestData data(seedValue, 2);
{
Env env{
*this,
testConfig("importShard", backendType, importDir.path())};
Env env{*this, testConfig(importDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -760,7 +746,7 @@ class DatabaseShard_test : public TestBase
{
beast::temp_dir shardDir;
Env env{*this, testConfig("", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -769,8 +755,14 @@ class DatabaseShard_test : public TestBase
db->prepareShard(1);
BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(2));
using namespace boost::filesystem;
remove_all(importPath / LgrDBName);
remove_all(importPath / TxDBName);
if (!BEAST_EXPECT(db->importShard(1, importPath)))
return;
BEAST_EXPECT(db->getPreShards() == "");
auto n = waitShard(*db, 1);
@@ -783,20 +775,17 @@ class DatabaseShard_test : public TestBase
}
void
testCorruptedDatabase(
std::string const& backendType,
std::uint64_t const seedValue)
testCorruptedDatabase(std::uint64_t const seedValue)
{
testcase("Corrupted shard store");
using namespace test::jtx;
beast::temp_dir shardDir;
{
TestData data(seedValue, 4, 2);
{
Env env{
*this,
testConfig(
"corruptedDatabase", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -810,7 +799,7 @@ class DatabaseShard_test : public TestBase
boost::filesystem::path path = shardDir.path();
path /= std::string("2");
path /= backendType + ".dat";
path /= "nudb.dat";
FILE* f = fopen(path.string().c_str(), "r+b");
if (!BEAST_EXPECT(f))
@@ -820,8 +809,8 @@ class DatabaseShard_test : public TestBase
BEAST_EXPECT(fwrite(buf, 1, 256, f) == 256);
fclose(f);
}
{
Env env{*this, testConfig("", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -837,25 +826,19 @@ class DatabaseShard_test : public TestBase
for (std::uint32_t i = 0; i < 1 * ledgersPerShard; ++i)
checkLedger(data, *db, *data.ledgers_[i]);
}
}
void
testIllegalFinalKey(
std::string const& backendType,
std::uint64_t const seedValue)
testIllegalFinalKey(std::uint64_t const seedValue)
{
testcase("Illegal finalKey");
using namespace test::jtx;
for (int i = 0; i < 5; ++i)
{
beast::temp_dir shardDir;
{
Env env{
*this,
testConfig(
(i == 0 ? "illegalFinalKey" : ""),
backendType,
shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -863,14 +846,16 @@ class DatabaseShard_test : public TestBase
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
int shardNumber = -1;
int shardIndex{-1};
for (std::uint32_t j = 0; j < ledgersPerShard; ++j)
{
auto ind = db->prepareLedger(2 * ledgersPerShard);
if (!BEAST_EXPECT(ind != boost::none))
auto const ledgerSeq{
db->prepareLedger(2 * ledgersPerShard)};
if (!BEAST_EXPECT(ledgerSeq != boost::none))
return;
shardNumber = db->seqToShardIndex(*ind);
int arrInd = *ind - ledgersPerShard - 1;
shardIndex = db->seqToShardIndex(*ledgerSeq);
int arrInd = *ledgerSeq - ledgersPerShard - 1;
BEAST_EXPECT(arrInd >= 0 && arrInd < ledgersPerShard);
BEAST_EXPECT(saveLedger(*db, *data.ledgers_[arrInd]));
if (arrInd % ledgersPerShard == (ledgersPerShard - 1))
@@ -878,8 +863,8 @@ class DatabaseShard_test : public TestBase
uint256 const finalKey_{0};
Serializer s;
s.add32(Shard::version + (i == 0));
s.add32(db->firstLedgerSeq(shardNumber) + (i == 1));
s.add32(db->lastLedgerSeq(shardNumber) - (i == 3));
s.add32(db->firstLedgerSeq(shardIndex) + (i == 1));
s.add32(db->lastLedgerSeq(shardIndex) - (i == 3));
s.addRaw(
data.ledgers_[arrInd - (i == 4)]
->info()
@@ -889,13 +874,13 @@ class DatabaseShard_test : public TestBase
hotUNKNOWN,
std::move(s.modData()),
finalKey_,
*ind);
*ledgerSeq);
}
db->setStored(data.ledgers_[arrInd]);
}
if (i == 2)
waitShard(*db, shardNumber);
waitShard(*db, shardIndex);
else
{
boost::filesystem::path path(shardDir.path());
@@ -916,7 +901,7 @@ class DatabaseShard_test : public TestBase
}
{
Env env{*this, testConfig("", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -941,17 +926,16 @@ class DatabaseShard_test : public TestBase
}
void
testImport(std::string const& backendType, std::uint64_t const seedValue)
testImport(std::uint64_t const seedValue)
{
testcase("Import node store");
using namespace test::jtx;
beast::temp_dir shardDir;
{
beast::temp_dir nodeDir;
Env env{
*this,
testConfig(
"import", backendType, shardDir.path(), nodeDir.path())};
Env env{*this, testConfig(shardDir.path(), nodeDir.path())};
DatabaseShard* db = env.app().getShardStore();
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
@@ -970,7 +954,7 @@ class DatabaseShard_test : public TestBase
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6));
}
{
Env env{*this, testConfig("", backendType, shardDir.path())};
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
@@ -989,10 +973,10 @@ class DatabaseShard_test : public TestBase
}
void
testImportWithHistoricalPaths(
std::string const& backendType,
std::uint64_t const seedValue)
testImportWithHistoricalPaths(std::uint64_t const seedValue)
{
testcase("Import with historical paths");
using namespace test::jtx;
// Test importing with multiple historical
@@ -1009,11 +993,7 @@ class DatabaseShard_test : public TestBase
[](const beast::temp_dir& dir) { return dir.path(); });
beast::temp_dir nodeDir;
auto c = testConfig(
"importWithHistoricalPaths",
backendType,
shardDir.path(),
nodeDir.path());
auto c = testConfig(shardDir.path(), nodeDir.path());
auto& historyPaths = c->section(SECTION_HISTORICAL_SHARD_PATHS);
historyPaths.append(
@@ -1075,11 +1055,7 @@ class DatabaseShard_test : public TestBase
beast::temp_dir historicalDir;
beast::temp_dir nodeDir;
auto c = testConfig(
"importWithSingleHistoricalPath",
backendType,
shardDir.path(),
nodeDir.path());
auto c = testConfig(shardDir.path(), nodeDir.path());
auto& historyPaths = c->section(SECTION_HISTORICAL_SHARD_PATHS);
historyPaths.append({historicalDir.path()});
@@ -1125,10 +1101,10 @@ class DatabaseShard_test : public TestBase
}
void
testPrepareWithHistoricalPaths(
std::string const& backendType,
std::uint64_t const seedValue)
testPrepareWithHistoricalPaths(std::uint64_t const seedValue)
{
testcase("Prepare with historical paths");
using namespace test::jtx;
// Test importing with multiple historical
@@ -1145,8 +1121,7 @@ class DatabaseShard_test : public TestBase
[](const beast::temp_dir& dir) { return dir.path(); });
beast::temp_dir nodeDir;
auto c = testConfig(
"prepareWithHistoricalPaths", backendType, shardDir.path());
auto c = testConfig(shardDir.path());
auto& historyPaths = c->section(SECTION_HISTORICAL_SHARD_PATHS);
historyPaths.append(
@@ -1300,20 +1275,53 @@ class DatabaseShard_test : public TestBase
}
void
testAll(std::string const& backendType)
testOpenShardManagement(std::uint64_t const seedValue)
{
std::uint64_t const seedValue = 51;
testStandalone(backendType);
testCreateShard(backendType, seedValue);
testReopenDatabase(backendType, seedValue + 5);
testGetCompleteShards(backendType, seedValue + 10);
testPrepareShard(backendType, seedValue + 20);
testImportShard(backendType, seedValue + 30);
testCorruptedDatabase(backendType, seedValue + 40);
testIllegalFinalKey(backendType, seedValue + 50);
testImport(backendType, seedValue + 60);
testImportWithHistoricalPaths(backendType, seedValue + 80);
testPrepareWithHistoricalPaths(backendType, seedValue + 90);
testcase("Open shard management");
using namespace test::jtx;
beast::temp_dir shardDir;
Env env{*this, testConfig(shardDir.path())};
auto shardStore{env.app().getShardStore()};
BEAST_EXPECT(shardStore);
// Create one shard more than the open final limit
auto const openFinalLimit{env.app().config().getValueFor(
SizedItem::openFinalLimit, boost::none)};
auto const numShards{openFinalLimit + 1};
TestData data(seedValue, 2, numShards);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
BEAST_EXPECT(shardStore->getCompleteShards().empty());
int oldestShardIndex{-1};
std::uint64_t bitMask{0};
for (auto i = 0; i < numShards; ++i)
{
auto shardIndex{createShard(data, *shardStore, numShards)};
if (!BEAST_EXPECT(
shardIndex && *shardIndex >= 1 && *shardIndex <= numShards))
return;
bitMask |= (1ll << *shardIndex);
if (oldestShardIndex == -1)
oldestShardIndex = *shardIndex;
}
// The number of open shards exceeds the open limit by one.
// A sweep will close enough shards to be within the limit.
shardStore->sweep();
// Read from the closed shard and automatically open it
auto const ledgerSeq{shardStore->lastLedgerSeq(oldestShardIndex)};
auto const index{ledgerSeq - ledgersPerShard - 1};
BEAST_EXPECT(shardStore->fetchNodeObject(
data.ledgers_[index]->info().hash, ledgerSeq));
}
public:
@@ -1324,19 +1332,24 @@ public:
void
run() override
{
testAll("nudb");
std::uint64_t const seedValue = 51;
#if RIPPLE_ROCKSDB_AVAILABLE
// testAll ("rocksdb");
#endif
#if RIPPLE_ENABLE_SQLITE_BACKEND_TESTS
testAll("sqlite");
#endif
testStandalone();
testCreateShard(seedValue);
testReopenDatabase(seedValue + 10);
testGetCompleteShards(seedValue + 20);
testPrepareShard(seedValue + 30);
testImportShard(seedValue + 40);
testCorruptedDatabase(seedValue + 50);
testIllegalFinalKey(seedValue + 60);
testImport(seedValue + 70);
testImportWithHistoricalPaths(seedValue + 80);
testPrepareWithHistoricalPaths(seedValue + 90);
testOpenShardManagement(seedValue + 100);
}
};
BEAST_DEFINE_TESTSUITE(DatabaseShard, NodeStore, ripple);
BEAST_DEFINE_TESTSUITE_MANUAL(DatabaseShard, NodeStore, ripple);
} // namespace NodeStore
} // namespace ripple

View File

@@ -217,7 +217,7 @@ public:
for (int i = 0; i < batch.size(); ++i)
{
std::shared_ptr<NodeObject> object =
db.fetch(batch[i]->getHash(), 0);
db.fetchNodeObject(batch[i]->getHash(), 0);
if (object != nullptr)
pCopy->push_back(object);