20 #include <ripple/app/ledger/TransactionMaster.h>
21 #include <ripple/app/misc/NetworkOPs.h>
22 #include <ripple/app/misc/SHAMapStoreImp.h>
23 #include <ripple/app/rdb/RelationalDBInterface_global.h>
24 #include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
25 #include <ripple/beast/core/CurrentThreadName.h>
26 #include <ripple/core/ConfigSections.h>
27 #include <ripple/core/Pg.h>
28 #include <ripple/nodestore/impl/DatabaseRotatingImp.h>
30 #include <boost/algorithm/string/predicate.hpp>
97 Throw<std::runtime_error>(
99 "] entry in configuration file");
103 if (boost::iequals(
get(section,
"type"),
"RocksDB"))
105 if (!section.exists(
"cache_mb"))
112 if (!section.exists(
"filter_bits") && (config.NODE_SIZE >= 2))
113 section.set(
"filter_bits",
"10");
122 Throw<std::runtime_error>(
123 "Reporting does not support online_delete. Remove "
124 "online_delete info from config");
143 auto const minInterval = config.standalone()
148 Throw<std::runtime_error>(
149 "online_delete must be at least " +
155 Throw<std::runtime_error>(
156 "online_delete must not be less than ledger_history "
174 Throw<std::runtime_error>(
175 "Reporting does not support online_delete. Remove "
176 "online_delete info from config");
183 state.
writableDb = writableBackend->getName();
184 state.
archiveDb = archiveBackend->getName();
189 auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
192 std::move(writableBackend),
193 std::move(archiveBackend),
261 Throw<std::runtime_error>(
262 "Reporting does not support online_delete. Remove "
263 "online_delete info from config");
297 LedgerIndex const validatedSeq = validatedLedger->info().seq;
300 lastRotated = validatedSeq;
304 bool const readyToRotate =
310 bool const waitForImport = readyToRotate && [
this, lastRotated] {
313 if (
auto sequence = shardStore->getDatabaseImportSequence())
314 return sequence <= lastRotated - 1;
323 <<
"NOT rotating validatedSeq " << validatedSeq
324 <<
" as rotation would interfere with ShardStore import";
328 if (readyToRotate && !waitForImport)
331 <<
"rotating validatedSeq " << validatedSeq <<
" lastRotated "
340 case Health::stopping:
342 case Health::unhealthy:
350 validatedLedger->stateMap().snapShot(
false)->visitNodes(
std::bind(
354 std::placeholders::_1));
357 case Health::stopping:
359 case Health::unhealthy:
366 <<
" nodecount " << nodeCount;
372 case Health::stopping:
374 case Health::unhealthy:
380 JLOG(
journal_.
debug()) << validatedSeq <<
" freshened caches";
385 << validatedSeq <<
" new backend " << newBackend->getName();
390 case Health::stopping:
392 case Health::unhealthy:
398 lastRotated = validatedSeq;
403 savedState.
writableDb = newBackend->getName();
404 savedState.
archiveDb = writableBackendName;
410 return std::move(newBackend);
413 JLOG(
journal_.
warn()) <<
"finished rotation " << validatedSeq;
422 boost::filesystem::path dbPath =
get(section,
"path");
424 if (boost::filesystem::exists(dbPath))
426 if (!boost::filesystem::is_directory(dbPath))
429 <<
"node db path must be a directory. " << dbPath.string();
430 Throw<std::runtime_error>(
"node db path must be a directory.");
435 boost::filesystem::create_directories(dbPath);
446 using namespace boost::filesystem;
447 auto const stored{path(sPath)};
448 if (stored.parent_path() == dbPath)
451 sPath = (dbPath / stored.filename()).
string();
462 bool writableDbExists =
false;
463 bool archiveDbExists =
false;
465 for (boost::filesystem::directory_iterator it(dbPath);
466 it != boost::filesystem::directory_iterator();
470 writableDbExists =
true;
472 archiveDbExists =
true;
474 boost::filesystem::remove_all(it->path());
479 (writableDbExists != archiveDbExists) ||
482 boost::filesystem::path stateDbPathName =
485 stateDbPathName +=
"*";
488 <<
"state db error:\n"
489 <<
" writableDbExists " << writableDbExists <<
" archiveDbExists "
490 << archiveDbExists <<
'\n'
491 <<
" writableDb '" << state.
writableDb <<
"' archiveDb '"
493 <<
"The existing data is in a corrupted state.\n"
494 <<
"To resume operation, remove the files matching "
495 << stateDbPathName.
string() <<
" and contents of the directory "
496 <<
get(section,
"path") <<
'\n'
497 <<
"Optionally, you can move those files to another\n"
498 <<
"location if you wish to analyze or back up the data.\n"
499 <<
"However, there is no guarantee that the data in its\n"
500 <<
"existing form is usable.";
502 Throw<std::runtime_error>(
"state db error");
510 boost::filesystem::path newPath;
518 boost::filesystem::path p =
get(section,
"path");
521 newPath = boost::filesystem::unique_path(p);
523 section.set(
"path", newPath.string());
547 <<
"Begin: Look up lowest value of: " << TableName;
548 auto m = getMinSeq();
549 JLOG(
journal_.
trace()) <<
"End: Look up lowest value of: " << TableName;
555 if (min > lastRotated ||
health() != Health::ok)
557 if (min == lastRotated)
560 JLOG(
journal_.
trace()) <<
"Nothing to delete from " << TableName;
564 JLOG(
journal_.
debug()) <<
"start deleting in: " << TableName <<
" from "
565 << min <<
" to " << lastRotated;
566 while (min < lastRotated)
571 <<
" rows with LedgerSeq < " << min <<
" from: " << TableName;
572 deleteBeforeSeq(min);
574 <<
"End: Delete up to " <<
deleteBatch_ <<
" rows with LedgerSeq < "
575 << min <<
" from: " << TableName;
578 if (min < lastRotated)
583 JLOG(
journal_.
debug()) <<
"finished deleting from: " << TableName;
608 Throw<std::runtime_error>(
609 "Reporting does not support online_delete. Remove "
610 "online_delete info from config");
615 JLOG(
journal_.
trace()) <<
"Begin: Clear internal ledgers up to "
618 JLOG(
journal_.
trace()) <<
"End: Clear internal ledgers up to "
656 "AccountTransactions",
673 return Health::stopping;
688 <<
"s for node to get back into sync with network. state: "
690 << age.count() <<
's';
700 <<
". age " << age.count() <<
's';
708 return Health::unhealthy;
743 return std::make_unique<SHAMapStoreImp>(app, scheduler, journal);
Holds a collection of configuration values.
LedgerIndex setCanDelete(LedgerIndex canDelete)
LedgerIndex setCanDelete(soci::session &session, LedgerIndex canDelete)
setCanDelete Updates ledger sequence which can be deleted.
virtual Family & getNodeFamily()=0
virtual std::shared_ptr< TreeNodeCache > getTreeNodeCache(std::uint32_t ledgerSeq)=0
Return a pointer to the Family Tree Node Cache.
NodeStore::Scheduler & scheduler_
SHAMapStoreImp(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
Persistency layer for NodeObject.
void clearLedgerCachePrior(LedgerIndex seq)
Stream trace() const
Severity stream access functions.
std::shared_ptr< Ledger const > newLedger_
void onLedgerClosed(std::shared_ptr< Ledger const > const &ledger) override
Called by LedgerMaster every time a ledger validates.
static const std::uint32_t minimumDeletionInterval_
const std::string dbPrefix_
LedgerMaster * ledgerMaster_
void setSavedState(soci::session &session, SavedState const &state)
setSavedState Saves given state.
virtual RelationalDBInterface & getRelationalDBInterface()=0
std::optional< std::chrono::seconds > recoveryWaitTime_
If set, and the node is out of sync during an online_delete health check, sleep the thread for this t...
virtual void deleteBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteBeforeLedgerSeq Deletes all ledgers with given sequence and all sequences below it.
virtual NodeStore::DatabaseShard * getShardStore()=0
virtual OperatingMode getOperatingMode() const =0
const beast::Journal journal_
std::condition_variable cond_
virtual std::shared_ptr< FullBelowCache > getFullBelowCache(std::uint32_t ledgerSeq)=0
Return a pointer to the Family Full Below Cache.
LedgerIndex getCanDelete()
virtual std::optional< LedgerIndex > getTransactionsMinLedgerSeq()=0
getTransactionsMinLedgerSeq Returns minimum ledger sequence among records in the Transactions table.
static constexpr auto nodeStoreName_
virtual void deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteTransactionsBeforeLedgerSeq Deletes all transactions with given ledger sequence and all sequenc...
virtual NetworkOPs & getOPs()=0
bool copyNode(std::uint64_t &nodeCount, SHAMapTreeNode const &node)
NodeStore::DatabaseRotating * dbRotating_
virtual std::optional< LedgerIndex > getMinLedgerSeq()=0
getMinLedgerSeq Returns minimum ledger sequence in Ledgers table.
bool get_if_exists(Section const §ion, std::string const &name, T &v)
@ SYNCING
fallen slightly behind
void clearSql(LedgerIndex lastRotated, std::string const &TableName, std::function< std::optional< LedgerIndex >()> const &getMinSeq, std::function< void(LedgerIndex)> const &deleteBeforeSeq)
delete from sqlite table in batches to not lock the db excessively.
int getValueFor(SizedItem item, std::optional< std::size_t > node=std::nullopt) const
Retrieve the default value for the item at the specified node size.
virtual LedgerMaster & getLedgerMaster()=0
virtual Config & config()=0
constexpr auto megabytes(T value) noexcept
std::atomic< LedgerIndex > minimumOnline_
void setState(SavedState const &state)
TreeNodeCache * treeNodeCache_
std::atomic< bool > working_
std::uint32_t deleteInterval_
std::chrono::seconds ageThreshold_
std::optional< LedgerIndex > minSqlSeq()
void initStateDB(soci::session &session, BasicConfig const &config, std::string const &dbName)
initStateDB Opens DB session with State DB.
void legacy(std::string const §ion, std::string value)
Set a value that is not a key/value pair.
TaggedCache< uint256, Transaction > & getCache()
void clearPrior(LedgerIndex lastRotated)
A generic endpoint for log messages.
Scheduling for asynchronous backend activity.
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous)
Fetch a node object.
SavedState getSavedState(soci::session &session)
getSavedState Returns saved state.
const std::uint64_t checkHealthInterval_
SHAMapHash const & getHash() const
Return the hash of this node.
std::chrono::milliseconds backOff_
virtual void deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteAccountTransactionsBeforeLedgerSeq Deletes all account transactions with given ledger sequence ...
void init(BasicConfig const &config, std::string const &dbName)
std::chrono::seconds getValidatedLedgerAge()
const std::string dbName_
void setLastRotated(soci::session &session, LedgerIndex seq)
setLastRotated Updates last rotated ledger sequence.
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
int fdRequired() const override
Returns the number of file descriptors that are needed.
virtual std::unique_ptr< Backend > make_Backend(Section const ¶meters, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal)=0
Create a backend.
beast::Journal journal(std::string const &name)
LedgerIndex getCanDelete(soci::session &session)
getCanDelete Returns ledger sequence which can be deleted.
void setLastRotated(LedgerIndex seq)
std::atomic< LedgerIndex > canDelete_
static const std::uint32_t minimumDeletionIntervalSA_
std::uint32_t deleteBatch_
FullBelowCache * fullBelowCache_
std::optional< LedgerIndex > minimumOnline() const override
The minimum ledger to try and maintain in our database.
void clearCaches(LedgerIndex validatedSeq)
virtual std::string strOperatingMode(OperatingMode const mode, bool const admin=false) const =0
OperatingMode
Specifies the mode under which the server believes it's operating.
std::unique_ptr< NodeStore::Backend > makeBackendRotating(std::string path=std::string())
uint256 const & as_uint256() const
static Manager & instance()
Returns the instance of the manager singleton.
virtual void rotateWithLock(std::function< std::unique_ptr< NodeStore::Backend >(std::string const &writableBackendName)> const &f)=0
Rotates the backends.
std::unique_ptr< NodeStore::Database > makeNodeStore(std::int32_t readThreads) override
virtual std::unique_ptr< Database > make_Database(std::size_t burstSize, Scheduler &scheduler, int readThreads, Section const &backendParameters, beast::Journal journal)=0
Construct a NodeStore database.
virtual std::optional< LedgerIndex > getAccountTransactionsMinLedgerSeq()=0
getAccountTransactionsMinLedgerSeq Returns minimum ledger sequence among records in the AccountTransa...
std::unique_ptr< SHAMapStore > make_SHAMapStore(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
bool freshenCache(CacheInstance &cache)
void clearPriorLedgers(LedgerIndex seq)
Holds unparsed configuration information.
static std::string nodeDatabase()
void rendezvous() const override
T & get(EitherAmount &amt)
Section & section(std::string const &name)
Returns the section with the given name.
std::condition_variable rendezvous_
virtual TransactionMaster & getMasterTransaction()=0
@ FULL
we have the ledger and can even validate