rippled
Loading...
Searching...
No Matches
SHAMapStoreImp.cpp
1#include <xrpld/app/ledger/TransactionMaster.h>
2#include <xrpld/app/misc/NetworkOPs.h>
3#include <xrpld/app/misc/SHAMapStoreImp.h>
4#include <xrpld/app/rdb/State.h>
5#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
6#include <xrpld/core/ConfigSections.h>
7
8#include <xrpl/beast/core/CurrentThreadName.h>
9#include <xrpl/nodestore/Scheduler.h>
10#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
11#include <xrpl/shamap/SHAMapMissingNode.h>
12
13#include <boost/algorithm/string/predicate.hpp>
14
15namespace ripple {
16void
18 BasicConfig const& config,
19 std::string const& dbName)
20{
22 initStateDB(sqlDb_, config, dbName);
23}
24
32
35{
37
38 return ripple::setCanDelete(sqlDb_, canDelete);
39}
40
48
49void
55
56void
62
63//------------------------------------------------------------------------------
64
66 Application& app,
67 NodeStore::Scheduler& scheduler,
68 beast::Journal journal)
69 : app_(app)
70 , scheduler_(scheduler)
71 , journal_(journal)
72 , working_(true)
73 , canDelete_(std::numeric_limits<LedgerIndex>::max())
74{
75 Config& config{app.config()};
76
77 Section& section{config.section(ConfigSection::nodeDatabase())};
78 if (section.empty())
79 {
80 Throw<std::runtime_error>(
81 "Missing [" + ConfigSection::nodeDatabase() +
82 "] entry in configuration file");
83 }
84
85 // RocksDB only. Use sensible defaults if no values specified.
86 if (boost::iequals(get(section, "type"), "RocksDB"))
87 {
88 if (!section.exists("cache_mb"))
89 {
90 section.set(
91 "cache_mb",
92 std::to_string(config.getValueFor(SizedItem::hashNodeDBCache)));
93 }
94
95 if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2))
96 section.set("filter_bits", "10");
97 }
98
99 get_if_exists(section, "online_delete", deleteInterval_);
100
101 if (deleteInterval_)
102 {
103 // Configuration that affects the behavior of online delete
104 get_if_exists(section, "delete_batch", deleteBatch_);
105 std::uint32_t temp;
106 if (get_if_exists(section, "back_off_milliseconds", temp) ||
107 // Included for backward compaibility with an undocumented setting
108 get_if_exists(section, "backOff", temp))
109 {
111 }
112 if (get_if_exists(section, "age_threshold_seconds", temp))
114 if (get_if_exists(section, "recovery_wait_seconds", temp))
116
117 get_if_exists(section, "advisory_delete", advisoryDelete_);
118
119 auto const minInterval = config.standalone()
122 if (deleteInterval_ < minInterval)
123 {
124 Throw<std::runtime_error>(
125 "online_delete must be at least " +
126 std::to_string(minInterval));
127 }
128
129 if (config.LEDGER_HISTORY > deleteInterval_)
130 {
131 Throw<std::runtime_error>(
132 "online_delete must not be less than ledger_history "
133 "(currently " +
134 std::to_string(config.LEDGER_HISTORY) + ")");
135 }
136
137 state_db_.init(config, dbName_);
138 dbPaths();
139 }
140}
141
144{
146
147 // Provide default values:
148 if (!nscfg.exists("cache_size"))
149 nscfg.set(
150 "cache_size",
153
154 if (!nscfg.exists("cache_age"))
155 nscfg.set(
156 "cache_age",
159
161
162 if (deleteInterval_)
163 {
164 SavedState state = state_db_.getState();
165 auto writableBackend = makeBackendRotating(state.writableDb);
166 auto archiveBackend = makeBackendRotating(state.archiveDb);
167 if (!state.writableDb.size())
168 {
169 state.writableDb = writableBackend->getName();
170 state.archiveDb = archiveBackend->getName();
171 state_db_.setState(state);
172 }
173
174 // Create NodeStore with two backends to allow online deletion of
175 // data
178 readThreads,
179 std::move(writableBackend),
180 std::move(archiveBackend),
181 nscfg,
183 fdRequired_ += dbr->fdRequired();
184 dbRotating_ = dbr.get();
185 db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));
186 }
187 else
188 {
190 megabytes(
193 readThreads,
194 nscfg,
196 fdRequired_ += db->fdRequired();
197 }
198 return db;
199}
200
201void
203{
204 {
206 newLedger_ = ledger;
207 working_ = true;
208 }
210}
211
212void
214{
215 if (!working_)
216 return;
217
219 rendezvous_.wait(lock, [&] { return !working_; });
220}
221
222int
224{
225 return fdRequired_;
226}
227
228bool
230{
231 // Copy a single record from node to dbRotating_
233 node.getHash().as_uint256(),
234 0,
236 true);
237 if (!(++nodeCount % checkHealthInterval_))
238 {
239 if (healthWait() == stopping)
240 return false;
241 }
242
243 return true;
244}
245
246void
248{
249 beast::setCurrentThreadName("SHAMapStore");
251 netOPs_ = &app_.getOPs();
255
256 if (advisoryDelete_)
258
259 while (true)
260 {
261 healthy_ = true;
262 std::shared_ptr<Ledger const> validatedLedger;
263
264 {
266 working_ = false;
268 if (stop_)
269 {
270 return;
271 }
272 cond_.wait(lock);
273 if (newLedger_)
274 {
275 validatedLedger = std::move(newLedger_);
276 }
277 else
278 continue;
279 }
280
281 LedgerIndex const validatedSeq = validatedLedger->info().seq;
282 if (!lastRotated)
283 {
284 lastRotated = validatedSeq;
285 state_db_.setLastRotated(lastRotated);
286 }
287
288 bool const readyToRotate =
289 validatedSeq >= lastRotated + deleteInterval_ &&
290 canDelete_ >= lastRotated - 1 && healthWait() == keepGoing;
291
292 // will delete up to (not including) lastRotated
293 if (readyToRotate)
294 {
295 JLOG(journal_.warn())
296 << "rotating validatedSeq " << validatedSeq << " lastRotated "
297 << lastRotated << " deleteInterval " << deleteInterval_
298 << " canDelete_ " << canDelete_ << " state "
299 << app_.getOPs().strOperatingMode(false) << " age "
301
302 clearPrior(lastRotated);
303 if (healthWait() == stopping)
304 return;
305
306 JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
307 std::uint64_t nodeCount = 0;
308
309 try
310 {
311 validatedLedger->stateMap().snapShot(false)->visitNodes(
312 std::bind(
314 this,
315 std::ref(nodeCount),
316 std::placeholders::_1));
317 }
318 catch (SHAMapMissingNode const& e)
319 {
320 JLOG(journal_.error())
321 << "Missing node while copying ledger before rotate: "
322 << e.what();
323 continue;
324 }
325
326 if (healthWait() == stopping)
327 return;
328 // Only log if we completed without a "health" abort
329 JLOG(journal_.debug()) << "copied ledger " << validatedSeq
330 << " nodecount " << nodeCount;
331
332 JLOG(journal_.debug()) << "freshening caches";
334 if (healthWait() == stopping)
335 return;
336 // Only log if we completed without a "health" abort
337 JLOG(journal_.debug()) << validatedSeq << " freshened caches";
338
339 JLOG(journal_.trace()) << "Making a new backend";
340 auto newBackend = makeBackendRotating();
341 JLOG(journal_.debug())
342 << validatedSeq << " new backend " << newBackend->getName();
343
344 clearCaches(validatedSeq);
345 if (healthWait() == stopping)
346 return;
347
348 lastRotated = validatedSeq;
349
351 std::move(newBackend),
352 [&](std::string const& writableName,
353 std::string const& archiveName) {
354 SavedState savedState;
355 savedState.writableDb = writableName;
356 savedState.archiveDb = archiveName;
357 savedState.lastRotated = lastRotated;
358 state_db_.setState(savedState);
359
360 clearCaches(validatedSeq);
361 });
362
363 JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
364 }
365 }
366}
367
368void
370{
372 boost::filesystem::path dbPath = get(section, "path");
373
374 if (boost::filesystem::exists(dbPath))
375 {
376 if (!boost::filesystem::is_directory(dbPath))
377 {
379 << "node db path must be a directory. " << dbPath.string();
380 Throw<std::runtime_error>("node db path must be a directory.");
381 }
382 }
383 else
384 {
385 boost::filesystem::create_directories(dbPath);
386 }
387
388 SavedState state = state_db_.getState();
389
390 {
391 auto update = [&dbPath](std::string& sPath) {
392 if (sPath.empty())
393 return false;
394
395 // Check if configured "path" matches stored directory path
396 using namespace boost::filesystem;
397 auto const stored{path(sPath)};
398 if (stored.parent_path() == dbPath)
399 return false;
400
401 sPath = (dbPath / stored.filename()).string();
402 return true;
403 };
404
405 if (update(state.writableDb))
406 {
407 update(state.archiveDb);
408 state_db_.setState(state);
409 }
410 }
411
412 bool writableDbExists = false;
413 bool archiveDbExists = false;
414
416 for (boost::filesystem::directory_iterator it(dbPath);
417 it != boost::filesystem::directory_iterator();
418 ++it)
419 {
420 if (!state.writableDb.compare(it->path().string()))
421 writableDbExists = true;
422 else if (!state.archiveDb.compare(it->path().string()))
423 archiveDbExists = true;
424 else if (!dbPrefix_.compare(it->path().stem().string()))
425 pathsToDelete.push_back(it->path());
426 }
427
428 if ((!writableDbExists && state.writableDb.size()) ||
429 (!archiveDbExists && state.archiveDb.size()) ||
430 (writableDbExists != archiveDbExists) ||
431 state.writableDb.empty() != state.archiveDb.empty())
432 {
433 boost::filesystem::path stateDbPathName =
434 app_.config().legacy("database_path");
435 stateDbPathName /= dbName_;
436 stateDbPathName += "*";
437
439 << "state db error:\n"
440 << " writableDbExists " << writableDbExists << " archiveDbExists "
441 << archiveDbExists << '\n'
442 << " writableDb '" << state.writableDb << "' archiveDb '"
443 << state.archiveDb << "\n\n"
444 << "The existing data is in a corrupted state.\n"
445 << "To resume operation, remove the files matching "
446 << stateDbPathName.string() << " and contents of the directory "
447 << get(section, "path") << '\n'
448 << "Optionally, you can move those files to another\n"
449 << "location if you wish to analyze or back up the data.\n"
450 << "However, there is no guarantee that the data in its\n"
451 << "existing form is usable.";
452
453 Throw<std::runtime_error>("state db error");
454 }
455
456 // The necessary directories exist. Now, remove any others.
457 for (boost::filesystem::path& p : pathsToDelete)
458 boost::filesystem::remove_all(p);
459}
460
463{
465 boost::filesystem::path newPath;
466
467 if (path.size())
468 {
469 newPath = path;
470 }
471 else
472 {
473 boost::filesystem::path p = get(section, "path");
474 p /= dbPrefix_;
475 p += ".%%%%";
476 newPath = boost::filesystem::unique_path(p);
477 }
478 section.set("path", newPath.string());
479
481 section,
482 megabytes(
486 backend->open();
487 return backend;
488}
489
490void
492 LedgerIndex lastRotated,
493 std::string const& TableName,
494 std::function<std::optional<LedgerIndex>()> const& getMinSeq,
495 std::function<void(LedgerIndex)> const& deleteBeforeSeq)
496{
497 XRPL_ASSERT(
499 "ripple::SHAMapStoreImp::clearSql : nonzero delete interval");
501
502 {
503 JLOG(journal_.trace())
504 << "Begin: Look up lowest value of: " << TableName;
505 auto m = getMinSeq();
506 JLOG(journal_.trace()) << "End: Look up lowest value of: " << TableName;
507 if (!m)
508 return;
509 min = *m;
510 }
511
512 if (min > lastRotated || healthWait() == stopping)
513 return;
514 if (min == lastRotated)
515 {
516 // Micro-optimization mainly to clarify logs
517 JLOG(journal_.trace()) << "Nothing to delete from " << TableName;
518 return;
519 }
520
521 JLOG(journal_.debug()) << "start deleting in: " << TableName << " from "
522 << min << " to " << lastRotated;
523 while (min < lastRotated)
524 {
525 min = std::min(lastRotated, min + deleteBatch_);
526 JLOG(journal_.trace())
527 << "Begin: Delete up to " << deleteBatch_
528 << " rows with LedgerSeq < " << min << " from: " << TableName;
529 deleteBeforeSeq(min);
530 JLOG(journal_.trace())
531 << "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < "
532 << min << " from: " << TableName;
533 if (healthWait() == stopping)
534 return;
535 if (min < lastRotated)
537 if (healthWait() == stopping)
538 return;
539 }
540 JLOG(journal_.debug()) << "finished deleting from: " << TableName;
541}
542
543void
549
550void
558
559void
561{
562 // Do not allow ledgers to be acquired from the network
563 // that are about to be deleted.
564 minimumOnline_ = lastRotated + 1;
565 JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to "
566 << lastRotated;
567 ledgerMaster_->clearPriorLedgers(lastRotated);
568 JLOG(journal_.trace()) << "End: Clear internal ledgers up to "
569 << lastRotated;
570 if (healthWait() == stopping)
571 return;
572
573 SQLiteDatabase* const db =
574 dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase());
575
576 if (!db)
577 Throw<std::runtime_error>("Failed to get relational database");
578
579 clearSql(
580 lastRotated,
581 "Ledgers",
582 [db]() -> std::optional<LedgerIndex> { return db->getMinLedgerSeq(); },
583 [db](LedgerIndex min) -> void { db->deleteBeforeLedgerSeq(min); });
584 if (healthWait() == stopping)
585 return;
586
587 if (!app_.config().useTxTables())
588 return;
589
590 clearSql(
591 lastRotated,
592 "Transactions",
593 [&db]() -> std::optional<LedgerIndex> {
594 return db->getTransactionsMinLedgerSeq();
595 },
596 [&db](LedgerIndex min) -> void {
598 });
599 if (healthWait() == stopping)
600 return;
601
602 clearSql(
603 lastRotated,
604 "AccountTransactions",
605 [&db]() -> std::optional<LedgerIndex> {
607 },
608 [&db](LedgerIndex min) -> void {
610 });
611 if (healthWait() == stopping)
612 return;
613}
614
617{
621 while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_))
622 {
623 lock.unlock();
624 JLOG(journal_.warn()) << "Waiting " << recoveryWaitTime_.count()
625 << "s for node to stabilize. state: "
626 << app_.getOPs().strOperatingMode(mode, false)
627 << ". age " << age.count() << 's';
630 mode = netOPs_->getOperatingMode();
631 lock.lock();
632 }
633
634 return stop_ ? stopping : keepGoing;
635}
636
637void
639{
640 if (thread_.joinable())
641 {
642 {
644 stop_ = true;
646 }
647 thread_.join();
648 }
649}
650
653{
654 // minimumOnline_ with 0 value is equivalent to unknown/not set.
655 // Don't attempt to acquire ledgers if that value is unknown.
657 return minimumOnline_.load();
658 return app_.getLedgerMaster().minSqlSeq();
659}
660
661//------------------------------------------------------------------------------
662
665 Application& app,
666 NodeStore::Scheduler& scheduler,
667 beast::Journal journal)
668{
669 return std::make_unique<SHAMapStoreImp>(app, scheduler, journal);
670}
671
672} // namespace ripple
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual Config & config()=0
virtual NetworkOPs & getOPs()=0
virtual Family & getNodeFamily()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual TransactionMaster & getMasterTransaction()=0
virtual Logs & logs()=0
Holds unparsed configuration information.
Section & section(std::string const &name)
Returns the section with the given name.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
bool useTxTables() const
Definition Config.h:323
int getValueFor(SizedItem item, std::optional< std::size_t > node=std::nullopt) const
Retrieve the default value for the item at the specified node size.
Definition Config.cpp:1097
virtual std::shared_ptr< FullBelowCache > getFullBelowCache()=0
Return a pointer to the Family Full Below Cache.
virtual std::shared_ptr< TreeNodeCache > getTreeNodeCache()=0
Return a pointer to the Family Tree Node Cache.
void clearLedgerCachePrior(LedgerIndex seq)
void clearPriorLedgers(LedgerIndex seq)
std::optional< LedgerIndex > minSqlSeq()
std::chrono::seconds getValidatedLedgerAge()
beast::Journal journal(std::string const &name)
Definition Log.cpp:141
virtual OperatingMode getOperatingMode() const =0
virtual std::string strOperatingMode(OperatingMode const mode, bool const admin=false) const =0
virtual void rotate(std::unique_ptr< NodeStore::Backend > &&newBackend, std::function< void(std::string const &writableName, std::string const &archiveName)> const &f)=0
Rotates the backends.
Persistency layer for NodeObject.
Definition Database.h:32
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:221
virtual std::unique_ptr< Backend > make_Backend(Section const &parameters, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal)=0
Create a backend.
virtual std::unique_ptr< Database > make_Database(std::size_t burstSize, Scheduler &scheduler, int readThreads, Section const &backendParameters, beast::Journal journal)=0
Construct a NodeStore database.
static Manager & instance()
Returns the instance of the manager singleton.
Scheduling for asynchronous backend activity.
virtual std::optional< LedgerIndex > getMinLedgerSeq()=0
getMinLedgerSeq Returns the minimum ledger sequence in the Ledgers table.
uint256 const & as_uint256() const
Definition SHAMapHash.h:25
LedgerIndex setCanDelete(LedgerIndex canDelete)
void setState(SavedState const &state)
void init(BasicConfig const &config, std::string const &dbName)
std::condition_variable rendezvous_
std::condition_variable cond_
bool freshenCache(CacheInstance &cache)
void rendezvous() const override
NodeStore::DatabaseRotating * dbRotating_
std::chrono::milliseconds backOff_
void clearSql(LedgerIndex lastRotated, std::string const &TableName, std::function< std::optional< LedgerIndex >()> const &getMinSeq, std::function< void(LedgerIndex)> const &deleteBeforeSeq)
delete from sqlite table in batches to not lock the db excessively.
static constexpr auto nodeStoreName_
void clearPrior(LedgerIndex lastRotated)
std::atomic< LedgerIndex > canDelete_
std::unique_ptr< NodeStore::Backend > makeBackendRotating(std::string path=std::string())
TreeNodeCache * treeNodeCache_
std::uint32_t deleteInterval_
LedgerMaster * ledgerMaster_
void onLedgerClosed(std::shared_ptr< Ledger const > const &ledger) override
Called by LedgerMaster every time a ledger validates.
std::chrono::seconds recoveryWaitTime_
If the node is out of sync during an online_delete healthWait() call, sleep the thread for this time,...
std::string const dbPrefix_
std::uint32_t deleteBatch_
std::chrono::seconds ageThreshold_
std::atomic< LedgerIndex > minimumOnline_
std::uint64_t const checkHealthInterval_
std::unique_ptr< NodeStore::Database > makeNodeStore(int readThreads) override
std::string const dbName_
std::atomic< bool > working_
FullBelowCache * fullBelowCache_
std::optional< LedgerIndex > minimumOnline() const override
The minimum ledger to try and maintain in our database.
HealthResult
This is a health check for online deletion that waits until rippled is stable before returning.
beast::Journal const journal_
NodeStore::Scheduler & scheduler_
static std::uint32_t const minimumDeletionIntervalSA_
int fdRequired() const override
Returns the number of file descriptors that are needed.
bool copyNode(std::uint64_t &nodeCount, SHAMapTreeNode const &node)
std::shared_ptr< Ledger const > newLedger_
static std::uint32_t const minimumDeletionInterval_
void clearCaches(LedgerIndex validatedSeq)
SHAMapStoreImp(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
SHAMapHash const & getHash() const
Return the hash of this node.
virtual std::optional< LedgerIndex > getAccountTransactionsMinLedgerSeq()=0
getAccountTransactionsMinLedgerSeq Returns the minimum ledger sequence stored in the AccountTransacti...
virtual void deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteAccountTransactionsBeforeLedgerSeq Deletes all account transactions with a sequence number less...
virtual std::optional< LedgerIndex > getTransactionsMinLedgerSeq()=0
getTransactionsMinLedgerSeq Returns the minimum ledger sequence stored in the Transactions table.
virtual void deleteBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteBeforeLedgerSeq Deletes all ledgers with a sequence number less than or equal to the given ledg...
virtual void deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq)=0
deleteTransactionsBeforeLedgerSeq Deletes all transactions with a sequence number less than or equal ...
Holds a collection of configuration values.
Definition BasicConfig.h:26
void set(std::string const &key, std::string const &value)
Set a key/value pair.
TaggedCache< uint256, Transaction > & getCache()
T compare(T... args)
T count(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T joinable(T... args)
T max(T... args)
T min(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
LedgerIndex getCanDelete(soci::session &session)
getCanDelete Returns the ledger sequence which can be deleted.
Definition State.cpp:62
constexpr auto megabytes(T value) noexcept
SavedState getSavedState(soci::session &session)
getSavedState Returns the saved state.
Definition State.cpp:80
void setSavedState(soci::session &session, SavedState const &state)
setSavedState Saves the given state.
Definition State.cpp:92
bool get_if_exists(Section const &section, std::string const &name, T &v)
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:49
@ FULL
we have the ledger and can even validate
void initStateDB(soci::session &session, BasicConfig const &config, std::string const &dbName)
initStateDB Opens a session with the State database.
Definition State.cpp:6
LedgerIndex setCanDelete(soci::session &session, LedgerIndex canDelete)
setCanDelete Updates the ledger sequence which can be deleted.
Definition State.cpp:72
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
void setLastRotated(soci::session &session, LedgerIndex seq)
setLastRotated Updates the last rotated ledger sequence.
Definition State.cpp:104
std::unique_ptr< SHAMapStore > make_SHAMapStore(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
STL namespace.
T push_back(T... args)
T ref(T... args)
T reset(T... args)
T size(T... args)
T sleep_for(T... args)
static std::string nodeDatabase()
LedgerIndex lastRotated
Definition State.h:18
std::string writableDb
Definition State.h:16
std::string archiveDb
Definition State.h:17
T to_string(T... args)
T what(T... args)