Improve NodeStore to ShardStore imports

* Run the import process in a background thread
* Prevent online_delete from removing ledgers pending import
This commit is contained in:
Devon White
2020-12-18 17:31:15 -05:00
committed by manojsdoshi
parent 14b2f27c3e
commit 38f954fd46
20 changed files with 711 additions and 302 deletions

View File

@@ -591,6 +591,7 @@ target_sources (rippled PRIVATE
src/ripple/rpc/handlers/LogLevel.cpp
src/ripple/rpc/handlers/LogRotate.cpp
src/ripple/rpc/handlers/Manifest.cpp
src/ripple/rpc/handlers/NodeToShardStatus.cpp
src/ripple/rpc/handlers/NoRippleCheck.cpp
src/ripple/rpc/handlers/OwnerInfo.cpp
src/ripple/rpc/handlers/PathFind.cpp

View File

@@ -916,7 +916,7 @@ public:
using namespace std::chrono;
auto const start = steady_clock::now();
m_nodeStore->import(*source);
m_nodeStore->importDatabase(*source);
auto const elapsed =
duration_cast<seconds>(steady_clock::now() - start);
@@ -2109,7 +2109,7 @@ ApplicationImp::nodeToShards()
JLOG(m_journal.fatal()) << "Invalid [shard_db] configuration";
return false;
}
shardStore_->import(getNodeStore());
shardStore_->importDatabase(getNodeStore());
return true;
}

View File

@@ -156,7 +156,8 @@ printHelp(const po::options_description& desc)
" ledger_current\n"
" ledger_request <ledger>\n"
" log_level [[<partition>] <severity>]\n"
" logrotate \n"
" logrotate\n"
" nodetoshard_status\n"
" peers\n"
" ping\n"
" random\n"

View File

@@ -301,9 +301,31 @@ SHAMapStoreImp::run()
state_db_.setLastRotated(lastRotated);
}
bool const readyToRotate =
validatedSeq >= lastRotated + deleteInterval_ &&
canDelete_ >= lastRotated - 1 && !health();
// Make sure we don't delete ledgers currently being
// imported into the ShardStore
bool const waitForImport = readyToRotate && [this, lastRotated] {
if (auto shardStore = app_.getShardStore())
{
if (auto sequence = shardStore->getDatabaseImportSequence())
return sequence <= lastRotated - 1;
}
return false;
}();
if (waitForImport)
{
JLOG(journal_.info())
<< "NOT rotating validatedSeq " << validatedSeq
<< " as rotation would interfere with ShardStore import";
}
// will delete up to (not including) lastRotated
if (validatedSeq >= lastRotated + deleteInterval_ &&
canDelete_ >= lastRotated - 1 && !health())
if (readyToRotate && !waitForImport)
{
JLOG(journal_.warn())
<< "rotating validatedSeq " << validatedSeq << " lastRotated "

View File

@@ -1257,6 +1257,7 @@ public:
{"log_level", &RPCParser::parseLogLevel, 0, 2},
{"logrotate", &RPCParser::parseAsIs, 0, 0},
{"manifest", &RPCParser::parseManifest, 1, 1},
{"nodetoshard_status", &RPCParser::parseAsIs, 0, 0},
{"owner_info", &RPCParser::parseAccountItems, 1, 3},
{"peers", &RPCParser::parseAsIs, 0, 0},
{"ping", &RPCParser::parseAsIs, 0, 0},

View File

@@ -82,7 +82,7 @@ public:
/** Import objects from another database. */
virtual void
import(Database& source) = 0;
importDatabase(Database& source) = 0;
/** Retrieve the estimated number of pending write operations.
This is used for diagnostics.

View File

@@ -97,7 +97,9 @@ public:
virtual std::string
getPreShards() = 0;
/** Import a shard into the shard database
/** Import a shard from the shard archive handler into the
shard database. This differs from 'importDatabase' which
imports the contents of the NodeStore
@param shardIndex Shard index to import
@param srcDir The directory to import from
@@ -268,6 +270,17 @@ public:
virtual boost::filesystem::path const&
getRootDir() const = 0;
virtual Json::Value
getDatabaseImportStatus() const = 0;
/** Returns the first ledger sequence of the shard currently being imported
from the NodeStore
@return The ledger sequence or an unseated value if no import is running
*/
virtual std::optional<std::uint32_t>
getDatabaseImportSequence() const = 0;
/** The number of ledgers in a shard */
static constexpr std::uint32_t ledgersPerShardDefault{16384u};
};

View File

@@ -97,7 +97,7 @@ public:
}
void
import(Database& source) override
importDatabase(Database& source) override
{
importInternal(*backend_.get(), source);
}

View File

