Extend peer shard info

This commit is contained in:
Miguel Portilla
2020-10-09 17:14:51 -04:00
committed by manojsdoshi
parent 38f954fd46
commit 80c2302fd3
35 changed files with 1828 additions and 1260 deletions

View File

@@ -520,6 +520,7 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/ShardInfo.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
main sources:

View File

@@ -8,7 +8,7 @@ Loop: ripple.app ripple.net
ripple.app > ripple.net
Loop: ripple.app ripple.nodestore
ripple.app > ripple.nodestore
ripple.nodestore ~= ripple.app
Loop: ripple.app ripple.overlay
ripple.overlay ~= ripple.app
@@ -41,7 +41,7 @@ Loop: ripple.net ripple.rpc
ripple.rpc > ripple.net
Loop: ripple.nodestore ripple.overlay
ripple.overlay == ripple.nodestore
ripple.overlay ~= ripple.nodestore
Loop: ripple.overlay ripple.rpc
ripple.rpc ~= ripple.overlay

View File

@@ -98,18 +98,15 @@ template <class T>
std::string
to_string(RangeSet<T> const& rs)
{
using ripple::to_string;
if (rs.empty())
return "empty";
std::string res = "";
std::string s;
for (auto const& interval : rs)
{
if (!res.empty())
res += ",";
res += to_string(interval);
}
return res;
s += ripple::to_string(interval) + ",";
s.pop_back();
return s;
}
/** Convert the given styled string to a RangeSet.
@@ -122,13 +119,14 @@ to_string(RangeSet<T> const& rs)
@return True on successfully converting styled string
*/
template <class T>
bool
[[nodiscard]] bool
from_string(RangeSet<T>& rs, std::string const& s)
{
std::vector<std::string> intervals;
std::vector<std::string> tokens;
bool result{true};
rs.clear();
boost::split(tokens, s, boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{

View File

@@ -224,14 +224,79 @@ public:
bool
isStopping() const;
/** @return The maximum number of ledgers stored in a shard
*/
[[nodiscard]] std::uint32_t
ledgersPerShard() const noexcept
{
return ledgersPerShard_;
}
/** @return The earliest ledger sequence allowed
*/
std::uint32_t
earliestLedgerSeq() const
[[nodiscard]] std::uint32_t
earliestLedgerSeq() const noexcept
{
return earliestLedgerSeq_;
}
/** @return The earliest shard index
*/
[[nodiscard]] std::uint32_t
earliestShardIndex() const noexcept
{
return earliestShardIndex_;
}
/** Calculates the first ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The first ledger sequence pertaining to the shard index
*/
[[nodiscard]] std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const noexcept
{
assert(shardIndex >= earliestShardIndex_);
if (shardIndex <= earliestShardIndex_)
return earliestLedgerSeq_;
return 1 + (shardIndex * ledgersPerShard_);
}
/** Calculates the last ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The last ledger sequence pertaining to the shard index
*/
[[nodiscard]] std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const noexcept
{
assert(shardIndex >= earliestShardIndex_);
return (shardIndex + 1) * ledgersPerShard_;
}
/** Calculates the shard index for a given ledger sequence
@param ledgerSeq ledger sequence
@return The shard index of the ledger sequence
*/
[[nodiscard]] std::uint32_t
seqToShardIndex(std::uint32_t ledgerSeq) const noexcept
{
assert(ledgerSeq >= earliestLedgerSeq_);
return (ledgerSeq - 1) / ledgersPerShard_;
}
/** Calculates the maximum ledgers for a given shard index
@param shardIndex The shard index considered
@return The maximum ledgers pertaining to the shard index
@note The earliest shard may store less if the earliest ledger
sequence truncates its beginning
*/
[[nodiscard]] std::uint32_t
maxLedgers(std::uint32_t shardIndex) const noexcept;
protected:
beast::Journal const j_;
Scheduler& scheduler_;
@@ -240,6 +305,25 @@ protected:
std::atomic<std::uint32_t> fetchHitCount_{0};
std::atomic<std::uint32_t> fetchSz_{0};
// The default is DEFAULT_LEDGERS_PER_SHARD (16384) to match the XRP ledger
// network. Can be set through the configuration file using the
// 'ledgers_per_shard' field under the 'node_db' and 'shard_db' stanzas.
// If specified, the value must be a multiple of 256 and equally assigned
// in both stanzas. Only unit tests or alternate networks should change
// this value.
std::uint32_t const ledgersPerShard_;
// The default is XRP_LEDGER_EARLIEST_SEQ (32570) to match the XRP ledger
// network's earliest allowed ledger sequence. Can be set through the
// configuration file using the 'earliest_seq' field under the 'node_db'
// and 'shard_db' stanzas. If specified, the value must be greater than zero
// and equally assigned in both stanzas. Only unit tests or alternate
// networks should change this value.
std::uint32_t const earliestLedgerSeq_;
// The earliest shard index
std::uint32_t const earliestShardIndex_;
void
storeStats(std::uint64_t count, std::uint64_t sz)
{
@@ -288,10 +372,6 @@ private:
std::vector<std::thread> readThreads_;
bool readStopping_{false};
// The default is 32570 to match the XRP ledger network's earliest
// allowed sequence. Alternate networks may set this value.
std::uint32_t const earliestLedgerSeq_;
virtual std::shared_ptr<NodeObject>
fetchNodeObject(
uint256 const& hash,

View File

@@ -21,9 +21,9 @@
#define RIPPLE_NODESTORE_DATABASESHARD_H_INCLUDED
#include <ripple/app/ledger/Ledger.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/SociDB.h>
#include <ripple/nodestore/Database.h>
#include <ripple/nodestore/ShardInfo.h>
#include <ripple/nodestore/Types.h>
#include <memory>
@@ -57,7 +57,7 @@ public:
@return `true` if the database initialized without error
*/
virtual bool
[[nodiscard]] virtual bool
init() = 0;
/** Prepare to store a new ledger in the shard being acquired
@@ -72,7 +72,7 @@ public:
between requests.
@implNote adds a new writable shard if necessary
*/
virtual std::optional<std::uint32_t>
[[nodiscard]] virtual std::optional<std::uint32_t>
prepareLedger(std::uint32_t validLedgerSeq) = 0;
/** Prepare one or more shard indexes to be imported into the database
@@ -80,7 +80,7 @@ public:
@param shardIndexes Shard indexes to be prepared for import
@return true if all shard indexes successfully prepared for import
*/
virtual bool
[[nodiscard]] virtual bool
prepareShards(std::vector<std::uint32_t> const& shardIndexes) = 0;
/** Remove a previously prepared shard index for import
@@ -94,7 +94,7 @@ public:
@return a string representing the shards prepared for import
*/
virtual std::string
[[nodiscard]] virtual std::string
getPreShards() = 0;
/** Import a shard from the shard archive handler into the
@@ -106,7 +106,7 @@ public:
@return true If the shard was successfully imported
@implNote if successful, srcDir is moved to the database directory
*/
virtual bool
[[nodiscard]] virtual bool
importShard(
std::uint32_t shardIndex,
boost::filesystem::path const& srcDir) = 0;
@@ -117,7 +117,7 @@ public:
@param seq The sequence of the ledger
@return The ledger if found, nullptr otherwise
*/
virtual std::shared_ptr<Ledger>
[[nodiscard]] virtual std::shared_ptr<Ledger>
fetchLedger(uint256 const& hash, std::uint32_t seq) = 0;
/** Notifies the database that the given ledger has been
@@ -128,13 +128,6 @@ public:
virtual void
setStored(std::shared_ptr<Ledger const> const& ledger) = 0;
/** Query which complete shards are stored
@return the indexes of complete shards
*/
virtual std::string
getCompleteShards() = 0;
/**
* @brief callForLedgerSQL Checkouts ledger database for shard
* containing given ledger and calls given callback function passing
@@ -231,43 +224,16 @@ public:
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback) = 0;
/** @return The maximum number of ledgers stored in a shard
*/
virtual std::uint32_t
ledgersPerShard() const = 0;
/** Query information about shards held
/** @return The earliest shard index
*/
virtual std::uint32_t
earliestShardIndex() const = 0;
/** Calculates the shard index for a given ledger sequence
@param seq ledger sequence
@return The shard index of the ledger sequence
@return Information about shards held by this node
*/
virtual std::uint32_t
seqToShardIndex(std::uint32_t seq) const = 0;
/** Calculates the first ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The first ledger sequence pertaining to the shard index
*/
virtual std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const = 0;
/** Calculates the last ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The last ledger sequence pertaining to the shard index
*/
virtual std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const = 0;
[[nodiscard]] virtual std::unique_ptr<ShardInfo>
getShardInfo() const = 0;
/** Returns the root database directory
*/
virtual boost::filesystem::path const&
[[nodiscard]] virtual boost::filesystem::path const&
getRootDir() const = 0;
virtual Json::Value
@@ -281,18 +247,12 @@ public:
virtual std::optional<std::uint32_t>
getDatabaseImportSequence() const = 0;
/** The number of ledgers in a shard */
static constexpr std::uint32_t ledgersPerShardDefault{16384u};
/** Returns the number of queued tasks
*/
[[nodiscard]] virtual size_t
getNumTasks() const = 0;
};
constexpr std::uint32_t
seqToShardIndex(
std::uint32_t ledgerSeq,
std::uint32_t ledgersPerShard = DatabaseShard::ledgersPerShardDefault)
{
return (ledgerSeq - 1) / ledgersPerShard;
}
extern std::unique_ptr<DatabaseShard>
make_ShardStore(
Application& app,

View File

@@ -0,0 +1,122 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED
#define RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED
#include <ripple/basics/RangeSet.h>
#include <ripple/nodestore/Types.h>
#include <ripple/protocol/messages.h>
namespace ripple {
namespace NodeStore {
/* Contains information on the status of shards for a node
*/
class ShardInfo
{
private:
class Incomplete
{
public:
Incomplete() = delete;
Incomplete(ShardState state, std::uint32_t percentProgress)
: state_(state), percentProgress_(percentProgress)
{
}
[[nodiscard]] ShardState
state() const noexcept
{
return state_;
}
[[nodiscard]] std::uint32_t
percentProgress() const noexcept
{
return percentProgress_;
}
private:
ShardState state_;
std::uint32_t percentProgress_;
};
public:
[[nodiscard]] NetClock::time_point const&
msgTimestamp() const
{
return msgTimestamp_;
}
void
setMsgTimestamp(NetClock::time_point const& timestamp)
{
msgTimestamp_ = timestamp;
}
[[nodiscard]] std::string
finalizedToString() const;
[[nodiscard]] bool
setFinalizedFromString(std::string const& str)
{
return from_string(finalized_, str);
}
[[nodiscard]] RangeSet<std::uint32_t> const&
finalized() const
{
return finalized_;
}
[[nodiscard]] std::string
incompleteToString() const;
[[nodiscard]] std::map<std::uint32_t, Incomplete> const&
incomplete() const
{
return incomplete_;
}
// Returns true if successful or false because of a duplicate index
bool
update(
std::uint32_t shardIndex,
ShardState state,
std::uint32_t percentProgress);
[[nodiscard]] protocol::TMPeerShardInfoV2
makeMessage(Application& app);
private:
// Finalized immutable shards
RangeSet<std::uint32_t> finalized_;
// Incomplete shards being acquired or finalized
std::map<std::uint32_t, Incomplete> incomplete_;
// Message creation time
NetClock::time_point msgTimestamp_;
};
} // namespace NodeStore
} // namespace ripple
#endif

View File

@@ -55,6 +55,16 @@ enum Status {
using Batch = std::vector<std::shared_ptr<NodeObject>>;
} // namespace NodeStore
/** Shard states. */
enum class ShardState : std::uint32_t {
acquire, // Acquiring ledgers
complete, // Backend is ledger complete, database is unverified
finalizing, // Verifying database
finalized, // Database verified, shard is immutable
queued // Queued to be finalized
};
} // namespace ripple
#endif

View File

@@ -36,9 +36,17 @@ Database::Database(
beast::Journal journal)
: j_(journal)
, scheduler_(scheduler)
, ledgersPerShard_(get<std::uint32_t>(
config,
"ledgers_per_shard",
DEFAULT_LEDGERS_PER_SHARD))
, earliestLedgerSeq_(
get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
, earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
{
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>("Invalid ledgers_per_shard");
if (earliestLedgerSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq");
@@ -64,6 +72,19 @@ Database::isStopping() const
return readStopping_;
}
std::uint32_t
Database::maxLedgers(std::uint32_t shardIndex) const noexcept
{
if (shardIndex > earliestShardIndex_)
return ledgersPerShard_;
if (shardIndex == earliestShardIndex_)
return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
assert(!"Invalid shard index");
return 0;
}
void
Database::stop()
{

View File

@@ -22,6 +22,7 @@
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
#include <ripple/basics/ByteUtilities.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/basics/chrono.h>
#include <ripple/basics/random.h>
#include <ripple/core/ConfigSections.h>
@@ -30,6 +31,7 @@
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/HashPrefix.h>
#include <ripple/protocol/digest.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -52,7 +54,6 @@ DatabaseShardImp::DatabaseShardImp(
app.config().section(ConfigSection::shardDatabase()),
j)
, app_(app)
, earliestShardIndex_(seqToShardIndex(earliestLedgerSeq()))
, avgShardFileSz_(ledgersPerShard_ * kilobytes(192ull))
, openFinalLimit_(
app.config().getValueFor(SizedItem::openFinalLimit, std::nullopt))
@@ -112,7 +113,7 @@ DatabaseShardImp::init()
if (!app_.config().standalone() && !historicalPaths_.empty())
{
// Check historical paths for duplicated file systems
if (!checkHistoricalPaths())
if (!checkHistoricalPaths(lock))
return false;
}
@@ -142,12 +143,12 @@ DatabaseShardImp::init()
// Ignore values below the earliest shard index
auto const shardIndex{std::stoul(dirName)};
if (shardIndex < earliestShardIndex())
if (shardIndex < earliestShardIndex_)
{
JLOG(j_.debug())
<< "shard " << shardIndex
<< " ignored, comes before earliest shard index "
<< earliestShardIndex();
<< earliestShardIndex_;
continue;
}
@@ -176,13 +177,13 @@ DatabaseShardImp::init()
switch (shard->getState())
{
case Shard::final:
case ShardState::finalized:
if (++openFinals > openFinalLimit_)
shard->tryClose();
shards_.emplace(shardIndex, std::move(shard));
break;
case Shard::complete:
case ShardState::complete:
finalizeShard(
shards_.emplace(shardIndex, std::move(shard))
.first->second,
@@ -190,7 +191,7 @@ DatabaseShardImp::init()
std::nullopt);
break;
case Shard::acquire:
case ShardState::acquire:
if (acquireIndex_ != 0)
{
JLOG(j_.error())
@@ -217,11 +218,10 @@ DatabaseShardImp::init()
return false;
}
updateStatus(lock);
init_ = true;
}
setFileStats();
updateFileStats();
return true;
}
@@ -289,7 +289,9 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
std::lock_guard lock(mutex_);
shards_.emplace(*shardIndex, std::move(shard));
acquireIndex_ = *shardIndex;
updatePeers(lock);
}
return ledgerSeq;
}
@@ -312,14 +314,15 @@ DatabaseShardImp::prepareShards(std::vector<std::uint32_t> const& shardIndexes)
boost::algorithm::join(indexesAsString, ", ");
};
std::string const prequel = shardIndex
? "shard " + std::to_string(*shardIndex)
: multipleIndexPrequel();
JLOG(j.error()) << prequel << " " << msg;
JLOG(j.error()) << (shardIndex ? "shard " + std::to_string(*shardIndex)
: multipleIndexPrequel())
<< " " << msg;
return false;
};
if (shardIndexes.empty())
return fail("invalid shard indexes");
std::lock_guard lock(mutex_);
assert(init_);
@@ -330,18 +333,18 @@ DatabaseShardImp::prepareShards(std::vector<std::uint32_t> const& shardIndexes)
for (auto const shardIndex : shardIndexes)
{
if (shardIndex < earliestShardIndex())
if (shardIndex < earliestShardIndex_)
{
return fail(
"comes before earliest shard index " +
std::to_string(earliestShardIndex()),
std::to_string(earliestShardIndex_),
shardIndex);
}
// If we are synced to the network, check if the shard index is
// greater or equal to the current or validated shard index.
auto seqCheck = [&](std::uint32_t ledgerSeq) {
if (ledgerSeq >= earliestLedgerSeq() &&
if (ledgerSeq >= earliestLedgerSeq_ &&
shardIndex >= seqToShardIndex(ledgerSeq))
{
return fail("invalid index", shardIndex);
@@ -403,14 +406,9 @@ DatabaseShardImp::prepareShards(std::vector<std::uint32_t> const& shardIndexes)
}
for (auto const shardIndex : shardIndexes)
{
auto const prepareSuccessful =
preparedIndexes_.emplace(shardIndex).second;
(void)prepareSuccessful;
assert(prepareSuccessful);
}
preparedIndexes_.emplace(shardIndex);
updatePeers(lock);
return true;
}
@@ -420,7 +418,8 @@ DatabaseShardImp::removePreShard(std::uint32_t shardIndex)
std::lock_guard lock(mutex_);
assert(init_);
preparedIndexes_.erase(shardIndex);
if (preparedIndexes_.erase(shardIndex))
updatePeers(lock);
}
std::string
@@ -438,7 +437,7 @@ DatabaseShardImp::getPreShards()
if (rs.empty())
return {};
return to_string(rs);
return ripple::to_string(rs);
};
bool
@@ -452,6 +451,7 @@ DatabaseShardImp::importShard(
// Remove the failed import shard index so it can be retried
preparedIndexes_.erase(shardIndex);
updatePeers(lock);
return false;
};
@@ -523,7 +523,8 @@ DatabaseShardImp::importShard(
auto shard{std::make_unique<Shard>(
app_, *this, shardIndex, dstDir.parent_path(), j_)};
if (!shard->init(scheduler_, *ctx_) || shard->getState() != Shard::complete)
if (!shard->init(scheduler_, *ctx_) ||
shard->getState() != ShardState::complete)
{
shard.reset();
renameDir(dstDir, srcDir);
@@ -566,9 +567,9 @@ DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t ledgerSeq)
// Ledger must be stored in a final or acquiring shard
switch (shard->getState())
{
case Shard::final:
case ShardState::finalized:
break;
case Shard::acquire:
case ShardState::acquire:
if (shard->containsLedger(ledgerSeq))
break;
[[fallthrough]];
@@ -687,13 +688,11 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
setStoredInShard(shard, ledger);
}
std::string
DatabaseShardImp::getCompleteShards()
std::unique_ptr<ShardInfo>
DatabaseShardImp::getShardInfo() const
{
std::lock_guard lock(mutex_);
assert(init_);
return status_;
return getShardInfo(lock);
}
void
@@ -895,17 +894,14 @@ DatabaseShardImp::doImportDatabase()
std::uint32_t const firstSeq = firstLedgerSeq(shardIndex);
std::uint32_t const lastSeq =
std::max(firstSeq, lastLedgerSeq(shardIndex));
std::uint32_t const numLedgers = shardIndex == earliestShardIndex()
? lastSeq - firstSeq + 1
: ledgersPerShard_;
// Verify SQLite ledgers are in the node store
{
auto const ledgerHashes{
app_.getRelationalDBInterface().getHashesByIndex(
firstSeq, lastSeq)};
if (ledgerHashes.size() != numLedgers)
continue;
// Verify SQLite ledgers are in the node store
{
auto const ledgerHashes{
app_.getRelationalDBInterface().getHashesByIndex(
firstSeq, lastSeq)};
if (ledgerHashes.size() != maxLedgers(shardIndex))
continue;
auto& source = app_.getNodeStore();
bool valid{true};
@@ -997,7 +993,7 @@ DatabaseShardImp::doImportDatabase()
using namespace boost::filesystem;
bool success{false};
if (lastLedgerHash && shard->getState() == Shard::complete)
if (lastLedgerHash && shard->getState() == ShardState::complete)
{
// Store shard final key
Serializer s;
@@ -1053,15 +1049,13 @@ DatabaseShardImp::doImportDatabase()
{
std::lock_guard lock(mutex_);
if (isStopping())
return;
databaseImportStatus_.reset();
updateStatus(lock);
}
setFileStats();
updateFileStats();
}
std::int32_t
@@ -1166,13 +1160,11 @@ DatabaseShardImp::sweep()
std::vector<std::shared_ptr<Shard>> openFinals;
openFinals.reserve(openFinalLimit_);
for (auto const& e : shards)
for (auto const& weak : shards)
{
if (auto const shard{e.lock()}; shard && shard->isOpen())
if (auto const shard{weak.lock()}; shard && shard->isOpen())
{
shard->sweep();
if (shard->getState() == Shard::final)
if (shard->getState() == ShardState::finalized)
openFinals.emplace_back(std::move(shard));
}
}
@@ -1252,30 +1244,30 @@ DatabaseShardImp::initConfig(std::lock_guard<std::mutex> const&)
Config const& config{app_.config()};
Section const& section{config.section(ConfigSection::shardDatabase())};
auto compare = [&](std::string const& name, std::uint32_t defaultValue) {
std::uint32_t shardDBValue{defaultValue};
get_if_exists<std::uint32_t>(section, name, shardDBValue);
std::uint32_t nodeDBValue{defaultValue};
get_if_exists<std::uint32_t>(
config.section(ConfigSection::nodeDatabase()), name, nodeDBValue);
return shardDBValue == nodeDBValue;
};
// If ledgers_per_shard or earliest_seq are specified,
// they must be equally assigned in 'node_db'
if (!compare("ledgers_per_shard", DEFAULT_LEDGERS_PER_SHARD))
{
// The earliest ledger sequence defaults to XRP_LEDGER_EARLIEST_SEQ.
// A custom earliest ledger sequence can be set through the
// configuration file using the 'earliest_seq' field under the
// 'node_db' and 'shard_db' stanzas. If specified, this field must
// have a value greater than zero and be equally assigned in
// both stanzas.
std::uint32_t shardDBEarliestSeq{0};
get_if_exists<std::uint32_t>(
section, "earliest_seq", shardDBEarliestSeq);
std::uint32_t nodeDBEarliestSeq{0};
get_if_exists<std::uint32_t>(
config.section(ConfigSection::nodeDatabase()),
"earliest_seq",
nodeDBEarliestSeq);
if (shardDBEarliestSeq != nodeDBEarliestSeq)
{
return fail(
"and [" + ConfigSection::nodeDatabase() +
"] define different 'earliest_seq' values");
}
return fail(
"and [" + ConfigSection::nodeDatabase() + "] define different '" +
"ledgers_per_shard" + "' values");
}
if (!compare("earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
{
return fail(
"and [" + ConfigSection::nodeDatabase() + "] define different '" +
"earliest_seq" + "' values");
}
using namespace boost::filesystem;
@@ -1307,20 +1299,6 @@ DatabaseShardImp::initConfig(std::lock_guard<std::mutex> const&)
}
}
if (section.exists("ledgers_per_shard"))
{
// To be set only in standalone for testing
if (!config.standalone())
return fail("'ledgers_per_shard' only honored in stand alone");
ledgersPerShard_ = get<std::uint32_t>(section, "ledgers_per_shard");
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
return fail("'ledgers_per_shard' must be a multiple of 256");
earliestShardIndex_ = seqToShardIndex(earliestLedgerSeq());
avgShardFileSz_ = ledgersPerShard_ * kilobytes(192);
}
// NuDB is the default and only supported permanent storage backend
backendName_ = get<std::string>(section, "type", "nudb");
if (!boost::iequals(backendName_, "NuDB"))
@@ -1353,7 +1331,7 @@ DatabaseShardImp::findAcquireIndex(
std::uint32_t validLedgerSeq,
std::lock_guard<std::mutex> const&)
{
if (validLedgerSeq < earliestLedgerSeq())
if (validLedgerSeq < earliestLedgerSeq_)
return std::nullopt;
auto const maxShardIndex{[this, validLedgerSeq]() {
@@ -1362,7 +1340,7 @@ DatabaseShardImp::findAcquireIndex(
--shardIndex;
return shardIndex;
}()};
auto const maxNumShards{maxShardIndex - earliestShardIndex() + 1};
auto const maxNumShards{maxShardIndex - earliestShardIndex_ + 1};
// Check if the shard store has all shards
if (shards_.size() >= maxNumShards)
@@ -1376,8 +1354,7 @@ DatabaseShardImp::findAcquireIndex(
std::vector<std::uint32_t> available;
available.reserve(maxNumShards - shards_.size());
for (auto shardIndex = earliestShardIndex();
shardIndex <= maxShardIndex;
for (auto shardIndex = earliestShardIndex_; shardIndex <= maxShardIndex;
++shardIndex)
{
if (shards_.find(shardIndex) == shards_.end() &&
@@ -1402,7 +1379,7 @@ DatabaseShardImp::findAcquireIndex(
// chances of running more than 30 times is less than 1 in a billion
for (int i = 0; i < 40; ++i)
{
auto const shardIndex{rand_int(earliestShardIndex(), maxShardIndex)};
auto const shardIndex{rand_int(earliestShardIndex_, maxShardIndex)};
if (shards_.find(shardIndex) == shards_.end() &&
preparedIndexes_.find(shardIndex) == preparedIndexes_.end())
{
@@ -1449,9 +1426,7 @@ DatabaseShardImp::finalizeShard(
{
auto const boundaryIndex{shardBoundaryIndex()};
std::lock_guard lock(mutex_);
updateStatus(lock);
if (shard->index() < boundaryIndex)
{
@@ -1464,19 +1439,17 @@ DatabaseShardImp::finalizeShard(
<< " is not stored at a historical path";
}
}
else
{
// Not a historical shard. Shift recent shards if necessary
relocateOutdatedShards(lock);
assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
auto& recentShard = shard->index() == boundaryIndex
? secondLatestShardIndex_
: latestShardIndex_;
relocateOutdatedShards(lock);
// Set the appropriate recent shard index
recentShard = shard->index();
if (shard->index() == boundaryIndex)
secondLatestShardIndex_ = shard->index();
else
latestShardIndex_ = shard->index();
if (shard->getDir().parent_path() != dir_)
{
@@ -1484,26 +1457,16 @@ DatabaseShardImp::finalizeShard(
<< " is not stored at the path";
}
}
updatePeers(lock);
}
setFileStats();
// Update peers with new shard index
if (!app_.config().standalone() &&
app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED)
{
protocol::TMPeerShardInfo message;
PublicKey const& publicKey{app_.nodeIdentity().first};
message.set_nodepubkey(publicKey.data(), publicKey.size());
message.set_shardindexes(std::to_string(shard->index()));
app_.overlay().foreach(send_always(std::make_shared<Message>(
message, protocol::mtPEER_SHARD_INFO)));
}
updateFileStats();
});
}
void
DatabaseShardImp::setFileStats()
DatabaseShardImp::updateFileStats()
{
std::vector<std::weak_ptr<Shard>> shards;
{
@@ -1519,9 +1482,9 @@ DatabaseShardImp::setFileStats()
std::uint64_t sumSz{0};
std::uint32_t sumFd{0};
std::uint32_t numShards{0};
for (auto const& e : shards)
for (auto const& weak : shards)
{
if (auto const shard{e.lock()}; shard)
if (auto const shard{weak.lock()}; shard)
{
auto const [sz, fd] = shard->getFileInfo();
sumSz += sz;
@@ -1563,21 +1526,6 @@ DatabaseShardImp::setFileStats()
}
}
void
DatabaseShardImp::updateStatus(std::lock_guard<std::mutex> const&)
{
if (!shards_.empty())
{
RangeSet<std::uint32_t> rs;
for (auto const& e : shards_)
if (e.second->getState() == Shard::final)
rs.insert(e.second->index());
status_ = to_string(rs);
}
else
status_.clear();
}
bool
DatabaseShardImp::sufficientStorage(
std::uint32_t numShards,
@@ -1645,7 +1593,7 @@ DatabaseShardImp::setStoredInShard(
return false;
}
if (shard->getState() == Shard::complete)
if (shard->getState() == ShardState::complete)
{
std::lock_guard lock(mutex_);
if (auto const it{shards_.find(shard->index())}; it != shards_.end())
@@ -1662,7 +1610,7 @@ DatabaseShardImp::setStoredInShard(
}
}
setFileStats();
updateFileStats();
return true;
}
@@ -1680,12 +1628,6 @@ DatabaseShardImp::removeFailedShard(std::shared_ptr<Shard>& shard)
if (shard->index() == secondLatestShardIndex_)
secondLatestShardIndex_ = std::nullopt;
if ((shards_.erase(shard->index()) > 0) &&
shard->getState() == Shard::final)
{
updateStatus(lock);
}
}
shard->removeOnDestroy();
@@ -1693,7 +1635,7 @@ DatabaseShardImp::removeFailedShard(std::shared_ptr<Shard>& shard)
// Reset the shared_ptr to invoke the shard's
// destructor and remove it from the server
shard.reset();
setFileStats();
updateFileStats();
}
std::uint32_t
@@ -1701,7 +1643,7 @@ DatabaseShardImp::shardBoundaryIndex() const
{
auto const validIndex = app_.getLedgerMaster().getValidLedgerIndex();
if (validIndex < earliestLedgerSeq())
if (validIndex < earliestLedgerSeq_)
return 0;
// Shards with an index earlier than the recent shard boundary index
@@ -1727,151 +1669,135 @@ void
DatabaseShardImp::relocateOutdatedShards(
std::lock_guard<std::mutex> const& lock)
{
if (auto& cur = latestShardIndex_, &prev = secondLatestShardIndex_;
cur || prev)
{
auto const latestShardIndex =
seqToShardIndex(app_.getLedgerMaster().getValidLedgerIndex());
auto& cur{latestShardIndex_};
auto& prev{secondLatestShardIndex_};
if (!cur && !prev)
return;
auto const separateHistoricalPath = !historicalPaths_.empty();
auto const latestShardIndex =
seqToShardIndex(app_.getLedgerMaster().getValidLedgerIndex());
auto const separateHistoricalPath = !historicalPaths_.empty();
auto const removeShard =
[this](std::uint32_t const shardIndex) -> void {
canAdd_ = false;
auto const removeShard = [this](std::uint32_t const shardIndex) -> void {
canAdd_ = false;
if (auto it = shards_.find(shardIndex); it != shards_.end())
{
if (it->second)
removeFailedShard(it->second);
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
}
if (auto it = shards_.find(shardIndex); it != shards_.end())
{
if (it->second)
removeFailedShard(it->second);
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
};
}
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
};
auto const keepShard =
[this, &lock, removeShard, separateHistoricalPath](
std::uint32_t const shardIndex) -> bool {
if (numHistoricalShards(lock) >= maxHistoricalShards_)
{
JLOG(j_.error())
<< "maximum number of historical shards reached";
auto const keepShard = [this, &lock, removeShard, separateHistoricalPath](
std::uint32_t const shardIndex) -> bool {
if (numHistoricalShards(lock) >= maxHistoricalShards_)
{
JLOG(j_.error()) << "maximum number of historical shards reached";
removeShard(shardIndex);
return false;
}
if (separateHistoricalPath &&
!sufficientStorage(1, PathDesignation::historical, lock))
{
JLOG(j_.error()) << "insufficient storage space available";
removeShard(shardIndex);
return false;
}
removeShard(shardIndex);
return false;
}
if (separateHistoricalPath &&
!sufficientStorage(1, PathDesignation::historical, lock))
{
JLOG(j_.error()) << "insufficient storage space available";
return true;
};
removeShard(shardIndex);
return false;
}
// Move a shard from the main shard path to a historical shard
// path by copying the contents, and creating a new shard.
auto const moveShard = [this,
&lock](std::uint32_t const shardIndex) -> void {
auto it{shards_.find(shardIndex)};
if (it == shards_.end())
{
JLOG(j_.warn()) << "can't find shard to move to historical path";
return;
}
return true;
};
auto& shard{it->second};
// Move a shard from the main shard path to a historical shard
// path by copying the contents, and creating a new shard.
auto const moveShard = [this,
&lock](std::uint32_t const shardIndex) -> void {
auto const dst = chooseHistoricalPath(lock);
// Close any open file descriptors before moving the shard
// directory. Don't call removeOnDestroy since that would
// attempt to close the fds after the directory has been moved.
if (!shard->tryClose())
{
JLOG(j_.warn()) << "can't close shard to move to historical path";
return;
}
if (auto it = shards_.find(shardIndex); it != shards_.end())
{
auto& shard{it->second};
auto const dst{chooseHistoricalPath(lock)};
try
{
// Move the shard directory to the new path
boost::filesystem::rename(
shard->getDir().string(), dst / std::to_string(shardIndex));
}
catch (...)
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to move to historical storage";
return;
}
// Close any open file descriptors before moving the shard
// directory. Don't call removeOnDestroy since that would
// attempt to close the fds after the directory has been moved.
if (!shard->tryClose())
{
JLOG(j_.warn())
<< "can't close shard to move to historical path";
return;
}
// Create a shard instance at the new location
shard = std::make_shared<Shard>(app_, *this, shardIndex, dst, j_);
try
{
// Move the shard directory to the new path
boost::filesystem::rename(
shard->getDir().string(),
dst / std::to_string(shardIndex));
}
catch (...)
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to move to historical storage";
return;
}
// Open the new shard
if (!shard->init(scheduler_, *ctx_))
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to open in historical storage";
shard->removeOnDestroy();
shard.reset();
}
};
// Create a shard instance at the new location
shard =
std::make_shared<Shard>(app_, *this, shardIndex, dst, j_);
// See if either of the recent shards needs to be updated
bool const curNotSynched =
latestShardIndex_ && *latestShardIndex_ != latestShardIndex;
bool const prevNotSynched = secondLatestShardIndex_ &&
*secondLatestShardIndex_ != latestShardIndex - 1;
// Open the new shard
if (!shard->init(scheduler_, *ctx_))
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to open in historical storage";
shard->removeOnDestroy();
shard.reset();
}
}
// A new shard has been published. Move outdated
// shards to historical storage as needed
if (curNotSynched || prevNotSynched)
{
if (prev)
{
// Move the formerly second latest shard to historical storage
if (keepShard(*prev) && separateHistoricalPath)
moveShard(*prev);
prev = std::nullopt;
}
if (cur)
{
// The formerly latest shard is now the second latest
if (cur == latestShardIndex - 1)
prev = cur;
// The formerly latest shard is no longer a 'recent' shard
else
{
JLOG(j_.warn())
<< "can't find shard to move to historical path";
}
};
// See if either of the recent shards needs to be updated
bool const curNotSynched =
latestShardIndex_ && *latestShardIndex_ != latestShardIndex;
bool const prevNotSynched = secondLatestShardIndex_ &&
*secondLatestShardIndex_ != latestShardIndex - 1;
// A new shard has been published. Move outdated
// shards to historical storage as needed
if (curNotSynched || prevNotSynched)
{
if (prev)
{
// Move the formerly second latest shard to historical storage
if (keepShard(*prev) && separateHistoricalPath)
{
moveShard(*prev);
}
prev = std::nullopt;
// Move the formerly latest shard to historical storage
if (keepShard(*cur) && separateHistoricalPath)
moveShard(*cur);
}
if (cur)
{
// The formerly latest shard is now the second latest
if (cur == latestShardIndex - 1)
{
prev = cur;
}
// The formerly latest shard is no longer a 'recent' shard
else
{
// Move the formerly latest shard to historical storage
if (keepShard(*cur) && separateHistoricalPath)
{
moveShard(*cur);
}
}
cur = std::nullopt;
}
cur = std::nullopt;
}
}
}
@@ -1941,7 +1867,7 @@ DatabaseShardImp::chooseHistoricalPath(std::lock_guard<std::mutex> const&) const
}
bool
DatabaseShardImp::checkHistoricalPaths() const
DatabaseShardImp::checkHistoricalPaths(std::lock_guard<std::mutex> const&) const
{
#if BOOST_OS_LINUX
// Each historical shard path must correspond
@@ -2031,7 +1957,7 @@ DatabaseShardImp::callForLedgerSQL(
auto shardIndex = seqToShardIndex(ledgerSeq);
if (shards_.count(shardIndex) &&
shards_[shardIndex]->getState() == Shard::State::final)
shards_[shardIndex]->getState() == ShardState::finalized)
{
return shards_[shardIndex]->callForLedgerSQL(callback);
}
@@ -2049,7 +1975,7 @@ DatabaseShardImp::callForTransactionSQL(
auto shardIndex = seqToShardIndex(ledgerSeq);
if (shards_.count(shardIndex) &&
shards_[shardIndex]->getState() == Shard::State::final)
shards_[shardIndex]->getState() == ShardState::finalized)
{
return shards_[shardIndex]->callForTransactionSQL(callback);
}
@@ -2075,7 +2001,7 @@ DatabaseShardImp::iterateShardsForward(
for (; it != eit; it++)
{
if (it->second->getState() == Shard::State::final)
if (it->second->getState() == ShardState::finalized)
{
if (!visit(*it->second))
return false;
@@ -2126,7 +2052,7 @@ DatabaseShardImp::iterateShardsBack(
for (; it != eit; it++)
{
if (it->second->getState() == Shard::State::final &&
if (it->second->getState() == ShardState::finalized &&
(!maxShardIndex || it->first <= *maxShardIndex))
{
if (!visit(*it->second))
@@ -2159,6 +2085,41 @@ DatabaseShardImp::iterateTransactionSQLsBack(
});
}
std::unique_ptr<ShardInfo>
DatabaseShardImp::getShardInfo(std::lock_guard<std::mutex> const&) const
{
auto shardInfo{std::make_unique<ShardInfo>()};
for (auto const& [_, shard] : shards_)
{
shardInfo->update(
shard->index(), shard->getState(), shard->getPercentProgress());
}
for (auto const shardIndex : preparedIndexes_)
shardInfo->update(shardIndex, ShardState::queued, 0);
return shardInfo;
}
size_t
DatabaseShardImp::getNumTasks() const
{
std::lock_guard lock(mutex_);
return taskQueue_.size();
}
void
DatabaseShardImp::updatePeers(std::lock_guard<std::mutex> const& lock) const
{
if (!app_.config().standalone() &&
app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED)
{
auto const message{getShardInfo(lock)->makeMessage(app_)};
app_.overlay().foreach(send_always(std::make_shared<Message>(
message, protocol::mtPEER_SHARD_INFO_V2)));
}
}
//------------------------------------------------------------------------------
std::unique_ptr<DatabaseShard>

View File

@@ -54,7 +54,7 @@ public:
[[nodiscard]] bool
init() override;
[[nodiscard]] std::optional<std::uint32_t>
std::optional<std::uint32_t>
prepareLedger(std::uint32_t validLedgerSeq) override;
bool
@@ -76,43 +76,11 @@ public:
void
setStored(std::shared_ptr<Ledger const> const& ledger) override;
std::string
getCompleteShards() override;
std::unique_ptr<ShardInfo>
getShardInfo() const override;
std::uint32_t
ledgersPerShard() const override
{
return ledgersPerShard_;
}
std::uint32_t
earliestShardIndex() const override
{
return earliestShardIndex_;
}
std::uint32_t
seqToShardIndex(std::uint32_t ledgerSeq) const override
{
assert(ledgerSeq >= earliestLedgerSeq());
return NodeStore::seqToShardIndex(ledgerSeq, ledgersPerShard_);
}
std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const override
{
assert(shardIndex >= earliestShardIndex_);
if (shardIndex <= earliestShardIndex_)
return earliestLedgerSeq();
return 1 + (shardIndex * ledgersPerShard_);
}
std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const override
{
assert(shardIndex >= earliestShardIndex_);
return (shardIndex + 1) * ledgersPerShard_;
}
size_t
getNumTasks() const override;
boost::filesystem::path const&
getRootDir() const override
@@ -268,9 +236,6 @@ private:
// If new shards can be stored
bool canAdd_{true};
// Complete shard indexes
std::string status_;
// The name associated with the backend used with the shard store
std::string backendName_;
@@ -283,14 +248,6 @@ private:
// Storage space utilized by the shard store (in bytes)
std::uint64_t fileSz_{0};
// Each shard stores 16384 ledgers. The earliest shard may store
// less if the earliest ledger sequence truncates its beginning.
// The value should only be altered for unit tests.
std::uint32_t ledgersPerShard_ = ledgersPerShardDefault;
// The earliest shard index
std::uint32_t earliestShardIndex_;
// Average storage space required by a shard (in bytes)
std::uint64_t avgShardFileSz_;
@@ -348,16 +305,11 @@ private:
bool writeSQLite,
std::optional<uint256> const& expectedHash);
// Set storage and file descriptor usage stats
// Update storage and file descriptor usage stats
void
setFileStats();
updateFileStats();
// Update status string
// Lock must be held
void
updateStatus(std::lock_guard<std::mutex> const&);
// Returns true if the filesystem has enough storage
// Returns true if the file system has enough storage
// available to hold the specified number of shards.
// The value of pathDesignation determines whether
// the shard(s) in question are historical and thus
@@ -406,9 +358,6 @@ private:
boost::filesystem::path
chooseHistoricalPath(std::lock_guard<std::mutex> const&) const;
bool
checkHistoricalPaths() const;
/**
* @brief iterateShardsForward Visits all shards starting from given
* in ascending order and calls given callback function to each
@@ -436,6 +385,16 @@ private:
iterateShardsBack(
std::optional<std::uint32_t> maxShardIndex,
std::function<bool(Shard& shard)> const& visit);
bool
checkHistoricalPaths(std::lock_guard<std::mutex> const&) const;
std::unique_ptr<ShardInfo>
getShardInfo(std::lock_guard<std::mutex> const&) const;
// Update peers with the status of every complete and incomplete shard
void
updatePeers(std::lock_guard<std::mutex> const& lock) const;
};
} // namespace NodeStore

View File

@@ -56,9 +56,7 @@ Shard::Shard(
, index_(index)
, firstSeq_(db.firstLedgerSeq(index))
, lastSeq_(std::max(firstSeq_, db.lastLedgerSeq(index)))
, maxLedgers_(
index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1
: db.ledgersPerShard())
, maxLedgers_(db.maxLedgers(index))
, dir_((dir.empty() ? db.getRootDir() : dir) / std::to_string(index_))
{
}
@@ -146,7 +144,7 @@ bool
Shard::tryClose()
{
// Keep database open if being acquired or finalized
if (state_ != final)
if (state_ != ShardState::finalized)
return false;
std::lock_guard lock(mutex_);
@@ -189,7 +187,7 @@ Shard::tryClose()
std::optional<std::uint32_t>
Shard::prepare()
{
if (state_ != acquire)
if (state_ != ShardState::acquire)
{
JLOG(j_.warn()) << "shard " << index_
<< " prepare called when not acquiring";
@@ -212,7 +210,7 @@ Shard::prepare()
bool
Shard::storeNodeObject(std::shared_ptr<NodeObject> const& nodeObject)
{
if (state_ != acquire)
if (state_ != ShardState::acquire)
{
// The import node store case is an exception
if (nodeObject->getHash() != finalKey)
@@ -296,7 +294,7 @@ Shard::storeLedger(
std::shared_ptr<Ledger const> const& next)
{
StoreLedgerResult result;
if (state_ != acquire)
if (state_ != ShardState::acquire)
{
// Ignore residual calls from InboundLedgers
JLOG(j_.trace()) << "shard " << index_ << ". Not acquiring";
@@ -418,20 +416,21 @@ Shard::storeLedger(
bool
Shard::setLedgerStored(std::shared_ptr<Ledger const> const& ledger)
{
if (state_ != acquire)
if (state_ != ShardState::acquire)
{
// Ignore residual calls from InboundLedgers
JLOG(j_.trace()) << "shard " << index_ << " not acquiring";
return false;
}
auto fail = [&](std::string const& msg) {
JLOG(j_.error()) << "shard " << index_ << ". " << msg;
return false;
};
auto const ledgerSeq{ledger->info().seq};
if (ledgerSeq < firstSeq_ || ledgerSeq > lastSeq_)
{
JLOG(j_.error()) << "shard " << index_ << " invalid ledger sequence "
<< ledgerSeq;
return false;
}
return fail("Invalid ledger sequence " + std::to_string(ledgerSeq));
auto const scopedCount{makeBackendCount()};
if (!scopedCount)
@@ -444,11 +443,8 @@ Shard::setLedgerStored(std::shared_ptr<Ledger const> const& ledger)
{
std::lock_guard lock(mutex_);
if (!acquireInfo_)
{
JLOG(j_.error())
<< "shard " << index_ << " missing acquire SQLite database";
return false;
}
return fail("Missing acquire SQLite database");
if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))
{
// Ignore redundant calls
@@ -459,7 +455,7 @@ Shard::setLedgerStored(std::shared_ptr<Ledger const> const& ledger)
}
if (!storeSQLite(ledger))
return false;
return fail("Failed to store ledger");
std::lock_guard lock(mutex_);
@@ -491,15 +487,16 @@ Shard::setLedgerStored(std::shared_ptr<Ledger const> const& ledger)
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "shard " << index_
<< ". Exception caught in function " << __func__
<< ". Error: " << e.what();
acquireInfo_->storedSeqs.erase(ledgerSeq);
return false;
return fail(
std::string(". Exception caught in function ") + __func__ +
". Error: " + e.what());
}
if (boost::icl::length(acquireInfo_->storedSeqs) >= maxLedgers_)
state_ = complete;
// Update progress
progress_ = boost::icl::length(acquireInfo_->storedSeqs);
if (progress_ == maxLedgers_)
state_ = ShardState::complete;
setFileStats(lock);
JLOG(j_.trace()) << "shard " << index_ << " stored ledger sequence "
@@ -512,7 +509,7 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const
{
if (ledgerSeq < firstSeq_ || ledgerSeq > lastSeq_)
return false;
if (state_ != acquire)
if (state_ != ShardState::acquire)
return true;
std::lock_guard lock(mutex_);
@@ -525,12 +522,6 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const
return boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq);
}
void
Shard::sweep()
{
// nothing to do
}
std::chrono::steady_clock::time_point
Shard::getLastUse() const
{
@@ -568,8 +559,6 @@ Shard::finalize(bool writeSQLite, std::optional<uint256> const& referenceHash)
if (!scopedCount)
return false;
state_ = finalizing;
uint256 hash{0};
std::uint32_t ledgerSeq{0};
auto fail = [&](std::string const& msg) {
@@ -579,12 +568,17 @@ Shard::finalize(bool writeSQLite, std::optional<uint256> const& referenceHash)
<< (ledgerSeq == 0 ? ""
: ". Ledger sequence " +
std::to_string(ledgerSeq));
state_ = finalizing;
state_ = ShardState::finalizing;
progress_ = 0;
busy_ = false;
return false;
};
try
{
state_ = ShardState::finalizing;
progress_ = 0;
// Check if a final key has been stored
if (std::shared_ptr<NodeObject> nodeObject;
backend_->fetch(finalKey.data(), &nodeObject) == Status::ok)
@@ -716,6 +710,10 @@ Shard::finalize(bool writeSQLite, std::optional<uint256> const& referenceHash)
hash = ledger->info().parentHash;
next = std::move(ledger);
// Update progress
progress_ = maxLedgers_ - (ledgerSeq - firstSeq_);
--ledgerSeq;
fullBelowCache->reset();
@@ -802,7 +800,9 @@ Shard::finalize(bool writeSQLite, std::optional<uint256> const& referenceHash)
// Re-open deterministic shard
if (!open(lock))
return false;
return fail("failed to open");
assert(state_ == ShardState::finalized);
// Allow all other threads work with the shard
busy_ = false;
@@ -829,7 +829,8 @@ Shard::open(std::lock_guard<std::mutex> const& lock)
txSQLiteDB_.reset();
acquireInfo_.reset();
state_ = acquire;
state_ = ShardState::acquire;
progress_ = 0;
if (!preexist)
remove_all(dir_);
@@ -841,18 +842,19 @@ Shard::open(std::lock_guard<std::mutex> const& lock)
return false;
};
auto createAcquireInfo = [this, &config]() {
acquireInfo_ = std::make_unique<AcquireInfo>();
DatabaseCon::Setup setup;
setup.startUp = config.standalone() ? config.LOAD : config.START_UP;
setup.standAlone = config.standalone();
setup.dataDir = dir_;
setup.useGlobalPragma = true;
acquireInfo_ = std::make_unique<AcquireInfo>();
acquireInfo_->SQLiteDB = makeAcquireDB(
setup,
DatabaseCon::CheckpointerSetup{&app_.getJobQueue(), &app_.logs()});
state_ = acquire;
state_ = ShardState::acquire;
progress_ = 0;
};
try
@@ -890,14 +892,14 @@ Shard::open(std::lock_guard<std::mutex> const& lock)
}
// Check if backend is complete
if (boost::icl::length(storedSeqs) == maxLedgers_)
state_ = complete;
progress_ = boost::icl::length(storedSeqs);
if (progress_ == maxLedgers_)
state_ = ShardState::complete;
}
}
else
{
// A shard that is final or its backend is complete
// and ready to be finalized
// A shard with a finalized or complete state
std::shared_ptr<NodeObject> nodeObject;
if (backend_->fetch(finalKey.data(), &nodeObject) != Status::ok)
{
@@ -920,10 +922,12 @@ Shard::open(std::lock_guard<std::mutex> const& lock)
if (exists(dir_ / LgrDBName) && exists(dir_ / TxDBName))
{
lastAccess_ = std::chrono::steady_clock::now();
state_ = final;
state_ = ShardState::finalized;
}
else
state_ = complete;
state_ = ShardState::complete;
progress_ = maxLedgers_;
}
}
catch (std::exception const& e)
@@ -949,7 +953,7 @@ Shard::initSQLite(std::lock_guard<std::mutex> const&)
setup.startUp = config.standalone() ? config.LOAD : config.START_UP;
setup.standAlone = config.standalone();
setup.dataDir = dir_;
setup.useGlobalPragma = (state_ != complete);
setup.useGlobalPragma = (state_ != ShardState::complete);
return setup;
}();
@@ -961,21 +965,48 @@ Shard::initSQLite(std::lock_guard<std::mutex> const&)
if (txSQLiteDB_)
txSQLiteDB_.reset();
if (state_ == final)
switch (state_)
{
auto [lgr, tx] = makeShardCompleteLedgerDBs(config, setup);
txSQLiteDB_ = std::move(tx);
lgrSQLiteDB_ = std::move(lgr);
}
else
{
auto [lgr, tx] = makeShardIncompleteLedgerDBs(
config,
setup,
DatabaseCon::CheckpointerSetup{
&app_.getJobQueue(), &app_.logs()});
txSQLiteDB_ = std::move(tx);
lgrSQLiteDB_ = std::move(lgr);
case ShardState::complete:
case ShardState::finalizing:
case ShardState::finalized: {
auto [lgr, tx] = makeShardCompleteLedgerDBs(config, setup);
lgrSQLiteDB_ = std::move(lgr);
lgrSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(
SizedItem::lgrDBCache, std::nullopt)));
txSQLiteDB_ = std::move(tx);
txSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(
SizedItem::txnDBCache, std::nullopt)));
break;
}
// case ShardState::acquire:
// case ShardState::queued:
default: {
// Incomplete shards use a Write Ahead Log for performance
auto [lgr, tx] = makeShardIncompleteLedgerDBs(
config,
setup,
DatabaseCon::CheckpointerSetup{
&app_.getJobQueue(), &app_.logs()});
lgrSQLiteDB_ = std::move(lgr);
lgrSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(SizedItem::lgrDBCache)));
txSQLiteDB_ = std::move(tx);
txSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(SizedItem::txnDBCache)));
break;
}
}
}
catch (std::exception const& e)
@@ -1200,7 +1231,7 @@ Shard::makeBackendCount()
if (!open(lock))
return {nullptr};
}
else if (state_ == final)
else if (state_ == ShardState::finalized)
lastAccess_ = std::chrono::steady_clock::now();
return Shard::Count(&backendCount_);

View File

@@ -23,6 +23,7 @@
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/rdb/RelationalDBInterface.h>
#include <ripple/basics/BasicConfig.h>
#include <ripple/basics/MathUtilities.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/nodestore/NodeObject.h>
@@ -52,17 +53,19 @@ class DatabaseShard;
class Shard final
{
public:
enum class State {
acquire, // Being acquired
complete, // Backend contains all ledgers but is not yet final
finalizing, // Being finalized
final // Database verified, shard is immutable
};
/// Copy constructor (disallowed)
Shard(Shard const&) = delete;
static constexpr State acquire = State::acquire;
static constexpr State complete = State::complete;
static constexpr State finalizing = State::finalizing;
static constexpr State final = State::final;
/// Move constructor (disallowed)
Shard(Shard&&) = delete;
// Copy assignment (disallowed)
Shard&
operator=(Shard const&) = delete;
// Move assignment (disallowed)
Shard&
operator=(Shard&&) = delete;
Shard(
Application& app,
@@ -102,7 +105,7 @@ public:
/** Notify shard to prepare for shutdown.
*/
void
stop()
stop() noexcept
{
stop_ = true;
}
@@ -140,17 +143,14 @@ public:
[[nodiscard]] bool
containsLedger(std::uint32_t ledgerSeq) const;
void
sweep();
[[nodiscard]] std::uint32_t
index() const
index() const noexcept
{
return index_;
}
[[nodiscard]] boost::filesystem::path const&
getDir() const
getDir() const noexcept
{
return dir_;
}
@@ -164,12 +164,21 @@ public:
[[nodiscard]] std::pair<std::uint64_t, std::uint32_t>
getFileInfo() const;
[[nodiscard]] State
getState() const
[[nodiscard]] ShardState
getState() const noexcept
{
return state_;
}
/** Returns a percent signifying how complete
the current state of the shard is.
*/
[[nodiscard]] std::uint32_t
getPercentProgress() const noexcept
{
return calculatePercent(progress_, maxLedgers_);
}
[[nodiscard]] std::int32_t
getWriteLoad();
@@ -192,7 +201,7 @@ public:
/** Enables removal of the shard directory on destruction.
*/
void
removeOnDestroy()
removeOnDestroy() noexcept
{
removeOnDestroy_ = true;
}
@@ -244,28 +253,28 @@ private:
public:
Count(Count const&) = delete;
Count&
operator=(Count&&) = delete;
Count&
operator=(Count const&) = delete;
Count&
operator=(Count&&) = delete;
Count(Count&& other) : counter_(other.counter_)
Count(Count&& other) noexcept : counter_(other.counter_)
{
other.counter_ = nullptr;
}
Count(std::atomic<std::uint32_t>* counter) : counter_(counter)
Count(std::atomic<std::uint32_t>* counter) noexcept : counter_(counter)
{
if (counter_)
++(*counter_);
}
~Count()
~Count() noexcept
{
if (counter_)
--(*counter_);
}
operator bool() const
operator bool() const noexcept
{
return counter_ != nullptr;
}
@@ -322,7 +331,7 @@ private:
std::unique_ptr<DatabaseCon> txSQLiteDB_;
// Tracking information used only when acquiring a shard from the network.
// If the shard is final, this member will be null.
// If the shard is finalized, this member will be null.
std::unique_ptr<AcquireInfo> acquireInfo_;
// Older shard without an acquire database or final key
@@ -335,12 +344,16 @@ private:
// Determines if the shard busy with replacing by deterministic one
std::atomic<bool> busy_{false};
std::atomic<State> state_{State::acquire};
// State of the shard
std::atomic<ShardState> state_{ShardState::acquire};
// Number of ledgers processed for the current shard state
std::atomic<std::uint32_t> progress_{0};
// Determines if the shard directory should be removed in the destructor
std::atomic<bool> removeOnDestroy_{false};
// The time of the last access of a shard that has a final state
// The time of the last access of a shard with a finalized state
std::chrono::steady_clock::time_point lastAccess_;
// Open shard databases

View File

@@ -0,0 +1,138 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/core/TimeKeeper.h>
#include <ripple/nodestore/ShardInfo.h>
#include <ripple/protocol/SecretKey.h>
#include <ripple/protocol/digest.h>
#include <boost/algorithm/clamp.hpp>
namespace ripple {
namespace NodeStore {
std::string
ShardInfo::finalizedToString() const
{
if (!finalized_.empty())
return ripple::to_string(finalized_);
return {};
}
std::string
ShardInfo::incompleteToString() const
{
std::string result;
if (!incomplete_.empty())
{
for (auto const& [shardIndex, incomplete] : incomplete_)
{
result += std::to_string(shardIndex) + ":" +
std::to_string(incomplete.percentProgress()) + ",";
}
result.pop_back();
}
return result;
}
bool
ShardInfo::update(
std::uint32_t shardIndex,
ShardState state,
std::uint32_t percentProgress)
{
if (state == ShardState::finalized)
{
if (boost::icl::contains(finalized_, shardIndex))
return false;
finalized_.insert(shardIndex);
return true;
}
return incomplete_.emplace(shardIndex, Incomplete(state, percentProgress))
.second;
}
protocol::TMPeerShardInfoV2
ShardInfo::makeMessage(Application& app)
{
protocol::TMPeerShardInfoV2 message;
Serializer s;
s.add32(HashPrefix::shardInfo);
// Set the message creation time
msgTimestamp_ = app.timeKeeper().now();
{
auto const timestamp{msgTimestamp_.time_since_epoch().count()};
message.set_timestamp(timestamp);
s.add32(timestamp);
}
if (!incomplete_.empty())
{
message.mutable_incomplete()->Reserve(incomplete_.size());
for (auto const& [shardIndex, incomplete] : incomplete_)
{
auto tmIncomplete{message.add_incomplete()};
tmIncomplete->set_shardindex(shardIndex);
s.add32(shardIndex);
static_assert(std::is_same_v<
std::underlying_type_t<decltype(incomplete.state())>,
std::uint32_t>);
auto const state{static_cast<std::uint32_t>(incomplete.state())};
tmIncomplete->set_state(state);
s.add32(state);
// Set progress if greater than zero
auto const percentProgress{incomplete.percentProgress()};
if (percentProgress > 0)
{
tmIncomplete->set_progress(percentProgress);
s.add32(percentProgress);
}
}
}
if (!finalized_.empty())
{
auto const str{ripple::to_string(finalized_)};
message.set_finalized(str);
s.addRaw(str.data(), str.size());
}
// Set the public key
auto const& publicKey{app.nodeIdentity().first};
message.set_publickey(publicKey.data(), publicKey.size());
// Create a digital signature using the node private key
auto const signature{sign(publicKey, app.nodeIdentity().second, s.slice())};
// Set the digital signature
message.set_signature(signature.data(), signature.size());
return message;
}
} // namespace NodeStore
} // namespace ripple

View File

@@ -44,6 +44,13 @@ TaskQueue::addTask(std::function<void()> task)
workers_.addTask();
}
size_t
TaskQueue::size() const
{
std::lock_guard lock{mutex_};
return tasks_.size() + processing_;
}
void
TaskQueue::processTask(int instance)
{
@@ -51,13 +58,18 @@ TaskQueue::processTask(int instance)
{
std::lock_guard lock{mutex_};
assert(!tasks_.empty());
assert(!tasks_.empty());
task = std::move(tasks_.front());
tasks_.pop();
++processing_;
}
task();
std::lock_guard lock{mutex_};
--processing_;
}
} // namespace NodeStore

View File

@@ -43,10 +43,16 @@ public:
void
addTask(std::function<void()> task);
/** Return the queue size
*/
[[nodiscard]] size_t
size() const;
private:
std::mutex mutex_;
mutable std::mutex mutex_;
Workers workers_;
std::queue<std::function<void()>> tasks_;
std::uint64_t processing_{0};
void
processTask(int instance) override;

View File

@@ -208,11 +208,12 @@ public:
/** Returns information reported to the crawl shard RPC command.
@param includePublicKey include peer public keys in the result.
@param hops the maximum jumps the crawler will attempt.
The number of hops achieved is not guaranteed.
*/
virtual Json::Value
crawlShards(bool pubKey, std::uint32_t hops) = 0;
crawlShards(bool includePublicKey, std::uint32_t hops) = 0;
/** Returns the ID of the network this server is configured for, if any.

View File

@@ -32,8 +32,8 @@ namespace Resource {
class Charge;
}
// Maximum hops to attempt when crawling shards. cs = crawl shards
static constexpr std::uint32_t csHopLimit = 3;
// Maximum hops to relay the peer shard info request
static constexpr std::uint32_t relayLimit = 3;
enum class ProtocolFeature {
ValidatorListPropagation,
@@ -113,8 +113,6 @@ public:
virtual void
ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0;
virtual bool
hasShard(std::uint32_t shardIndex) const = 0;
virtual bool
hasTxSet(uint256 const& hash) const = 0;
virtual void
cycleStatus() = 0;

View File

@@ -93,13 +93,13 @@ Message::compress()
case protocol::mtSTATUS_CHANGE:
case protocol::mtHAVE_SET:
case protocol::mtVALIDATION:
case protocol::mtGET_SHARD_INFO:
case protocol::mtSHARD_INFO:
case protocol::mtGET_PEER_SHARD_INFO:
case protocol::mtPEER_SHARD_INFO:
case protocol::mtPROOF_PATH_REQ:
case protocol::mtPROOF_PATH_RESPONSE:
case protocol::mtREPLAY_DELTA_REQ:
case protocol::mtGET_PEER_SHARD_INFO_V2:
case protocol::mtPEER_SHARD_INFO_V2:
break;
}
return false;

View File

@@ -681,98 +681,99 @@ OverlayImpl::reportTraffic(
}
Json::Value
OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays)
{
using namespace std::chrono;
using namespace std::chrono_literals;
Json::Value jv(Json::objectValue);
auto const numPeers{size()};
if (numPeers == 0)
// Add shard info from this server to json result
if (auto shardStore = app_.getShardStore())
{
if (includePublicKey)
jv[jss::public_key] =
toBase58(TokenType::NodePublic, app_.nodeIdentity().first);
auto const shardInfo{shardStore->getShardInfo()};
if (!shardInfo->finalized().empty())
jv[jss::complete_shards] = shardInfo->finalizedToString();
if (!shardInfo->incomplete().empty())
jv[jss::incomplete_shards] = shardInfo->incompleteToString();
}
if (relays == 0 || size() == 0)
return jv;
// If greater than a hop away, we may need to gather or freshen data
if (hops > 0)
{
// Prevent crawl spamming
clock_type::time_point const last(csLast_.load());
if ((clock_type::now() - last) > 60s)
protocol::TMGetPeerShardInfoV2 tmGPS;
tmGPS.set_relays(relays);
// Wait if a request is in progress
std::unique_lock<std::mutex> csLock{csMutex_};
if (!csIDs_.empty())
csCV_.wait(csLock);
{
auto const timeout(seconds((hops * hops) * 10));
std::unique_lock<std::mutex> l{csMutex_};
std::lock_guard lock{mutex_};
for (auto const& id : ids_)
csIDs_.emplace(id.first);
}
// Check if already requested
if (csIDs_.empty())
{
{
std::lock_guard lock{mutex_};
for (auto& id : ids_)
csIDs_.emplace(id.first);
}
// Request peer shard info
foreach(send_always(std::make_shared<Message>(
tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)));
// Relay request to active peers
protocol::TMGetPeerShardInfo tmGPS;
tmGPS.set_hops(hops);
foreach(send_always(std::make_shared<Message>(
tmGPS, protocol::mtGET_PEER_SHARD_INFO)));
if (csCV_.wait_for(l, timeout) == std::cv_status::timeout)
{
csIDs_.clear();
csCV_.notify_all();
}
csLast_ = duration_cast<seconds>(
clock_type::now().time_since_epoch());
}
else
csCV_.wait_for(l, timeout);
if (csCV_.wait_for(csLock, seconds(60)) == std::cv_status::timeout)
{
csIDs_.clear();
csCV_.notify_all();
}
}
// Combine the shard info from peers and their sub peers
hash_map<PublicKey, PeerImp::ShardInfo> peerShardInfo;
for_each([&](std::shared_ptr<PeerImp> const& peer) {
if (auto psi = peer->getPeerShardInfo())
// Combine shard info from peers
hash_map<PublicKey, NodeStore::ShardInfo> peerShardInfo;
for_each([&](std::shared_ptr<PeerImp>&& peer) {
auto const psi{peer->getPeerShardInfos()};
for (auto const& [publicKey, shardInfo] : psi)
{
// e is non-const so it may be moved from
for (auto& e : *psi)
{
auto it{peerShardInfo.find(e.first)};
if (it != peerShardInfo.end())
// The key exists so join the shard indexes.
it->second.shardIndexes += e.second.shardIndexes;
else
peerShardInfo.emplace(std::move(e));
}
auto const it{peerShardInfo.find(publicKey)};
if (it == peerShardInfo.end())
peerShardInfo.emplace(publicKey, shardInfo);
else if (shardInfo.msgTimestamp() > it->second.msgTimestamp())
it->second = shardInfo;
}
});
// Prepare json reply
auto& av = jv[jss::peers] = Json::Value(Json::arrayValue);
for (auto const& e : peerShardInfo)
// Add shard info to json result
if (!peerShardInfo.empty())
{
auto& pv{av.append(Json::Value(Json::objectValue))};
if (pubKey)
pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first);
auto& av = jv[jss::peers] = Json::Value(Json::arrayValue);
for (auto const& [publicKey, shardInfo] : peerShardInfo)
{
auto& pv{av.append(Json::Value(Json::objectValue))};
if (includePublicKey)
{
pv[jss::public_key] =
toBase58(TokenType::NodePublic, publicKey);
}
auto const& address{e.second.endpoint.address()};
if (!address.is_unspecified())
pv[jss::ip] = address.to_string();
pv[jss::complete_shards] = to_string(e.second.shardIndexes);
if (!shardInfo.finalized().empty())
pv[jss::complete_shards] = shardInfo.finalizedToString();
if (!shardInfo.incomplete().empty())
pv[jss::incomplete_shards] = shardInfo.incompleteToString();
}
}
return jv;
}
void
OverlayImpl::lastLink(std::uint32_t id)
OverlayImpl::endOfPeerChain(std::uint32_t id)
{
// Notify threads when every peer has received a last link.
// This doesn't account for every node that might reply but
// it is adequate.
std::lock_guard l{csMutex_};
if (csIDs_.erase(id) && csIDs_.empty())
// Notify threads if all peers have received a reply from all peer chains
std::lock_guard csLock{csMutex_};
csIDs_.erase(id);
if (csIDs_.empty())
csCV_.notify_all();
}
@@ -834,8 +835,16 @@ OverlayImpl::getOverlayInfo()
pv[jss::complete_ledgers] =
std::to_string(minSeq) + "-" + std::to_string(maxSeq);
if (auto shardIndexes = sp->getShardIndexes())
pv[jss::complete_shards] = to_string(*shardIndexes);
auto const peerShardInfos{sp->getPeerShardInfos()};
auto const it{peerShardInfos.find(sp->getNodePublic())};
if (it != peerShardInfos.end())
{
auto const& shardInfo{it->second};
if (!shardInfo.finalized().empty())
pv[jss::complete_shards] = shardInfo.finalizedToString();
if (!shardInfo.incomplete().empty())
pv[jss::incomplete_shards] = shardInfo.incompleteToString();
}
});
return jv;

View File

@@ -118,8 +118,7 @@ private:
std::atomic<uint64_t> peerDisconnects_{0};
std::atomic<uint64_t> peerDisconnectsCharges_{0};
// Last time we crawled peers for shard info. 'cs' = crawl shards
std::atomic<std::chrono::seconds> csLast_{std::chrono::seconds{0}};
// 'cs' = crawl shards
std::mutex csMutex_;
std::condition_variable csCV_;
// Peer IDs expecting to receive a last link notification
@@ -370,14 +369,14 @@ public:
}
Json::Value
crawlShards(bool pubKey, std::uint32_t hops) override;
crawlShards(bool includePublicKey, std::uint32_t relays) override;
/** Called when the last link from a peer chain is received.
/** Called when the reply from the last peer in a peer chain is received.
@param id peer id that received the shard info.
*/
void
lastLink(std::uint32_t id);
endOfPeerChain(std::uint32_t id);
/** Updates message count for validator/peer. Sends TMSquelch if the number
* of messages for N peers reaches threshold T. A message is counted

View File

@@ -470,8 +470,17 @@ PeerImp::hasLedger(uint256 const& hash, std::uint32_t seq) const
return true;
}
return seq >= app_.getNodeStore().earliestLedgerSeq() &&
hasShard(NodeStore::seqToShardIndex(seq));
if (seq >= app_.getNodeStore().earliestLedgerSeq())
{
std::lock_guard lock{shardInfoMutex_};
auto const it{shardInfos_.find(publicKey_)};
if (it != shardInfos_.end())
{
auto const shardIndex{app_.getNodeStore().seqToShardIndex(seq)};
return boost::icl::contains(it->second.finalized(), shardIndex);
}
}
return false;
}
void
@@ -483,16 +492,6 @@ PeerImp::ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const
maxSeq = maxLedger_;
}
bool
PeerImp::hasShard(std::uint32_t shardIndex) const
{
std::lock_guard l{shardInfoMutex_};
auto const it{shardInfo_.find(publicKey_)};
if (it != shardInfo_.end())
return boost::icl::contains(it->second.shardIndexes, shardIndex);
return false;
}
bool
PeerImp::hasTxSet(uint256 const& hash) const
{
@@ -575,23 +574,11 @@ PeerImp::fail(std::string const& name, error_code ec)
close();
}
std::optional<RangeSet<std::uint32_t>>
PeerImp::getShardIndexes() const
hash_map<PublicKey, NodeStore::ShardInfo> const
PeerImp::getPeerShardInfos() const
{
std::lock_guard l{shardInfoMutex_};
auto it{shardInfo_.find(publicKey_)};
if (it != shardInfo_.end())
return it->second.shardIndexes;
return std::nullopt;
}
std::optional<hash_map<PublicKey, PeerImp::ShardInfo>>
PeerImp::getPeerShardInfo() const
{
std::lock_guard l{shardInfoMutex_};
if (!shardInfo_.empty())
return shardInfo_;
return std::nullopt;
return shardInfos_;
}
void
@@ -845,9 +832,9 @@ PeerImp::doProtocolStart()
send(m);
// Request shard info from peer
protocol::TMGetPeerShardInfo tmGPS;
tmGPS.set_hops(0);
send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO));
protocol::TMGetPeerShardInfoV2 tmGPS;
tmGPS.set_relays(0);
send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
setTimer();
}
@@ -1126,222 +1113,292 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
app_.getFeeTrack().setClusterFee(clusterFee);
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetShardInfo> const& m)
{
// DEPRECATED
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMShardInfo> const& m)
{
// DEPRECATED
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetPeerShardInfo> const& m)
{
auto badData = [&](std::string msg) {
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) << msg;
};
if (m->hops() > csHopLimit)
return badData("Invalid hops: " + std::to_string(m->hops()));
if (m->peerchain_size() > csHopLimit)
return badData("Invalid peer chain");
// Reply with shard info we may have
if (auto shardStore = app_.getShardStore())
{
fee_ = Resource::feeLightPeer;
auto shards{shardStore->getCompleteShards()};
if (!shards.empty())
{
protocol::TMPeerShardInfo reply;
reply.set_shardindexes(shards);
if (m->has_lastlink())
reply.set_lastlink(true);
if (m->peerchain_size() > 0)
{
for (int i = 0; i < m->peerchain_size(); ++i)
{
if (!publicKeyType(makeSlice(m->peerchain(i).nodepubkey())))
return badData("Invalid peer chain public key");
}
*reply.mutable_peerchain() = m->peerchain();
}
send(std::make_shared<Message>(reply, protocol::mtPEER_SHARD_INFO));
JLOG(p_journal_.trace()) << "Sent shard indexes " << shards;
}
}
// Relay request to peers
if (m->hops() > 0)
{
fee_ = Resource::feeMediumBurdenPeer;
m->set_hops(m->hops() - 1);
if (m->hops() == 0)
m->set_lastlink(true);
m->add_peerchain()->set_nodepubkey(
publicKey_.data(), publicKey_.size());
overlay_.foreach(send_if_not(
std::make_shared<Message>(*m, protocol::mtGET_PEER_SHARD_INFO),
match_peer(this)));
}
// DEPRECATED
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfo> const& m)
{
// DEPRECATED
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetPeerShardInfoV2> const& m)
{
auto badData = [&](std::string msg) {
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) << msg;
};
if (m->shardindexes().empty())
return badData("Missing shard indexes");
if (m->peerchain_size() > csHopLimit)
return badData("Invalid peer chain");
if (m->has_nodepubkey() && !publicKeyType(makeSlice(m->nodepubkey())))
return badData("Invalid public key");
// Verify relays
if (m->relays() > relayLimit)
return badData("Invalid relays");
// Check if the message should be forwarded to another peer
if (m->peerchain_size() > 0)
// Verify peer chain
// The peer chain should not contain this node's public key
// nor the public key of the sending peer
std::set<PublicKey> pubKeyChain;
pubKeyChain.insert(app_.nodeIdentity().first);
pubKeyChain.insert(publicKey_);
auto const peerChainSz{m->peerchain_size()};
if (peerChainSz > 0)
{
// Get the Public key of the last link in the peer chain
auto const s{
makeSlice(m->peerchain(m->peerchain_size() - 1).nodepubkey())};
if (!publicKeyType(s))
return badData("Invalid pubKey");
PublicKey peerPubKey(s);
if (peerChainSz > relayLimit)
return badData("Invalid peer chain size");
if (auto peer = overlay_.findPeerByPublicKey(peerPubKey))
if (peerChainSz + m->relays() > relayLimit)
return badData("Invalid relays and peer chain size");
for (int i = 0; i < peerChainSz; ++i)
{
if (!m->has_nodepubkey())
m->set_nodepubkey(publicKey_.data(), publicKey_.size());
auto const slice{makeSlice(m->peerchain(i).publickey())};
if (!m->has_endpoint())
// Verify peer public key
if (!publicKeyType(slice))
return badData("Invalid peer public key");
// Verify peer public key is unique in the peer chain
if (!pubKeyChain.emplace(slice).second)
return badData("Invalid peer public key");
}
}
// Reply with shard info this node may have
if (auto shardStore = app_.getShardStore())
{
auto reply{shardStore->getShardInfo()->makeMessage(app_)};
if (peerChainSz > 0)
*(reply.mutable_peerchain()) = m->peerchain();
send(std::make_shared<Message>(reply, protocol::mtPEER_SHARD_INFO_V2));
}
if (m->relays() == 0)
return;
// Charge originating peer a fee for requesting relays
if (peerChainSz == 0)
fee_ = Resource::feeMediumBurdenPeer;
// Add peer to the peer chain
m->add_peerchain()->set_publickey(publicKey_.data(), publicKey_.size());
// Relay the request to peers, exclude the peer chain
m->set_relays(m->relays() - 1);
overlay_.foreach(send_if_not(
std::make_shared<Message>(*m, protocol::mtGET_PEER_SHARD_INFO_V2),
[&](std::shared_ptr<Peer> const& peer) {
return pubKeyChain.find(peer->getNodePublic()) != pubKeyChain.end();
}));
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m)
{
// Find the earliest and latest shard indexes
auto const& db{app_.getNodeStore()};
auto const earliestShardIndex{db.earliestShardIndex()};
auto const latestShardIndex{[&]() -> std::optional<std::uint32_t> {
auto const curLedgerSeq{app_.getLedgerMaster().getCurrentLedgerIndex()};
if (curLedgerSeq >= db.earliestLedgerSeq())
return db.seqToShardIndex(curLedgerSeq);
return std::nullopt;
}()};
auto badData = [&](std::string msg) {
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) << msg;
};
// Used to create a digest and verify the message signature
Serializer s;
s.add32(HashPrefix::shardInfo);
// Verify message creation time
NodeStore::ShardInfo shardInfo;
{
auto const timestamp{
NetClock::time_point{std::chrono::seconds{m->timestamp()}}};
auto const now{app_.timeKeeper().now()};
if (timestamp > (now + 5s))
return badData("Invalid timestamp");
// Check if stale
using namespace std::chrono_literals;
if (timestamp < (now - 5min))
return badData("Stale timestamp");
s.add32(m->timestamp());
shardInfo.setMsgTimestamp(timestamp);
}
// Verify incomplete shards
auto const numIncomplete{m->incomplete_size()};
if (numIncomplete > 0)
{
if (latestShardIndex && numIncomplete > *latestShardIndex)
return badData("Invalid number of incomplete shards");
// Verify each incomplete shard
for (int i = 0; i < numIncomplete; ++i)
{
auto const& incomplete{m->incomplete(i)};
auto const shardIndex{incomplete.shardindex()};
// Verify shard index
if (shardIndex < earliestShardIndex ||
(latestShardIndex && shardIndex > latestShardIndex))
{
// Check if peer will share IP publicly
if (crawl())
m->set_endpoint(remote_address_.address().to_string());
else
m->set_endpoint("0");
return badData("Invalid incomplete shard index");
}
s.add32(shardIndex);
// Verify state
auto const state{static_cast<ShardState>(incomplete.state())};
switch (state)
{
// Incomplete states
case ShardState::acquire:
case ShardState::complete:
case ShardState::finalizing:
case ShardState::queued:
break;
// case ShardState::finalized:
default:
return badData("Invalid incomplete shard state");
};
s.add32(incomplete.state());
// Verify progress
std::uint32_t progress{0};
if (incomplete.has_progress())
{
progress = incomplete.progress();
if (progress < 1 || progress > 100)
return badData("Invalid incomplete shard progress");
s.add32(progress);
}
// Verify each incomplete shard is unique
if (!shardInfo.update(shardIndex, state, progress))
return badData("Invalid duplicate incomplete shards");
}
}
// Verify finalized shards
if (m->has_finalized())
{
auto const& str{m->finalized()};
if (str.empty())
return badData("Invalid finalized shards");
if (!shardInfo.setFinalizedFromString(str))
return badData("Invalid finalized shard indexes");
auto const& finalized{shardInfo.finalized()};
auto const numFinalized{boost::icl::length(finalized)};
if (numFinalized == 0 ||
boost::icl::first(finalized) < earliestShardIndex ||
(latestShardIndex &&
boost::icl::last(finalized) > latestShardIndex))
{
return badData("Invalid finalized shard indexes");
}
if (latestShardIndex &&
(numFinalized + numIncomplete) > *latestShardIndex)
{
return badData("Invalid number of finalized and incomplete shards");
}
s.addRaw(str.data(), str.size());
}
// Verify public key
auto slice{makeSlice(m->publickey())};
if (!publicKeyType(slice))
return badData("Invalid public key");
// Verify peer public key isn't this nodes's public key
PublicKey const publicKey(slice);
if (publicKey == app_.nodeIdentity().first)
return badData("Invalid public key");
// Verify signature
if (!verify(publicKey, s.slice(), makeSlice(m->signature()), false))
return badData("Invalid signature");
// Forward the message if a peer chain exists
auto const peerChainSz{m->peerchain_size()};
if (peerChainSz > 0)
{
// Verify peer chain
if (peerChainSz > relayLimit)
return badData("Invalid peer chain size");
// The peer chain should not contain this node's public key
// nor the public key of the sending peer
std::set<PublicKey> pubKeyChain;
pubKeyChain.insert(app_.nodeIdentity().first);
pubKeyChain.insert(publicKey_);
for (int i = 0; i < peerChainSz; ++i)
{
// Verify peer public key
slice = makeSlice(m->peerchain(i).publickey());
if (!publicKeyType(slice))
return badData("Invalid peer public key");
// Verify peer public key is unique in the peer chain
if (!pubKeyChain.emplace(slice).second)
return badData("Invalid peer public key");
}
// If last peer in the chain is connected, relay the message
PublicKey const peerPubKey(
makeSlice(m->peerchain(peerChainSz - 1).publickey()));
if (auto peer = overlay_.findPeerByPublicKey(peerPubKey))
{
m->mutable_peerchain()->RemoveLast();
peer->send(
std::make_shared<Message>(*m, protocol::mtPEER_SHARD_INFO));
std::make_shared<Message>(*m, protocol::mtPEER_SHARD_INFO_V2));
JLOG(p_journal_.trace())
<< "Relayed TMPeerShardInfo to peer with IP "
<< remote_address_.address().to_string();
<< "Relayed TMPeerShardInfoV2 from peer IP "
<< remote_address_.address().to_string() << " to peer IP "
<< peer->getRemoteAddress().to_string();
}
else
{
// Peer is no longer available so the relay ends
fee_ = Resource::feeUnwantedData;
JLOG(p_journal_.info()) << "Unable to route shard info";
}
return;
}
// Parse the shard indexes received in the shard info
RangeSet<std::uint32_t> shardIndexes;
{
if (!from_string(shardIndexes, m->shardindexes()))
return badData("Invalid shard indexes");
std::uint32_t earliestShard;
std::optional<std::uint32_t> latestShard;
{
auto const curLedgerSeq{
app_.getLedgerMaster().getCurrentLedgerIndex()};
if (auto shardStore = app_.getShardStore())
{
earliestShard = shardStore->earliestShardIndex();
if (curLedgerSeq >= shardStore->earliestLedgerSeq())
latestShard = shardStore->seqToShardIndex(curLedgerSeq);
}
else
{
auto const earliestLedgerSeq{
app_.getNodeStore().earliestLedgerSeq()};
earliestShard = NodeStore::seqToShardIndex(earliestLedgerSeq);
if (curLedgerSeq >= earliestLedgerSeq)
latestShard = NodeStore::seqToShardIndex(curLedgerSeq);
}
}
if (boost::icl::first(shardIndexes) < earliestShard ||
(latestShard && boost::icl::last(shardIndexes) > latestShard))
{
return badData("Invalid shard indexes");
}
}
// Get the IP of the node reporting the shard info
beast::IP::Endpoint endpoint;
if (m->has_endpoint())
{
if (m->endpoint() != "0")
{
auto result =
beast::IP::Endpoint::from_string_checked(m->endpoint());
if (!result)
return badData("Invalid incoming endpoint: " + m->endpoint());
endpoint = std::move(*result);
}
}
else if (crawl()) // Check if peer will share IP publicly
{
endpoint = remote_address_;
}
// Get the Public key of the node reporting the shard info
PublicKey publicKey;
if (m->has_nodepubkey())
publicKey = PublicKey(makeSlice(m->nodepubkey()));
else
publicKey = publicKey_;
{
std::lock_guard l{shardInfoMutex_};
auto it{shardInfo_.find(publicKey)};
if (it != shardInfo_.end())
{
// Update the IP address for the node
it->second.endpoint = std::move(endpoint);
// Join the shard index range set
it->second.shardIndexes += shardIndexes;
}
else
{
// Add a new node
ShardInfo shardInfo;
shardInfo.endpoint = std::move(endpoint);
shardInfo.shardIndexes = std::move(shardIndexes);
shardInfo_.emplace(publicKey, std::move(shardInfo));
JLOG(p_journal_.info()) << "Unable to relay peer shard info";
}
}
JLOG(p_journal_.trace())
<< "Consumed TMPeerShardInfo originating from public key "
<< toBase58(TokenType::NodePublic, publicKey) << " shard indexes "
<< m->shardindexes();
<< "Consumed TMPeerShardInfoV2 originating from public key "
<< toBase58(TokenType::NodePublic, publicKey) << " finalized shards["
<< ripple::to_string(shardInfo.finalized()) << "] incomplete shards["
<< (shardInfo.incomplete().empty() ? "empty"
: shardInfo.incompleteToString())
<< "]";
if (m->has_lastlink())
overlay_.lastLink(id_);
// Consume the message
{
std::lock_guard lock{shardInfoMutex_};
auto const it{shardInfos_.find(publicKey_)};
if (it == shardInfos_.end())
shardInfos_.emplace(publicKey, std::move(shardInfo));
else if (shardInfo.msgTimestamp() > it->second.msgTimestamp())
it->second = std::move(shardInfo);
}
// Notify overlay a reply was received from the last peer in this chain
if (peerChainSz == 0)
overlay_.endOfPeerChain(id_);
}
void

View File

@@ -25,6 +25,7 @@
#include <ripple/basics/Log.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <ripple/nodestore/ShardInfo.h>
#include <ripple/overlay/Squelch.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/ProtocolMessage.h>
@@ -54,12 +55,6 @@ public:
/** Whether the peer's view of the ledger converges or diverges from ours */
enum class Tracking { diverged, unknown, converged };
struct ShardInfo
{
beast::IP::Endpoint endpoint;
RangeSet<std::uint32_t> shardIndexes;
};
private:
using clock_type = std::chrono::steady_clock;
using error_code = boost::system::error_code;
@@ -166,8 +161,9 @@ private:
// been sent to or received from this peer.
hash_map<PublicKey, std::size_t> publisherListSequences_;
// Any known shard info from this peer and its sub peers
hash_map<PublicKey, NodeStore::ShardInfo> shardInfos_;
std::mutex mutable shardInfoMutex_;
hash_map<PublicKey, ShardInfo> shardInfo_;
Compressed compressionEnabled_ = Compressed::Off;
// true if validation/proposal reduce-relay feature is enabled
@@ -375,9 +371,6 @@ public:
void
ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const override;
bool
hasShard(std::uint32_t shardIndex) const override;
bool
hasTxSet(uint256 const& hash) const override;
@@ -397,13 +390,9 @@ public:
void
fail(std::string const& reason);
/** Return a range set of known shard indexes from this peer. */
std::optional<RangeSet<std::uint32_t>>
getShardIndexes() const;
/** Return any known shard info from this peer and its sub peers. */
std::optional<hash_map<PublicKey, ShardInfo>>
getPeerShardInfo() const;
// Return any known shard info from this peer and its sub peers
[[nodiscard]] hash_map<PublicKey, NodeStore::ShardInfo> const
getPeerShardInfos() const;
bool
compressionEnabled() const override
@@ -498,14 +487,14 @@ public:
void
onMessage(std::shared_ptr<protocol::TMCluster> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetShardInfo> const& m);
void
onMessage(std::shared_ptr<protocol::TMShardInfo> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetPeerShardInfo> const& m);
void
onMessage(std::shared_ptr<protocol::TMPeerShardInfo> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetPeerShardInfoV2> const& m);
void
onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m);
void
onMessage(std::shared_ptr<protocol::TMEndpoints> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransaction> const& m);

View File

@@ -69,14 +69,6 @@ protocolMessageName(int type)
return "ping";
case protocol::mtCLUSTER:
return "cluster";
case protocol::mtGET_SHARD_INFO:
return "get_shard_info";
case protocol::mtSHARD_INFO:
return "shard_info";
case protocol::mtGET_PEER_SHARD_INFO:
return "get_peer_shard_info";
case protocol::mtPEER_SHARD_INFO:
return "peer_shard_info";
case protocol::mtENDPOINTS:
return "endpoints";
case protocol::mtTRANSACTION:
@@ -97,6 +89,10 @@ protocolMessageName(int type)
return "validator_list_collection";
case protocol::mtVALIDATION:
return "validation";
case protocol::mtGET_PEER_SHARD_INFO:
return "get_peer_shard_info";
case protocol::mtPEER_SHARD_INFO:
return "peer_shard_info";
case protocol::mtGET_OBJECTS:
return "get_objects";
case protocol::mtSQUELCH:
@@ -109,6 +105,10 @@ protocolMessageName(int type)
return "replay_delta_request";
case protocol::mtREPLAY_DELTA_RESPONSE:
return "replay_delta_response";
case protocol::mtGET_PEER_SHARD_INFO_V2:
return "get_peer_shard_info_v2";
case protocol::mtPEER_SHARD_INFO_V2:
return "peer_shard_info_v2";
default:
break;
}
@@ -401,22 +401,6 @@ invokeProtocolMessage(
success =
detail::invoke<protocol::TMCluster>(*header, buffers, handler);
break;
case protocol::mtGET_SHARD_INFO:
success = detail::invoke<protocol::TMGetShardInfo>(
*header, buffers, handler);
break;
case protocol::mtSHARD_INFO:
success = detail::invoke<protocol::TMShardInfo>(
*header, buffers, handler);
break;
case protocol::mtGET_PEER_SHARD_INFO:
success = detail::invoke<protocol::TMGetPeerShardInfo>(
*header, buffers, handler);
break;
case protocol::mtPEER_SHARD_INFO:
success = detail::invoke<protocol::TMPeerShardInfo>(
*header, buffers, handler);
break;
case protocol::mtENDPOINTS:
success = detail::invoke<protocol::TMEndpoints>(
*header, buffers, handler);
@@ -449,6 +433,14 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMValidation>(
*header, buffers, handler);
break;
case protocol::mtGET_PEER_SHARD_INFO:
success = detail::invoke<protocol::TMGetPeerShardInfo>(
*header, buffers, handler);
break;
case protocol::mtPEER_SHARD_INFO:
success = detail::invoke<protocol::TMPeerShardInfo>(
*header, buffers, handler);
break;
case protocol::mtVALIDATORLIST:
success = detail::invoke<protocol::TMValidatorList>(
*header, buffers, handler);
@@ -481,6 +473,14 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMReplayDeltaResponse>(
*header, buffers, handler);
break;
case protocol::mtGET_PEER_SHARD_INFO_V2:
success = detail::invoke<protocol::TMGetPeerShardInfoV2>(
*header, buffers, handler);
break;
case protocol::mtPEER_SHARD_INFO_V2:
success = detail::invoke<protocol::TMPeerShardInfoV2>(
*header, buffers, handler);
break;
default:
handler.onMessageUnknown(header->message_type);
success = true;

View File

@@ -39,10 +39,10 @@ TrafficCount::categorize(
if (type == protocol::mtENDPOINTS)
return TrafficCount::category::overlay;
if ((type == protocol::mtGET_SHARD_INFO) ||
(type == protocol::mtSHARD_INFO) ||
(type == protocol::mtGET_PEER_SHARD_INFO) ||
(type == protocol::mtPEER_SHARD_INFO))
if ((type == protocol::mtGET_PEER_SHARD_INFO) ||
(type == protocol::mtPEER_SHARD_INFO) ||
(type == protocol::mtGET_PEER_SHARD_INFO_V2) ||
(type == protocol::mtPEER_SHARD_INFO_V2))
return TrafficCount::category::shards;
if (type == protocol::mtTRANSACTION)

View File

@@ -6,29 +6,31 @@ package protocol;
// conflict. Even if you're sure, it's probably best to assign a new type.
enum MessageType
{
mtMANIFESTS = 2;
mtPING = 3;
mtCLUSTER = 5;
mtENDPOINTS = 15;
mtTRANSACTION = 30;
mtGET_LEDGER = 31;
mtLEDGER_DATA = 32;
mtPROPOSE_LEDGER = 33;
mtSTATUS_CHANGE = 34;
mtHAVE_SET = 35;
mtVALIDATION = 41;
mtGET_OBJECTS = 42;
mtGET_SHARD_INFO = 50;
mtSHARD_INFO = 51;
mtGET_PEER_SHARD_INFO = 52;
mtPEER_SHARD_INFO = 53;
mtVALIDATORLIST = 54;
mtSQUELCH = 55;
mtVALIDATORLISTCOLLECTION = 56;
mtPROOF_PATH_REQ = 57;
mtPROOF_PATH_RESPONSE = 58;
mtREPLAY_DELTA_REQ = 59;
mtREPLAY_DELTA_RESPONSE = 60;
mtMANIFESTS = 2;
mtPING = 3;
mtCLUSTER = 5;
mtENDPOINTS = 15;
mtTRANSACTION = 30;
mtGET_LEDGER = 31;
mtLEDGER_DATA = 32;
mtPROPOSE_LEDGER = 33;
mtSTATUS_CHANGE = 34;
mtHAVE_SET = 35;
mtVALIDATION = 41;
mtGET_OBJECTS = 42;
mtGET_SHARD_INFO = 50;
mtSHARD_INFO = 51;
mtGET_PEER_SHARD_INFO = 52;
mtPEER_SHARD_INFO = 53;
mtVALIDATORLIST = 54;
mtSQUELCH = 55;
mtVALIDATORLISTCOLLECTION = 56;
mtPROOF_PATH_REQ = 57;
mtPROOF_PATH_RESPONSE = 58;
mtREPLAY_DELTA_REQ = 59;
mtREPLAY_DELTA_RESPONSE = 60;
mtGET_PEER_SHARD_INFO_V2 = 61;
mtPEER_SHARD_INFO_V2 = 62;
}
// token, iterations, target, challenge = issue demand for proof of work
@@ -79,46 +81,75 @@ message TMCluster
repeated TMLoadSource loadSources = 2;
}
// Request info on shards held
message TMGetShardInfo
{
required uint32 hops = 1 [deprecated=true]; // number of hops to travel
optional bool lastLink = 2 [deprecated=true]; // true if last link in the peer chain
repeated uint32 peerchain = 3 [deprecated=true]; // IDs used to route messages
}
// Info about shards held
message TMShardInfo
{
required string shardIndexes = 1 [deprecated=true]; // rangeSet of shard indexes
optional bytes nodePubKey = 2 [deprecated=true]; // The node's public key
optional string endpoint = 3 [deprecated=true]; // ipv6 or ipv4 address
optional bool lastLink = 4 [deprecated=true]; // true if last link in the peer chain
repeated uint32 peerchain = 5 [deprecated=true]; // IDs used to route messages
}
// Node public key
message TMLink
{
required bytes nodePubKey = 1; // node public key
required bytes nodePubKey = 1 [deprecated=true]; // node public key
}
// Request info on shards held
message TMGetPeerShardInfo
{
required uint32 hops = 1; // number of hops to travel
optional bool lastLink = 2; // true if last link in the peer chain
repeated TMLink peerChain = 3; // public keys used to route messages
required uint32 hops = 1 [deprecated=true]; // number of hops to travel
optional bool lastLink = 2 [deprecated=true]; // true if last link in the peer chain
repeated TMLink peerChain = 3 [deprecated=true]; // public keys used to route messages
}
// Info about shards held
message TMPeerShardInfo
{
required string shardIndexes = 1; // rangeSet of shard indexes
optional bytes nodePubKey = 2; // node public key
optional string endpoint = 3; // ipv6 or ipv4 address
optional bool lastLink = 4; // true if last link in the peer chain
repeated TMLink peerChain = 5; // public keys used to route messages
required string shardIndexes = 1 [deprecated=true]; // rangeSet of shard indexes
optional bytes nodePubKey = 2 [deprecated=true]; // node public key
optional string endpoint = 3 [deprecated=true]; // ipv6 or ipv4 address
optional bool lastLink = 4 [deprecated=true]; // true if last link in the peer chain
repeated TMLink peerChain = 5 [deprecated=true]; // public keys used to route messages
}
// Peer public key
message TMPublicKey
{
required bytes publicKey = 1;
}
// Request peer shard information
message TMGetPeerShardInfoV2
{
// Peer public keys used to route messages
repeated TMPublicKey peerChain = 1;
// Remaining times to relay
required uint32 relays = 2;
}
// Peer shard information
message TMPeerShardInfoV2
{
message TMIncomplete
{
required uint32 shardIndex = 1;
required uint32 state = 2;
// State completion percent, 1 - 100
optional uint32 progress = 3;
}
// Message creation time
required uint32 timestamp = 1;
// Incomplete shards being acquired or verified
repeated TMIncomplete incomplete = 2;
// Verified immutable shards (RangeSet)
optional string finalized = 3;
// Public key of node that authored the shard info
required bytes publicKey = 4;
// Digital signature of node that authored the shard info
required bytes signature = 5;
// Peer public keys used to route messages
repeated TMPublicKey peerChain = 6;
}
// A transaction can have only one input and one output.

View File

@@ -84,6 +84,9 @@ enum class HashPrefix : std::uint32_t {
/** Payment Channel Claim */
paymentChannelClaim = detail::make_hash_prefix('C', 'L', 'M'),
/** shard info for signing */
shardInfo = detail::make_hash_prefix('S', 'H', 'D'),
};
template <class Hasher>

View File

@@ -58,7 +58,10 @@ systemCurrencyCode()
}
/** The XRP ledger network's earliest allowed sequence */
static std::uint32_t constexpr XRP_LEDGER_EARLIEST_SEQ{32570};
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ{32570u};
/** The number of ledgers in a shard */
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD{16384u};
/** The minimum amount of support an amendment should have.

View File

@@ -65,6 +65,7 @@ JSS(EscrowFinish); // transaction type.
JSS(Fee); // in/out: TransactionSign; field.
JSS(FeeSettings); // ledger type.
JSS(Flags); // in/out: TransactionSign; field.
JSS(incomplete_shards); // out: OverlayImpl, PeerImp
JSS(Invalid); //
JSS(LastLedgerSequence); // in: TransactionSign; field
JSS(LedgerHashes); // ledger type.

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/net/RPCErr.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Overlay.h>
@@ -48,37 +49,23 @@ doCrawlShards(RPC::JsonContext& context)
if (context.role != Role::ADMIN)
return rpcError(rpcNO_PERMISSION);
std::uint32_t hops{0};
std::uint32_t relays{0};
if (auto const& jv = context.params[jss::limit])
{
if (!(jv.isUInt() || (jv.isInt() && jv.asInt() >= 0)))
{
return RPC::expected_field_error(jss::limit, "unsigned integer");
}
hops = std::min(jv.asUInt(), csHopLimit);
relays = std::min(jv.asUInt(), relayLimit);
context.loadType = Resource::feeHighBurdenRPC;
}
else
context.loadType = Resource::feeMediumBurdenRPC;
bool const pubKey{
// Collect shard info from server and peers
bool const includePublicKey{
context.params.isMember(jss::public_key) &&
context.params[jss::public_key].asBool()};
// Collect shard info from peers connected to this server
Json::Value jvResult{context.app.overlay().crawlShards(pubKey, hops)};
// Collect shard info from this server
if (auto shardStore = context.app.getShardStore())
{
if (pubKey)
jvResult[jss::public_key] = toBase58(
TokenType::NodePublic, context.app.nodeIdentity().first);
jvResult[jss::complete_shards] = shardStore->getCompleteShards();
}
if (hops == 0)
context.loadType = Resource::feeMediumBurdenRPC;
else
context.loadType = Resource::feeHighBurdenRPC;
Json::Value jvResult{
context.app.overlay().crawlShards(includePublicKey, relays)};
return jvResult;
}

View File

@@ -271,11 +271,6 @@ public:
{
}
bool
hasShard(std::uint32_t shardIndex) const override
{
return false;
}
bool
hasTxSet(uint256 const& hash) const override
{
return false;

View File

@@ -96,22 +96,18 @@ public:
BEAST_EXPECT(!from_string(set, "1,,2"));
BEAST_EXPECT(boost::icl::length(set) == 0);
set.clear();
BEAST_EXPECT(from_string(set, "1"));
BEAST_EXPECT(boost::icl::length(set) == 1);
BEAST_EXPECT(boost::icl::first(set) == 1);
set.clear();
BEAST_EXPECT(from_string(set, "1,1"));
BEAST_EXPECT(boost::icl::length(set) == 1);
BEAST_EXPECT(boost::icl::first(set) == 1);
set.clear();
BEAST_EXPECT(from_string(set, "1-1"));
BEAST_EXPECT(boost::icl::length(set) == 1);
BEAST_EXPECT(boost::icl::first(set) == 1);
set.clear();
BEAST_EXPECT(from_string(set, "1,4-6"));
BEAST_EXPECT(boost::icl::length(set) == 4);
BEAST_EXPECT(boost::icl::first(set) == 1);
@@ -121,7 +117,6 @@ public:
BEAST_EXPECT(boost::icl::contains(set, 5));
BEAST_EXPECT(boost::icl::last(set) == 6);
set.clear();
BEAST_EXPECT(from_string(set, "1-2,4-6"));
BEAST_EXPECT(boost::icl::length(set) == 5);
BEAST_EXPECT(boost::icl::first(set) == 1);
@@ -129,7 +124,6 @@ public:
BEAST_EXPECT(boost::icl::contains(set, 4));
BEAST_EXPECT(boost::icl::last(set) == 6);
set.clear();
BEAST_EXPECT(from_string(set, "1-2,6"));
BEAST_EXPECT(boost::icl::length(set) == 3);
BEAST_EXPECT(boost::icl::first(set) == 1);

File diff suppressed because it is too large Load Diff

View File

@@ -566,9 +566,8 @@ public:
if (type == "memory")
{
// Earliest ledger sequence tests
// Verify default earliest ledger sequence
{
// Verify default earliest ledger sequence
std::unique_ptr<Database> db =
Manager::instance().make_Database(
megabytes(4), scheduler, 2, nodeParams, journal_);
@@ -617,6 +616,49 @@ public:
std::strcmp(e.what(), "earliest_seq set more than once") ==
0);
}
// Verify default ledgers per shard
{
std::unique_ptr<Database> db =
Manager::instance().make_Database(
megabytes(4),
scheduler,
2,
nodeParams,
journal_);
BEAST_EXPECT(
db->ledgersPerShard() == DEFAULT_LEDGERS_PER_SHARD);
}
// Set an invalid ledgers per shard
try
{
nodeParams.set("ledgers_per_shard", "100");
std::unique_ptr<Database> db =
Manager::instance().make_Database(
megabytes(4),
scheduler,
2,
nodeParams,
journal_);
}
catch (std::runtime_error const& e)
{
BEAST_EXPECT(
std::strcmp(e.what(), "Invalid ledgers_per_shard") == 0);
}
// Set a valid ledgers per shard
nodeParams.set("ledgers_per_shard", "256");
std::unique_ptr<Database> db = Manager::instance().make_Database(
megabytes(4),
scheduler,
2,
nodeParams,
journal_);
// Verify database uses the ledgers per shard
BEAST_EXPECT(db->ledgersPerShard() == 256);
}
}

View File

@@ -141,11 +141,6 @@ public:
{
}
bool
hasShard(std::uint32_t shardIndex) const override
{
return false;
}
bool
hasTxSet(uint256 const& hash) const override
{
return false;

View File

@@ -159,13 +159,18 @@ public:
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("earliest_seq", "257");
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
@@ -257,13 +262,18 @@ public:
{
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("earliest_seq", "257");
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
@@ -354,19 +364,23 @@ public:
}
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("ledgers_per_shard", "256");
section.set("shard_verification_retry_interval", "1");
section.set("shard_verification_max_attempts", "10000");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("earliest_seq", "257");
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "20");
section.set("shard_verification_retry_interval", "1");
section.set("shard_verification_max_attempts", "10000");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
jtx::Env env(*this, std::move(c));
std::uint8_t const numberOfDownloads = 10;
// Create some ledgers so that the ShardArchiveHandler
@@ -422,13 +436,18 @@ public:
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "1");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("earliest_seq", "257");
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "1");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
@@ -496,13 +515,18 @@ public:
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "0");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("earliest_seq", "257");
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "0");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
@@ -579,13 +603,18 @@ public:
beast::temp_dir tempDir;
auto c = jtx::envconfig();
auto& section = c->section(ConfigSection::shardDatabase());
section.set("path", tempDir.path());
section.set("max_historical_shards", "1");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("earliest_seq", "257");
{
auto& section{c->section(ConfigSection::shardDatabase())};
section.set("path", tempDir.path());
section.set("max_historical_shards", "1");
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
{
auto& section{c->section(ConfigSection::nodeDatabase())};
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
}
c->setupControl(true, true, true);
std::unique_ptr<Logs> logs(new CaptureLogs(&capturedLogs));
@@ -607,7 +636,7 @@ public:
env.close();
}
env.app().getShardStore()->prepareShards({1});
BEAST_EXPECT(env.app().getShardStore()->prepareShards({1}));
auto handler = env.app().getShardArchiveHandler();
BEAST_EXPECT(handler);