rippled
Loading...
Searching...
No Matches
SHAMapStoreImp.cpp
1#include <xrpld/app/ledger/TransactionMaster.h>
2#include <xrpld/app/misc/NetworkOPs.h>
3#include <xrpld/app/misc/SHAMapStoreImp.h>
4#include <xrpld/app/rdb/State.h>
5#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
6#include <xrpld/core/ConfigSections.h>
7
8#include <xrpl/beast/core/CurrentThreadName.h>
9#include <xrpl/nodestore/Scheduler.h>
10#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
11#include <xrpl/shamap/SHAMapMissingNode.h>
12
13#include <boost/algorithm/string/predicate.hpp>
14
15namespace xrpl {
16void
18{
20 initStateDB(sqlDb_, config, dbName);
21}
22
30
33{
35
36 return xrpl::setCanDelete(sqlDb_, canDelete);
37}
38
46
47void
53
54void
60
61//------------------------------------------------------------------------------
62
64 : app_(app)
65 , scheduler_(scheduler)
66 , journal_(journal)
67 , working_(true)
68 , canDelete_(std::numeric_limits<LedgerIndex>::max())
69{
70 Config& config{app.config()};
71
72 Section& section{config.section(ConfigSection::nodeDatabase())};
73 if (section.empty())
74 {
75 Throw<std::runtime_error>("Missing [" + ConfigSection::nodeDatabase() + "] entry in configuration file");
76 }
77
78 // RocksDB only. Use sensible defaults if no values specified.
79 if (boost::iequals(get(section, "type"), "RocksDB"))
80 {
81 if (!section.exists("cache_mb"))
82 {
83 section.set("cache_mb", std::to_string(config.getValueFor(SizedItem::hashNodeDBCache)));
84 }
85
86 if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2))
87 section.set("filter_bits", "10");
88 }
89
90 get_if_exists(section, "online_delete", deleteInterval_);
91
93 {
94 // Configuration that affects the behavior of online delete
95 get_if_exists(section, "delete_batch", deleteBatch_);
96 std::uint32_t temp;
97 if (get_if_exists(section, "back_off_milliseconds", temp) ||
98 // Included for backward compatibility with an undocumented setting
99 get_if_exists(section, "backOff", temp))
100 {
102 }
103 if (get_if_exists(section, "age_threshold_seconds", temp))
105 if (get_if_exists(section, "recovery_wait_seconds", temp))
107
108 get_if_exists(section, "advisory_delete", advisoryDelete_);
109
110 auto const minInterval = config.standalone() ? minimumDeletionIntervalSA_ : minimumDeletionInterval_;
111 if (deleteInterval_ < minInterval)
112 {
113 Throw<std::runtime_error>("online_delete must be at least " + std::to_string(minInterval));
114 }
115
116 if (config.LEDGER_HISTORY > deleteInterval_)
117 {
118 Throw<std::runtime_error>(
119 "online_delete must not be less than ledger_history "
120 "(currently " +
121 std::to_string(config.LEDGER_HISTORY) + ")");
122 }
123
124 state_db_.init(config, dbName_);
125 dbPaths();
126 }
127}
128
131{
133
134 // Provide default values:
135 if (!nscfg.exists("cache_size"))
137
138 if (!nscfg.exists("cache_age"))
140
142
143 if (deleteInterval_)
144 {
145 SavedState state = state_db_.getState();
146 auto writableBackend = makeBackendRotating(state.writableDb);
147 auto archiveBackend = makeBackendRotating(state.archiveDb);
148 if (!state.writableDb.size())
149 {
150 state.writableDb = writableBackend->getName();
151 state.archiveDb = archiveBackend->getName();
152 state_db_.setState(state);
153 }
154
155 // Create NodeStore with two backends to allow online deletion of
156 // data
159 readThreads,
160 std::move(writableBackend),
161 std::move(archiveBackend),
162 nscfg,
164 fdRequired_ += dbr->fdRequired();
165 dbRotating_ = dbr.get();
166 db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));
167 }
168 else
169 {
173 readThreads,
174 nscfg,
176 fdRequired_ += db->fdRequired();
177 }
178 return db;
179}
180
181void
183{
184 {
186 newLedger_ = ledger;
187 working_ = true;
188 }
190}
191
192void
194{
195 if (!working_)
196 return;
197
199 rendezvous_.wait(lock, [&] { return !working_; });
200}
201
202int
204{
205 return fdRequired_;
206}
207
208bool
210{
211 // Copy a single record from node to dbRotating_
213 if (!(++nodeCount % checkHealthInterval_))
214 {
215 if (healthWait() == stopping)
216 return false;
217 }
218
219 return true;
220}
221
222void
224{
225 beast::setCurrentThreadName("SHAMapStore");
227 netOPs_ = &app_.getOPs();
231
232 if (advisoryDelete_)
234
235 while (true)
236 {
237 healthy_ = true;
238 std::shared_ptr<Ledger const> validatedLedger;
239
240 {
242 working_ = false;
244 if (stop_)
245 {
246 return;
247 }
248 cond_.wait(lock);
249 if (newLedger_)
250 {
251 validatedLedger = std::move(newLedger_);
252 }
253 else
254 continue;
255 }
256
257 LedgerIndex const validatedSeq = validatedLedger->header().seq;
258 if (!lastRotated)
259 {
260 lastRotated = validatedSeq;
261 state_db_.setLastRotated(lastRotated);
262 }
263
264 bool const readyToRotate =
265 validatedSeq >= lastRotated + deleteInterval_ && canDelete_ >= lastRotated - 1 && healthWait() == keepGoing;
266
267 // will delete up to (not including) lastRotated
268 if (readyToRotate)
269 {
270 JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq << " lastRotated " << lastRotated
271 << " deleteInterval " << deleteInterval_ << " canDelete_ " << canDelete_ << " state "
272 << app_.getOPs().strOperatingMode(false) << " age "
274
275 clearPrior(lastRotated);
276 if (healthWait() == stopping)
277 return;
278
279 JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
280 std::uint64_t nodeCount = 0;
281
282 try
283 {
284 validatedLedger->stateMap().snapShot(false)->visitNodes(
285 std::bind(&SHAMapStoreImp::copyNode, this, std::ref(nodeCount), std::placeholders::_1));
286 }
287 catch (SHAMapMissingNode const& e)
288 {
289 JLOG(journal_.error()) << "Missing node while copying ledger before rotate: " << e.what();
290 continue;
291 }
292
293 if (healthWait() == stopping)
294 return;
295 // Only log if we completed without a "health" abort
296 JLOG(journal_.debug()) << "copied ledger " << validatedSeq << " nodecount " << nodeCount;
297
298 JLOG(journal_.debug()) << "freshening caches";
300 if (healthWait() == stopping)
301 return;
302 // Only log if we completed without a "health" abort
303 JLOG(journal_.debug()) << validatedSeq << " freshened caches";
304
305 JLOG(journal_.trace()) << "Making a new backend";
306 auto newBackend = makeBackendRotating();
307 JLOG(journal_.debug()) << validatedSeq << " new backend " << newBackend->getName();
308
309 clearCaches(validatedSeq);
310 if (healthWait() == stopping)
311 return;
312
313 lastRotated = validatedSeq;
314
316 std::move(newBackend), [&](std::string const& writableName, std::string const& archiveName) {
317 SavedState savedState;
318 savedState.writableDb = writableName;
319 savedState.archiveDb = archiveName;
320 savedState.lastRotated = lastRotated;
321 state_db_.setState(savedState);
322
323 clearCaches(validatedSeq);
324 });
325
326 JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
327 }
328 }
329}
330
331void
333{
335 boost::filesystem::path dbPath = get(section, "path");
336
337 if (boost::filesystem::exists(dbPath))
338 {
339 if (!boost::filesystem::is_directory(dbPath))
340 {
341 journal_.error() << "node db path must be a directory. " << dbPath.string();
342 Throw<std::runtime_error>("node db path must be a directory.");
343 }
344 }
345 else
346 {
347 boost::filesystem::create_directories(dbPath);
348 }
349
350 SavedState state = state_db_.getState();
351
352 {
353 auto update = [&dbPath](std::string& sPath) {
354 if (sPath.empty())
355 return false;
356
357 // Check if configured "path" matches stored directory path
358 using namespace boost::filesystem;
359 auto const stored{path(sPath)};
360 if (stored.parent_path() == dbPath)
361 return false;
362
363 sPath = (dbPath / stored.filename()).string();
364 return true;
365 };
366
367 if (update(state.writableDb))
368 {
369 update(state.archiveDb);
370 state_db_.setState(state);
371 }
372 }
373
374 bool writableDbExists = false;
375 bool archiveDbExists = false;
376
378 for (boost::filesystem::directory_iterator it(dbPath); it != boost::filesystem::directory_iterator(); ++it)
379 {
380 if (!state.writableDb.compare(it->path().string()))
381 writableDbExists = true;
382 else if (!state.archiveDb.compare(it->path().string()))
383 archiveDbExists = true;
384 else if (!dbPrefix_.compare(it->path().stem().string()))
385 pathsToDelete.push_back(it->path());
386 }
387
388 if ((!writableDbExists && state.writableDb.size()) || (!archiveDbExists && state.archiveDb.size()) ||
389 (writableDbExists != archiveDbExists) || state.writableDb.empty() != state.archiveDb.empty())
390 {
391 boost::filesystem::path stateDbPathName = app_.config().legacy("database_path");
392 stateDbPathName /= dbName_;
393 stateDbPathName += "*";
394
395 journal_.error() << "state db error:\n"
396 << " writableDbExists " << writableDbExists << " archiveDbExists " << archiveDbExists << '\n'
397 << " writableDb '" << state.writableDb << "' archiveDb '" << state.archiveDb << "\n\n"
398 << "The existing data is in a corrupted state.\n"
399 << "To resume operation, remove the files matching " << stateDbPathName.string()
400 << " and contents of the directory " << get(section, "path") << '\n'
401 << "Optionally, you can move those files to another\n"
402 << "location if you wish to analyze or back up the data.\n"
403 << "However, there is no guarantee that the data in its\n"
404 << "existing form is usable.";
405
406 Throw<std::runtime_error>("state db error");
407 }
408
409 // The necessary directories exist. Now, remove any others.
410 for (boost::filesystem::path& p : pathsToDelete)
411 boost::filesystem::remove_all(p);
412}
413
416{
418 boost::filesystem::path newPath;
419
420 if (path.size())
421 {
422 newPath = path;
423 }
424 else
425 {
426 boost::filesystem::path p = get(section, "path");
427 p /= dbPrefix_;
428 p += ".%%%%";
429 newPath = boost::filesystem::unique_path(p);
430 }
431 section.set("path", newPath.string());
432
434 section,
438 backend->open();
439 return backend;
440}
441
442void
444 LedgerIndex lastRotated,
445 std::string const& TableName,
446 std::function<std::optional<LedgerIndex>()> const& getMinSeq,
447 std::function<void(LedgerIndex)> const& deleteBeforeSeq)
448{
449 XRPL_ASSERT(deleteInterval_, "xrpl::SHAMapStoreImp::clearSql : nonzero delete interval");
451
452 {
453 JLOG(journal_.trace()) << "Begin: Look up lowest value of: " << TableName;
454 auto m = getMinSeq();
455 JLOG(journal_.trace()) << "End: Look up lowest value of: " << TableName;
456 if (!m)
457 return;
458 min = *m;
459 }
460
461 if (min > lastRotated || healthWait() == stopping)
462 return;
463 if (min == lastRotated)
464 {
465 // Micro-optimization mainly to clarify logs
466 JLOG(journal_.trace()) << "Nothing to delete from " << TableName;
467 return;
468 }
469
470 JLOG(journal_.debug()) << "start deleting in: " << TableName << " from " << min << " to " << lastRotated;
471 while (min < lastRotated)
472 {
473 min = std::min(lastRotated, min + deleteBatch_);
474 JLOG(journal_.trace()) << "Begin: Delete up to " << deleteBatch_ << " rows with LedgerSeq < " << min
475 << " from: " << TableName;
476 deleteBeforeSeq(min);
477 JLOG(journal_.trace()) << "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < " << min
478 << " from: " << TableName;
479 if (healthWait() == stopping)
480 return;
481 if (min < lastRotated)
483 if (healthWait() == stopping)
484 return;
485 }
486 JLOG(journal_.debug()) << "finished deleting from: " << TableName;
487}
488
489void
495
496void
504
505void
507{
508 // Do not allow ledgers to be acquired from the network
509 // that are about to be deleted.
510 minimumOnline_ = lastRotated + 1;
511 JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to " << lastRotated;
512 ledgerMaster_->clearPriorLedgers(lastRotated);
513 JLOG(journal_.trace()) << "End: Clear internal ledgers up to " << lastRotated;
514 if (healthWait() == stopping)
515 return;
516
517 SQLiteDatabase* const db = dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase());
518
519 if (!db)
520 Throw<std::runtime_error>("Failed to get relational database");
521
522 clearSql(
523 lastRotated,
524 "Ledgers",
525 [db]() -> std::optional<LedgerIndex> { return db->getMinLedgerSeq(); },
526 [db](LedgerIndex min) -> void { db->deleteBeforeLedgerSeq(min); });
527 if (healthWait() == stopping)
528 return;
529
530 if (!app_.config().useTxTables())
531 return;
532
533 clearSql(
534 lastRotated,
535 "Transactions",
536 [&db]() -> std::optional<LedgerIndex> { return db->getTransactionsMinLedgerSeq(); },
537 [&db](LedgerIndex min) -> void { db->deleteTransactionsBeforeLedgerSeq(min); });
538 if (healthWait() == stopping)
539 return;
540
541 clearSql(
542 lastRotated,
543 "AccountTransactions",
545 [&db](LedgerIndex min) -> void { db->deleteAccountTransactionsBeforeLedgerSeq(min); });
546 if (healthWait() == stopping)
547 return;
548}
549
552{
556 while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_))
557 {
558 lock.unlock();
559 JLOG(journal_.warn()) << "Waiting " << recoveryWaitTime_.count()
560 << "s for node to stabilize. state: " << app_.getOPs().strOperatingMode(mode, false)
561 << ". age " << age.count() << 's';
564 mode = netOPs_->getOperatingMode();
565 lock.lock();
566 }
567
568 return stop_ ? stopping : keepGoing;
569}
570
571void
573{
574 if (thread_.joinable())
575 {
576 {
578 stop_ = true;
580 }
581 thread_.join();
582 }
583}
584
587{
588 // minimumOnline_ with 0 value is equivalent to unknown/not set.
589 // Don't attempt to acquire ledgers if that value is unknown.
591 return minimumOnline_.load();
592 return app_.getLedgerMaster().minSqlSeq();
593}
594
595//------------------------------------------------------------------------------
596
599{
600 return std::make_unique<SHAMapStoreImp>(app, scheduler, journal);
601}
602
603} // namespace xrpl
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual Config & config()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual Family & getNodeFamily()=0
virtual Logs & logs()=0
virtual TransactionMaster & getMasterTransaction()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual NetworkOPs & getOPs()=0
Holds unparsed configuration information.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
Section & section(std::string const &name)
Returns the section with the given name.
bool useTxTables() const
Definition Config.h:318
int getValueFor(SizedItem item, std::optional< std::size_t > node=std::nullopt) const
Retrieve the default value for the item at the specified node size.
Definition Config.cpp:1013
virtual std::shared_ptr< TreeNodeCache > getTreeNodeCache()=0
Return a pointer to the Family Tree Node Cache.
virtual std::shared_ptr< FullBelowCache > getFullBelowCache()=0
Return a pointer to the Family Full Below Cache.
std::optional< LedgerIndex > minSqlSeq()
std::chrono::seconds getValidatedLedgerAge()
void clearPriorLedgers(LedgerIndex seq)
void clearLedgerCachePrior(LedgerIndex seq)
beast::Journal journal(std::string const &name)
Definition Log.cpp:134
virtual OperatingMode getOperatingMode() const =0
virtual std::string strOperatingMode(OperatingMode const mode, bool const admin=false) const =0
virtual void rotate(std::unique_ptr< NodeStore::Backend > &&newBackend, std::function< void(std::string const &writableName, std::string const &archiveName)> const &f)=0
Rotates the backends.
Persistency layer for NodeObject.
Definition Database.h:32
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:202
static Manager & instance()
Returns the instance of the manager singleton.
virtual std::unique_ptr< Database > make_Database(std::size_t burstSize, Scheduler &scheduler, int readThreads, Section const &backendParameters, beast::Journal journal)=0
Construct a NodeStore database.
virtual std::unique_ptr< Backend > make_Backend(Section const &parameters, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal)=0
Create a backend.
Scheduling for asynchronous backend activity.
virtual std::optional< LedgerIndex > getMinLedgerSeq()=0
getMinLedgerSeq Returns the minimum ledger sequence in the Ledgers table.
uint256 const & as_uint256() const
Definition SHAMapHash.h:25
void setState(SavedState const &state)
void init(BasicConfig const &config, std::string const &dbName)
LedgerIndex setCanDelete(LedgerIndex canDelete)
void clearSql(LedgerIndex lastRotated, std::string const &TableName, std::function< std::optional< LedgerIndex >()> const &getMinSeq, std::function< void(LedgerIndex)> const &deleteBeforeSeq)
delete from sqlite table in batches to not lock the db excessively.
bool copyNode(std::uint64_t &nodeCount, SHAMapTreeNode const &node)
std::unique_ptr< NodeStore::Backend > makeBackendRotating(std::string path=std::string())
std::atomic< bool > working_
std::condition_variable cond_
std::uint32_t deleteBatch_
std::atomic< LedgerIndex > minimumOnline_
std::chrono::seconds recoveryWaitTime_
If the node is out of sync during an online_delete healthWait() call, sleep the thread for this time,...
FullBelowCache * fullBelowCache_
std::optional< LedgerIndex > minimumOnline() const override
The minimum ledger to try and maintain in our database.
TreeNodeCache * treeNodeCache_
static constexpr auto nodeStoreName_
std::uint32_t deleteInterval_
int fdRequired() const override
Returns the number of file descriptors that are needed.
static std::uint32_t const minimumDeletionInterval_
std::unique_ptr< NodeStore::Database > makeNodeStore(int readThreads) override
std::chrono::milliseconds backOff_
std::atomic< LedgerIndex > canDelete_
NodeStore::DatabaseRotating * dbRotating_
std::shared_ptr< Ledger const > newLedger_
std::string const dbName_
std::condition_variable rendezvous_
static std::uint32_t const minimumDeletionIntervalSA_
std::uint64_t const checkHealthInterval_
HealthResult healthWait()
void clearCaches(LedgerIndex validatedSeq)
std::chrono::seconds ageThreshold_
void rendezvous() const override
LedgerMaster * ledgerMaster_
beast::Journal const journal_
std::string const dbPrefix_
NodeStore::Scheduler & scheduler_
HealthResult
This is a health check for online deletion that waits until rippled is stable before returning.
void onLedgerClosed(std::shared_ptr< Ledger const > const &ledger) override
Called by LedgerMaster every time a ledger validates.
bool freshenCache(CacheInstance &cache)
void clearPrior(LedgerIndex lastRotated)
SHAMapStoreImp(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
SHAMapHash const & getHash() const
Return the hash of this node.
virtual std::optional< LedgerIndex > getTransactionsMinLedgerSeq()=0
getTransactionsMinLedgerSeq Returns the minimum ledger sequence stored in the Transactions table.
virtual void deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteAccountTransactionsBeforeLedgerSeq Deletes all account transactions with a sequence number less...
virtual void deleteBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteBeforeLedgerSeq Deletes all ledgers with a sequence number less than or equal to the given ledg...
virtual std::optional< LedgerIndex > getAccountTransactionsMinLedgerSeq()=0
getAccountTransactionsMinLedgerSeq Returns the minimum ledger sequence stored in the AccountTransacti...
virtual void deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteTransactionsBeforeLedgerSeq Deletes all transactions with a sequence number less than or equal ...
Holds a collection of configuration values.
Definition BasicConfig.h:25
void set(std::string const &key, std::string const &value)
Set a key/value pair.
TaggedCache< uint256, Transaction > & getCache()
T compare(T... args)
T count(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T joinable(T... args)
T max(T... args)
T min(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
void setLastRotated(soci::session &session, LedgerIndex seq)
setLastRotated Updates the last rotated ledger sequence.
Definition State.cpp:93
void setSavedState(soci::session &session, SavedState const &state)
setSavedState Saves the given state.
Definition State.cpp:82
void initStateDB(soci::session &session, BasicConfig const &config, std::string const &dbName)
initStateDB Opens a session with the State database.
Definition State.cpp:6
std::unique_ptr< SHAMapStore > make_SHAMapStore(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
SavedState getSavedState(soci::session &session)
getSavedState Returns the saved state.
Definition State.cpp:71
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
LedgerIndex setCanDelete(soci::session &session, LedgerIndex canDelete)
setCanDelete Updates the ledger sequence which can be deleted.
Definition State.cpp:64
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:49
@ FULL
we have the ledger and can even validate
LedgerIndex getCanDelete(soci::session &session)
getCanDelete Returns the ledger sequence which can be deleted.
Definition State.cpp:55
T push_back(T... args)
T ref(T... args)
T reset(T... args)
T size(T... args)
T sleep_for(T... args)
static std::string nodeDatabase()
std::string writableDb
Definition State.h:16
LedgerIndex lastRotated
Definition State.h:18
std::string archiveDb
Definition State.h:17
T to_string(T... args)
T what(T... args)