mirror of
https://github.com/XRPLF/rippled.git
synced 2026-01-12 02:35:22 +00:00
Improve online_delete configuration and DB tuning:
* Document delete_batch, back_off_milliseconds, age_threshold_seconds. * Convert those time values to chrono types. * Fix bug that ignored age_threshold_seconds. * Add a "recovery buffer" to the config that gives the node a chance to recover before aborting online delete. * Add begin/end log messages around the SQL queries. * Add a new configuration section: [sqlite] to allow tuning the sqlite database operations. Ignored on full/large history servers. * Update documentation of [node_db] and [sqlite] in the rippled-example.cfg file. * Resolves #3321
This commit is contained in:
@@ -869,18 +869,62 @@
|
||||
#
|
||||
# These keys are possible for any type of backend:
|
||||
#
|
||||
# earliest_seq The default is 32570 to match the XRP ledger
|
||||
# network's earliest allowed sequence. Alternate
|
||||
# networks may set this value. Minimum value of 1.
|
||||
# If a [shard_db] section is defined, and this
|
||||
# value is present either [node_db] or [shard_db],
|
||||
# it must be defined with the same value in both
|
||||
# sections.
|
||||
#
|
||||
# online_delete Minimum value of 256. Enable automatic purging
|
||||
# of older ledger information. Maintain at least this
|
||||
# number of ledger records online. Must be greater
|
||||
# than or equal to ledger_history.
|
||||
#
|
||||
# advisory_delete 0 for disabled, 1 for enabled. If set, then
|
||||
# require administrative RPC call "can_delete"
|
||||
# to enable online deletion of ledger records.
|
||||
# These keys modify the behavior of online_delete, and thus are only
|
||||
# relevant if online_delete is defined and non-zero:
|
||||
#
|
||||
# earliest_seq The default is 32570 to match the XRP ledger
|
||||
# network's earliest allowed sequence. Alternate
|
||||
# networks may set this value. Minimum value of 1.
|
||||
# advisory_delete 0 for disabled, 1 for enabled. If set, the
|
||||
# administrative RPC call "can_delete" is required
|
||||
# to enable online deletion of ledger records.
|
||||
# Online deletion does not run automatically if
|
||||
# non-zero and the last deletion was on a ledger
|
||||
# greater than the current "can_delete" setting.
|
||||
# Default is 0.
|
||||
#
|
||||
# delete_batch When automatically purging, SQLite database
|
||||
# records are deleted in batches. This value
|
||||
# controls the maximum size of each batch. Larger
|
||||
# batches keep the databases locked for more time,
|
||||
# which may cause other functions to fall behind,
|
||||
# and thus cause the node to lose sync.
|
||||
# Default is 100.
|
||||
#
|
||||
# back_off_milliseconds
|
||||
# Number of milliseconds to wait between
|
||||
# online_delete batches to allow other functions
|
||||
# to catch up.
|
||||
# Default is 100.
|
||||
#
|
||||
# age_threshold_seconds
|
||||
# The online delete process will only run if the
|
||||
# latest validated ledger is younger than this
|
||||
# number of seconds.
|
||||
# Default is 60.
|
||||
#
|
||||
# recovery_buffer_seconds
|
||||
# The online delete process checks periodically
|
||||
# that rippled is still in sync with the network,
|
||||
# and that the validated ledger is less than
|
||||
# *age_threshold_seconds* old. By default, if it
|
||||
# is not the online delete process aborts and
|
||||
# tries again later. If *recovery_buffer_seconds*
|
||||
# is set, online delete will wait this number of
|
||||
# seconds for rippled to recover before it aborts.
|
||||
# Set this value if the node is otherwise staying
|
||||
# in sync, or recovering quickly.
|
||||
# Default is unset.
|
||||
#
|
||||
# Notes:
|
||||
# The 'node_db' entry configures the primary, persistent storage.
|
||||
@@ -892,6 +936,12 @@
|
||||
# [import_db] Settings for performing a one-time import (optional)
|
||||
# [database_path] Path to the book-keeping databases.
|
||||
#
|
||||
# There are 4 or 5 bookkeeping SQLite database that the server creates and
|
||||
# maintains. If you omit this configuration setting, it will default to
|
||||
# creating a directory called "db" located in the same place as your
|
||||
# rippled.cfg file. Partial pathnames will be considered relative to
|
||||
# the location of the rippled executable.
|
||||
#
|
||||
# [shard_db] Settings for the Shard Database (optional)
|
||||
#
|
||||
# Format (without spaces):
|
||||
@@ -907,12 +957,64 @@
|
||||
#
|
||||
# max_size_gb Maximum disk space the database will utilize (in gigabytes)
|
||||
#
|
||||
# [sqlite] Tuning settings for the SQLite databases (optional)
|
||||
#
|
||||
# There are 4 bookkeeping SQLite database that the server creates and
|
||||
# maintains. If you omit this configuration setting, it will default to
|
||||
# creating a directory called "db" located in the same place as your
|
||||
# rippled.cfg file. Partial pathnames will be considered relative to
|
||||
# the location of the rippled executable.
|
||||
# Format (without spaces):
|
||||
# One or more lines of case-insensitive key / value pairs:
|
||||
# <key> '=' <value>
|
||||
# ...
|
||||
#
|
||||
# Example:
|
||||
# sync_level=low
|
||||
# journal_mode=off
|
||||
#
|
||||
# WARNING: These settings can have significant effects on data integrity,
|
||||
# particularly in failure scenarios. It is strongly recommended that they
|
||||
# be left at their defaults unless the server is having performance issues
|
||||
# during normal operation or during automatic purging (online_delete)
|
||||
# operations.
|
||||
#
|
||||
# Optional keys:
|
||||
#
|
||||
# safety_level Valid values: high, low
|
||||
# The default is "high", and tunes the SQLite
|
||||
# databases in the most reliable mode. "low"
|
||||
# is equivalent to
|
||||
# journal_mode=memory
|
||||
# synchronous=off
|
||||
# temp_store=memory
|
||||
# These settings trade speed and reduced I/O
|
||||
# for a higher risk of data loss. See the
|
||||
# individual settings below for more information.
|
||||
#
|
||||
# journal_mode Valid values: delete, truncate, persist, memory, wal, off
|
||||
# The default is "wal", which uses a write-ahead
|
||||
# log to implement database transactions.
|
||||
# Alternately, "memory" saves disk I/O, but if
|
||||
# rippled crashes during a transaction, the
|
||||
# database is likely to be corrupted.
|
||||
# See https://www.sqlite.org/pragma.html#pragma_journal_mode
|
||||
# for more details about the available options.
|
||||
#
|
||||
# synchronous Valid values: off, normal, full, extra
|
||||
# The default is "normal", which works well with
|
||||
# the "wal" journal mode. Alternatively, "off"
|
||||
# allows rippled to continue as soon as data is
|
||||
# passed to the OS, which can significantly
|
||||
# increase speed, but risks data corruption if
|
||||
# the host computer crashes before writing that
|
||||
# data to disk.
|
||||
# See https://www.sqlite.org/pragma.html#pragma_synchronous
|
||||
# for more details about the available options.
|
||||
#
|
||||
# temp_store Valid values: default, file, memory
|
||||
# The default is "file", which will use files
|
||||
# for temporary database tables and indices.
|
||||
# Alternatively, "memory" may save I/O, but
|
||||
# rippled does not currently use many, if any,
|
||||
# of these temporary objects.
|
||||
# See https://www.sqlite.org/pragma.html#pragma_temp_store
|
||||
# for more details about the available options.
|
||||
#
|
||||
#
|
||||
#
|
||||
@@ -1212,23 +1314,24 @@ medium
|
||||
|
||||
# This is primary persistent datastore for rippled. This includes transaction
|
||||
# metadata, account states, and ledger headers. Helpful information can be
|
||||
# found here: https://ripple.com/wiki/NodeBackEnd
|
||||
# delete old ledgers while maintaining at least 2000. Do not require an
|
||||
# external administrative command to initiate deletion.
|
||||
# found at https://xrpl.org/capacity-planning.html#node-db-type
|
||||
# type=NuDB is recommended for non-validators with fast SSDs. Validators or
|
||||
# slow / spinning disks should use RocksDB.
|
||||
# online_delete=512 is recommended to delete old ledgers while maintaining at
|
||||
# least 512.
|
||||
# advisory_delete=0 allows the online delete process to run automatically
|
||||
# when the node has approximately two times the "online_delete" value of
|
||||
# ledgers. No external administrative command is required to initiate
|
||||
# deletion.
|
||||
[node_db]
|
||||
type=RocksDB
|
||||
path=/var/lib/rippled/db/rocksdb
|
||||
open_files=2000
|
||||
filter_bits=12
|
||||
cache_mb=256
|
||||
file_size_mb=8
|
||||
file_size_mult=2
|
||||
online_delete=2000
|
||||
type=NuDB
|
||||
path=/var/lib/rippled/db/nudb
|
||||
online_delete=512
|
||||
advisory_delete=0
|
||||
|
||||
# This is the persistent datastore for shards. It is important for the health
|
||||
# of the ripple network that rippled operators shard as much as practical.
|
||||
# NuDB requires SSD storage. Helpful information can be found here
|
||||
# NuDB requires SSD storage. Helpful information can be found at
|
||||
# https://ripple.com/build/history-sharding
|
||||
#[shard_db]
|
||||
#path=/var/lib/rippled/db/shards/nudb
|
||||
|
||||
@@ -1001,7 +1001,9 @@ void
|
||||
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
|
||||
{
|
||||
if (!positions && app_.getOPs().isFull())
|
||||
{
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -228,14 +228,14 @@ Ledger::Ledger(
|
||||
!txMap_->fetchRoot(SHAMapHash{info_.txHash}, nullptr))
|
||||
{
|
||||
loaded = false;
|
||||
JLOG(j.warn()) << "Don't have TX root for ledger";
|
||||
JLOG(j.warn()) << "Don't have TX root for ledger" << info_.seq;
|
||||
}
|
||||
|
||||
if (info_.accountHash.isNonZero() &&
|
||||
!stateMap_->fetchRoot(SHAMapHash{info_.accountHash}, nullptr))
|
||||
{
|
||||
loaded = false;
|
||||
JLOG(j.warn()) << "Don't have AS root for ledger";
|
||||
JLOG(j.warn()) << "Don't have AS root for ledger" << info_.seq;
|
||||
}
|
||||
|
||||
txMap_->setImmutable();
|
||||
|
||||
@@ -1026,7 +1026,7 @@ public:
|
||||
|
||||
// transaction database
|
||||
mTxnDB = std::make_unique<DatabaseCon>(
|
||||
setup, TxDBName, TxDBPragma, TxDBInit);
|
||||
setup, TxDBName, true, TxDBPragma, TxDBInit);
|
||||
mTxnDB->getSession() << boost::str(
|
||||
boost::format("PRAGMA cache_size=-%d;") %
|
||||
kilobytes(config_->getValueFor(SizedItem::txnDBCache)));
|
||||
@@ -1065,7 +1065,7 @@ public:
|
||||
|
||||
// ledger database
|
||||
mLedgerDB = std::make_unique<DatabaseCon>(
|
||||
setup, LgrDBName, LgrDBPragma, LgrDBInit);
|
||||
setup, LgrDBName, true, LgrDBPragma, LgrDBInit);
|
||||
mLedgerDB->getSession() << boost::str(
|
||||
boost::format("PRAGMA cache_size=-%d;") %
|
||||
kilobytes(config_->getValueFor(SizedItem::lgrDBCache)));
|
||||
@@ -1075,6 +1075,7 @@ public:
|
||||
mWalletDB = std::make_unique<DatabaseCon>(
|
||||
setup,
|
||||
WalletDBName,
|
||||
false,
|
||||
std::array<char const*, 0>(),
|
||||
WalletDBInit);
|
||||
}
|
||||
|
||||
@@ -26,13 +26,23 @@ namespace ripple {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// These pragmas are built at startup and applied to all database
|
||||
// connections, unless otherwise noted.
|
||||
inline constexpr char const* CommonDBPragmaJournal{"PRAGMA journal_mode=%s;"};
|
||||
inline constexpr char const* CommonDBPragmaSync{"PRAGMA synchronous=%s;"};
|
||||
inline constexpr char const* CommonDBPragmaTemp{"PRAGMA temp_store=%s;"};
|
||||
// Default values will always be used for the common pragmas if
|
||||
// at least this much ledger history is configured. This includes
|
||||
// full history nodes. This is because such a large amount of data will
|
||||
// be more difficult to recover if a rare failure occurs, which are
|
||||
// more likely with some of the other available tuning settings.
|
||||
inline constexpr std::uint32_t SQLITE_TUNING_CUTOFF = 100'000'000;
|
||||
|
||||
// Ledger database holds ledgers and ledger confirmations
|
||||
inline constexpr auto LgrDBName{"ledger.db"};
|
||||
|
||||
inline constexpr std::array<char const*, 3> LgrDBPragma{
|
||||
{"PRAGMA synchronous=NORMAL;",
|
||||
"PRAGMA journal_mode=WAL;",
|
||||
"PRAGMA journal_size_limit=1582080;"}};
|
||||
inline constexpr std::array<char const*, 1> LgrDBPragma{
|
||||
{"PRAGMA journal_size_limit=1582080;"}};
|
||||
|
||||
inline constexpr std::array<char const*, 5> LgrDBInit{
|
||||
{"BEGIN TRANSACTION;",
|
||||
@@ -63,15 +73,14 @@ inline constexpr auto TxDBName{"transaction.db"};
|
||||
|
||||
inline constexpr
|
||||
#if (ULONG_MAX > UINT_MAX) && !defined(NO_SQLITE_MMAP)
|
||||
std::array<char const*, 6>
|
||||
std::array<char const*, 4>
|
||||
TxDBPragma
|
||||
{
|
||||
{
|
||||
#else
|
||||
std::array<char const*, 5> TxDBPragma {{
|
||||
std::array<char const*, 3> TxDBPragma {{
|
||||
#endif
|
||||
"PRAGMA page_size=4096;", "PRAGMA synchronous=NORMAL;",
|
||||
"PRAGMA journal_mode=WAL;", "PRAGMA journal_size_limit=1582080;",
|
||||
"PRAGMA page_size=4096;", "PRAGMA journal_size_limit=1582080;",
|
||||
"PRAGMA max_page_count=2147483646;",
|
||||
#if (ULONG_MAX > UINT_MAX) && !defined(NO_SQLITE_MMAP)
|
||||
"PRAGMA mmap_size=17179869184;"
|
||||
@@ -115,10 +124,8 @@ inline constexpr std::array<char const*, 8> TxDBInit{
|
||||
// Temporary database used with an incomplete shard that is being acquired
|
||||
inline constexpr auto AcquireShardDBName{"acquire.db"};
|
||||
|
||||
inline constexpr std::array<char const*, 3> AcquireShardDBPragma{
|
||||
{"PRAGMA synchronous=NORMAL;",
|
||||
"PRAGMA journal_mode=WAL;",
|
||||
"PRAGMA journal_size_limit=1582080;"}};
|
||||
inline constexpr std::array<char const*, 1> AcquireShardDBPragma{
|
||||
{"PRAGMA journal_size_limit=1582080;"}};
|
||||
|
||||
inline constexpr std::array<char const*, 1> AcquireShardDBInit{
|
||||
{"CREATE TABLE IF NOT EXISTS Shard ( \
|
||||
@@ -130,6 +137,7 @@ inline constexpr std::array<char const*, 1> AcquireShardDBInit{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Pragma for Ledger and Transaction databases with complete shards
|
||||
// These override the CommonDBPragma values defined above.
|
||||
inline constexpr std::array<char const*, 2> CompleteShardDBPragma{
|
||||
{"PRAGMA synchronous=OFF;", "PRAGMA journal_mode=OFF;"}};
|
||||
|
||||
@@ -172,6 +180,7 @@ inline constexpr std::array<char const*, 6> WalletDBInit{
|
||||
|
||||
static constexpr auto stateDBName{"state.db"};
|
||||
|
||||
// These override the CommonDBPragma values defined above.
|
||||
static constexpr std::array<char const*, 2> DownloaderDBPragma{
|
||||
{"PRAGMA synchronous=FULL;", "PRAGMA journal_mode=DELETE;"}};
|
||||
|
||||
|
||||
@@ -542,7 +542,7 @@ run(int argc, char** argv)
|
||||
}
|
||||
|
||||
auto txnDB = std::make_unique<DatabaseCon>(
|
||||
dbSetup, TxDBName, TxDBPragma, TxDBInit);
|
||||
dbSetup, TxDBName, false, TxDBPragma, TxDBInit);
|
||||
auto& session = txnDB->getSession();
|
||||
std::uint32_t pageSize;
|
||||
|
||||
@@ -555,7 +555,9 @@ run(int argc, char** argv)
|
||||
session << "PRAGMA temp_store_directory=\"" << tmpPath.string()
|
||||
<< "\";";
|
||||
session << "VACUUM;";
|
||||
session << "PRAGMA journal_mode=WAL;";
|
||||
assert(dbSetup.CommonPragma);
|
||||
for (auto const& p : *dbSetup.CommonPragma)
|
||||
session << p;
|
||||
session << "PRAGMA page_size;", soci::into(pageSize);
|
||||
|
||||
std::cout << "VACUUM finished. page_size: " << pageSize
|
||||
|
||||
@@ -180,13 +180,24 @@ SHAMapStoreImp::SHAMapStoreImp(
|
||||
section.set("filter_bits", "10");
|
||||
}
|
||||
|
||||
get_if_exists(section, "delete_batch", deleteBatch_);
|
||||
get_if_exists(section, "backOff", backOff_);
|
||||
get_if_exists(section, "age_threshold", ageThreshold_);
|
||||
get_if_exists(section, "online_delete", deleteInterval_);
|
||||
|
||||
if (deleteInterval_)
|
||||
{
|
||||
// Configuration that affects the behavior of online delete
|
||||
get_if_exists(section, "delete_batch", deleteBatch_);
|
||||
std::uint32_t temp;
|
||||
if (get_if_exists(section, "back_off_milliseconds", temp) ||
|
||||
// Included for backward compaibility with an undocumented setting
|
||||
get_if_exists(section, "backOff", temp))
|
||||
{
|
||||
backOff_ = std::chrono::milliseconds{temp};
|
||||
}
|
||||
if (get_if_exists(section, "age_threshold_seconds", temp))
|
||||
ageThreshold_ = std::chrono::seconds{temp};
|
||||
if (get_if_exists(section, "recovery_buffer_seconds", temp))
|
||||
recoveryBuffer_.emplace(std::chrono::seconds{temp});
|
||||
|
||||
get_if_exists(section, "advisory_delete", advisoryDelete_);
|
||||
|
||||
auto const minInterval = config.standalone()
|
||||
@@ -348,23 +359,14 @@ SHAMapStoreImp::run()
|
||||
|
||||
// will delete up to (not including) lastRotated
|
||||
if (validatedSeq >= lastRotated + deleteInterval_ &&
|
||||
canDelete_ >= lastRotated - 1)
|
||||
canDelete_ >= lastRotated - 1 && !health())
|
||||
{
|
||||
JLOG(journal_.warn())
|
||||
<< "rotating validatedSeq " << validatedSeq << " lastRotated "
|
||||
<< lastRotated << " deleteInterval " << deleteInterval_
|
||||
<< " canDelete_ " << canDelete_;
|
||||
|
||||
switch (health())
|
||||
{
|
||||
case Health::stopping:
|
||||
stopped();
|
||||
return;
|
||||
case Health::unhealthy:
|
||||
continue;
|
||||
case Health::ok:
|
||||
default:;
|
||||
}
|
||||
<< " canDelete_ " << canDelete_ << " state "
|
||||
<< app_.getOPs().strOperatingMode(false) << " age "
|
||||
<< ledgerMaster_->getValidatedLedgerAge().count() << 's';
|
||||
|
||||
clearPrior(lastRotated);
|
||||
switch (health())
|
||||
@@ -378,27 +380,29 @@ SHAMapStoreImp::run()
|
||||
default:;
|
||||
}
|
||||
|
||||
JLOG(journal_.trace()) << "copying ledger " << validatedSeq;
|
||||
std::uint64_t nodeCount = 0;
|
||||
validatedLedger->stateMap().snapShot(false)->visitNodes(std::bind(
|
||||
&SHAMapStoreImp::copyNode,
|
||||
this,
|
||||
std::ref(nodeCount),
|
||||
std::placeholders::_1));
|
||||
switch (health())
|
||||
{
|
||||
case Health::stopping:
|
||||
stopped();
|
||||
return;
|
||||
case Health::unhealthy:
|
||||
continue;
|
||||
case Health::ok:
|
||||
default:;
|
||||
}
|
||||
// Only log if we completed without a "health" abort
|
||||
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
|
||||
<< " nodecount " << nodeCount;
|
||||
switch (health())
|
||||
{
|
||||
case Health::stopping:
|
||||
stopped();
|
||||
return;
|
||||
case Health::unhealthy:
|
||||
continue;
|
||||
case Health::ok:
|
||||
default:;
|
||||
}
|
||||
|
||||
JLOG(journal_.trace()) << "freshening caches";
|
||||
freshenCaches();
|
||||
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
|
||||
switch (health())
|
||||
{
|
||||
case Health::stopping:
|
||||
@@ -409,7 +413,10 @@ SHAMapStoreImp::run()
|
||||
case Health::ok:
|
||||
default:;
|
||||
}
|
||||
// Only log if we completed without a "health" abort
|
||||
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
|
||||
|
||||
JLOG(journal_.trace()) << "Making a new backend";
|
||||
auto newBackend = makeBackendRotating();
|
||||
JLOG(journal_.debug())
|
||||
<< validatedSeq << " new backend " << newBackend->getName();
|
||||
@@ -566,12 +573,21 @@ SHAMapStoreImp::clearSql(
|
||||
std::string const& minQuery,
|
||||
std::string const& deleteQuery)
|
||||
{
|
||||
assert(deleteInterval_);
|
||||
LedgerIndex min = std::numeric_limits<LedgerIndex>::max();
|
||||
|
||||
{
|
||||
auto db = database.checkoutDb();
|
||||
boost::optional<std::uint64_t> m;
|
||||
*db << minQuery, soci::into(m);
|
||||
JLOG(journal_.trace())
|
||||
<< "Begin: Look up lowest value of: " << minQuery;
|
||||
if (auto db = database.checkoutDb())
|
||||
*db << minQuery, soci::into(m);
|
||||
else
|
||||
{
|
||||
assert(false);
|
||||
return false;
|
||||
}
|
||||
JLOG(journal_.trace()) << "End: Look up lowest value of: " << minQuery;
|
||||
if (!m)
|
||||
return false;
|
||||
min = *m;
|
||||
@@ -579,6 +595,12 @@ SHAMapStoreImp::clearSql(
|
||||
|
||||
if (min > lastRotated || health() != Health::ok)
|
||||
return false;
|
||||
if (min == lastRotated)
|
||||
{
|
||||
// Micro-optimization mainly to clarify logs
|
||||
JLOG(journal_.trace()) << "Nothing to delete from " << deleteQuery;
|
||||
return true;
|
||||
}
|
||||
|
||||
boost::format formattedDeleteQuery(deleteQuery);
|
||||
|
||||
@@ -587,14 +609,27 @@ SHAMapStoreImp::clearSql(
|
||||
while (min < lastRotated)
|
||||
{
|
||||
min = std::min(lastRotated, min + deleteBatch_);
|
||||
JLOG(journal_.trace()) << "Begin: Delete up to " << deleteBatch_
|
||||
<< " rows with LedgerSeq < " << min
|
||||
<< " using query: " << deleteQuery;
|
||||
if (auto db = database.checkoutDb())
|
||||
{
|
||||
auto db = database.checkoutDb();
|
||||
*db << boost::str(formattedDeleteQuery % min);
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(false);
|
||||
return false;
|
||||
}
|
||||
JLOG(journal_.trace())
|
||||
<< "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < "
|
||||
<< min << " using query: " << deleteQuery;
|
||||
if (health())
|
||||
return true;
|
||||
if (min < lastRotated)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(backOff_));
|
||||
std::this_thread::sleep_for(backOff_);
|
||||
if (health())
|
||||
return true;
|
||||
}
|
||||
JLOG(journal_.debug()) << "finished: " << deleteQuery;
|
||||
return true;
|
||||
@@ -627,7 +662,11 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
|
||||
// Do not allow ledgers to be acquired from the network
|
||||
// that are about to be deleted.
|
||||
minimumOnline_ = lastRotated + 1;
|
||||
JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to "
|
||||
<< lastRotated;
|
||||
ledgerMaster_->clearPriorLedgers(lastRotated);
|
||||
JLOG(journal_.trace()) << "End: Clear internal ledgers up to "
|
||||
<< lastRotated;
|
||||
if (health())
|
||||
return;
|
||||
|
||||
@@ -666,16 +705,32 @@ SHAMapStoreImp::health()
|
||||
}
|
||||
if (!netOPs_)
|
||||
return Health::ok;
|
||||
assert(deleteInterval_);
|
||||
|
||||
constexpr static std::chrono::seconds age_threshold(60);
|
||||
auto age = ledgerMaster_->getValidatedLedgerAge();
|
||||
OperatingMode mode = netOPs_->getOperatingMode();
|
||||
if (mode != OperatingMode::FULL || age > age_threshold)
|
||||
if (healthy_)
|
||||
{
|
||||
JLOG(journal_.warn()) << "Not deleting. state: "
|
||||
<< app_.getOPs().strOperatingMode(mode, false)
|
||||
<< ". age " << age.count() << 's';
|
||||
healthy_ = false;
|
||||
auto age = ledgerMaster_->getValidatedLedgerAge();
|
||||
OperatingMode mode = netOPs_->getOperatingMode();
|
||||
if (recoveryBuffer_ && mode == OperatingMode::SYNCING &&
|
||||
age < ageThreshold_)
|
||||
{
|
||||
JLOG(journal_.warn())
|
||||
<< "Waiting " << recoveryBuffer_->count()
|
||||
<< "s for node to get back into sync with network. state: "
|
||||
<< app_.getOPs().strOperatingMode(mode, false) << ". age "
|
||||
<< age.count() << 's';
|
||||
std::this_thread::sleep_for(*recoveryBuffer_);
|
||||
|
||||
age = ledgerMaster_->getValidatedLedgerAge();
|
||||
mode = netOPs_->getOperatingMode();
|
||||
}
|
||||
if (mode != OperatingMode::FULL || age > ageThreshold_)
|
||||
{
|
||||
JLOG(journal_.warn()) << "Not deleting. state: "
|
||||
<< app_.getOPs().strOperatingMode(mode, false)
|
||||
<< ". age " << age.count() << 's';
|
||||
healthy_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (healthy_)
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <ripple/core/DatabaseCon.h>
|
||||
#include <ripple/nodestore/DatabaseRotating.h>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <thread>
|
||||
|
||||
@@ -106,8 +107,9 @@ private:
|
||||
std::uint32_t deleteInterval_ = 0;
|
||||
bool advisoryDelete_ = false;
|
||||
std::uint32_t deleteBatch_ = 100;
|
||||
std::uint32_t backOff_ = 100;
|
||||
std::int32_t ageThreshold_ = 60;
|
||||
std::chrono::milliseconds backOff_{100};
|
||||
std::chrono::seconds ageThreshold_{60};
|
||||
boost::optional<std::chrono::seconds> recoveryBuffer_{};
|
||||
|
||||
// these do not exist upon SHAMapStore creation, but do exist
|
||||
// as of onPrepare() or before
|
||||
|
||||
@@ -89,12 +89,17 @@ public:
|
||||
Config::StartUpType startUp = Config::NORMAL;
|
||||
bool standAlone = false;
|
||||
boost::filesystem::path dataDir;
|
||||
static std::unique_ptr<std::vector<std::string> const> CommonPragma;
|
||||
/// Shortcut used by the database connections that ignore the common
|
||||
/// pragma strings
|
||||
static const std::vector<std::string> NoCommonPragma;
|
||||
};
|
||||
|
||||
template <std::size_t N, std::size_t M>
|
||||
DatabaseCon(
|
||||
Setup const& setup,
|
||||
std::string const& DBName,
|
||||
bool useCommonPragma,
|
||||
std::array<char const*, N> const& pragma,
|
||||
std::array<char const*, M> const& initSQL)
|
||||
{
|
||||
@@ -106,7 +111,12 @@ public:
|
||||
boost::filesystem::path pPath =
|
||||
useTempFiles ? "" : (setup.dataDir / DBName);
|
||||
|
||||
init(pPath, pragma, initSQL);
|
||||
assert(!useCommonPragma || setup.CommonPragma);
|
||||
init(
|
||||
pPath,
|
||||
useCommonPragma ? *setup.CommonPragma : setup.NoCommonPragma,
|
||||
pragma,
|
||||
initSQL);
|
||||
}
|
||||
|
||||
template <std::size_t N, std::size_t M>
|
||||
@@ -116,7 +126,7 @@ public:
|
||||
std::array<char const*, N> const& pragma,
|
||||
std::array<char const*, M> const& initSQL)
|
||||
{
|
||||
init((dataDir / DBName), pragma, initSQL);
|
||||
init((dataDir / DBName), {}, pragma, initSQL);
|
||||
}
|
||||
|
||||
soci::session&
|
||||
@@ -139,11 +149,17 @@ private:
|
||||
void
|
||||
init(
|
||||
boost::filesystem::path const& pPath,
|
||||
std::vector<std::string> const& commonPragma,
|
||||
std::array<char const*, N> const& pragma,
|
||||
std::array<char const*, M> const& initSQL)
|
||||
{
|
||||
open(session_, "sqlite", pPath.string());
|
||||
|
||||
for (auto const& p : commonPragma)
|
||||
{
|
||||
soci::statement st = session_.prepare << p;
|
||||
st.execute(true);
|
||||
}
|
||||
for (auto const& p : pragma)
|
||||
{
|
||||
soci::statement st = session_.prepare << p;
|
||||
|
||||
@@ -442,7 +442,7 @@ Config::loadFromString(std::string const& fileContents)
|
||||
if (getSingleSection(secConfig, SECTION_LEDGER_HISTORY, strTemp, j_))
|
||||
{
|
||||
if (boost::iequals(strTemp, "full"))
|
||||
LEDGER_HISTORY = 1000000000u;
|
||||
LEDGER_HISTORY = 1'000'000'000u;
|
||||
else if (boost::iequals(strTemp, "none"))
|
||||
LEDGER_HISTORY = 0;
|
||||
else
|
||||
@@ -454,7 +454,7 @@ Config::loadFromString(std::string const& fileContents)
|
||||
if (boost::iequals(strTemp, "none"))
|
||||
FETCH_DEPTH = 0;
|
||||
else if (boost::iequals(strTemp, "full"))
|
||||
FETCH_DEPTH = 1000000000u;
|
||||
FETCH_DEPTH = 1'000'000'000u;
|
||||
else
|
||||
FETCH_DEPTH = beast::lexicalCastThrow<std::uint32_t>(strTemp);
|
||||
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
#include <ripple/basics/contract.h>
|
||||
#include <ripple/core/DatabaseCon.h>
|
||||
#include <ripple/core/SociDB.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/format.hpp>
|
||||
#include <memory>
|
||||
|
||||
namespace ripple {
|
||||
@@ -38,9 +40,101 @@ setup_DatabaseCon(Config const& c)
|
||||
Throw<std::runtime_error>("database_path must be set.");
|
||||
}
|
||||
|
||||
if (!setup.CommonPragma)
|
||||
{
|
||||
setup.CommonPragma = [&c]() {
|
||||
auto const& sqlite = c.section("sqlite");
|
||||
auto result = std::make_unique<std::vector<std::string>>();
|
||||
result->reserve(3);
|
||||
|
||||
// defaults
|
||||
std::string safety_level = "high";
|
||||
std::string journal_mode = "wal";
|
||||
std::string synchronous = "normal";
|
||||
std::string temp_store = "file";
|
||||
|
||||
if (c.LEDGER_HISTORY < SQLITE_TUNING_CUTOFF &&
|
||||
set(safety_level, "safety_level", sqlite))
|
||||
{
|
||||
if (boost::iequals(safety_level, "low"))
|
||||
{
|
||||
// low safety defaults
|
||||
journal_mode = "memory";
|
||||
synchronous = "off";
|
||||
temp_store = "memory";
|
||||
}
|
||||
else if (!boost::iequals(safety_level, "high"))
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid safety_level value: " + safety_level);
|
||||
}
|
||||
}
|
||||
|
||||
// #journal_mode Valid values : delete, truncate, persist, memory,
|
||||
// wal, off
|
||||
if (c.LEDGER_HISTORY < SQLITE_TUNING_CUTOFF)
|
||||
set(journal_mode, "journal_mode", sqlite);
|
||||
if (boost::iequals(journal_mode, "delete") ||
|
||||
boost::iequals(journal_mode, "truncate") ||
|
||||
boost::iequals(journal_mode, "persist") ||
|
||||
boost::iequals(journal_mode, "memory") ||
|
||||
boost::iequals(journal_mode, "wal") ||
|
||||
boost::iequals(journal_mode, "off"))
|
||||
{
|
||||
result->emplace_back(boost::str(
|
||||
boost::format(CommonDBPragmaJournal) % journal_mode));
|
||||
}
|
||||
else
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid journal_mode value: " + journal_mode);
|
||||
}
|
||||
|
||||
//#synchronous Valid values : off, normal, full, extra
|
||||
if (c.LEDGER_HISTORY < SQLITE_TUNING_CUTOFF)
|
||||
set(synchronous, "synchronous", sqlite);
|
||||
if (boost::iequals(synchronous, "off") ||
|
||||
boost::iequals(synchronous, "normal") ||
|
||||
boost::iequals(synchronous, "full") ||
|
||||
boost::iequals(synchronous, "extra"))
|
||||
{
|
||||
result->emplace_back(boost::str(
|
||||
boost::format(CommonDBPragmaSync) % synchronous));
|
||||
}
|
||||
else
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid synchronous value: " + synchronous);
|
||||
}
|
||||
|
||||
// #temp_store Valid values : default, file, memory
|
||||
if (c.LEDGER_HISTORY < SQLITE_TUNING_CUTOFF)
|
||||
set(temp_store, "temp_store", sqlite);
|
||||
if (boost::iequals(temp_store, "default") ||
|
||||
boost::iequals(temp_store, "file") ||
|
||||
boost::iequals(temp_store, "memory"))
|
||||
{
|
||||
result->emplace_back(
|
||||
boost::str(boost::format(CommonDBPragmaTemp) % temp_store));
|
||||
}
|
||||
else
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid temp_store value: " + temp_store);
|
||||
}
|
||||
|
||||
assert(result->size() == 3);
|
||||
return result;
|
||||
}();
|
||||
}
|
||||
|
||||
return setup;
|
||||
}
|
||||
|
||||
std::unique_ptr<std::vector<std::string> const>
|
||||
DatabaseCon::Setup::CommonPragma;
|
||||
const std::vector<std::string> DatabaseCon::Setup::NoCommonPragma;
|
||||
|
||||
void
|
||||
DatabaseCon::setupCheckpointing(JobQueue* q, Logs& l)
|
||||
{
|
||||
|
||||
@@ -51,8 +51,9 @@ DatabaseBody::value_type::open(
|
||||
auto setup = setup_DatabaseCon(config);
|
||||
setup.dataDir = path.parent_path();
|
||||
|
||||
// Downloader ignores the "CommonPragma"
|
||||
conn_ = std::make_unique<DatabaseCon>(
|
||||
setup, "Download", DownloaderDBPragma, DatabaseBodyDBInit);
|
||||
setup, "Download", false, DownloaderDBPragma, DatabaseBodyDBInit);
|
||||
|
||||
path_ = path;
|
||||
|
||||
|
||||
@@ -128,6 +128,7 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx)
|
||||
acquireInfo_->SQLiteDB = std::make_unique<DatabaseCon>(
|
||||
setup,
|
||||
AcquireShardDBName,
|
||||
true,
|
||||
AcquireShardDBPragma,
|
||||
AcquireShardDBInit);
|
||||
acquireInfo_->SQLiteDB->setupCheckpointing(
|
||||
@@ -684,14 +685,14 @@ Shard::initSQLite(std::lock_guard<std::recursive_mutex> const&)
|
||||
if (backendComplete_)
|
||||
{
|
||||
lgrSQLiteDB_ = std::make_unique<DatabaseCon>(
|
||||
setup, LgrDBName, CompleteShardDBPragma, LgrDBInit);
|
||||
setup, LgrDBName, false, CompleteShardDBPragma, LgrDBInit);
|
||||
lgrSQLiteDB_->getSession() << boost::str(
|
||||
boost::format("PRAGMA cache_size=-%d;") %
|
||||
kilobytes(
|
||||
config.getValueFor(SizedItem::lgrDBCache, boost::none)));
|
||||
|
||||
txSQLiteDB_ = std::make_unique<DatabaseCon>(
|
||||
setup, TxDBName, CompleteShardDBPragma, TxDBInit);
|
||||
setup, TxDBName, false, CompleteShardDBPragma, TxDBInit);
|
||||
txSQLiteDB_->getSession() << boost::str(
|
||||
boost::format("PRAGMA cache_size=-%d;") %
|
||||
kilobytes(
|
||||
@@ -701,14 +702,14 @@ Shard::initSQLite(std::lock_guard<std::recursive_mutex> const&)
|
||||
{
|
||||
// The incomplete shard uses a Write Ahead Log for performance
|
||||
lgrSQLiteDB_ = std::make_unique<DatabaseCon>(
|
||||
setup, LgrDBName, LgrDBPragma, LgrDBInit);
|
||||
setup, LgrDBName, true, LgrDBPragma, LgrDBInit);
|
||||
lgrSQLiteDB_->getSession() << boost::str(
|
||||
boost::format("PRAGMA cache_size=-%d;") %
|
||||
kilobytes(config.getValueFor(SizedItem::lgrDBCache)));
|
||||
lgrSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs());
|
||||
|
||||
txSQLiteDB_ = std::make_unique<DatabaseCon>(
|
||||
setup, TxDBName, TxDBPragma, TxDBInit);
|
||||
setup, TxDBName, true, TxDBPragma, TxDBInit);
|
||||
txSQLiteDB_->getSession() << boost::str(
|
||||
boost::format("PRAGMA cache_size=-%d;") %
|
||||
kilobytes(config.getValueFor(SizedItem::txnDBCache)));
|
||||
|
||||
@@ -259,6 +259,7 @@ public:
|
||||
DatabaseCon dbCon(
|
||||
setup,
|
||||
dbName.data(),
|
||||
false,
|
||||
std::array<char const*, 0>(),
|
||||
WalletDBInit);
|
||||
|
||||
|
||||
@@ -18,8 +18,11 @@
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/beast/utility/temp_dir.h>
|
||||
#include <ripple/core/DatabaseCon.h>
|
||||
#include <ripple/nodestore/DummyScheduler.h>
|
||||
#include <ripple/nodestore/Manager.h>
|
||||
#include <test/jtx.h>
|
||||
#include <test/jtx/envconfig.h>
|
||||
#include <test/nodestore/TestBase.h>
|
||||
#include <test/unit_test/SuiteJournal.h>
|
||||
|
||||
@@ -35,6 +38,194 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
testConfig()
|
||||
{
|
||||
testcase("Config");
|
||||
|
||||
using namespace ripple::test::jtx;
|
||||
|
||||
{
|
||||
// defaults
|
||||
Env env(*this);
|
||||
|
||||
auto const s = setup_DatabaseCon(env.app().config());
|
||||
|
||||
if (BEAST_EXPECT(s.CommonPragma->size() == 3))
|
||||
{
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(0) == "PRAGMA journal_mode=wal;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(1) == "PRAGMA synchronous=normal;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(2) == "PRAGMA temp_store=file;");
|
||||
}
|
||||
}
|
||||
{
|
||||
// Low safety level
|
||||
DatabaseCon::Setup::CommonPragma.reset();
|
||||
|
||||
Env env = [&]() {
|
||||
auto p = test::jtx::envconfig();
|
||||
{
|
||||
auto& section = p->section("sqlite");
|
||||
section.set("safety_level", "low");
|
||||
}
|
||||
|
||||
return Env(*this, std::move(p));
|
||||
}();
|
||||
|
||||
auto const s = setup_DatabaseCon(env.app().config());
|
||||
if (BEAST_EXPECT(s.CommonPragma->size() == 3))
|
||||
{
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(0) == "PRAGMA journal_mode=memory;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(1) == "PRAGMA synchronous=off;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(2) == "PRAGMA temp_store=memory;");
|
||||
}
|
||||
}
|
||||
{
|
||||
// Override individual settings
|
||||
DatabaseCon::Setup::CommonPragma.reset();
|
||||
|
||||
Env env = [&]() {
|
||||
auto p = test::jtx::envconfig();
|
||||
{
|
||||
auto& section = p->section("sqlite");
|
||||
section.set("journal_mode", "off");
|
||||
section.set("synchronous", "extra");
|
||||
section.set("temp_store", "default");
|
||||
}
|
||||
|
||||
return Env(*this, std::move(p));
|
||||
}();
|
||||
|
||||
auto const s = setup_DatabaseCon(env.app().config());
|
||||
if (BEAST_EXPECT(s.CommonPragma->size() == 3))
|
||||
{
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(0) == "PRAGMA journal_mode=off;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(1) == "PRAGMA synchronous=extra;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(2) == "PRAGMA temp_store=default;");
|
||||
}
|
||||
}
|
||||
{
|
||||
// Override individual settings with low safety level
|
||||
// (Low doesn't force the other settings)
|
||||
DatabaseCon::Setup::CommonPragma.reset();
|
||||
|
||||
Env env = [&]() {
|
||||
auto p = test::jtx::envconfig();
|
||||
{
|
||||
auto& section = p->section("sqlite");
|
||||
section.set("safety_level", "low");
|
||||
section.set("journal_mode", "off");
|
||||
section.set("synchronous", "extra");
|
||||
section.set("temp_store", "default");
|
||||
}
|
||||
|
||||
return Env(*this, std::move(p));
|
||||
}();
|
||||
|
||||
auto const s = setup_DatabaseCon(env.app().config());
|
||||
if (BEAST_EXPECT(s.CommonPragma->size() == 3))
|
||||
{
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(0) == "PRAGMA journal_mode=off;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(1) == "PRAGMA synchronous=extra;");
|
||||
BEAST_EXPECT(
|
||||
s.CommonPragma->at(2) == "PRAGMA temp_store=default;");
|
||||
}
|
||||
}
|
||||
{
|
||||
// Errors
|
||||
DatabaseCon::Setup::CommonPragma.reset();
|
||||
|
||||
auto p = test::jtx::envconfig();
|
||||
{
|
||||
auto& section = p->section("sqlite");
|
||||
section.set("safety_level", "slow");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Env env(*this, std::move(p));
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
pass();
|
||||
}
|
||||
}
|
||||
{
|
||||
// Errors
|
||||
DatabaseCon::Setup::CommonPragma.reset();
|
||||
|
||||
auto p = test::jtx::envconfig();
|
||||
{
|
||||
auto& section = p->section("sqlite");
|
||||
section.set("journal_mode", "fast");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Env env(*this, std::move(p));
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
pass();
|
||||
}
|
||||
}
|
||||
{
|
||||
// Errors
|
||||
DatabaseCon::Setup::CommonPragma.reset();
|
||||
|
||||
auto p = test::jtx::envconfig();
|
||||
{
|
||||
auto& section = p->section("sqlite");
|
||||
section.set("synchronous", "instant");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Env env(*this, std::move(p));
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
pass();
|
||||
}
|
||||
}
|
||||
{
|
||||
// Errors
|
||||
DatabaseCon::Setup::CommonPragma.reset();
|
||||
|
||||
auto p = test::jtx::envconfig();
|
||||
{
|
||||
auto& section = p->section("sqlite");
|
||||
section.set("temp_store", "network");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Env env(*this, std::move(p));
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
pass();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
void
|
||||
testImport(
|
||||
std::string const& destBackendType,
|
||||
@@ -221,6 +412,8 @@ public:
|
||||
{
|
||||
std::int64_t const seedValue = 50;
|
||||
|
||||
testConfig();
|
||||
|
||||
testNodeStore("memory", false, seedValue);
|
||||
|
||||
// Persistent backend tests
|
||||
|
||||
Reference in New Issue
Block a user