@@ -69,7 +69,7 @@ DatabaseRotatingImp::getWriteLoad() const
}
void
DatabaseRotatingImp::import(Database& source)
DatabaseRotatingImp::importDatabase(Database& source)
{
auto const backend = [&] {
std::lock_guard lock(mutex_);

View File

@@ -58,7 +58,7 @@ public:
getWriteLoad() const override;
void
import(Database& source) override;
importDatabase(Database& source) override;
bool isSameDB(std::uint32_t, std::uint32_t) override
{

View File

@@ -151,12 +151,12 @@ DatabaseShardImp::init()
continue;
}
// Check if a previous import failed
if (is_regular_file(shardDir / importMarker_))
// Check if a previous database import failed
if (is_regular_file(shardDir / databaseImportMarker_))
{
JLOG(j_.warn())
<< "shard " << shardIndex
<< " previously failed import, removing";
<< " previously failed database import, removing";
remove_all(shardDir);
continue;
}
@@ -358,7 +358,19 @@ DatabaseShardImp::prepareShards(std::vector<std::uint32_t> const& shardIndexes)
return fail("is already stored", shardIndex);
if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end())
return fail("is already queued for import", shardIndex);
return fail(
"is already queued for import from the shard archive handler",
shardIndex);
if (databaseImportStatus_)
{
if (auto shard = databaseImportStatus_->currentShard.lock(); shard)
{
if (shard->index() == shardIndex)
return fail(
"is being imported from the nodestore", shardIndex);
}
}
// Any shard earlier than the two most recent shards
// is a historical shard
@@ -710,102 +722,151 @@ DatabaseShardImp::stop()
JLOG(j_.warn()) << " shard " << shard->index() << " unexpired";
}
}
// Notify the shard being imported
// from the node store to stop
if (databaseImportStatus_)
{
// A node store import is in progress
if (auto importShard = databaseImportStatus_->currentShard.lock();
importShard)
importShard->stop();
}
// Wait for the node store import thread
// if necessary
if (databaseImporter_.joinable())
databaseImporter_.join();
}
void
DatabaseShardImp::import(Database& source)
DatabaseShardImp::importDatabase(Database& source)
{
std::lock_guard lock(mutex_);
assert(init_);
// Only the application local node store can be imported
assert(&source == &app_.getNodeStore());
if (databaseImporter_.joinable())
{
assert(false);
JLOG(j_.error()) << "database import already in progress";
return;
}
// Run the lengthy node store import process in the background
// on a dedicated thread.
databaseImporter_ = std::thread([this] { doImportDatabase(); });
}
void
DatabaseShardImp::doImportDatabase()
{
if (isStopping())
return;
auto loadLedger =
[this](char const* const sortOrder) -> std::optional<std::uint32_t> {
std::shared_ptr<Ledger> ledger;
std::uint32_t ledgerSeq{0};
std::optional<LedgerInfo> info;
if (sortOrder == std::string("asc"))
{
info = dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface())
->getLimitedOldestLedgerInfo(earliestLedgerSeq());
}
else
{
info = dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface())
->getLimitedNewestLedgerInfo(earliestLedgerSeq());
}
if (info)
{
ledger = loadLedgerHelper(*info, app_, false);
ledgerSeq = info->seq;
}
if (!ledger || ledgerSeq == 0)
{
JLOG(j_.error()) << "no suitable ledgers were found in"
" the SQLite database to import";
return std::nullopt;
}
return ledgerSeq;
};
// Find earliest ledger sequence stored
auto const earliestLedgerSeq{loadLedger("asc")};
if (!earliestLedgerSeq)
return;
auto const earliestIndex = [&] {
auto earliestIndex = seqToShardIndex(*earliestLedgerSeq);
// Consider only complete shards
if (earliestLedgerSeq != firstLedgerSeq(earliestIndex))
++earliestIndex;
return earliestIndex;
}();
// Find last ledger sequence stored
auto const latestLedgerSeq = loadLedger("desc");
if (!latestLedgerSeq)
return;
auto const latestIndex = [&] {
auto latestIndex = seqToShardIndex(*latestLedgerSeq);
// Consider only complete shards
if (latestLedgerSeq != lastLedgerSeq(latestIndex))
--latestIndex;
return latestIndex;
}();
if (latestIndex < earliestIndex)
{
JLOG(j_.error()) << "no suitable ledgers were found in"
" the SQLite database to import";
return;
}
JLOG(j_.debug()) << "Importing ledgers for shards " << earliestIndex
<< " through " << latestIndex;
{
std::lock_guard lock(mutex_);
assert(init_);
// Only the application local node store can be imported
if (&source != &app_.getNodeStore())
{
assert(false);
JLOG(j_.error()) << "invalid source database";
assert(!databaseImportStatus_);
databaseImportStatus_ = std::make_unique<DatabaseImportStatus>(
earliestIndex, latestIndex, 0);
}
// Import the shards
for (std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex;
++shardIndex)
{
if (isStopping())
return;
}
std::uint32_t earliestIndex;
std::uint32_t latestIndex;
{
auto loadLedger = [&](bool ascendSort =
true) -> std::optional<std::uint32_t> {
std::shared_ptr<Ledger> ledger;
std::uint32_t ledgerSeq{0};
std::optional<LedgerInfo> info;
if (ascendSort)
{
info =
dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface())
->getLimitedOldestLedgerInfo(earliestLedgerSeq());
}
else
{
info =
dynamic_cast<RelationalDBInterfaceSqlite*>(
&app_.getRelationalDBInterface())
->getLimitedNewestLedgerInfo(earliestLedgerSeq());
}
if (info)
{
ledger = loadLedgerHelper(*info, app_, false);
ledgerSeq = info->seq;
}
if (!ledger || ledgerSeq == 0)
{
JLOG(j_.error()) << "no suitable ledgers were found in"
" the SQLite database to import";
return std::nullopt;
}
return ledgerSeq;
};
auto const pathDesignation = [this, shardIndex] {
std::lock_guard lock(mutex_);
// Find earliest ledger sequence stored
auto ledgerSeq{loadLedger()};
if (!ledgerSeq)
return;
earliestIndex = seqToShardIndex(*ledgerSeq);
// Consider only complete shards
if (ledgerSeq != firstLedgerSeq(earliestIndex))
++earliestIndex;
// Find last ledger sequence stored
ledgerSeq = loadLedger(false);
if (!ledgerSeq)
return;
latestIndex = seqToShardIndex(*ledgerSeq);
// Consider only complete shards
if (ledgerSeq != lastLedgerSeq(latestIndex))
--latestIndex;
if (latestIndex < earliestIndex)
{
JLOG(j_.error()) << "no suitable ledgers were found in"
" the SQLite database to import";
return;
}
}
auto numHistShards = this->numHistoricalShards(lock);
// Import the shards
for (std::uint32_t shardIndex = earliestIndex;
shardIndex <= latestIndex;
++shardIndex)
{
auto const numHistShards = numHistoricalShards(lock);
auto const pathDesignation =
prepareForNewShard(shardIndex, numHistShards, lock);
if (!pathDesignation)
break;
return pathDesignation;
}();
auto const needsHistoricalPath =
*pathDesignation == PathDesignation::historical;
if (!pathDesignation)
break;
{
std::lock_guard lock(mutex_);
// Skip if being acquired
if (shardIndex == acquireIndex_)
@@ -815,7 +876,7 @@ DatabaseShardImp::import(Database& source)
continue;
}
// Skip if being imported
// Skip if being imported from the shard archive handler
if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end())
{
JLOG(j_.debug())
@@ -829,134 +890,174 @@ DatabaseShardImp::import(Database& source)
JLOG(j_.debug()) << "shard " << shardIndex << " already stored";
continue;
}
}
// Verify SQLite ledgers are in the node store
{
auto const firstSeq{firstLedgerSeq(shardIndex)};
auto const lastSeq{
std::max(firstSeq, lastLedgerSeq(shardIndex))};
auto const numLedgers{
shardIndex == earliestShardIndex() ? lastSeq - firstSeq + 1
: ledgersPerShard_};
auto ledgerHashes{
app_.getRelationalDBInterface().getHashesByIndex(
firstSeq, lastSeq)};
if (ledgerHashes.size() != numLedgers)
continue;
std::uint32_t const firstSeq = firstLedgerSeq(shardIndex);
std::uint32_t const lastSeq =
std::max(firstSeq, lastLedgerSeq(shardIndex));
std::uint32_t const numLedgers = shardIndex == earliestShardIndex()
? lastSeq - firstSeq + 1
: ledgersPerShard_;
bool valid{true};
for (std::uint32_t n = firstSeq; n <= lastSeq; n += 256)
{
if (!source.fetchNodeObject(ledgerHashes[n].ledgerHash, n))
{
JLOG(j_.warn()) << "SQLite ledger sequence " << n
<< " mismatches node store";
valid = false;
break;
}
}
if (!valid)
continue;
}
auto const path =
needsHistoricalPath ? chooseHistoricalPath(lock) : dir_;
// Create the new shard
auto shard{
std::make_unique<Shard>(app_, *this, shardIndex, path, j_)};
if (!shard->init(scheduler_, *ctx_))
// Verify SQLite ledgers are in the node store
{
auto const ledgerHashes{
app_.getRelationalDBInterface().getHashesByIndex(
firstSeq, lastSeq)};
if (ledgerHashes.size() != numLedgers)
continue;
// Create a marker file to signify an import in progress
auto const shardDir{path / std::to_string(shardIndex)};
auto const markerFile{shardDir / importMarker_};
auto& source = app_.getNodeStore();
bool valid{true};
for (std::uint32_t n = firstSeq; n <= lastSeq; ++n)
{
std::ofstream ofs{markerFile.string()};
if (!ofs.is_open())
if (!source.fetchNodeObject(ledgerHashes.at(n).ledgerHash, n))
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to create temp marker file";
shard->removeOnDestroy();
continue;
}
ofs.close();
}
// Copy the ledgers from node store
std::shared_ptr<Ledger> recentStored;
std::optional<uint256> lastLedgerHash;
while (auto const ledgerSeq = shard->prepare())
{
auto ledger{loadByIndex(*ledgerSeq, app_, false)};
if (!ledger || ledger->info().seq != ledgerSeq)
JLOG(j_.warn()) << "SQLite ledger sequence " << n
<< " mismatches node store";
valid = false;
break;
auto const result{shard->storeLedger(ledger, recentStored)};
storeStats(result.count, result.size);
if (result.error)
break;
if (!shard->setLedgerStored(ledger))
break;
if (!lastLedgerHash && ledgerSeq == lastLedgerSeq(shardIndex))
lastLedgerHash = ledger->info().hash;
recentStored = std::move(ledger);
}
using namespace boost::filesystem;
bool success{false};
if (lastLedgerHash && shard->getState() == Shard::complete)
{
// Store shard final key
Serializer s;
s.add32(Shard::version);
s.add32(firstLedgerSeq(shardIndex));
s.add32(lastLedgerSeq(shardIndex));
s.addBitString(*lastLedgerHash);
auto const nodeObject{NodeObject::createObject(
hotUNKNOWN, std::move(s.modData()), Shard::finalKey)};
if (shard->storeNodeObject(nodeObject))
{
try
{
// The import process is complete and the
// marker file is no longer required
remove_all(markerFile);
JLOG(j_.debug()) << "shard " << shardIndex
<< " was successfully imported";
finalizeShard(
shards_.emplace(shardIndex, std::move(shard))
.first->second,
true,
std::nullopt);
success = true;
if (shardIndex < shardBoundaryIndex())
++numHistShards;
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "shard index " << shardIndex
<< ". Exception caught in function "
<< __func__ << ". Error: " << e.what();
}
}
}
if (!valid)
continue;
}
if (!success)
if (isStopping())
return;
bool const needsHistoricalPath =
*pathDesignation == PathDesignation::historical;
auto const path = needsHistoricalPath
? chooseHistoricalPath(std::lock_guard(mutex_))
: dir_;
// Create the new shard
auto shard{std::make_shared<Shard>(app_, *this, shardIndex, path, j_)};
if (!shard->init(scheduler_, *ctx_))
continue;
{
std::lock_guard lock(mutex_);
if (isStopping())
return;
databaseImportStatus_->currentIndex = shardIndex;
databaseImportStatus_->currentShard = shard;
databaseImportStatus_->firstSeq = firstSeq;
databaseImportStatus_->lastSeq = lastSeq;
}
// Create a marker file to signify a database import in progress
auto const shardDir{path / std::to_string(shardIndex)};
auto const markerFile{shardDir / databaseImportMarker_};
{
std::ofstream ofs{markerFile.string()};
if (!ofs.is_open())
{
JLOG(j_.error())
<< "shard " << shardIndex << " failed to import";
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to create temp marker file";
shard->removeOnDestroy();
continue;
}
}
// Copy the ledgers from node store
std::shared_ptr<Ledger> recentStored;
std::optional<uint256> lastLedgerHash;
while (auto const ledgerSeq = shard->prepare())
{
if (isStopping())
return;
auto const ledger{loadByIndex(*ledgerSeq, app_, false)};
if (!ledger || ledger->info().seq != ledgerSeq)
break;
auto const result{shard->storeLedger(ledger, recentStored)};
storeStats(result.count, result.size);
if (result.error)
break;
if (!shard->setLedgerStored(ledger))
break;
if (!lastLedgerHash && ledgerSeq == lastSeq)
lastLedgerHash = ledger->info().hash;
recentStored = std::move(ledger);
}
if (isStopping())
return;
using namespace boost::filesystem;
bool success{false};
if (lastLedgerHash && shard->getState() == Shard::complete)
{
// Store shard final key
Serializer s;
s.add32(Shard::version);
s.add32(firstLedgerSeq(shardIndex));
s.add32(lastLedgerSeq(shardIndex));
s.addBitString(*lastLedgerHash);
auto const nodeObject{NodeObject::createObject(
hotUNKNOWN, std::move(s.modData()), Shard::finalKey)};
if (shard->storeNodeObject(nodeObject))
{
try
{
std::lock_guard lock(mutex_);
// The database import process is complete and the
// marker file is no longer required
remove_all(markerFile);
JLOG(j_.debug()) << "shard " << shardIndex
<< " was successfully imported"
" from the NodeStore";
finalizeShard(
shards_.emplace(shardIndex, std::move(shard))
.first->second,
true,
std::nullopt);
// This variable is meant to capture the success
// of everything up to the point of shard finalization.
// If the shard fails to finalize, this condition will
// be handled by the finalization function itself, and
// not here.
success = true;
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "shard index " << shardIndex
<< ". Exception caught in function "
<< __func__ << ". Error: " << e.what();
}
}
}
if (!success)
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to import from the NodeStore";
shard->removeOnDestroy();
}
}
{
std::lock_guard lock(mutex_);
if (isStopping())
return;
databaseImportStatus_.reset();
updateStatus(lock);
}
@@ -1103,6 +1204,43 @@ DatabaseShardImp::sweep()
}
}
Json::Value
DatabaseShardImp::getDatabaseImportStatus() const
{
Json::Value ret(Json::objectValue);
if (std::lock_guard lock(mutex_); databaseImportStatus_)
{
ret[jss::firstShardIndex] = databaseImportStatus_->earliestIndex;
ret[jss::lastShardIndex] = databaseImportStatus_->latestIndex;
ret[jss::currentShardIndex] = databaseImportStatus_->currentIndex;
Json::Value currentShard(Json::objectValue);
currentShard[jss::firstSequence] = databaseImportStatus_->firstSeq;
currentShard[jss::lastSequence] = databaseImportStatus_->lastSeq;
if (auto shard = databaseImportStatus_->currentShard.lock(); shard)
currentShard[jss::storedSeqs] = shard->getStoredSeqs();
ret[jss::currentShard] = currentShard;
}
else
ret = "Database import not running";
return ret;
}
std::optional<std::uint32_t>
DatabaseShardImp::getDatabaseImportSequence() const
{
std::lock_guard lock(mutex_);
if (!databaseImportStatus_)
return {};
return databaseImportStatus_->firstSeq;
}
bool
DatabaseShardImp::initConfig(std::lock_guard<std::mutex> const&)
{
@@ -1279,7 +1417,7 @@ DatabaseShardImp::findAcquireIndex(
void
DatabaseShardImp::finalizeShard(
std::shared_ptr<Shard>& shard,
bool writeSQLite,
bool const writeSQLite,
std::optional<uint256> const& expectedHash)
{
taskQueue_.addTask([this,
@@ -1738,11 +1876,11 @@ DatabaseShardImp::relocateOutdatedShards(
}
}
std::optional<DatabaseShardImp::PathDesignation>
auto
DatabaseShardImp::prepareForNewShard(
std::uint32_t shardIndex,
std::uint32_t numHistoricalShards,
std::lock_guard<std::mutex> const& lock)
std::lock_guard<std::mutex> const& lock) -> std::optional<PathDesignation>
{
// Any shard earlier than the two most recent shards is a historical shard
auto const boundaryIndex{shardBoundaryIndex()};

View File

@@ -134,7 +134,10 @@ public:
@param source The application node store.
*/
void
import(Database& source) override;
importDatabase(Database& source) override;
void
doImportDatabase();
std::int32_t
getWriteLoad() const override;
@@ -161,6 +164,12 @@ public:
void
sweep() override;
Json::Value
getDatabaseImportStatus() const override;
std::optional<std::uint32_t>
getDatabaseImportSequence() const override;
bool
callForLedgerSQL(
LedgerIndex ledgerSeq,
@@ -203,6 +212,37 @@ private:
historical // Needs a historical path
};
struct DatabaseImportStatus
{
DatabaseImportStatus(
std::uint32_t const earliestIndex,
std::uint32_t const latestIndex,
std::uint32_t const currentIndex)
: earliestIndex(earliestIndex)
, latestIndex(latestIndex)
, currentIndex(currentIndex)
{
}
// Index of the first shard to be imported
std::uint32_t earliestIndex{0};
// Index of the last shard to be imported
std::uint32_t latestIndex{0};
// Index of the shard currently being imported
std::uint32_t currentIndex{0};
// First ledger sequence of the current shard
std::uint32_t firstSeq{0};
// Last ledger sequence of the current shard
std::uint32_t lastSeq{0};
// The shard currently being imported
std::weak_ptr<Shard> currentShard;
};
Application& app_;
mutable std::mutex mutex_;
bool init_{false};
@@ -216,7 +256,7 @@ private:
// Shards held by this server
std::map<std::uint32_t, std::shared_ptr<Shard>> shards_;
// Shard indexes being imported
// Shard indexes being imported from the shard archive handler
std::set<std::uint32_t> preparedIndexes_;
// Shard index being acquired from the peer network
@@ -258,7 +298,7 @@ private:
std::uint32_t const openFinalLimit_;
// File name used to mark shards being imported from node store
static constexpr auto importMarker_ = "import";
static constexpr auto databaseImportMarker_ = "database_import";
// latestShardIndex_ and secondLatestShardIndex hold the indexes
// of the shards most recently confirmed by the network. These
@@ -270,6 +310,12 @@ private:
std::optional<std::uint32_t> latestShardIndex_;
std::optional<std::uint32_t> secondLatestShardIndex_;
// Struct used for node store import progress
std::unique_ptr<DatabaseImportStatus> databaseImportStatus_;
// Thread for running node store import
std::thread databaseImporter_;
// Initialize settings from the configuration file
// Lock must be held
bool
@@ -284,7 +330,7 @@ private:
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
{
Throw<std::runtime_error>("Shard store import not supported");
Throw<std::runtime_error>("Import from shard store not supported");
}
// Randomly select a shard index not stored

View File

@@ -197,6 +197,15 @@ public:
removeOnDestroy_ = true;
}
std::string
getStoredSeqs()
{
if (!acquireInfo_)
return "";
return to_string(acquireInfo_->storedSeqs);
}
/**
* @brief callForLedgerSQL Checks out ledger database for the shard and
* calls given callback function passing shard index and session

View File

@@ -179,6 +179,8 @@ JSS(converge_time); // out: NetworkOPs
JSS(converge_time_s); // out: NetworkOPs
JSS(count); // in: AccountTx*, ValidatorList
JSS(counters); // in/out: retrieve counters
JSS(currentShard); // out: NodeToShardStatus
JSS(currentShardIndex); // out: NodeToShardStatus
JSS(currency); // in: paths/PathRequest, STAmount
// out: STPathSet, STAmount,
// AccountLines
@@ -245,6 +247,8 @@ JSS(fee_mult_max); // in: TransactionSign
JSS(fee_ref); // out: NetworkOPs
JSS(fetch_pack); // out: NetworkOPs
JSS(first); // out: rpc/Version
JSS(firstSequence); // out: NodeToShardStatus
JSS(firstShardIndex); // out: NodeToShardStatus
JSS(finished);
JSS(fix_txns); // in: LedgerCleaner
JSS(flags); // out: AccountOffers,
@@ -295,6 +299,8 @@ JSS(key); // out
JSS(key_type); // in/out: WalletPropose, TransactionSign
JSS(latency); // out: PeerImp
JSS(last); // out: RPCVersion
JSS(lastSequence); // out: NodeToShardStatus
JSS(lastShardIndex); // out: NodeToShardStatus
JSS(last_close); // out: NetworkOPs
JSS(last_refresh_time); // out: ValidatorSite
JSS(last_refresh_status); // out: ValidatorSite
@@ -513,6 +519,7 @@ JSS(state_accounting); // out: NetworkOPs
JSS(state_now); // in: Subscribe
JSS(status); // error
JSS(stop); // in: LedgerCleaner
JSS(storedSeqs); // out: NodeToShardStatus
JSS(streams); // in: Subscribe, Unsubscribe
JSS(strict); // in: AccountCurrencies, AccountInfo
JSS(sub_index); // in: LedgerEntry

View File

@@ -89,6 +89,8 @@ doLogRotate(RPC::JsonContext&);
Json::Value
doManifest(RPC::JsonContext&);
Json::Value
doNodeToShardStatus(RPC::JsonContext&);
Json::Value
doNoRippleCheck(RPC::JsonContext&);
Json::Value
doOwnerInfo(RPC::JsonContext&);

View File

@@ -0,0 +1,43 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/json/json_value.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/jss.h>
#include <ripple/rpc/Context.h>
namespace ripple {
Json::Value
doNodeToShardStatus(RPC::JsonContext& context)
{
Json::Value ret(Json::objectValue);
if (auto const shardStore = context.app.getShardStore())
ret[jss::info] = shardStore->getDatabaseImportStatus();
else
ret = RPC::make_error(rpcINTERNAL, "No shard store");
return ret;
}
} // namespace ripple

View File

@@ -107,6 +107,10 @@ Handler const handlerArray[]{
{"log_level", byRef(&doLogLevel), Role::ADMIN, NO_CONDITION},
{"logrotate", byRef(&doLogRotate), Role::ADMIN, NO_CONDITION},
{"manifest", byRef(&doManifest), Role::USER, NO_CONDITION},
{"nodetoshard_status",
byRef(&doNodeToShardStatus),
Role::USER,
NO_CONDITION},
{"noripple_check", byRef(&doNoRippleCheck), Role::USER, NO_CONDITION},
{"owner_info", byRef(&doOwnerInfo), Role::USER, NEEDS_CURRENT_LEDGER},
{"peers", byRef(&doPeers), Role::ADMIN, NO_CONDITION},

View File

@@ -19,6 +19,7 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/misc/SHAMapStore.h>
#include <ripple/beast/hash/hash_append.h>
#include <ripple/beast/utility/temp_dir.h>
#include <ripple/core/ConfigSections.h>
@@ -34,6 +35,7 @@
#include <numeric>
#include <openssl/ripemd.h>
#include <test/jtx.h>
#include <test/jtx/CaptureLogs.h>
#include <test/nodestore/TestBase.h>
namespace ripple {
@@ -1043,53 +1045,6 @@ class DatabaseShard_test : public TestBase
}
}
void
testImport(std::uint64_t const seedValue)
{
testcase("Import node store");
using namespace test::jtx;
beast::temp_dir shardDir;
{
beast::temp_dir nodeDir;
Env env{*this, testConfig(shardDir.path(), nodeDir.path())};
DatabaseShard* db = env.app().getShardStore();
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
TestData data(seedValue, 4, 2);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i)
BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i]));
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0));
db->import(ndb);
for (std::uint32_t i = 1; i <= 2; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6));
}
{
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
TestData data(seedValue, 4, 2);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 1; i <= 2; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6));
for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i)
checkLedger(data, *db, *data.ledgers_[i]);
}
}
std::string
ripemd160File(std::string filename)
{
@@ -1169,6 +1124,137 @@ class DatabaseShard_test : public TestBase
}
}
void
testImport(std::uint64_t const seedValue)
{
testcase("Import node store");
using namespace test::jtx;
beast::temp_dir shardDir;
{
beast::temp_dir nodeDir;
Env env{*this, testConfig(shardDir.path(), nodeDir.path())};
DatabaseShard* db = env.app().getShardStore();
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
TestData data(seedValue, 4, 2);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i)
BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i]));
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0));
db->importDatabase(ndb);
for (std::uint32_t i = 1; i <= 2; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6));
}
{
Env env{*this, testConfig(shardDir.path())};
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
TestData data(seedValue, 4, 2);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 1; i <= 2; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6));
for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i)
checkLedger(data, *db, *data.ledgers_[i]);
}
}
void
testImportWithOnlineDelete(std::uint64_t const seedValue)
{
testcase("Import node store with online delete");
using namespace test::jtx;
using test::CaptureLogs;
beast::temp_dir shardDir;
beast::temp_dir nodeDir;
std::string capturedLogs;
{
auto c = testConfig(shardDir.path(), nodeDir.path());
auto& section = c->section(ConfigSection::nodeDatabase());
section.set("online_delete", "550");
// Adjust the log level to capture relevant output
c->section(SECTION_RPC_STARTUP)
.append(
"{ \"command\": \"log_level\", \"severity\": \"trace\" "
"}");
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
Env env{*this, std::move(c), std::move(logs)};
DatabaseShard* db = env.app().getShardStore();
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
auto const shardCount = 4;
TestData data(seedValue, 4, shardCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
// Start the import
db->importDatabase(ndb);
while (!db->getDatabaseImportSequence())
{
// Wait until the import starts
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
auto& store = env.app().getSHAMapStore();
auto lastRotated = store.getLastRotated();
auto pauseVerifier = std::thread([lastRotated, &store, db, this] {
while (true)
{
// Make sure database rotations dont interfere
// with the import
if (store.getLastRotated() != lastRotated)
{
// A rotation occurred during shard import. Not
// necessarily an error
auto sequence = db->getDatabaseImportSequence();
BEAST_EXPECT(!sequence || sequence >= lastRotated);
break;
}
}
});
data = TestData(seedValue * 2, 4, shardCount);
if (!BEAST_EXPECT(data.makeLedgers(env, shardCount)))
{
pauseVerifier.join();
return;
}
pauseVerifier.join();
BEAST_EXPECT(store.getLastRotated() != lastRotated);
}
// Database rotation should have been postponed at some
// point during the import
auto const expectedLogMessage =
"rotation would interfere with ShardStore import";
BEAST_EXPECT(
capturedLogs.find(expectedLogMessage) != std::string::npos);
}
void
testImportWithHistoricalPaths(std::uint64_t const seedValue)
{
@@ -1204,19 +1290,19 @@ class DatabaseShard_test : public TestBase
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
auto const ledgerCount = 4;
auto const shardCount = 4;
TestData data(seedValue, 4, ledgerCount);
TestData data(seedValue, 4, shardCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i)
for (std::uint32_t i = 0; i < shardCount * ledgersPerShard; ++i)
BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i]));
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0));
db->import(ndb);
for (std::uint32_t i = 1; i <= ledgerCount; ++i)
db->importDatabase(ndb);
for (std::uint32_t i = 1; i <= shardCount; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110));
@@ -1242,7 +1328,7 @@ class DatabaseShard_test : public TestBase
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == ledgerCount - 2);
BEAST_EXPECT(historicalPathCount == shardCount - 2);
}
// Test importing with a single historical
@@ -1262,19 +1348,19 @@ class DatabaseShard_test : public TestBase
Database& ndb = env.app().getNodeStore();
BEAST_EXPECT(db);
auto const ledgerCount = 4;
auto const shardCount = 4;
TestData data(seedValue * 2, 4, ledgerCount);
TestData data(seedValue * 2, 4, shardCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i)
for (std::uint32_t i = 0; i < shardCount * ledgersPerShard; ++i)
BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i]));
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0));
db->import(ndb);
for (std::uint32_t i = 1; i <= ledgerCount; ++i)
db->importDatabase(ndb);
for (std::uint32_t i = 1; i <= shardCount; ++i)
waitShard(*db, i);
BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110));
@@ -1293,7 +1379,7 @@ class DatabaseShard_test : public TestBase
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == ledgerCount - 2);
BEAST_EXPECT(historicalPathCount == shardCount - 2);
}
}
@@ -1331,9 +1417,9 @@ class DatabaseShard_test : public TestBase
DatabaseShard* db = env.app().getShardStore();
BEAST_EXPECT(db);
auto const ledgerCount = 4;
auto const shardCount = 4;
TestData data(seedValue, 4, ledgerCount);
TestData data(seedValue, 4, shardCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
@@ -1341,10 +1427,10 @@ class DatabaseShard_test : public TestBase
std::uint64_t bitMask = 0;
// Add ten shards to the Shard Database
for (std::uint32_t i = 0; i < ledgerCount; ++i)
for (std::uint32_t i = 0; i < shardCount; ++i)
{
auto n = createShard(data, *db, ledgerCount);
if (!BEAST_EXPECT(n && *n >= 1 && *n <= ledgerCount))
auto n = createShard(data, *db, shardCount);
if (!BEAST_EXPECT(n && *n >= 1 && *n <= shardCount))
return;
bitMask |= 1ll << *n;
BEAST_EXPECT(
@@ -1405,19 +1491,19 @@ class DatabaseShard_test : public TestBase
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == ledgerCount - 2);
BEAST_EXPECT(historicalPathCount == shardCount - 2);
data = TestData(seedValue * 2, 4, ledgerCount);
if (!BEAST_EXPECT(data.makeLedgers(env, ledgerCount)))
data = TestData(seedValue * 2, 4, shardCount);
if (!BEAST_EXPECT(data.makeLedgers(env, shardCount)))
return;
// Add ten more shards to the Shard Database
// to exercise recent shard rotation
for (std::uint32_t i = 0; i < ledgerCount; ++i)
for (std::uint32_t i = 0; i < shardCount; ++i)
{
auto n = createShard(data, *db, ledgerCount * 2, ledgerCount);
auto n = createShard(data, *db, shardCount * 2, shardCount);
if (!BEAST_EXPECT(
n && *n >= 1 + ledgerCount && *n <= ledgerCount * 2))
n && *n >= 1 + shardCount && *n <= shardCount * 2))
return;
bitMask |= 1ll << *n;
BEAST_EXPECT(
@@ -1467,7 +1553,7 @@ class DatabaseShard_test : public TestBase
// All historical shards should be stored
// at historical paths
BEAST_EXPECT(historicalPathCount == (ledgerCount * 2) - 2);
BEAST_EXPECT(historicalPathCount == (shardCount * 2) - 2);
}
}
@@ -1529,21 +1615,26 @@ public:
void
run() override
{
std::uint64_t const seedValue = 51;
auto seedValue = [] {
static std::uint64_t seedValue = 41;
seedValue += 10;
return seedValue;
};
testStandalone();
testCreateShard(seedValue);
testReopenDatabase(seedValue + 10);
testGetCompleteShards(seedValue + 20);
testPrepareShards(seedValue + 30);
testImportShard(seedValue + 40);
testCorruptedDatabase(seedValue + 50);
testIllegalFinalKey(seedValue + 60);
testImport(seedValue + 70);
testDeterministicShard(seedValue + 80);
testImportWithHistoricalPaths(seedValue + 90);
testPrepareWithHistoricalPaths(seedValue + 100);
testOpenShardManagement(seedValue + 110);
testCreateShard(seedValue());
testReopenDatabase(seedValue());
testGetCompleteShards(seedValue());
testPrepareShards(seedValue());
testImportShard(seedValue());
testCorruptedDatabase(seedValue());
testIllegalFinalKey(seedValue());
testDeterministicShard(seedValue());
testImport(seedValue());
testImportWithOnlineDelete(seedValue());
testImportWithHistoricalPaths(seedValue());
testPrepareWithHistoricalPaths(seedValue());
testOpenShardManagement(seedValue());
}
};

View File

@@ -487,7 +487,7 @@ public:
srcBackendType + "'");
// Do the import
dest->import(*src);
dest->importDatabase(*src);
// Get the results of the import
fetchCopyOfBatch(*dest, &copy, batch);

View File

@@ -4301,6 +4301,37 @@ static RPCCallTestData const rpcCallTestArray[] = {
]
})"},
// nodetoshard_status
// -------------------------------------------------------------------
{"nodetoshard_status: minimal.",
__LINE__,
{
"nodetoshard_status",
},
RPCCallTestData::no_exception,
R"({
"method" : "nodetoshard_status",
"params" : [
{
"api_version" : %MAX_API_VER%,
}
]
})"},
{"nodetoshard_status: too many arguments.",
__LINE__,
{"nodetoshard_status", "extra"},
RPCCallTestData::no_exception,
R"({
"method" : "nodetoshard_status",
"params" : [
{
"error" : "badSyntax",
"error_code" : 1,
"error_message" : "Syntax error."
}
]
})"},
// owner_info
// ------------------------------------------------------------------
{"owner_info: minimal.",