Move shard store init to Application

This commit is contained in:
Miguel Portilla
2019-05-10 14:16:27 -04:00
committed by Nik Bougalis
parent c5a95f1eb5
commit a02d914093
15 changed files with 342 additions and 411 deletions

View File

@@ -51,6 +51,7 @@
#include <ripple/basics/PerfLog.h>
#include <ripple/json/json_reader.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/make_Overlay.h>
#include <ripple/protocol/BuildInfo.h>
@@ -419,9 +420,11 @@ public:
, m_nodeStoreScheduler (*this)
, m_shaMapStore (make_SHAMapStore (*this, setup_SHAMapStore (*config_),
*this, m_nodeStoreScheduler, logs_->journal("SHAMapStore"),
logs_->journal("NodeObject"), m_txMaster, *config_))
, m_shaMapStore(make_SHAMapStore(
*this,
*this,
m_nodeStoreScheduler,
logs_->journal("SHAMapStore")))
, accountIDCache_(128000)
@@ -443,14 +446,15 @@ public:
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))
//
// Anything which calls addJob must be a descendant of the JobQueue
//
, m_nodeStore (
m_shaMapStore->makeDatabase ("NodeStore.main", 4, *m_jobQueue))
, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, shardStore_ (
m_shaMapStore->makeDatabaseShard ("ShardStore", 4, *m_jobQueue))
// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))
, family_ (*this, *m_nodeStore, *m_collectorManager)
@@ -535,8 +539,13 @@ public:
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this, *shardStore_, *m_collectorManager);
*this,
*shardStore_,
*m_collectorManager);
}
add (m_resourceManager.get ());
//
@@ -853,9 +862,6 @@ public:
setup.startUp == Config::LOAD_FILE ||
setup.startUp == Config::REPLAY)
{
// perform any needed table updates
updateTxnDB();
// Check if AccountTransactions has primary key
std::string cid, name, type;
std::size_t notnull, dflt_value, pk;
@@ -913,14 +919,6 @@ public:
bool
initNodeStoreDBs()
{
if (config_->section(ConfigSection::nodeDatabase()).empty())
{
JLOG(m_journal.fatal()) <<
"The [node_db] configuration setting " <<
"has been updated and must be set";
return false;
}
if (config_->doImport)
{
auto j = logs_->journal("NodeObject");
@@ -1251,7 +1249,6 @@ private:
// and new validations must be greater than this.
std::atomic<LedgerIndex> maxDisallowedLedger_ {0};
void updateTxnDB ();
bool nodeToShards ();
bool validateShards ();
void startGenesisLedger ();
@@ -1632,6 +1629,9 @@ int ApplicationImp::fdlimit() const
// doubled if online delete is enabled).
needed += std::max(5, m_shaMapStore->fdlimit());
if (shardStore_)
needed += shardStore_->fdlimit();
// One fd per incoming connection a port can accept, or
// if no limit is set, assume it'll handle 256 clients.
for(auto const& p : serverHandler_->setup().ports)
@@ -2068,111 +2068,6 @@ ApplicationImp::journal (std::string const& name)
return logs_->journal (name);
}
void
ApplicationImp::updateTxnDB()
{
auto schemaHas = [&](std::string const& column)
{
std::string cid, name;
soci::statement st = (mTxnDB->getSession().prepare <<
("PRAGMA table_info(AccountTransactions);"),
soci::into(cid),
soci::into(name));
st.execute();
while (st.fetch())
{
if (name == column)
return true;
}
return false;
};
assert(schemaHas("TransID"));
assert(!schemaHas("foobar"));
if (schemaHas("TxnSeq"))
return;
JLOG (m_journal.warn()) << "Transaction sequence field is missing";
auto& session = getTxnDB ().getSession ();
std::vector< std::pair<uint256, int> > txIDs;
txIDs.reserve (300000);
JLOG (m_journal.info()) << "Parsing transactions";
int i = 0;
uint256 transID;
boost::optional<std::string> strTransId;
soci::blob sociTxnMetaBlob(session);
soci::indicator tmi;
Blob txnMeta;
soci::statement st =
(session.prepare <<
"SELECT TransID, TxnMeta FROM Transactions;",
soci::into(strTransId),
soci::into(sociTxnMetaBlob, tmi));
st.execute ();
while (st.fetch ())
{
if (soci::i_ok == tmi)
convert (sociTxnMetaBlob, txnMeta);
else
txnMeta.clear ();
std::string tid = strTransId.value_or("");
transID.SetHex (tid, true);
if (txnMeta.size () == 0)
{
txIDs.push_back (std::make_pair (transID, -1));
JLOG (m_journal.info()) << "No metadata for " << transID;
}
else
{
TxMeta m (transID, 0, txnMeta);
txIDs.push_back (std::make_pair (transID, m.getIndex ()));
}
if ((++i % 1000) == 0)
{
JLOG (m_journal.info()) << i << " transactions read";
}
}
JLOG (m_journal.info()) << "All " << i << " transactions read";
soci::transaction tr(session);
JLOG (m_journal.info()) << "Dropping old index";
session << "DROP INDEX AcctTxIndex;";
JLOG (m_journal.info()) << "Altering table";
session << "ALTER TABLE AccountTransactions ADD COLUMN TxnSeq INTEGER;";
boost::format fmt ("UPDATE AccountTransactions SET TxnSeq = %d WHERE TransID = '%s';");
i = 0;
for (auto& t : txIDs)
{
session << boost::str (fmt % t.second % to_string (t.first));
if ((++i % 1000) == 0)
{
JLOG (m_journal.info()) << i << " transactions updated";
}
}
JLOG (m_journal.info()) << "Building new index";
session << "CREATE INDEX AcctTxIndex ON AccountTransactions(Account, LedgerSeq, TxnSeq, TransID);";
tr.commit ();
}
bool ApplicationImp::nodeToShards()
{
assert(m_overlay);

View File

@@ -31,28 +31,12 @@ class TransactionMaster;
/**
* class to create database, launch online delete thread, and
* related sqlite databse
* related SQLite database
*/
class SHAMapStore
: public Stoppable
{
public:
struct Setup
{
explicit Setup() = default;
bool standalone = false;
std::uint32_t deleteInterval = 0;
bool advisoryDelete = false;
std::uint32_t ledgerHistory = 0;
Section nodeDatabase;
std::string databasePath;
std::uint32_t deleteBatch = 100;
std::uint32_t backOff = 100;
std::int32_t ageThreshold = 60;
Section shardDatabase;
};
SHAMapStore (Stoppable& parent) : Stoppable ("SHAMapStore", parent) {}
/** Called by LedgerMaster every time a ledger validates. */
@@ -62,13 +46,9 @@ public:
virtual std::uint32_t clampFetchDepth (std::uint32_t fetch_depth) const = 0;
virtual std::unique_ptr <NodeStore::Database> makeDatabase (
std::string const& name,
std::int32_t readThreads, Stoppable& parent) = 0;
virtual std::unique_ptr <NodeStore::DatabaseShard> makeDatabaseShard(
std::string const& name, std::int32_t readThreads,
Stoppable& parent) = 0;
virtual
std::unique_ptr <NodeStore::Database>
makeNodeStore(std::string const& name, std::int32_t readThreads) = 0;
/** Highest ledger that may be deleted. */
virtual LedgerIndex setCanDelete (LedgerIndex canDelete) = 0;
@@ -88,19 +68,12 @@ public:
//------------------------------------------------------------------------------
SHAMapStore::Setup
setup_SHAMapStore(Config const& c);
std::unique_ptr<SHAMapStore>
make_SHAMapStore(
Application& app,
SHAMapStore::Setup const& s,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal,
beast::Journal nodeStoreJournal,
TransactionMaster& transactionMaster,
BasicConfig const& conf);
beast::Journal journal);
}
#endif

View File

@@ -24,7 +24,8 @@
#include <ripple/beast/core/CurrentThreadName.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/impl/DatabaseRotatingImp.h>
#include <ripple/nodestore/impl/DatabaseShardImp.h>
#include <boost/beast/core/string.hpp>
namespace ripple {
void SHAMapStoreImp::SavedStateDB::init (BasicConfig const& config,
@@ -160,100 +161,78 @@ SHAMapStoreImp::SavedStateDB::setLastRotated (LedgerIndex seq)
//------------------------------------------------------------------------------
SHAMapStoreImp::SHAMapStoreImp (
Application& app,
Setup const& setup,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal,
beast::Journal nodeStoreJournal,
TransactionMaster& transactionMaster,
BasicConfig const& config)
SHAMapStoreImp::SHAMapStoreImp(
Application& app,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal)
: SHAMapStore (parent)
, app_ (app)
, setup_ (setup)
, scheduler_ (scheduler)
, journal_ (journal)
, nodeStoreJournal_ (nodeStoreJournal)
, working_(true)
, transactionMaster_ (transactionMaster)
, canDelete_ (std::numeric_limits <LedgerIndex>::max())
{
if (setup_.deleteInterval)
Config& config {app.config()};
Section& section {config.section(ConfigSection::nodeDatabase())};
if (section.empty())
{
auto const minInterval = setup.standalone ?
minimumDeletionIntervalSA_ : minimumDeletionInterval_;
if (setup_.deleteInterval < minInterval)
Throw<std::runtime_error>(
"Missing [" + ConfigSection::nodeDatabase() +
"] entry in configuration file");
}
// RocksDB only. Use sensible defaults if no values specified.
if (boost::beast::detail::iequals(
get<std::string>(section, "type"), "RocksDB"))
{
if (!section.exists("cache_mb"))
{
Throw<std::runtime_error> ("online_delete must be at least " +
section.set("cache_mb", std::to_string(
config.getSize(siHashNodeDBCache)));
}
if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2))
section.set("filter_bits", "10");
}
get_if_exists(section, "delete_batch", deleteBatch_);
get_if_exists(section, "backOff", backOff_);
get_if_exists(section, "age_threshold", ageThreshold_);
get_if_exists(section, "online_delete", deleteInterval_);
if (deleteInterval_)
{
get_if_exists(section, "advisory_delete", advisoryDelete_);
auto const minInterval = config.standalone() ?
minimumDeletionIntervalSA_ : minimumDeletionInterval_;
if (deleteInterval_ < minInterval)
{
Throw<std::runtime_error>("online_delete must be at least " +
std::to_string (minInterval));
}
if (setup_.ledgerHistory > setup_.deleteInterval)
if (config.LEDGER_HISTORY > deleteInterval_)
{
Throw<std::runtime_error> (
Throw<std::runtime_error>(
"online_delete must not be less than ledger_history (currently " +
std::to_string (setup_.ledgerHistory) + ")");
std::to_string (config.LEDGER_HISTORY) + ")");
}
state_db_.init (config, dbName_);
state_db_.init(config, dbName_);
dbPaths();
}
if (! setup_.shardDatabase.empty())
{
// The node and shard stores must use
// the same earliest ledger sequence
std::array<std::uint32_t, 2> seq;
if (get_if_exists<std::uint32_t>(
setup_.nodeDatabase, "earliest_seq", seq[0]))
{
if (get_if_exists<std::uint32_t>(
setup_.shardDatabase, "earliest_seq", seq[1]) &&
seq[0] != seq[1])
{
Throw<std::runtime_error>("earliest_seq set more than once");
}
}
boost::filesystem::path dbPath =
get<std::string>(setup_.shardDatabase, "path");
if (dbPath.empty())
Throw<std::runtime_error>("shard path missing");
if (boost::filesystem::exists(dbPath))
{
if (! boost::filesystem::is_directory(dbPath))
Throw<std::runtime_error>("shard db path must be a directory.");
}
else
boost::filesystem::create_directories(dbPath);
auto const maxDiskSpace = get<std::uint64_t>(
setup_.shardDatabase, "max_size_gb", 0);
// Must be large enough for one shard
if (maxDiskSpace < 3)
Throw<std::runtime_error>("max_size_gb too small");
if ((maxDiskSpace << 30) < maxDiskSpace)
Throw<std::runtime_error>("overflow max_size_gb");
std::uint32_t lps;
if (get_if_exists<std::uint32_t>(
setup_.shardDatabase, "ledgers_per_shard", lps))
{
// ledgers_per_shard to be set only in standalone for testing
if (! setup_.standalone)
Throw<std::runtime_error>(
"ledgers_per_shard only honored in stand alone");
}
}
}
std::unique_ptr <NodeStore::Database>
SHAMapStoreImp::makeDatabase (std::string const& name,
std::int32_t readThreads, Stoppable& parent)
SHAMapStoreImp::makeNodeStore(std::string const& name, std::int32_t readThreads)
{
// Anything which calls addJob must be a descendant of the JobQueue.
// Therefore Database objects use the JobQueue as Stoppable parent.
std::unique_ptr <NodeStore::Database> db;
if (setup_.deleteInterval)
if (deleteInterval_)
{
SavedState state = state_db_.getState();
auto writableBackend = makeBackendRotating(state.writableDb);
@@ -267,40 +246,32 @@ SHAMapStoreImp::makeDatabase (std::string const& name,
// Create NodeStore with two backends to allow online deletion of data
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
"NodeStore.main", scheduler_, readThreads, parent,
std::move(writableBackend), std::move(archiveBackend),
setup_.nodeDatabase, nodeStoreJournal_);
name,
scheduler_,
readThreads,
app_.getJobQueue(),
std::move(writableBackend),
std::move(archiveBackend),
app_.config().section(ConfigSection::nodeDatabase()),
app_.logs().journal(nodeStoreName_));
fdlimit_ += dbr->fdlimit();
dbRotating_ = dbr.get();
db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));
}
else
{
db = NodeStore::Manager::instance().make_Database (name, scheduler_,
readThreads, parent, setup_.nodeDatabase, nodeStoreJournal_);
db = NodeStore::Manager::instance().make_Database(
name,
scheduler_,
readThreads,
app_.getJobQueue(),
app_.config().section(ConfigSection::nodeDatabase()),
app_.logs().journal(nodeStoreName_));
fdlimit_ += db->fdlimit();
}
return db;
}
std::unique_ptr<NodeStore::DatabaseShard>
SHAMapStoreImp::makeDatabaseShard(std::string const& name,
std::int32_t readThreads, Stoppable& parent)
{
std::unique_ptr<NodeStore::DatabaseShard> db;
if(! setup_.shardDatabase.empty())
{
db = std::make_unique<NodeStore::DatabaseShardImp>(
app_, name, parent, scheduler_, readThreads,
setup_.shardDatabase, app_.journal("ShardStore"));
if (db->init())
fdlimit_ += db->fdlimit();
else
db.reset();
}
return db;
}
void
SHAMapStoreImp::onLedgerClosed(
std::shared_ptr<Ledger const> const& ledger)
@@ -359,7 +330,7 @@ SHAMapStoreImp::run()
transactionDb_ = &app_.getTxnDB();
ledgerDb_ = &app_.getLedgerDB();
if (setup_.advisoryDelete)
if (advisoryDelete_)
canDelete_ = state_db_.getCanDelete ();
while (1)
@@ -393,12 +364,12 @@ SHAMapStoreImp::run()
}
// will delete up to (not including) lastRotated)
if (validatedSeq >= lastRotated + setup_.deleteInterval
if (validatedSeq >= lastRotated + deleteInterval_
&& canDelete_ >= lastRotated - 1)
{
JLOG(journal_.debug()) << "rotating validatedSeq " << validatedSeq
<< " lastRotated " << lastRotated << " deleteInterval "
<< setup_.deleteInterval << " canDelete_ " << canDelete_;
<< deleteInterval_ << " canDelete_ " << canDelete_;
switch (health())
{
@@ -498,8 +469,8 @@ SHAMapStoreImp::run()
void
SHAMapStoreImp::dbPaths()
{
boost::filesystem::path dbPath =
get<std::string>(setup_.nodeDatabase, "path");
Section section {app_.config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path dbPath = get<std::string>(section, "path");
if (boost::filesystem::exists (dbPath))
{
@@ -536,7 +507,8 @@ SHAMapStoreImp::dbPaths()
(writableDbExists != archiveDbExists) ||
state.writableDb.empty() != state.archiveDb.empty())
{
boost::filesystem::path stateDbPathName = setup_.databasePath;
boost::filesystem::path stateDbPathName =
app_.config().legacy("database_path");
stateDbPathName /= dbName_;
stateDbPathName += "*";
@@ -550,7 +522,7 @@ SHAMapStoreImp::dbPaths()
<< "remove the files matching "
<< stateDbPathName.string()
<< " and contents of the directory "
<< get<std::string>(setup_.nodeDatabase, "path")
<< dbPath
<< std::endl;
Throw<std::runtime_error> ("state db error");
@@ -560,8 +532,8 @@ SHAMapStoreImp::dbPaths()
std::unique_ptr <NodeStore::Backend>
SHAMapStoreImp::makeBackendRotating (std::string path)
{
Section section {app_.config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path newPath;
Section parameters = setup_.nodeDatabase;
if (path.size())
{
@@ -569,15 +541,15 @@ SHAMapStoreImp::makeBackendRotating (std::string path)
}
else
{
boost::filesystem::path p = get<std::string>(parameters, "path");
boost::filesystem::path p = get<std::string>(section, "path");
p /= dbPrefix_;
p += ".%%%%";
newPath = boost::filesystem::unique_path (p);
}
parameters.set("path", newPath.string());
section.set("path", newPath.string());
auto backend {NodeStore::Manager::instance().make_Backend(
parameters, scheduler_, nodeStoreJournal_)};
section, scheduler_, app_.logs().journal(nodeStoreName_))};
backend->open();
return backend;
}
@@ -608,7 +580,7 @@ SHAMapStoreImp::clearSql (DatabaseCon& database,
"start: " << deleteQuery << " from " << min << " to " << lastRotated;
while (min < lastRotated)
{
min = std::min(lastRotated, min + setup_.deleteBatch);
min = std::min(lastRotated, min + deleteBatch_);
{
auto db = database.checkoutDb ();
*db << boost::str (formattedDeleteQuery % min);
@@ -617,7 +589,7 @@ SHAMapStoreImp::clearSql (DatabaseCon& database,
return true;
if (min < lastRotated)
std::this_thread::sleep_for (
std::chrono::milliseconds (setup_.backOff));
std::chrono::milliseconds (backOff_));
}
JLOG(journal_.debug()) << "finished: " << deleteQuery;
return true;
@@ -637,7 +609,7 @@ SHAMapStoreImp::freshenCaches()
return;
if (freshenCache (*treeNodeCache_))
return;
if (freshenCache (transactionMaster_.getCache()))
if (freshenCache (app_.getMasterTransaction().getCache()))
return;
}
@@ -684,11 +656,11 @@ SHAMapStoreImp::health()
NetworkOPs::OperatingMode mode = netOPs_->getOperatingMode();
auto age = ledgerMaster_->getValidatedLedgerAge();
if (mode != NetworkOPs::omFULL || age.count() >= setup_.ageThreshold)
if (mode != NetworkOPs::omFULL || age.count() >= ageThreshold_)
{
JLOG(journal_.warn()) << "Not deleting. state: " << mode
<< " age " << age.count()
<< " age threshold " << setup_.ageThreshold;
<< " age threshold " << ageThreshold_;
healthy_ = false;
}
@@ -701,7 +673,7 @@ SHAMapStoreImp::health()
void
SHAMapStoreImp::onStop()
{
if (setup_.deleteInterval)
if (deleteInterval_)
{
{
std::lock_guard <std::mutex> lock (mutex_);
@@ -718,7 +690,7 @@ SHAMapStoreImp::onStop()
void
SHAMapStoreImp::onChildrenStopped()
{
if (setup_.deleteInterval)
if (deleteInterval_)
{
{
std::lock_guard <std::mutex> lock (mutex_);
@@ -733,52 +705,15 @@ SHAMapStoreImp::onChildrenStopped()
}
//------------------------------------------------------------------------------
SHAMapStore::Setup
setup_SHAMapStore (Config const& c)
{
SHAMapStore::Setup setup;
setup.standalone = c.standalone();
// Get existing settings and add some default values if not specified:
setup.nodeDatabase = c.section (ConfigSection::nodeDatabase ());
// These two parameters apply only to RocksDB. We want to give them sensible
// defaults if no values are specified.
if (!setup.nodeDatabase.exists ("cache_mb"))
setup.nodeDatabase.set ("cache_mb", std::to_string (c.getSize (siHashNodeDBCache)));
if (!setup.nodeDatabase.exists ("filter_bits") && (c.NODE_SIZE >= 2))
setup.nodeDatabase.set ("filter_bits", "10");
get_if_exists (setup.nodeDatabase, "online_delete", setup.deleteInterval);
if (setup.deleteInterval)
get_if_exists (setup.nodeDatabase, "advisory_delete", setup.advisoryDelete);
setup.ledgerHistory = c.LEDGER_HISTORY;
setup.databasePath = c.legacy("database_path");
get_if_exists (setup.nodeDatabase, "delete_batch", setup.deleteBatch);
get_if_exists (setup.nodeDatabase, "backOff", setup.backOff);
get_if_exists (setup.nodeDatabase, "age_threshold", setup.ageThreshold);
setup.shardDatabase = c.section(ConfigSection::shardDatabase());
return setup;
}
std::unique_ptr<SHAMapStore>
make_SHAMapStore (Application& app,
SHAMapStore::Setup const& setup,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal,
beast::Journal nodeStoreJournal,
TransactionMaster& transactionMaster,
BasicConfig const& config)
make_SHAMapStore(
Application& app,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal)
{
return std::make_unique<SHAMapStoreImp>(app, setup, parent, scheduler,
journal, nodeStoreJournal, transactionMaster, config);
return std::make_unique<SHAMapStoreImp>(app, parent, scheduler, journal);
}
}

View File

@@ -84,10 +84,8 @@ private:
// minimum # of ledgers required for standalone mode.
static std::uint32_t const minimumDeletionIntervalSA_ = 8;
Setup setup_;
NodeStore::Scheduler& scheduler_;
beast::Journal journal_;
beast::Journal nodeStoreJournal_;
NodeStore::DatabaseRotating* dbRotating_ = nullptr;
SavedStateDB state_db_;
std::thread thread_;
@@ -98,8 +96,15 @@ private:
mutable std::mutex mutex_;
std::shared_ptr<Ledger const> newLedger_;
std::atomic<bool> working_;
TransactionMaster& transactionMaster_;
std::atomic <LedgerIndex> canDelete_;
int fdlimit_ = 0;
std::uint32_t deleteInterval_ = 0;
bool advisoryDelete_ = false;
std::uint32_t deleteBatch_ = 100;
std::uint32_t backOff_ = 100;
std::int32_t ageThreshold_ = 60;
// these do not exist upon SHAMapStore creation, but do exist
// as of onPrepare() or before
NetworkOPs* netOPs_ = nullptr;
@@ -108,17 +113,15 @@ private:
TreeNodeCache* treeNodeCache_ = nullptr;
DatabaseCon* transactionDb_ = nullptr;
DatabaseCon* ledgerDb_ = nullptr;
int fdlimit_ = 0;
static constexpr auto nodeStoreName_ = "NodeStore";
public:
SHAMapStoreImp (Application& app,
Setup const& setup,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal,
beast::Journal nodeStoreJournal,
TransactionMaster& transactionMaster,
BasicConfig const& config);
SHAMapStoreImp(
Application& app,
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal);
~SHAMapStoreImp()
{
@@ -129,22 +132,17 @@ public:
std::uint32_t
clampFetchDepth (std::uint32_t fetch_depth) const override
{
return setup_.deleteInterval ? std::min (fetch_depth,
setup_.deleteInterval) : fetch_depth;
return deleteInterval_ ? std::min (fetch_depth,
deleteInterval_) : fetch_depth;
}
std::unique_ptr <NodeStore::Database> makeDatabase (
std::string const&name,
std::int32_t readThreads, Stoppable& parent) override;
std::unique_ptr <NodeStore::DatabaseShard>
makeDatabaseShard(std::string const& name,
std::int32_t readThreads, Stoppable& parent) override;
std::unique_ptr <NodeStore::Database>
makeNodeStore(std::string const& name, std::int32_t readThreads) override;
LedgerIndex
setCanDelete (LedgerIndex seq) override
{
if (setup_.advisoryDelete)
if (advisoryDelete_)
canDelete_ = seq;
return state_db_.setCanDelete (seq);
}
@@ -152,7 +150,7 @@ public:
bool
advisoryDelete() const override
{
return setup_.advisoryDelete;
return advisoryDelete_;
}
// All ledgers prior to this one are eligible
@@ -230,7 +228,7 @@ private:
void
onStart() override
{
if (setup_.deleteInterval)
if (deleteInterval_)
thread_ = std::thread (&SHAMapStoreImp::run, this);
}

View File

@@ -209,6 +209,16 @@ public:
/** Destroy the Stoppable. */
virtual ~Stoppable ();
RootStoppable& getRoot() {return m_root;}
/** Set the parent of this Stoppable.
@note The Stoppable must not already have a parent.
The parent to be set cannot not be stopping.
Both roots must match.
*/
void setParent(Stoppable& parent);
/** Returns `true` if the stoppable should stop. */
bool isStopping () const;
@@ -318,6 +328,7 @@ private:
std::condition_variable m_cv;
std::mutex m_mut;
bool m_is_stopping = false;
bool hasParent_ {false};
};
//------------------------------------------------------------------------------

View File

@@ -17,7 +17,9 @@
*/
//==============================================================================
#include <ripple/basics/contract.h>
#include <ripple/core/Stoppable.h>
#include <cassert>
namespace ripple {
@@ -34,16 +36,23 @@ Stoppable::Stoppable (std::string name, Stoppable& parent)
, m_root (parent.m_root)
, m_child (this)
{
// Must not have stopping parent.
assert (! parent.isStopping());
parent.m_children.push_front (&m_child);
setParent(parent);
}
Stoppable::~Stoppable ()
{
}
void Stoppable::setParent (Stoppable& parent)
{
assert(!hasParent_);
assert(!parent.isStopping());
assert(std::addressof(m_root) == std::addressof(parent.m_root));
parent.m_children.push_front(&m_child);
hasParent_ = true;
}
bool Stoppable::isStopping() const
{
return m_root.isStopping();

View File

@@ -60,7 +60,8 @@ public:
@param name The Stoppable name for this Database.
@param parent The parent Stoppable.
@param scheduler The scheduler to use for performing asynchronous tasks.
@param readThreads The number of async read threads to create.
@param readThreads The number of asynchronous read threads to create.
@param config The configuration settings
@param journal Destination for logging output.
*/
Database(std::string name, Stoppable& parent, Scheduler& scheduler,
@@ -282,7 +283,7 @@ private:
// The default is 32570 to match the XRP ledger network's earliest
// allowed sequence. Alternate networks may set this value.
std::uint32_t earliestSeq_ {XRP_LEDGER_EARLIEST_SEQ};
std::uint32_t const earliestSeq_;
virtual
std::shared_ptr<NodeObject>

View File

@@ -42,8 +42,8 @@ public:
@param name The Stoppable name for this Database
@param parent The parent Stoppable
@param scheduler The scheduler to use for performing asynchronous tasks
@param readThreads The number of async read threads to create
@param config The configuration for the database
@param readThreads The number of asynchronous read threads to create
@param config The shard configuration section for the database
@param journal Destination for logging output
*/
DatabaseShard(
@@ -89,9 +89,9 @@ public:
bool
prepareShard(std::uint32_t shardIndex) = 0;
/** Remove shard indexes from prepared import
/** Remove a previously prepared shard index for import
@param indexes Shard indexes to be removed from import
@param shardIndex Shard index to be removed from import
*/
virtual
void
@@ -219,6 +219,15 @@ seqToShardIndex(std::uint32_t seq,
return (seq - 1) / ledgersPerShard;
}
extern
std::unique_ptr<DatabaseShard>
make_ShardStore(
Application& app,
Stoppable& parent,
Scheduler& scheduler,
int readThreads,
beast::Journal j);
}
}

View File

@@ -33,17 +33,16 @@ Database::Database(
int readThreads,
Section const& config,
beast::Journal journal)
: Stoppable(name, parent)
: Stoppable(name, parent.getRoot())
, j_(journal)
, scheduler_(scheduler)
, earliestSeq_(get<std::uint32_t>(
config,
"earliest_seq",
XRP_LEDGER_EARLIEST_SEQ))
{
std::uint32_t seq;
if (get_if_exists<std::uint32_t>(config, "earliest_seq", seq))
{
if (seq < 1)
Throw<std::runtime_error>("Invalid earliest_seq");
earliestSeq_ = seq;
}
if (earliestSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq");
while (readThreads-- > 0)
readThreads_.emplace_back(&Database::threadEntry, this);

View File

@@ -49,6 +49,7 @@ public:
, backend_(std::move(backend))
{
assert(backend_);
setParent(parent);
}
~DatabaseNodeImp() override

View File

@@ -45,6 +45,7 @@ DatabaseRotatingImp::DatabaseRotatingImp(
fdLimit_ += writableBackend_->fdlimit();
if (archiveBackend_)
fdLimit_ += archiveBackend_->fdlimit();
setParent(parent);
}
// Make sure to call it already locked!

View File

@@ -17,12 +17,13 @@
*/
//==============================================================================
#include <ripple/nodestore/impl/DatabaseShardImp.h>
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/basics/ByteUtilities.h>
#include <ripple/basics/chrono.h>
#include <ripple/basics/random.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/overlay/Overlay.h>
@@ -32,30 +33,24 @@
namespace ripple {
namespace NodeStore {
constexpr std::uint32_t DatabaseShard::ledgersPerShardDefault;
DatabaseShardImp::DatabaseShardImp(
Application& app,
std::string const& name,
Stoppable& parent,
std::string const& name,
Scheduler& scheduler,
int readThreads,
Section const& config,
beast::Journal j)
: DatabaseShard(name, parent, scheduler, readThreads, config, j)
: DatabaseShard(
name,
parent,
scheduler,
readThreads,
app.config().section(ConfigSection::shardDatabase()),
j)
, app_(app)
, ctx_(std::make_unique<nudb::context>())
, config_(config)
, dir_(get<std::string>(config, "path"))
, backendName_(Manager::instance().find(
get<std::string>(config, "type", "nudb"))->getName())
, maxDiskSpace_(get<std::uint64_t>(config, "max_size_gb") << 30)
, ledgersPerShard_(get<std::uint32_t>(
config, "ledgers_per_shard", ledgersPerShardDefault))
, earliestShardIndex_(seqToShardIndex(earliestSeq()))
, avgShardSz_(ledgersPerShard_ * (192 * 1024))
, avgShardSz_(ledgersPerShard_ * kilobytes(192))
{
ctx_->start();
}
DatabaseShardImp::~DatabaseShardImp()
@@ -85,36 +80,94 @@ DatabaseShardImp::init()
"Already initialized";
return false;
}
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
auto fail = [&](std::string const& msg)
{
JLOG(j_.error()) <<
"ledgers_per_shard must be a multiple of 256";
"[" << ConfigSection::shardDatabase() << "] " << msg;
return false;
};
Config const& config {app_.config()};
Section const& section {config.section(ConfigSection::shardDatabase())};
if (section.empty())
return fail("missing configuration");
{
// Node and shard stores must use same earliest ledger sequence
std::uint32_t seq;
if (get_if_exists<std::uint32_t>(
config.section(ConfigSection::nodeDatabase()),
"earliest_seq",
seq))
{
std::uint32_t seq2;
if (get_if_exists<std::uint32_t>(section, "earliest_seq", seq2) &&
seq != seq2)
{
return fail("and [" + ConfigSection::shardDatabase() +
"] both define 'earliest_seq'");
}
}
}
if (!get_if_exists<boost::filesystem::path>(section, "path", dir_))
return fail("'path' missing");
if (boost::filesystem::exists(dir_))
{
if (!boost::filesystem::is_directory(dir_))
return fail("'path' must be a directory");
}
else
boost::filesystem::create_directories(dir_);
{
std::uint64_t i;
if (!get_if_exists<std::uint64_t>(section, "max_size_gb", i))
return fail("'max_size_gb' missing");
// Minimum disk space required (in gigabytes)
static constexpr auto minDiskSpace = 10;
if (i < minDiskSpace)
{
return fail("'max_size_gb' must be at least " +
std::to_string(minDiskSpace));
}
if ((i << 30) < i)
return fail("'max_size_gb' overflow");
// Convert to bytes
maxDiskSpace_ = i << 30;
}
if (section.exists("ledgers_per_shard"))
{
// To be set only in standalone for testing
if (!config.standalone())
return fail("'ledgers_per_shard' only honored in stand alone");
ledgersPerShard_ = get<std::uint32_t>(section, "ledgers_per_shard");
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
return fail("'ledgers_per_shard' must be a multiple of 256");
}
// NuDB is the default and only supported permanent storage backend
// "Memory" and "none" types are supported for tests
backendName_ = get<std::string>(section, "type", "nudb");
if (!iequals(backendName_, "NuDB") &&
!iequals(backendName_, "Memory") &&
!iequals(backendName_, "none"))
{
JLOG(j_.error()) <<
"Unsupported shard store type: " << backendName_;
return false;
return fail("'type' value unsupported");
}
// Find backend file handle requirement
if (auto factory = Manager::instance().find(backendName_))
{
// Find backend file handle requirement
auto factory {Manager::instance().find(backendName_)};
if (!factory)
{
JLOG(j_.error()) <<
"Failed to create shard store type " << backendName_;
return false;
}
auto backend {factory->createInstance(NodeObject::keyBytes,
config_, scheduler_, *ctx_, j_)};
auto backend {factory->createInstance(
NodeObject::keyBytes, section, scheduler_, j_)};
backed_ = backend->backed();
if (!backed_)
{
@@ -123,9 +176,14 @@ DatabaseShardImp::init()
}
fdLimit_ = backend->fdlimit();
}
else
return fail("'type' value unsupported");
try
{
ctx_ = std::make_unique<nudb::context>();
ctx_->start();
// Find shards
for (auto const& d : directory_iterator(dir_))
{
@@ -166,7 +224,7 @@ DatabaseShardImp::init()
auto shard {std::make_unique<Shard>(
*this, shardIndex, cacheSz_, cacheAge_, j_)};
if (!shard->open(config_, scheduler_, *ctx_))
if (!shard->open(section, scheduler_, *ctx_))
return false;
usedDiskSpace_ += shard->fileSize();
@@ -250,7 +308,10 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
1, static_cast<int>(complete_.size() + 1)))};
incomplete_ = std::make_unique<Shard>(
*this, *shardIndex, sz, cacheAge_, j_);
if (!incomplete_->open(config_, scheduler_, *ctx_))
if (!incomplete_->open(
app_.config().section(ConfigSection::shardDatabase()),
scheduler_,
*ctx_))
{
incomplete_.reset();
return boost::none;
@@ -422,7 +483,7 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
// Create the new shard
auto shard {std::make_unique<Shard>(
*this, shardIndex, cacheSz_, cacheAge_, j_)};
auto fail = [&](std::string msg)
auto fail = [&](std::string const& msg)
{
if (!msg.empty())
{
@@ -434,8 +495,13 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
return false;
};
if (!shard->open(config_, scheduler_, *ctx_))
if (!shard->open(
app_.config().section(ConfigSection::shardDatabase()),
scheduler_,
*ctx_))
{
return fail({});
}
if (!shard->complete())
return fail("incomplete shard");
@@ -750,7 +816,10 @@ DatabaseShardImp::import(Database& source)
auto const shardDir {dir_ / std::to_string(shardIndex)};
auto shard = std::make_unique<Shard>(
*this, shardIndex, shardCacheSz, cacheAge_, j_);
if (!shard->open(config_, scheduler_, *ctx_))
if (!shard->open(
app_.config().section(ConfigSection::shardDatabase()),
scheduler_,
*ctx_))
{
shard.reset();
continue;
@@ -1192,5 +1261,36 @@ DatabaseShardImp::available() const
}
}
//------------------------------------------------------------------------------
std::unique_ptr<DatabaseShard>
make_ShardStore(
Application& app,
Stoppable& parent,
Scheduler& scheduler,
int readThreads,
beast::Journal j)
{
// The shard store is optional. Future changes will require it.
Section const& section {
app.config().section(ConfigSection::shardDatabase())};
if (section.empty())
return nullptr;
auto shardStore = std::make_unique<DatabaseShardImp>(
app,
parent,
"ShardStore",
scheduler,
readThreads,
j);
if (shardStore->init())
shardStore->setParent(parent);
else
shardStore.reset();
return shardStore;
}
} // NodeStore
} // ripple

View File

@@ -37,11 +37,10 @@ public:
DatabaseShardImp(
Application& app,
std::string const& name,
Stoppable& parent,
std::string const& name,
Scheduler& scheduler,
int readThreads,
Section const& config,
beast::Journal j);
~DatabaseShardImp() override;
@@ -180,8 +179,8 @@ private:
// Shards prepared for import
std::map<std::uint32_t, Shard*> preShards_;
Section const config_;
boost::filesystem::path const dir_;
// The shard store root directory
boost::filesystem::path dir_;
// If new shards can be stored
bool canAdd_ {true};
@@ -193,10 +192,10 @@ private:
bool backed_;
// The name associated with the backend used with the shard store
std::string const backendName_;
std::string backendName_;
// Maximum disk space the DB can use (in bytes)
std::uint64_t const maxDiskSpace_;
std::uint64_t maxDiskSpace_;
// Disk space used to store the shards (in bytes)
std::uint64_t usedDiskSpace_ {0};
@@ -204,7 +203,7 @@ private:
// Each shard stores 16384 ledgers. The earliest shard may store
// less if the earliest ledger sequence truncates its beginning.
// The value should only be altered for unit tests.
std::uint32_t const ledgersPerShard_;
std::uint32_t ledgersPerShard_ = ledgersPerShardDefault;
// The earliest shard index
std::uint32_t const earliestShardIndex_;
@@ -218,7 +217,7 @@ private:
// File name used to mark shards being imported from node store
static constexpr auto importMarker_ = "import";
std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) override;

View File

@@ -65,14 +65,7 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx)
return false;
}
boost::system::error_code ec;
auto const preexist {exists(dir_, ec)};
if (ec)
{
JLOG(j_.error()) <<
"shard " << index_ << ": " << ec.message();
return false;
}
auto const preexist {exists(dir_)};
config.set("path", dir_.string());
backend_ = factory->createInstance(
@@ -85,6 +78,8 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx)
JLOG(j_.error()) <<
"shard " << index_ << ": " << msg;
}
if (backend_)
backend_->close();
if (!preexist)
removeAll(dir_, j_);
return false;
@@ -92,6 +87,7 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx)
try
{
// Open/Create the NuDB key/value store for node objects
backend_->open(!preexist);
if (!backend_->backed())

View File

@@ -152,8 +152,12 @@ private:
// Path to control file
boost::filesystem::path const control_;
// Disk space utilized by the shard
std::uint64_t fileSize_ {0};
// NuDB key/value store for node objects
std::shared_ptr<Backend> backend_;
beast::Journal j_;
// True if shard has its entire ledger range stored