rippled
DatabaseShardImp.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2017 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/RangeSet.h>
26 #include <ripple/basics/chrono.h>
27 #include <ripple/basics/random.h>
28 #include <ripple/core/ConfigSections.h>
29 #include <ripple/nodestore/DummyScheduler.h>
30 #include <ripple/nodestore/impl/DatabaseShardImp.h>
31 #include <ripple/overlay/Overlay.h>
32 #include <ripple/overlay/predicates.h>
33 #include <ripple/protocol/HashPrefix.h>
34 #include <ripple/protocol/digest.h>
35 
36 #include <boost/algorithm/string/predicate.hpp>
37 
38 #if BOOST_OS_LINUX
39 #include <sys/statvfs.h>
40 #endif
41 
42 namespace ripple {
43 
44 namespace NodeStore {
45 
47  Application& app,
48  Scheduler& scheduler,
49  int readThreads,
51  : DatabaseShard(
52  scheduler,
53  readThreads,
54  app.config().section(ConfigSection::shardDatabase()),
55  j)
56  , app_(app)
57  , avgShardFileSz_(ledgersPerShard_ * kilobytes(192ull))
58  , openFinalLimit_(
59  app.config().getValueFor(SizedItem::openFinalLimit, std::nullopt))
60 {
61  if (app.config().reporting())
62  {
63  Throw<std::runtime_error>(
64  "Attempted to create DatabaseShardImp in reporting mode. Reporting "
65  "does not support shards. Remove shards info from config");
66  }
67 }
68 
69 bool
71 {
72  {
73  std::lock_guard lock(mutex_);
74  if (init_)
75  {
76  JLOG(j_.error()) << "already initialized";
77  return false;
78  }
79 
80  if (!initConfig(lock))
81  {
82  JLOG(j_.error()) << "invalid configuration file settings";
83  return false;
84  }
85 
86  try
87  {
88  using namespace boost::filesystem;
89 
90  // Consolidate the main storage path and all historical paths
91  std::vector<path> paths{dir_};
92  paths.insert(
93  paths.end(), historicalPaths_.begin(), historicalPaths_.end());
94 
95  for (auto const& path : paths)
96  {
97  if (exists(path))
98  {
99  if (!is_directory(path))
100  {
101  JLOG(j_.error()) << path << " must be a directory";
102  return false;
103  }
104  }
105  else if (!create_directories(path))
106  {
107  JLOG(j_.error())
108  << "failed to create path: " + path.string();
109  return false;
110  }
111  }
112 
114  {
115  // Check historical paths for duplicated file systems
116  if (!checkHistoricalPaths(lock))
117  return false;
118  }
119 
120  ctx_ = std::make_unique<nudb::context>();
121  ctx_->start();
122 
123  // Find shards
124  std::uint32_t openFinals{0};
125  for (auto const& path : paths)
126  {
127  for (auto const& it : directory_iterator(path))
128  {
129  // Ignore files
130  if (!is_directory(it))
131  continue;
132 
133  // Ignore nonnumerical directory names
134  auto const shardDir{it.path()};
135  auto dirName{shardDir.stem().string()};
136  if (!std::all_of(
137  dirName.begin(), dirName.end(), [](auto c) {
138  return ::isdigit(static_cast<unsigned char>(c));
139  }))
140  {
141  continue;
142  }
143 
144  // Ignore values below the earliest shard index
145  auto const shardIndex{std::stoul(dirName)};
146  if (shardIndex < earliestShardIndex_)
147  {
148  JLOG(j_.debug())
149  << "shard " << shardIndex
150  << " ignored, comes before earliest shard index "
152  continue;
153  }
154 
155  // Check if a previous database import failed
156  if (is_regular_file(shardDir / databaseImportMarker_))
157  {
158  JLOG(j_.warn())
159  << "shard " << shardIndex
160  << " previously failed database import, removing";
161  remove_all(shardDir);
162  continue;
163  }
164 
165  auto shard{std::make_shared<Shard>(
166  app_, *this, shardIndex, shardDir.parent_path(), j_)};
167  if (!shard->init(scheduler_, *ctx_))
168  {
169  // Remove corrupted or legacy shard
170  shard->removeOnDestroy();
171  JLOG(j_.warn())
172  << "shard " << shardIndex << " removed, "
173  << (shard->isLegacy() ? "legacy" : "corrupted")
174  << " shard";
175  continue;
176  }
177 
178  switch (shard->getState())
179  {
181  if (++openFinals > openFinalLimit_)
182  shard->tryClose();
183  shards_.emplace(shardIndex, std::move(shard));
184  break;
185 
188  shards_.emplace(shardIndex, std::move(shard))
189  .first->second,
190  true,
191  std::nullopt);
192  break;
193 
194  case ShardState::acquire:
195  if (acquireIndex_ != 0)
196  {
197  JLOG(j_.error())
198  << "more than one shard being acquired";
199  return false;
200  }
201 
202  shards_.emplace(shardIndex, std::move(shard));
203  acquireIndex_ = shardIndex;
204  break;
205 
206  default:
207  JLOG(j_.error())
208  << "shard " << shardIndex << " invalid state";
209  return false;
210  }
211  }
212  }
213  }
214  catch (std::exception const& e)
215  {
216  JLOG(j_.fatal()) << "Exception caught in function " << __func__
217  << ". Error: " << e.what();
218  return false;
219  }
220 
221  init_ = true;
222  }
223 
224  updateFileStats();
225  return true;
226 }
227 
230 {
231  std::optional<std::uint32_t> shardIndex;
232 
233  {
234  std::lock_guard lock(mutex_);
235  assert(init_);
236 
237  if (acquireIndex_ != 0)
238  {
239  if (auto const it{shards_.find(acquireIndex_)}; it != shards_.end())
240  return it->second->prepare();
241 
242  // Should never get here
243  assert(false);
244  return std::nullopt;
245  }
246 
247  if (!canAdd_)
248  return std::nullopt;
249 
250  shardIndex = findAcquireIndex(validLedgerSeq, lock);
251  }
252 
253  if (!shardIndex)
254  {
255  JLOG(j_.debug()) << "no new shards to add";
256  {
257  std::lock_guard lock(mutex_);
258  canAdd_ = false;
259  }
260  return std::nullopt;
261  }
262 
263  auto const pathDesignation = [this, shardIndex = *shardIndex]() {
264  std::lock_guard lock(mutex_);
265  return prepareForNewShard(shardIndex, numHistoricalShards(lock), lock);
266  }();
267 
268  if (!pathDesignation)
269  return std::nullopt;
270 
271  auto const needsHistoricalPath =
272  *pathDesignation == PathDesignation::historical;
273 
274  auto shard = [this, shardIndex, needsHistoricalPath] {
275  std::lock_guard lock(mutex_);
276  return std::make_unique<Shard>(
277  app_,
278  *this,
279  *shardIndex,
280  (needsHistoricalPath ? chooseHistoricalPath(lock) : ""),
281  j_);
282  }();
283 
284  if (!shard->init(scheduler_, *ctx_))
285  return std::nullopt;
286 
287  auto const ledgerSeq{shard->prepare()};
288  {
289  std::lock_guard lock(mutex_);
290  shards_.emplace(*shardIndex, std::move(shard));
291  acquireIndex_ = *shardIndex;
292  updatePeers(lock);
293  }
294 
295  return ledgerSeq;
296 }
297 
298 bool
300 {
301  auto fail = [j = j_, &shardIndexes](
302  std::string const& msg,
303  std::optional<std::uint32_t> shardIndex = std::nullopt) {
304  auto multipleIndexPrequel = [&shardIndexes] {
305  std::vector<std::string> indexesAsString(shardIndexes.size());
307  shardIndexes.begin(),
308  shardIndexes.end(),
309  indexesAsString.begin(),
310  [](uint32_t const index) { return std::to_string(index); });
311 
312  return std::string("shard") +
313  (shardIndexes.size() > 1 ? "s " : " ") +
314  boost::algorithm::join(indexesAsString, ", ");
315  };
316 
317  JLOG(j.error()) << (shardIndex ? "shard " + std::to_string(*shardIndex)
318  : multipleIndexPrequel())
319  << " " << msg;
320  return false;
321  };
322 
323  if (shardIndexes.empty())
324  return fail("invalid shard indexes");
325 
326  std::lock_guard lock(mutex_);
327  assert(init_);
328 
329  if (!canAdd_)
330  return fail("cannot be stored at this time");
331 
332  auto historicalShardsToPrepare = 0;
333 
334  for (auto const shardIndex : shardIndexes)
335  {
336  if (shardIndex < earliestShardIndex_)
337  {
338  return fail(
339  "comes before earliest shard index " +
341  shardIndex);
342  }
343 
344  // If we are synced to the network, check if the shard index is
345  // greater or equal to the current or validated shard index.
346  auto seqCheck = [&](std::uint32_t ledgerSeq) {
347  if (ledgerSeq >= earliestLedgerSeq_ &&
348  shardIndex >= seqToShardIndex(ledgerSeq))
349  {
350  return fail("invalid index", shardIndex);
351  }
352  return true;
353  };
354  if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex() + 1) ||
356  {
357  return fail("invalid index", shardIndex);
358  }
359 
360  if (shards_.find(shardIndex) != shards_.end())
361  return fail("is already stored", shardIndex);
362 
363  if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end())
364  return fail(
365  "is already queued for import from the shard archive handler",
366  shardIndex);
367 
369  {
370  if (auto shard = databaseImportStatus_->currentShard.lock(); shard)
371  {
372  if (shard->index() == shardIndex)
373  return fail(
374  "is being imported from the nodestore", shardIndex);
375  }
376  }
377 
378  // Any shard earlier than the two most recent shards
379  // is a historical shard
380  if (shardIndex < shardBoundaryIndex())
381  ++historicalShardsToPrepare;
382  }
383 
384  auto const numHistShards = numHistoricalShards(lock);
385 
386  // Check shard count and available storage space
387  if (numHistShards + historicalShardsToPrepare > maxHistoricalShards_)
388  return fail("maximum number of historical shards reached");
389 
390  if (historicalShardsToPrepare)
391  {
392  // Check available storage space for historical shards
393  if (!sufficientStorage(
394  historicalShardsToPrepare, PathDesignation::historical, lock))
395  return fail("insufficient storage space available");
396  }
397 
398  if (auto const recentShardsToPrepare =
399  shardIndexes.size() - historicalShardsToPrepare;
400  recentShardsToPrepare)
401  {
402  // Check available storage space for recent shards
403  if (!sufficientStorage(
404  recentShardsToPrepare, PathDesignation::none, lock))
405  return fail("insufficient storage space available");
406  }
407 
408  for (auto const shardIndex : shardIndexes)
409  preparedIndexes_.emplace(shardIndex);
410 
411  updatePeers(lock);
412  return true;
413 }
414 
415 void
417 {
418  std::lock_guard lock(mutex_);
419  assert(init_);
420 
421  if (preparedIndexes_.erase(shardIndex))
422  updatePeers(lock);
423 }
424 
427 {
429  {
430  std::lock_guard lock(mutex_);
431  assert(init_);
432 
433  for (auto const& shardIndex : preparedIndexes_)
434  rs.insert(shardIndex);
435  }
436 
437  if (rs.empty())
438  return {};
439 
440  return ripple::to_string(rs);
441 };
442 
443 bool
445  std::uint32_t shardIndex,
446  boost::filesystem::path const& srcDir)
447 {
448  auto fail = [&](std::string const& msg,
449  std::lock_guard<std::mutex> const& lock) {
450  JLOG(j_.error()) << "shard " << shardIndex << " " << msg;
451 
452  // Remove the failed import shard index so it can be retried
453  preparedIndexes_.erase(shardIndex);
454  updatePeers(lock);
455  return false;
456  };
457 
458  using namespace boost::filesystem;
459  try
460  {
461  if (!is_directory(srcDir) || is_empty(srcDir))
462  {
463  return fail(
464  "invalid source directory " + srcDir.string(),
466  }
467  }
468  catch (std::exception const& e)
469  {
470  return fail(
471  std::string(". Exception caught in function ") + __func__ +
472  ". Error: " + e.what(),
474  }
475 
476  auto const expectedHash{app_.getLedgerMaster().walkHashBySeq(
478  if (!expectedHash)
479  return fail("expected hash not found", std::lock_guard(mutex_));
480 
481  path dstDir;
482  {
483  std::lock_guard lock(mutex_);
484  if (shards_.find(shardIndex) != shards_.end())
485  return fail("already exists", lock);
486 
487  // Check shard was prepared for import
488  if (preparedIndexes_.find(shardIndex) == preparedIndexes_.end())
489  return fail("was not prepared for import", lock);
490 
491  auto const pathDesignation{
492  prepareForNewShard(shardIndex, numHistoricalShards(lock), lock)};
493  if (!pathDesignation)
494  return fail("failed to import", lock);
495 
496  if (*pathDesignation == PathDesignation::historical)
497  dstDir = chooseHistoricalPath(lock);
498  else
499  dstDir = dir_;
500  }
501  dstDir /= std::to_string(shardIndex);
502 
503  auto renameDir = [&, fname = __func__](path const& src, path const& dst) {
504  try
505  {
506  rename(src, dst);
507  }
508  catch (std::exception const& e)
509  {
510  return fail(
511  std::string(". Exception caught in function ") + fname +
512  ". Error: " + e.what(),
514  }
515  return true;
516  };
517 
518  // Rename source directory to the shard database directory
519  if (!renameDir(srcDir, dstDir))
520  return false;
521 
522  // Create the new shard
523  auto shard{std::make_unique<Shard>(
524  app_, *this, shardIndex, dstDir.parent_path(), j_)};
525 
526  if (!shard->init(scheduler_, *ctx_) ||
527  shard->getState() != ShardState::complete)
528  {
529  shard.reset();
530  renameDir(dstDir, srcDir);
531  return fail("failed to import", std::lock_guard(mutex_));
532  }
533 
534  auto const [it, inserted] = [&]() {
535  std::lock_guard lock(mutex_);
536  preparedIndexes_.erase(shardIndex);
537  return shards_.emplace(shardIndex, std::move(shard));
538  }();
539 
540  if (!inserted)
541  {
542  shard.reset();
543  renameDir(dstDir, srcDir);
544  return fail("failed to import", std::lock_guard(mutex_));
545  }
546 
547  finalizeShard(it->second, true, expectedHash);
548  return true;
549 }
550 
553 {
554  auto const shardIndex{seqToShardIndex(ledgerSeq)};
555  {
557  {
558  std::lock_guard lock(mutex_);
559  assert(init_);
560 
561  auto const it{shards_.find(shardIndex)};
562  if (it == shards_.end())
563  return nullptr;
564  shard = it->second;
565  }
566 
567  // Ledger must be stored in a final or acquiring shard
568  switch (shard->getState())
569  {
571  break;
572  case ShardState::acquire:
573  if (shard->containsLedger(ledgerSeq))
574  break;
575  [[fallthrough]];
576  default:
577  return nullptr;
578  }
579  }
580 
581  auto const nodeObject{Database::fetchNodeObject(hash, ledgerSeq)};
582  if (!nodeObject)
583  return nullptr;
584 
585  auto fail = [&](std::string const& msg) -> std::shared_ptr<Ledger> {
586  JLOG(j_.error()) << "shard " << shardIndex << " " << msg;
587  return nullptr;
588  };
589 
590  auto ledger{std::make_shared<Ledger>(
591  deserializePrefixedHeader(makeSlice(nodeObject->getData())),
592  app_.config(),
593  *app_.getShardFamily())};
594 
595  if (ledger->info().seq != ledgerSeq)
596  {
597  return fail(
598  "encountered invalid ledger sequence " + std::to_string(ledgerSeq));
599  }
600  if (ledger->info().hash != hash)
601  {
602  return fail(
603  "encountered invalid ledger hash " + to_string(hash) +
604  " on sequence " + std::to_string(ledgerSeq));
605  }
606 
607  ledger->setFull();
608  if (!ledger->stateMap().fetchRoot(
609  SHAMapHash{ledger->info().accountHash}, nullptr))
610  {
611  return fail(
612  "is missing root STATE node on hash " + to_string(hash) +
613  " on sequence " + std::to_string(ledgerSeq));
614  }
615 
616  if (ledger->info().txHash.isNonZero())
617  {
618  if (!ledger->txMap().fetchRoot(
619  SHAMapHash{ledger->info().txHash}, nullptr))
620  {
621  return fail(
622  "is missing root TXN node on hash " + to_string(hash) +
623  " on sequence " + std::to_string(ledgerSeq));
624  }
625  }
626  return ledger;
627 }
628 
629 void
631 {
632  auto const ledgerSeq{ledger->info().seq};
633  if (ledger->info().hash.isZero())
634  {
635  JLOG(j_.error()) << "zero ledger hash for ledger sequence "
636  << ledgerSeq;
637  return;
638  }
639  if (ledger->info().accountHash.isZero())
640  {
641  JLOG(j_.error()) << "zero account hash for ledger sequence "
642  << ledgerSeq;
643  return;
644  }
645  if (ledger->stateMap().getHash().isNonZero() &&
646  !ledger->stateMap().isValid())
647  {
648  JLOG(j_.error()) << "invalid state map for ledger sequence "
649  << ledgerSeq;
650  return;
651  }
652  if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
653  {
654  JLOG(j_.error()) << "invalid transaction map for ledger sequence "
655  << ledgerSeq;
656  return;
657  }
658 
659  auto const shardIndex{seqToShardIndex(ledgerSeq)};
661  {
662  std::lock_guard lock(mutex_);
663  assert(init_);
664 
665  if (shardIndex != acquireIndex_)
666  {
667  JLOG(j_.trace())
668  << "shard " << shardIndex << " is not being acquired";
669  return;
670  }
671 
672  auto const it{shards_.find(shardIndex)};
673  if (it == shards_.end())
674  {
675  JLOG(j_.error())
676  << "shard " << shardIndex << " is not being acquired";
677  return;
678  }
679  shard = it->second;
680  }
681 
682  if (shard->containsLedger(ledgerSeq))
683  {
684  JLOG(j_.trace()) << "shard " << shardIndex << " ledger already stored";
685  return;
686  }
687 
688  setStoredInShard(shard, ledger);
689 }
690 
693 {
694  std::lock_guard lock(mutex_);
695  return getShardInfo(lock);
696 }
697 
698 void
700 {
701  // Stop read threads in base before data members are destroyed
702  Database::stop();
704  {
705  std::lock_guard lock(mutex_);
706  shards.reserve(shards_.size());
707  for (auto const& [_, shard] : shards_)
708  {
709  shards.push_back(shard);
710  shard->stop();
711  }
712  shards_.clear();
713  }
714  taskQueue_.stop();
715 
716  // All shards should be expired at this point
717  for (auto const& wptr : shards)
718  {
719  if (auto const shard{wptr.lock()})
720  {
721  JLOG(j_.warn()) << " shard " << shard->index() << " unexpired";
722  }
723  }
724 
725  std::unique_lock lock(mutex_);
726 
727  // Notify the shard being imported
728  // from the node store to stop
730  {
731  // A node store import is in progress
732  if (auto importShard = databaseImportStatus_->currentShard.lock();
733  importShard)
734  importShard->stop();
735  }
736 
737  // Wait for the node store import thread
738  // if necessary
740  {
741  // Tells the import function to halt
742  haltDatabaseImport_ = true;
743 
744  // Wait for the function to exit
745  while (databaseImportStatus_)
746  {
747  // Unlock just in case the import
748  // function is waiting on the mutex
749  lock.unlock();
750 
752  lock.lock();
753  }
754 
755  // Calling join while holding the mutex_ without
756  // first making sure that doImportDatabase has
757  // exited could lead to deadlock via the mutex
758  // acquisition that occurs in that function
761  }
762 }
763 
764 void
766 {
767  std::lock_guard lock(mutex_);
768  assert(init_);
769 
770  // Only the application local node store can be imported
771  assert(&source == &app_.getNodeStore());
772 
774  {
775  assert(false);
776  JLOG(j_.error()) << "database import already in progress";
777  return;
778  }
779 
781 }
782 
783 void
785 {
786  auto shouldHalt = [this] {
787  bool expected = true;
788  return haltDatabaseImport_.compare_exchange_strong(expected, false) ||
789  isStopping();
790  };
791 
792  if (shouldHalt())
793  return;
794 
795  auto loadLedger =
796  [this](char const* const sortOrder) -> std::optional<std::uint32_t> {
798  std::uint32_t ledgerSeq{0};
800  if (sortOrder == std::string("asc"))
801  {
802  info = dynamic_cast<RelationalDBInterfaceSqlite*>(
805  }
806  else
807  {
808  info = dynamic_cast<RelationalDBInterfaceSqlite*>(
811  }
812  if (info)
813  {
814  ledger = loadLedgerHelper(*info, app_, false);
815  ledgerSeq = info->seq;
816  }
817  if (!ledger || ledgerSeq == 0)
818  {
819  JLOG(j_.error()) << "no suitable ledgers were found in"
820  " the SQLite database to import";
821  return std::nullopt;
822  }
823  return ledgerSeq;
824  };
825 
826  // Find earliest ledger sequence stored
827  auto const earliestLedgerSeq{loadLedger("asc")};
828  if (!earliestLedgerSeq)
829  return;
830 
831  auto const earliestIndex = [&] {
832  auto earliestIndex = seqToShardIndex(*earliestLedgerSeq);
833 
834  // Consider only complete shards
835  if (earliestLedgerSeq != firstLedgerSeq(earliestIndex))
836  ++earliestIndex;
837 
838  return earliestIndex;
839  }();
840 
841  // Find last ledger sequence stored
842  auto const latestLedgerSeq = loadLedger("desc");
843  if (!latestLedgerSeq)
844  return;
845 
846  auto const latestIndex = [&] {
847  auto latestIndex = seqToShardIndex(*latestLedgerSeq);
848 
849  // Consider only complete shards
850  if (latestLedgerSeq != lastLedgerSeq(latestIndex))
851  --latestIndex;
852 
853  return latestIndex;
854  }();
855 
856  if (latestIndex < earliestIndex)
857  {
858  JLOG(j_.error()) << "no suitable ledgers were found in"
859  " the SQLite database to import";
860  return;
861  }
862 
863  JLOG(j_.debug()) << "Importing ledgers for shards " << earliestIndex
864  << " through " << latestIndex;
865 
866  {
867  std::lock_guard lock(mutex_);
868 
869  assert(!databaseImportStatus_);
870  databaseImportStatus_ = std::make_unique<DatabaseImportStatus>(
871  earliestIndex, latestIndex, 0);
872  }
873 
874  // Import the shards
875  for (std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex;
876  ++shardIndex)
877  {
878  if (shouldHalt())
879  return;
880 
881  auto const pathDesignation = [this, shardIndex] {
882  std::lock_guard lock(mutex_);
883 
884  auto const numHistShards = numHistoricalShards(lock);
885  auto const pathDesignation =
886  prepareForNewShard(shardIndex, numHistShards, lock);
887 
888  return pathDesignation;
889  }();
890 
891  if (!pathDesignation)
892  break;
893 
894  {
895  std::lock_guard lock(mutex_);
896 
897  // Skip if being acquired
898  if (shardIndex == acquireIndex_)
899  {
900  JLOG(j_.debug())
901  << "shard " << shardIndex << " already being acquired";
902  continue;
903  }
904 
905  // Skip if being imported from the shard archive handler
906  if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end())
907  {
908  JLOG(j_.debug())
909  << "shard " << shardIndex << " already being imported";
910  continue;
911  }
912 
913  // Skip if stored
914  if (shards_.find(shardIndex) != shards_.end())
915  {
916  JLOG(j_.debug()) << "shard " << shardIndex << " already stored";
917  continue;
918  }
919  }
920 
921  std::uint32_t const firstSeq = firstLedgerSeq(shardIndex);
922  std::uint32_t const lastSeq =
923  std::max(firstSeq, lastLedgerSeq(shardIndex));
924 
925  // Verify SQLite ledgers are in the node store
926  {
927  auto const ledgerHashes{
929  firstSeq, lastSeq)};
930  if (ledgerHashes.size() != maxLedgers(shardIndex))
931  continue;
932 
933  auto& source = app_.getNodeStore();
934  bool valid{true};
935 
936  for (std::uint32_t n = firstSeq; n <= lastSeq; ++n)
937  {
938  if (!source.fetchNodeObject(ledgerHashes.at(n).ledgerHash, n))
939  {
940  JLOG(j_.warn()) << "SQLite ledger sequence " << n
941  << " mismatches node store";
942  valid = false;
943  break;
944  }
945  }
946  if (!valid)
947  continue;
948  }
949 
950  if (shouldHalt())
951  return;
952 
953  bool const needsHistoricalPath =
954  *pathDesignation == PathDesignation::historical;
955 
956  auto const path = needsHistoricalPath
958  : dir_;
959 
960  // Create the new shard
961  auto shard{std::make_shared<Shard>(app_, *this, shardIndex, path, j_)};
962  if (!shard->init(scheduler_, *ctx_))
963  continue;
964 
965  {
966  std::lock_guard lock(mutex_);
967 
968  if (shouldHalt())
969  return;
970 
971  databaseImportStatus_->currentIndex = shardIndex;
972  databaseImportStatus_->currentShard = shard;
973  databaseImportStatus_->firstSeq = firstSeq;
974  databaseImportStatus_->lastSeq = lastSeq;
975  }
976 
977  // Create a marker file to signify a database import in progress
978  auto const shardDir{path / std::to_string(shardIndex)};
979  auto const markerFile{shardDir / databaseImportMarker_};
980  {
981  std::ofstream ofs{markerFile.string()};
982  if (!ofs.is_open())
983  {
984  JLOG(j_.error()) << "shard " << shardIndex
985  << " failed to create temp marker file";
986  shard->removeOnDestroy();
987  continue;
988  }
989  }
990 
991  // Copy the ledgers from node store
992  std::shared_ptr<Ledger> recentStored;
993  std::optional<uint256> lastLedgerHash;
994 
995  while (auto const ledgerSeq = shard->prepare())
996  {
997  if (shouldHalt())
998  return;
999 
1000  // Not const so it may be moved later
1001  auto ledger{loadByIndex(*ledgerSeq, app_, false)};
1002  if (!ledger || ledger->info().seq != ledgerSeq)
1003  break;
1004 
1005  auto const result{shard->storeLedger(ledger, recentStored)};
1006  storeStats(result.count, result.size);
1007  if (result.error)
1008  break;
1009 
1010  if (!shard->setLedgerStored(ledger))
1011  break;
1012 
1013  if (!lastLedgerHash && ledgerSeq == lastSeq)
1014  lastLedgerHash = ledger->info().hash;
1015 
1016  recentStored = std::move(ledger);
1017  }
1018 
1019  if (shouldHalt())
1020  return;
1021 
1022  using namespace boost::filesystem;
1023  bool success{false};
1024  if (lastLedgerHash && shard->getState() == ShardState::complete)
1025  {
1026  // Store shard final key
1027  Serializer s;
1028  s.add32(Shard::version);
1029  s.add32(firstLedgerSeq(shardIndex));
1030  s.add32(lastLedgerSeq(shardIndex));
1031  s.addBitString(*lastLedgerHash);
1032  auto const nodeObject{NodeObject::createObject(
1033  hotUNKNOWN, std::move(s.modData()), Shard::finalKey)};
1034 
1035  if (shard->storeNodeObject(nodeObject))
1036  {
1037  try
1038  {
1039  std::lock_guard lock(mutex_);
1040 
1041  // The database import process is complete and the
1042  // marker file is no longer required
1043  remove_all(markerFile);
1044 
1045  JLOG(j_.debug()) << "shard " << shardIndex
1046  << " was successfully imported"
1047  " from the NodeStore";
1048  finalizeShard(
1049  shards_.emplace(shardIndex, std::move(shard))
1050  .first->second,
1051  true,
1052  std::nullopt);
1053 
1054  // This variable is meant to capture the success
1055  // of everything up to the point of shard finalization.
1056  // If the shard fails to finalize, this condition will
1057  // be handled by the finalization function itself, and
1058  // not here.
1059  success = true;
1060  }
1061  catch (std::exception const& e)
1062  {
1063  JLOG(j_.fatal()) << "shard index " << shardIndex
1064  << ". Exception caught in function "
1065  << __func__ << ". Error: " << e.what();
1066  }
1067  }
1068  }
1069 
1070  if (!success)
1071  {
1072  JLOG(j_.error()) << "shard " << shardIndex
1073  << " failed to import from the NodeStore";
1074 
1075  if (shard)
1076  shard->removeOnDestroy();
1077  }
1078  }
1079 
1080  if (shouldHalt())
1081  return;
1082 
1083  updateFileStats();
1084 }
1085 
1088 {
1089  std::shared_ptr<Shard> shard;
1090  {
1091  std::lock_guard lock(mutex_);
1092  assert(init_);
1093 
1094  auto const it{shards_.find(acquireIndex_)};
1095  if (it == shards_.end())
1096  return 0;
1097  shard = it->second;
1098  }
1099 
1100  return shard->getWriteLoad();
1101 }
1102 
1103 void
1105  NodeObjectType type,
1106  Blob&& data,
1107  uint256 const& hash,
1108  std::uint32_t ledgerSeq)
1109 {
1110  auto const shardIndex{seqToShardIndex(ledgerSeq)};
1111  std::shared_ptr<Shard> shard;
1112  {
1113  std::lock_guard lock(mutex_);
1114  if (shardIndex != acquireIndex_)
1115  {
1116  JLOG(j_.trace())
1117  << "shard " << shardIndex << " is not being acquired";
1118  return;
1119  }
1120 
1121  auto const it{shards_.find(shardIndex)};
1122  if (it == shards_.end())
1123  {
1124  JLOG(j_.error())
1125  << "shard " << shardIndex << " is not being acquired";
1126  return;
1127  }
1128  shard = it->second;
1129  }
1130 
1131  auto const nodeObject{
1132  NodeObject::createObject(type, std::move(data), hash)};
1133  if (shard->storeNodeObject(nodeObject))
1134  storeStats(1, nodeObject->getData().size());
1135 }
1136 
1137 bool
1139 {
1140  auto const ledgerSeq{srcLedger->info().seq};
1141  auto const shardIndex{seqToShardIndex(ledgerSeq)};
1142  std::shared_ptr<Shard> shard;
1143  {
1144  std::lock_guard lock(mutex_);
1145  assert(init_);
1146 
1147  if (shardIndex != acquireIndex_)
1148  {
1149  JLOG(j_.trace())
1150  << "shard " << shardIndex << " is not being acquired";
1151  return false;
1152  }
1153 
1154  auto const it{shards_.find(shardIndex)};
1155  if (it == shards_.end())
1156  {
1157  JLOG(j_.error())
1158  << "shard " << shardIndex << " is not being acquired";
1159  return false;
1160  }
1161  shard = it->second;
1162  }
1163 
1164  auto const result{shard->storeLedger(srcLedger, nullptr)};
1165  storeStats(result.count, result.size);
1166  if (result.error || result.count == 0 || result.size == 0)
1167  return false;
1168 
1169  return setStoredInShard(shard, srcLedger);
1170 }
1171 
1172 void
1174 {
1176  {
1177  std::lock_guard lock(mutex_);
1178  assert(init_);
1179 
1180  shards.reserve(shards_.size());
1181  for (auto const& e : shards_)
1182  shards.push_back(e.second);
1183  }
1184 
1186  openFinals.reserve(openFinalLimit_);
1187 
1188  for (auto const& weak : shards)
1189  {
1190  if (auto const shard{weak.lock()}; shard && shard->isOpen())
1191  {
1192  if (shard->getState() == ShardState::finalized)
1193  openFinals.emplace_back(std::move(shard));
1194  }
1195  }
1196 
1197  if (openFinals.size() > openFinalLimit_)
1198  {
1199  JLOG(j_.trace()) << "Open shards exceed configured limit of "
1200  << openFinalLimit_ << " by "
1201  << (openFinals.size() - openFinalLimit_);
1202 
1203  // Try to close enough shards to be within the limit.
1204  // Sort ascending on last use so the oldest are removed first.
1205  std::sort(
1206  openFinals.begin(),
1207  openFinals.end(),
1208  [&](std::shared_ptr<Shard> const& lhsShard,
1209  std::shared_ptr<Shard> const& rhsShard) {
1210  return lhsShard->getLastUse() < rhsShard->getLastUse();
1211  });
1212 
1213  for (auto it{openFinals.cbegin()};
1214  it != openFinals.cend() && openFinals.size() > openFinalLimit_;)
1215  {
1216  if ((*it)->tryClose())
1217  it = openFinals.erase(it);
1218  else
1219  ++it;
1220  }
1221  }
1222 }
1223 
1226 {
1228  {
1230 
1231  ret[jss::firstShardIndex] = databaseImportStatus_->earliestIndex;
1232  ret[jss::lastShardIndex] = databaseImportStatus_->latestIndex;
1233  ret[jss::currentShardIndex] = databaseImportStatus_->currentIndex;
1234 
1235  Json::Value currentShard(Json::objectValue);
1236  currentShard[jss::firstSequence] = databaseImportStatus_->firstSeq;
1237  currentShard[jss::lastSequence] = databaseImportStatus_->lastSeq;
1238 
1239  if (auto shard = databaseImportStatus_->currentShard.lock(); shard)
1240  currentShard[jss::storedSeqs] = shard->getStoredSeqs();
1241 
1242  ret[jss::currentShard] = currentShard;
1243 
1244  if (haltDatabaseImport_)
1245  ret[jss::message] = "Database import halt initiated...";
1246 
1247  return ret;
1248  }
1249 
1250  return RPC::make_error(rpcINTERNAL, "Database import not running");
1251 }
1252 
1255 {
1256  std::lock_guard lock(mutex_);
1257 
1258  if (!init_)
1259  return RPC::make_error(rpcINTERNAL, "Shard store not initialized");
1260 
1262  return RPC::make_error(
1263  rpcINTERNAL, "Database import already in progress");
1264 
1265  if (isStopping())
1266  return RPC::make_error(rpcINTERNAL, "Node is shutting down");
1267 
1269 
1271  result[jss::message] = "Database import initiated...";
1272 
1273  return result;
1274 }
1275 
1278 {
1279  std::lock_guard lock(mutex_);
1280 
1281  if (!init_)
1282  return RPC::make_error(rpcINTERNAL, "Shard store not initialized");
1283 
1284  if (!databaseImporter_.joinable())
1285  return RPC::make_error(rpcINTERNAL, "Database import not running");
1286 
1287  if (isStopping())
1288  return RPC::make_error(rpcINTERNAL, "Node is shutting down");
1289 
1290  haltDatabaseImport_ = true;
1291 
1293  result[jss::message] = "Database import halt initiated...";
1294 
1295  return result;
1296 }
1297 
1300 {
1301  std::lock_guard lock(mutex_);
1302 
1303  if (!databaseImportStatus_)
1304  return {};
1305 
1306  return databaseImportStatus_->firstSeq;
1307 }
1308 
1309 bool
1311 {
1312  auto fail = [j = j_](std::string const& msg) {
1313  JLOG(j.error()) << "[" << ConfigSection::shardDatabase() << "] " << msg;
1314  return false;
1315  };
1316 
1317  Config const& config{app_.config()};
1318  Section const& section{config.section(ConfigSection::shardDatabase())};
1319 
1320  auto compare = [&](std::string const& name, std::uint32_t defaultValue) {
1321  std::uint32_t shardDBValue{defaultValue};
1322  get_if_exists<std::uint32_t>(section, name, shardDBValue);
1323 
1324  std::uint32_t nodeDBValue{defaultValue};
1325  get_if_exists<std::uint32_t>(
1326  config.section(ConfigSection::nodeDatabase()), name, nodeDBValue);
1327 
1328  return shardDBValue == nodeDBValue;
1329  };
1330 
1331  // If ledgers_per_shard or earliest_seq are specified,
1332  // they must be equally assigned in 'node_db'
1333  if (!compare("ledgers_per_shard", DEFAULT_LEDGERS_PER_SHARD))
1334  {
1335  return fail(
1336  "and [" + ConfigSection::nodeDatabase() + "] define different '" +
1337  "ledgers_per_shard" + "' values");
1338  }
1339  if (!compare("earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
1340  {
1341  return fail(
1342  "and [" + ConfigSection::nodeDatabase() + "] define different '" +
1343  "earliest_seq" + "' values");
1344  }
1345 
1346  using namespace boost::filesystem;
1347  if (!get_if_exists<path>(section, "path", dir_))
1348  return fail("'path' missing");
1349 
1350  {
1351  get_if_exists(section, "max_historical_shards", maxHistoricalShards_);
1352 
1353  Section const& historicalShardPaths =
1354  config.section(SECTION_HISTORICAL_SHARD_PATHS);
1355 
1356  auto values = historicalShardPaths.values();
1357 
1358  std::sort(values.begin(), values.end());
1359  values.erase(std::unique(values.begin(), values.end()), values.end());
1360 
1361  for (auto const& s : values)
1362  {
1363  auto const dir = path(s);
1364  if (dir_ == dir)
1365  {
1366  return fail(
1367  "the 'path' cannot also be in the "
1368  "'historical_shard_path' section");
1369  }
1370 
1372  }
1373  }
1374 
1375  // NuDB is the default and only supported permanent storage backend
1376  backendName_ = get(section, "type", "nudb");
1377  if (!boost::iequals(backendName_, "NuDB"))
1378  return fail("'type' value unsupported");
1379 
1380  return true;
1381 }
1382 
1385  uint256 const& hash,
1386  std::uint32_t ledgerSeq,
1387  FetchReport& fetchReport,
1388  bool duplicate)
1389 {
1390  auto const shardIndex{seqToShardIndex(ledgerSeq)};
1391  std::shared_ptr<Shard> shard;
1392  {
1393  std::lock_guard lock(mutex_);
1394  auto const it{shards_.find(shardIndex)};
1395  if (it == shards_.end())
1396  return nullptr;
1397  shard = it->second;
1398  }
1399 
1400  return shard->fetchNodeObject(hash, fetchReport);
1401 }
1402 
1405  std::uint32_t validLedgerSeq,
1407 {
1408  if (validLedgerSeq < earliestLedgerSeq_)
1409  return std::nullopt;
1410 
1411  auto const maxShardIndex{[this, validLedgerSeq]() {
1412  auto shardIndex{seqToShardIndex(validLedgerSeq)};
1413  if (validLedgerSeq != lastLedgerSeq(shardIndex))
1414  --shardIndex;
1415  return shardIndex;
1416  }()};
1417  auto const maxNumShards{maxShardIndex - earliestShardIndex_ + 1};
1418 
1419  // Check if the shard store has all shards
1420  if (shards_.size() >= maxNumShards)
1421  return std::nullopt;
1422 
1423  if (maxShardIndex < 1024 ||
1424  static_cast<float>(shards_.size()) / maxNumShards > 0.5f)
1425  {
1426  // Small or mostly full index space to sample
1427  // Find the available indexes and select one at random
1429  available.reserve(maxNumShards - shards_.size());
1430 
1431  for (auto shardIndex = earliestShardIndex_; shardIndex <= maxShardIndex;
1432  ++shardIndex)
1433  {
1434  if (shards_.find(shardIndex) == shards_.end() &&
1435  preparedIndexes_.find(shardIndex) == preparedIndexes_.end())
1436  {
1437  available.push_back(shardIndex);
1438  }
1439  }
1440 
1441  if (available.empty())
1442  return std::nullopt;
1443 
1444  if (available.size() == 1)
1445  return available.front();
1446 
1447  return available[rand_int(
1448  0u, static_cast<std::uint32_t>(available.size() - 1))];
1449  }
1450 
1451  // Large, sparse index space to sample
1452  // Keep choosing indexes at random until an available one is found
1453  // chances of running more than 30 times is less than 1 in a billion
1454  for (int i = 0; i < 40; ++i)
1455  {
1456  auto const shardIndex{rand_int(earliestShardIndex_, maxShardIndex)};
1457  if (shards_.find(shardIndex) == shards_.end() &&
1458  preparedIndexes_.find(shardIndex) == preparedIndexes_.end())
1459  {
1460  return shardIndex;
1461  }
1462  }
1463 
1464  assert(false);
1465  return std::nullopt;
1466 }
1467 
1468 void
1470  std::shared_ptr<Shard>& shard,
1471  bool const writeSQLite,
1472  std::optional<uint256> const& expectedHash)
1473 {
1474  taskQueue_.addTask([this,
1475  wptr = std::weak_ptr<Shard>(shard),
1476  writeSQLite,
1477  expectedHash]() {
1478  if (isStopping())
1479  return;
1480 
1481  auto shard{wptr.lock()};
1482  if (!shard)
1483  {
1484  JLOG(j_.debug()) << "Shard removed before being finalized";
1485  return;
1486  }
1487 
1488  if (!shard->finalize(writeSQLite, expectedHash))
1489  {
1490  if (isStopping())
1491  return;
1492 
1493  // Invalid or corrupt shard, remove it
1494  removeFailedShard(shard);
1495  return;
1496  }
1497 
1498  if (isStopping())
1499  return;
1500 
1501  {
1502  auto const boundaryIndex{shardBoundaryIndex()};
1503  std::lock_guard lock(mutex_);
1504 
1505  if (shard->index() < boundaryIndex)
1506  {
1507  // This is a historical shard
1508  if (!historicalPaths_.empty() &&
1509  shard->getDir().parent_path() == dir_)
1510  {
1511  // Shard wasn't placed at a separate historical path
1512  JLOG(j_.warn()) << "shard " << shard->index()
1513  << " is not stored at a historical path";
1514  }
1515  }
1516  else
1517  {
1518  // Not a historical shard. Shift recent shards if necessary
1519  assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1520  relocateOutdatedShards(lock);
1521 
1522  // Set the appropriate recent shard index
1523  if (shard->index() == boundaryIndex)
1524  secondLatestShardIndex_ = shard->index();
1525  else
1526  latestShardIndex_ = shard->index();
1527 
1528  if (shard->getDir().parent_path() != dir_)
1529  {
1530  JLOG(j_.warn()) << "shard " << shard->index()
1531  << " is not stored at the path";
1532  }
1533  }
1534 
1535  updatePeers(lock);
1536  }
1537 
1538  updateFileStats();
1539  });
1540 }
1541 
1542 void
1544 {
1546  {
1547  std::lock_guard lock(mutex_);
1548  if (shards_.empty())
1549  return;
1550 
1551  shards.reserve(shards_.size());
1552  for (auto const& e : shards_)
1553  shards.push_back(e.second);
1554  }
1555 
1556  std::uint64_t sumSz{0};
1557  std::uint32_t sumFd{0};
1558  std::uint32_t numShards{0};
1559  for (auto const& weak : shards)
1560  {
1561  if (auto const shard{weak.lock()}; shard)
1562  {
1563  auto const [sz, fd] = shard->getFileInfo();
1564  sumSz += sz;
1565  sumFd += fd;
1566  ++numShards;
1567  }
1568  }
1569 
1570  std::lock_guard lock(mutex_);
1571  fileSz_ = sumSz;
1572  fdRequired_ = sumFd;
1573  avgShardFileSz_ = (numShards == 0 ? fileSz_ : fileSz_ / numShards);
1574 
1575  if (!canAdd_)
1576  return;
1577 
1578  if (auto const count = numHistoricalShards(lock);
1579  count >= maxHistoricalShards_)
1580  {
1582  {
1583  // In order to avoid excessive output, don't produce
1584  // this warning if the server isn't configured to
1585  // store historical shards.
1586  JLOG(j_.warn()) << "maximum number of historical shards reached";
1587  }
1588 
1589  canAdd_ = false;
1590  }
1591  else if (!sufficientStorage(
1592  maxHistoricalShards_ - count,
1594  lock))
1595  {
1596  JLOG(j_.warn())
1597  << "maximum shard store size exceeds available storage space";
1598 
1599  canAdd_ = false;
1600  }
1601 }
1602 
1603 bool
1605  std::uint32_t numShards,
1606  PathDesignation pathDesignation,
1607  std::lock_guard<std::mutex> const&) const
1608 {
1609  try
1610  {
1611  std::vector<std::uint64_t> capacities;
1612 
1613  if (pathDesignation == PathDesignation::historical &&
1615  {
1616  capacities.reserve(historicalPaths_.size());
1617 
1618  for (auto const& path : historicalPaths_)
1619  {
1620  // Get the available storage for each historical path
1621  auto const availableSpace =
1622  boost::filesystem::space(path).available;
1623 
1624  capacities.push_back(availableSpace);
1625  }
1626  }
1627  else
1628  {
1629  // Get the available storage for the main shard path
1630  capacities.push_back(boost::filesystem::space(dir_).available);
1631  }
1632 
1633  for (std::uint64_t const capacity : capacities)
1634  {
1635  // Leverage all the historical shard paths to
1636  // see if collectively they can fit the specified
1637  // number of shards. For this to work properly,
1638  // each historical path must correspond to a separate
1639  // physical device or filesystem.
1640 
1641  auto const shardCap = capacity / avgShardFileSz_;
1642  if (numShards <= shardCap)
1643  return true;
1644 
1645  numShards -= shardCap;
1646  }
1647  }
1648  catch (std::exception const& e)
1649  {
1650  JLOG(j_.fatal()) << "Exception caught in function " << __func__
1651  << ". Error: " << e.what();
1652  return false;
1653  }
1654 
1655  return false;
1656 }
1657 
1658 bool
1660  std::shared_ptr<Shard>& shard,
1661  std::shared_ptr<Ledger const> const& ledger)
1662 {
1663  if (!shard->setLedgerStored(ledger))
1664  {
1665  // Invalid or corrupt shard, remove it
1666  removeFailedShard(shard);
1667  return false;
1668  }
1669 
1670  if (shard->getState() == ShardState::complete)
1671  {
1672  std::lock_guard lock(mutex_);
1673  if (auto const it{shards_.find(shard->index())}; it != shards_.end())
1674  {
1675  if (shard->index() == acquireIndex_)
1676  acquireIndex_ = 0;
1677 
1678  finalizeShard(it->second, false, std::nullopt);
1679  }
1680  else
1681  {
1682  JLOG(j_.debug())
1683  << "shard " << shard->index() << " is no longer being acquired";
1684  }
1685  }
1686 
1687  updateFileStats();
1688  return true;
1689 }
1690 
1691 void
1693 {
1694  {
1695  std::lock_guard lock(mutex_);
1696 
1697  if (shard->index() == acquireIndex_)
1698  acquireIndex_ = 0;
1699 
1700  if (shard->index() == latestShardIndex_)
1701  latestShardIndex_ = std::nullopt;
1702 
1703  if (shard->index() == secondLatestShardIndex_)
1704  secondLatestShardIndex_ = std::nullopt;
1705  }
1706 
1707  shard->removeOnDestroy();
1708 
1709  // Reset the shared_ptr to invoke the shard's
1710  // destructor and remove it from the server
1711  shard.reset();
1712  updateFileStats();
1713 }
1714 
1717 {
1718  auto const validIndex = app_.getLedgerMaster().getValidLedgerIndex();
1719 
1720  if (validIndex < earliestLedgerSeq_)
1721  return 0;
1722 
1723  // Shards with an index earlier than the recent shard boundary index
1724  // are considered historical. The three shards at or later than
1725  // this index consist of the two most recently validated shards
1726  // and the shard still in the process of being built by live
1727  // transactions.
1728  return seqToShardIndex(validIndex) - 1;
1729 }
1730 
1733  std::lock_guard<std::mutex> const& lock) const
1734 {
1735  auto const boundaryIndex{shardBoundaryIndex()};
1736  return std::count_if(
1737  shards_.begin(), shards_.end(), [boundaryIndex](auto const& entry) {
1738  return entry.first < boundaryIndex;
1739  });
1740 }
1741 
1742 void
1744  std::lock_guard<std::mutex> const& lock)
1745 {
1746  auto& cur{latestShardIndex_};
1747  auto& prev{secondLatestShardIndex_};
1748  if (!cur && !prev)
1749  return;
1750 
1751  auto const latestShardIndex =
1753  auto const separateHistoricalPath = !historicalPaths_.empty();
1754 
1755  auto const removeShard = [this](std::uint32_t const shardIndex) -> void {
1756  canAdd_ = false;
1757 
1758  if (auto it = shards_.find(shardIndex); it != shards_.end())
1759  {
1760  if (it->second)
1761  removeFailedShard(it->second);
1762  else
1763  {
1764  JLOG(j_.warn()) << "can't find shard to remove";
1765  }
1766  }
1767  else
1768  {
1769  JLOG(j_.warn()) << "can't find shard to remove";
1770  }
1771  };
1772 
1773  auto const keepShard = [this, &lock, removeShard, separateHistoricalPath](
1774  std::uint32_t const shardIndex) -> bool {
1776  {
1777  JLOG(j_.error()) << "maximum number of historical shards reached";
1778  removeShard(shardIndex);
1779  return false;
1780  }
1781  if (separateHistoricalPath &&
1783  {
1784  JLOG(j_.error()) << "insufficient storage space available";
1785  removeShard(shardIndex);
1786  return false;
1787  }
1788 
1789  return true;
1790  };
1791 
1792  // Move a shard from the main shard path to a historical shard
1793  // path by copying the contents, and creating a new shard.
1794  auto const moveShard = [this,
1795  &lock](std::uint32_t const shardIndex) -> void {
1796  auto it{shards_.find(shardIndex)};
1797  if (it == shards_.end())
1798  {
1799  JLOG(j_.warn()) << "can't find shard to move to historical path";
1800  return;
1801  }
1802 
1803  auto& shard{it->second};
1804 
1805  // Close any open file descriptors before moving the shard
1806  // directory. Don't call removeOnDestroy since that would
1807  // attempt to close the fds after the directory has been moved.
1808  if (!shard->tryClose())
1809  {
1810  JLOG(j_.warn()) << "can't close shard to move to historical path";
1811  return;
1812  }
1813 
1814  auto const dst{chooseHistoricalPath(lock)};
1815  try
1816  {
1817  // Move the shard directory to the new path
1818  boost::filesystem::rename(
1819  shard->getDir().string(), dst / std::to_string(shardIndex));
1820  }
1821  catch (...)
1822  {
1823  JLOG(j_.error()) << "shard " << shardIndex
1824  << " failed to move to historical storage";
1825  return;
1826  }
1827 
1828  // Create a shard instance at the new location
1829  shard = std::make_shared<Shard>(app_, *this, shardIndex, dst, j_);
1830 
1831  // Open the new shard
1832  if (!shard->init(scheduler_, *ctx_))
1833  {
1834  JLOG(j_.error()) << "shard " << shardIndex
1835  << " failed to open in historical storage";
1836  shard->removeOnDestroy();
1837  shard.reset();
1838  }
1839  };
1840 
1841  // See if either of the recent shards needs to be updated
1842  bool const curNotSynched =
1843  latestShardIndex_ && *latestShardIndex_ != latestShardIndex;
1844  bool const prevNotSynched = secondLatestShardIndex_ &&
1845  *secondLatestShardIndex_ != latestShardIndex - 1;
1846 
1847  // A new shard has been published. Move outdated
1848  // shards to historical storage as needed
1849  if (curNotSynched || prevNotSynched)
1850  {
1851  if (prev)
1852  {
1853  // Move the formerly second latest shard to historical storage
1854  if (keepShard(*prev) && separateHistoricalPath)
1855  moveShard(*prev);
1856 
1857  prev = std::nullopt;
1858  }
1859 
1860  if (cur)
1861  {
1862  // The formerly latest shard is now the second latest
1863  if (cur == latestShardIndex - 1)
1864  prev = cur;
1865 
1866  // The formerly latest shard is no longer a 'recent' shard
1867  else
1868  {
1869  // Move the formerly latest shard to historical storage
1870  if (keepShard(*cur) && separateHistoricalPath)
1871  moveShard(*cur);
1872  }
1873 
1874  cur = std::nullopt;
1875  }
1876  }
1877 }
1878 
1879 auto
1881  std::uint32_t shardIndex,
1884 {
1885  // Any shard earlier than the two most recent shards is a historical shard
1886  auto const boundaryIndex{shardBoundaryIndex()};
1887  auto const isHistoricalShard = shardIndex < boundaryIndex;
1888 
1889  auto const designation = isHistoricalShard && !historicalPaths_.empty()
1892 
1893  // Check shard count and available storage space
1894  if (isHistoricalShard && numHistoricalShards >= maxHistoricalShards_)
1895  {
1896  JLOG(j_.error()) << "maximum number of historical shards reached";
1897  canAdd_ = false;
1898  return std::nullopt;
1899  }
1900  if (!sufficientStorage(1, designation, lock))
1901  {
1902  JLOG(j_.error()) << "insufficient storage space available";
1903  canAdd_ = false;
1904  return std::nullopt;
1905  }
1906 
1907  return designation;
1908 }
1909 
1910 boost::filesystem::path
1912 {
1913  // If not configured with separate historical paths,
1914  // use the main path (dir_) by default.
1915  if (historicalPaths_.empty())
1916  return dir_;
1917 
1918  boost::filesystem::path historicalShardPath;
1919  std::vector<boost::filesystem::path> potentialPaths;
1920 
1921  for (boost::filesystem::path const& path : historicalPaths_)
1922  {
1923  if (boost::filesystem::space(path).available >= avgShardFileSz_)
1924  potentialPaths.push_back(path);
1925  }
1926 
1927  if (potentialPaths.empty())
1928  {
1929  JLOG(j_.error()) << "failed to select a historical shard path";
1930  return "";
1931  }
1932 
1933  std::sample(
1934  potentialPaths.begin(),
1935  potentialPaths.end(),
1936  &historicalShardPath,
1937  1,
1938  default_prng());
1939 
1940  return historicalShardPath;
1941 }
1942 
1943 bool
1945 {
1946 #if BOOST_OS_LINUX
1947  // Each historical shard path must correspond
1948  // to a directory on a distinct device or file system.
1949  // Currently, this constraint is enforced only on Linux.
1952 
1953  for (auto const& path : historicalPaths_)
1954  {
1955  struct statvfs buffer;
1956  if (statvfs(path.c_str(), &buffer))
1957  {
1958  JLOG(j_.error())
1959  << "failed to acquire stats for 'historical_shard_path': "
1960  << path;
1961  return false;
1962  }
1963 
1964  filesystemIDs[buffer.f_fsid].push_back(path.string());
1965  }
1966 
1967  bool ret = true;
1968  for (auto const& entry : filesystemIDs)
1969  {
1970  // Check to see if any of the paths are stored on the same file system
1971  if (entry.second.size() > 1)
1972  {
1973  // Two or more historical storage paths
1974  // correspond to the same file system.
1975  JLOG(j_.error())
1976  << "The following paths correspond to the same filesystem: "
1977  << boost::algorithm::join(entry.second, ", ")
1978  << ". Each configured historical storage path should"
1979  " be on a unique device or filesystem.";
1980 
1981  ret = false;
1982  }
1983  }
1984 
1985  return ret;
1986 
1987 #else
1988  // The requirement that each historical storage path
1989  // corresponds to a distinct device or file system is
1990  // enforced only on Linux, so on other platforms
1991  // keep track of the available capacities for each
1992  // path. Issue a warning if we suspect any of the paths
1993  // may violate this requirement.
1994 
1995  // Map byte counts to each path that shares that byte count.
1997  uniqueCapacities(historicalPaths_.size());
1998 
1999  for (auto const& path : historicalPaths_)
2000  uniqueCapacities[boost::filesystem::space(path).available].push_back(
2001  path.string());
2002 
2003  for (auto const& entry : uniqueCapacities)
2004  {
2005  // Check to see if any paths have the same amount of available bytes.
2006  if (entry.second.size() > 1)
2007  {
2008  // Two or more historical storage paths may
2009  // correspond to the same device or file system.
2010  JLOG(j_.warn())
2011  << "Each of the following paths have " << entry.first
2012  << " bytes free, and may be located on the same device"
2013  " or file system: "
2014  << boost::algorithm::join(entry.second, ", ")
2015  << ". Each configured historical storage path should"
2016  " be on a unique device or file system.";
2017  }
2018  }
2019 #endif
2020 
2021  return true;
2022 }
2023 
2024 bool
2026  LedgerIndex ledgerSeq,
2027  std::function<bool(soci::session& session)> const& callback)
2028 {
2029  return callForLedgerSQLByShardIndex(seqToShardIndex(ledgerSeq), callback);
2030 }
2031 
2032 bool
2034  const uint32_t shardIndex,
2035  std::function<bool(soci::session& session)> const& callback)
2036 {
2037  std::lock_guard lock(mutex_);
2038 
2039  auto const it{shards_.find(shardIndex)};
2040 
2041  return it != shards_.end() &&
2042  it->second->getState() == ShardState::finalized &&
2043  it->second->callForLedgerSQL(callback);
2044 }
2045 
2046 bool
2048  LedgerIndex ledgerSeq,
2049  std::function<bool(soci::session& session)> const& callback)
2050 {
2052  seqToShardIndex(ledgerSeq), callback);
2053 }
2054 
2055 bool
2057  std::uint32_t const shardIndex,
2058  std::function<bool(soci::session& session)> const& callback)
2059 {
2060  std::lock_guard lock(mutex_);
2061 
2062  auto const it{shards_.find(shardIndex)};
2063 
2064  return it != shards_.end() &&
2065  it->second->getState() == ShardState::finalized &&
2066  it->second->callForTransactionSQL(callback);
2067 }
2068 
2069 bool
2071  std::optional<std::uint32_t> minShardIndex,
2072  std::function<bool(Shard& shard)> const& visit)
2073 {
2074  std::lock_guard lock(mutex_);
2075 
2077 
2078  if (!minShardIndex)
2079  it = shards_.begin();
2080  else
2081  it = shards_.lower_bound(*minShardIndex);
2082 
2083  eit = shards_.end();
2084 
2085  for (; it != eit; it++)
2086  {
2087  if (it->second->getState() == ShardState::finalized)
2088  {
2089  if (!visit(*it->second))
2090  return false;
2091  }
2092  }
2093 
2094  return true;
2095 }
2096 
2097 bool
2099  std::optional<std::uint32_t> minShardIndex,
2100  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2101  callback)
2102 {
2103  return iterateShardsForward(
2104  minShardIndex, [&callback](Shard& shard) -> bool {
2105  return shard.callForLedgerSQL(callback);
2106  });
2107 }
2108 
2109 bool
2111  std::optional<std::uint32_t> minShardIndex,
2112  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2113  callback)
2114 {
2115  return iterateShardsForward(
2116  minShardIndex, [&callback](Shard& shard) -> bool {
2117  return shard.callForTransactionSQL(callback);
2118  });
2119 }
2120 
2121 bool
2123  std::optional<std::uint32_t> maxShardIndex,
2124  std::function<bool(Shard& shard)> const& visit)
2125 {
2126  std::lock_guard lock(mutex_);
2127 
2128  std::map<std::uint32_t, std::shared_ptr<Shard>>::reverse_iterator it, eit;
2129 
2130  if (!maxShardIndex)
2131  it = shards_.rbegin();
2132  else
2133  it = std::make_reverse_iterator(shards_.upper_bound(*maxShardIndex));
2134 
2135  eit = shards_.rend();
2136 
2137  for (; it != eit; it++)
2138  {
2139  if (it->second->getState() == ShardState::finalized &&
2140  (!maxShardIndex || it->first <= *maxShardIndex))
2141  {
2142  if (!visit(*it->second))
2143  return false;
2144  }
2145  }
2146 
2147  return true;
2148 }
2149 
2150 bool
2152  std::optional<std::uint32_t> maxShardIndex,
2153  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2154  callback)
2155 {
2156  return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool {
2157  return shard.callForLedgerSQL(callback);
2158  });
2159 }
2160 
2161 bool
2163  std::optional<std::uint32_t> maxShardIndex,
2164  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2165  callback)
2166 {
2167  return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool {
2168  return shard.callForTransactionSQL(callback);
2169  });
2170 }
2171 
2174 {
2175  auto shardInfo{std::make_unique<ShardInfo>()};
2176  for (auto const& [_, shard] : shards_)
2177  {
2178  shardInfo->update(
2179  shard->index(), shard->getState(), shard->getPercentProgress());
2180  }
2181 
2182  for (auto const shardIndex : preparedIndexes_)
2183  shardInfo->update(shardIndex, ShardState::queued, 0);
2184 
2185  return shardInfo;
2186 }
2187 
2188 size_t
2190 {
2191  std::lock_guard lock(mutex_);
2192  return taskQueue_.size();
2193 }
2194 
2195 void
2197 {
2198  if (!app_.config().standalone() &&
2200  {
2201  auto const message{getShardInfo(lock)->makeMessage(app_)};
2202  app_.overlay().foreach(send_always(std::make_shared<Message>(
2203  message, protocol::mtPEER_SHARD_INFO_V2)));
2204  }
2205 }
2206 
2207 void
2209 {
2210  // Run the lengthy node store import process in the background
2211  // on a dedicated thread.
2212  databaseImporter_ = std::thread([this] {
2213  doImportDatabase();
2214 
2215  std::lock_guard lock(mutex_);
2216 
2217  // Make sure to clear this in case the import
2218  // exited early.
2219  databaseImportStatus_.reset();
2220 
2221  // Detach the thread so subsequent attempts
2222  // to start the import won't get held up by
2223  // the old thread of execution
2225  });
2226 }
2227 
2228 //------------------------------------------------------------------------------
2229 
2232  Application& app,
2233  Scheduler& scheduler,
2234  int readThreads,
2235  beast::Journal j)
2236 {
2237  // The shard store is optional. Future changes will require it.
2238  Section const& section{
2240  if (section.empty())
2241  return nullptr;
2242 
2243  return std::make_unique<DatabaseShardImp>(app, scheduler, readThreads, j);
2244 }
2245 
2246 } // namespace NodeStore
2247 } // namespace ripple
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
ripple::NodeStore::DatabaseShardImp::iterateLedgerSQLsForward
bool iterateLedgerSQLsForward(std::optional< std::uint32_t > minShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateLedgerSQLsForward Checks out ledger databases for all shards in ascending order starting from ...
Definition: DatabaseShardImp.cpp:2098
ripple::SizedItem::openFinalLimit
@ openFinalLimit
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::NodeStore::Database::lastLedgerSeq
std::uint32_t lastLedgerSeq(std::uint32_t shardIndex) const noexcept
Calculates the last ledger sequence for a given shard index.
Definition: Database.h:271
ripple::Application
Definition: Application.h:115
ripple::hotUNKNOWN
@ hotUNKNOWN
Definition: NodeObject.h:33
std::this_thread::sleep_for
T sleep_for(T... args)
ripple::NodeStore::make_ShardStore
std::unique_ptr< DatabaseShard > make_ShardStore(Application &app, Scheduler &scheduler, int readThreads, beast::Journal j)
Definition: DatabaseShardImp.cpp:2231
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:240
ripple::NodeStore::DatabaseShardImp::mutex_
std::mutex mutex_
Definition: DatabaseShardImp.h:233
ripple::NodeStore::DatabaseShardImp::app_
Application & app_
Definition: DatabaseShardImp.h:232
ripple::ShardState::complete
@ complete
ripple::DEFAULT_LEDGERS_PER_SHARD
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD
The number of ledgers in a shard.
Definition: SystemParameters.h:64
ripple::NodeStore::DatabaseShardImp::storeLedger
bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger) override
Store a ledger from a different database.
Definition: DatabaseShardImp.cpp:1138
ripple::NodeStore::Database
Persistency layer for NodeObject.
Definition: Database.h:51
std::string
STL class.
std::shared_ptr< Ledger >
ripple::loadByIndex
std::shared_ptr< Ledger > loadByIndex(std::uint32_t ledgerIndex, Application &app, bool acquire)
Definition: Ledger.cpp:1064
ripple::SizedItem
SizedItem
Definition: Config.h:48
ripple::NodeStore::DatabaseShardImp::shards_
std::map< std::uint32_t, std::shared_ptr< Shard > > shards_
Definition: DatabaseShardImp.h:243
std::exception
STL class.
std::stoul
T stoul(T... args)
ripple::NodeStore::DatabaseShardImp::PathDesignation
PathDesignation
Definition: DatabaseShardImp.h:196
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
ripple::NodeStore::DatabaseShardImp::callForLedgerSQLByLedgerSeq
bool callForLedgerSQLByLedgerSeq(LedgerIndex ledgerSeq, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the SQLite db holding the corresponding ledger.
Definition: DatabaseShardImp.cpp:2025
ripple::NodeStore::TaskQueue::size
size_t size() const
Return the queue size.
Definition: TaskQueue.cpp:48
std::vector::reserve
T reserve(T... args)
ripple::NodeStore::DatabaseShardImp::removePreShard
void removePreShard(std::uint32_t shardIndex) override
Remove a previously prepared shard index for import.
Definition: DatabaseShardImp.cpp:416
ripple::LedgerMaster::getValidLedgerIndex
LedgerIndex getValidLedgerIndex()
Definition: LedgerMaster.cpp:214
ripple::NodeStore::Database::fdRequired_
int fdRequired_
Definition: Database.h:303
ripple::NodeStore::DatabaseShardImp::fileSz_
std::uint64_t fileSz_
Definition: DatabaseShardImp.h:267
ripple::InboundLedger::Reason::GENERIC
@ GENERIC
std::vector
STL class.
std::set::find
T find(T... args)
ripple::ConfigSection::shardDatabase
static std::string shardDatabase()
Definition: ConfigSections.h:38
ripple::NodeStore::Shard::callForLedgerSQL
bool callForLedgerSQL(std::function< bool(Args... args)> const &callback)
Invoke a callback on the ledger SQLite db.
Definition: Shard.h:226
ripple::NodeStore::DatabaseShardImp::stop
void stop() override
Definition: DatabaseShardImp.cpp:699
std::vector::size
T size(T... args)
ripple::Application::getRelationalDBInterface
virtual RelationalDBInterface & getRelationalDBInterface()=0
ripple::NodeObjectType
NodeObjectType
The types of node objects.
Definition: NodeObject.h:32
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
std::chrono::milliseconds
ripple::NodeStore::DatabaseShardImp::taskQueue_
TaskQueue taskQueue_
Definition: DatabaseShardImp.h:240
ripple::NodeStore::DatabaseShardImp::setStored
void setStored(std::shared_ptr< Ledger const > const &ledger) override
Notifies the database that the given ledger has been fully acquired and stored.
Definition: DatabaseShardImp.cpp:630
std::set::emplace
T emplace(T... args)
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::lock_guard
STL class.
ripple::kilobytes
constexpr auto kilobytes(T value) noexcept
Definition: ByteUtilities.h:27
ripple::NetworkOPs::getOperatingMode
virtual OperatingMode getOperatingMode() const =0
ripple::NodeStore::Database::stop
virtual void stop()
Definition: Database.cpp:89
ripple::NodeStore::FetchReport
Contains information about a fetch operation.
Definition: ripple/nodestore/Scheduler.h:32
ripple::NodeStore::DatabaseShardImp::getDatabaseImportSequence
std::optional< std::uint32_t > getDatabaseImportSequence() const override
Returns the first ledger sequence of the shard currently being imported from the NodeStore.
Definition: DatabaseShardImp.cpp:1299
std::function
std::all_of
T all_of(T... args)
ripple::NodeStore::Shard::finalKey
static const uint256 finalKey
Definition: Shard.h:249
std::atomic_bool::compare_exchange_strong
T compare_exchange_strong(T... args)
ripple::LedgerMaster::walkHashBySeq
std::optional< LedgerHash > walkHashBySeq(std::uint32_t index, InboundLedger::Reason reason)
Walk to a ledger's hash using the skip list.
Definition: LedgerMaster.cpp:1713
ripple::getLimitedNewestLedgerInfo
std::optional< LedgerInfo > getLimitedNewestLedgerInfo(soci::session &session, LedgerIndex ledgerFirstIndex, beast::Journal j)
getLimitedNewestLedgerInfo Returns info of newest ledger from ledgers with sequences greather or equa...
Definition: RelationalDBInterface_nodes.cpp:486
ripple::NodeStore::DatabaseShardImp::importDatabase
void importDatabase(Database &source) override
Import the application local node store.
Definition: DatabaseShardImp.cpp:765
ripple::deserializePrefixedHeader
LedgerInfo deserializePrefixedHeader(Slice data, bool hasHash)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
Definition: InboundLedger.cpp:293
ripple::NodeStore::DatabaseShardImp::databaseImporter_
std::thread databaseImporter_
Definition: DatabaseShardImp.h:292
ripple::NodeStore::DatabaseShardImp::openFinalLimit_
const std::uint32_t openFinalLimit_
Definition: DatabaseShardImp.h:273
std::sort
T sort(T... args)
std::shared_ptr::reset
T reset(T... args)
ripple::SHAMapHash
Definition: SHAMapHash.h:32
ripple::NodeStore::DatabaseShardImp::iterateShardsForward
bool iterateShardsForward(std::optional< std::uint32_t > minShardIndex, std::function< bool(Shard &shard)> const &visit)
iterateShardsForward Visits all shards starting from given in ascending order and calls given callbac...
Definition: DatabaseShardImp.cpp:2070
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::NodeStore::DatabaseShardImp::sweep
void sweep() override
Remove expired entries from the positive and negative caches.
Definition: DatabaseShardImp.cpp:1173
ripple::getLimitedOldestLedgerInfo
std::optional< LedgerInfo > getLimitedOldestLedgerInfo(soci::session &session, LedgerIndex ledgerFirstIndex, beast::Journal j)
getLimitedOldestLedgerInfo Returns info of oldest ledger from ledgers with sequences greather or equa...
Definition: RelationalDBInterface_nodes.cpp:474
ripple::NodeStore::DatabaseShardImp::stopNodeToShard
Json::Value stopNodeToShard() override
Terminates a NodeStore to ShardStore import and returns the result in a JSON object.
Definition: DatabaseShardImp.cpp:1277
ripple::Section::values
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:77
std::thread::detach
T detach(T... args)
ripple::send_always
Sends a message to all peers.
Definition: predicates.h:31
ripple::NodeStore::DatabaseShardImp::PathDesignation::historical
@ historical
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:384
ripple::NodeStore::Shard::version
static constexpr std::uint32_t version
Definition: Shard.h:244
ripple::NodeStore::DatabaseShardImp::getDatabaseImportStatus
Json::Value getDatabaseImportStatus() const override
Returns a JSON object detailing the status of an ongoing database import if one is running,...
Definition: DatabaseShardImp.cpp:1225
std::vector::push_back
T push_back(T... args)
ripple::NodeStore::DatabaseShardImp::secondLatestShardIndex_
std::optional< std::uint32_t > secondLatestShardIndex_
Definition: DatabaseShardImp.h:286
ripple::NodeStore::DatabaseShardImp::avgShardFileSz_
std::uint64_t avgShardFileSz_
Definition: DatabaseShardImp.h:270
ripple::NodeStore::DatabaseShardImp::callForTransactionSQLByLedgerSeq
bool callForTransactionSQLByLedgerSeq(LedgerIndex ledgerSeq, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the transaction SQLite db for the corresponding ledger.
Definition: DatabaseShardImp.cpp:2047
ripple::base_uint< 256 >
ripple::NodeStore::DatabaseShardImp::updatePeers
void updatePeers(std::lock_guard< std::mutex > const &lock) const
Definition: DatabaseShardImp.cpp:2196
std::sample
T sample(T... args)
ripple::NodeStore::DatabaseShardImp::getPreShards
std::string getPreShards() override
Get shard indexes being imported.
Definition: DatabaseShardImp.cpp:426
ripple::NodeStore::DatabaseShardImp::databaseImportStatus_
std::unique_ptr< DatabaseImportStatus > databaseImportStatus_
Definition: DatabaseShardImp.h:289
ripple::NodeStore::DatabaseShardImp::getWriteLoad
std::int32_t getWriteLoad() const override
Retrieve the estimated number of pending write operations.
Definition: DatabaseShardImp.cpp:1087
std::thread::joinable
T joinable(T... args)
ripple::NodeStore::DatabaseShardImp::findAcquireIndex
std::optional< std::uint32_t > findAcquireIndex(std::uint32_t validLedgerSeq, std::lock_guard< std::mutex > const &)
Definition: DatabaseShardImp.cpp:1404
ripple::NodeStore::Database::firstLedgerSeq
std::uint32_t firstLedgerSeq(std::uint32_t shardIndex) const noexcept
Calculates the first ledger sequence for a given shard index.
Definition: Database.h:257
ripple::Config::reporting
bool reporting() const
Definition: Config.h:316
ripple::OperatingMode::DISCONNECTED
@ DISCONNECTED
not ready to process requests
ripple::NodeStore::DatabaseShardImp::iterateTransactionSQLsBack
bool iterateTransactionSQLsBack(std::optional< std::uint32_t > maxShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateTransactionSQLsBack Checks out transaction databases for all shards in descending order starti...
Definition: DatabaseShardImp.cpp:2162
ripple::NodeStore::DatabaseShardImp::haltDatabaseImport_
std::atomic_bool haltDatabaseImport_
Definition: DatabaseShardImp.h:295
ripple::NodeStore::DatabaseShardImp::chooseHistoricalPath
boost::filesystem::path chooseHistoricalPath(std::lock_guard< std::mutex > const &) const
Definition: DatabaseShardImp.cpp:1911
ripple::rand_int
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
Definition: ripple/basics/random.h:115
ripple::NodeStore::DatabaseShardImp::sufficientStorage
bool sufficientStorage(std::uint32_t numShards, PathDesignation pathDesignation, std::lock_guard< std::mutex > const &) const
Definition: DatabaseShardImp.cpp:1604
ripple::NodeStore::DatabaseShardImp::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq, FetchReport &fetchReport, bool duplicate) override
Definition: DatabaseShardImp.cpp:1384
ripple::NodeStore::DatabaseShardImp::init_
bool init_
Definition: DatabaseShardImp.h:234
std::thread
STL class.
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::NodeStore::TaskQueue::addTask
void addTask(std::function< void()> task)
Adds a task to the queue.
Definition: TaskQueue.cpp:38
ripple::NodeStore::DatabaseShardImp::callForLedgerSQLByShardIndex
bool callForLedgerSQLByShardIndex(std::uint32_t const shardIndex, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the ledger SQLite db for the corresponding shard.
Definition: DatabaseShardImp.cpp:2033
ripple::Config
Definition: Config.h:68
ripple::RelationalDBInterfaceSqlite
Definition: RelationalDBInterfaceSqlite.h:27
ripple::NodeStore::DatabaseShardImp::doImportDatabase
void doImportDatabase()
Definition: DatabaseShardImp.cpp:784
ripple::compare
int compare(base_uint< Bits, Tag > const &a, base_uint< Bits, Tag > const &b)
Definition: base_uint.h:533
std::ofstream
STL class.
ripple::Application::config
virtual Config & config()=0
ripple::NodeStore::DatabaseShardImp::dir_
boost::filesystem::path dir_
Definition: DatabaseShardImp.h:252
ripple::Config::standalone
bool standalone() const
Definition: Config.h:311
std::unique_lock
STL class.
ripple::NodeStore::DatabaseShardImp::removeFailedShard
void removeFailedShard(std::shared_ptr< Shard > &shard)
Definition: DatabaseShardImp.cpp:1692
ripple::NodeStore::DatabaseShard
A collection of historical shards.
Definition: DatabaseShard.h:37
std::to_string
T to_string(T... args)
ripple::NodeStore::DatabaseShardImp::getNumTasks
size_t getNumTasks() const override
Returns the number of queued tasks.
Definition: DatabaseShardImp.cpp:2189
ripple::NodeStore::DatabaseShardImp::importShard
bool importShard(std::uint32_t shardIndex, boost::filesystem::path const &srcDir) override
Import a shard from the shard archive handler into the shard database.
Definition: DatabaseShardImp.cpp:444
ripple::default_prng
beast::xor_shift_engine & default_prng()
Return the default random engine.
Definition: ripple/basics/random.h:65
ripple::NodeStore::DatabaseShardImp::store
void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t ledgerSeq) override
Definition: DatabaseShardImp.cpp:1104
ripple::NodeStore::TaskQueue::stop
void stop()
Definition: TaskQueue.cpp:32
ripple::NodeStore::DatabaseShardImp::PathDesignation::none
@ none
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::ShardState::finalized
@ finalized
std::set::erase
T erase(T... args)
ripple::NodeStore::DatabaseShardImp::initConfig
bool initConfig(std::lock_guard< std::mutex > const &)
Definition: DatabaseShardImp.cpp:1310
ripple::ConfigSection
Definition: ConfigSections.h:28
ripple::NodeStore::DatabaseShardImp::latestShardIndex_
std::optional< std::uint32_t > latestShardIndex_
Definition: DatabaseShardImp.h:285
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
ripple::NodeStore::DatabaseShardImp::acquireIndex_
std::uint32_t acquireIndex_
Definition: DatabaseShardImp.h:249
ripple::Overlay::foreach
void foreach(Function f) const
Visit every active peer.
Definition: Overlay.h:198
std::map
STL class.
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:60
std::transform
T transform(T... args)
ripple::NodeStore::Database::storeStats
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:328
ripple::NodeStore::DatabaseShardImp::startNodeToShard
Json::Value startNodeToShard() override
Initiates a NodeStore to ShardStore import and returns the result in a JSON object.
Definition: DatabaseShardImp.cpp:1254
ripple::NodeStore::DatabaseShardImp::preparedIndexes_
std::set< std::uint32_t > preparedIndexes_
Definition: DatabaseShardImp.h:246
ripple::NodeStore::DatabaseShardImp::init
bool init() override
Initialize the database.
Definition: DatabaseShardImp.cpp:70
std::weak_ptr
STL class.
ripple::NodeStore::Database::isStopping
bool isStopping() const
Definition: Database.cpp:69
ripple::rpcINTERNAL
@ rpcINTERNAL
Definition: ErrorCodes.h:130
ripple::Serializer
Definition: Serializer.h:39
ripple::NodeStore::DatabaseShardImp::historicalPaths_
std::vector< boost::filesystem::path > historicalPaths_
Definition: DatabaseShardImp.h:264
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Serializer::addBitString
int addBitString(base_uint< Bits, Tag > const &v)
Definition: Serializer.h:97
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::NodeStore::DatabaseShardImp::checkHistoricalPaths
bool checkHistoricalPaths(std::lock_guard< std::mutex > const &) const
Definition: DatabaseShardImp.cpp:1944
ripple::NodeStore::DatabaseShardImp::maxHistoricalShards_
std::uint32_t maxHistoricalShards_
Definition: DatabaseShardImp.h:261
ripple::NodeStore::Shard::callForTransactionSQL
bool callForTransactionSQL(std::function< bool(Args... args)> const &callback)
Invoke a callback on the transaction SQLite db.
Definition: Shard.h:238
ripple::ShardState::acquire
@ acquire
ripple::Application::getShardFamily
virtual Family * getShardFamily()=0
ripple::NodeStore::Database::j_
const beast::Journal j_
Definition: Database.h:301
ripple::NodeStore::DatabaseShardImp::callForTransactionSQLByShardIndex
bool callForTransactionSQLByShardIndex(std::uint32_t const shardIndex, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the transaction SQLite db for the corresponding shard.
Definition: DatabaseShardImp.cpp:2056
ripple::NodeStore::DatabaseShardImp::fetchLedger
std::shared_ptr< Ledger > fetchLedger(uint256 const &hash, std::uint32_t ledgerSeq) override
Fetch a ledger from the shard store.
Definition: DatabaseShardImp.cpp:552
std::vector::begin
T begin(T... args)
ripple::NodeStore::Database::seqToShardIndex
std::uint32_t seqToShardIndex(std::uint32_t ledgerSeq) const noexcept
Calculates the shard index for a given ledger sequence.
Definition: Database.h:283
std
STL namespace.
ripple::XRP_LEDGER_EARLIEST_SEQ
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
Definition: SystemParameters.h:61
ripple::NodeStore::DatabaseShardImp::iterateShardsBack
bool iterateShardsBack(std::optional< std::uint32_t > maxShardIndex, std::function< bool(Shard &shard)> const &visit)
iterateShardsBack Visits all shards starting from given in descending order and calls given callback ...
Definition: DatabaseShardImp.cpp:2122
ripple::NodeStore::DatabaseShardImp::numHistoricalShards
std::uint32_t numHistoricalShards(std::lock_guard< std::mutex > const &lock) const
Definition: DatabaseShardImp.cpp:1732
ripple::LedgerMaster::getCurrentLedgerIndex
LedgerIndex getCurrentLedgerIndex()
Definition: LedgerMaster.cpp:208
ripple::NodeStore::DatabaseShardImp::relocateOutdatedShards
void relocateOutdatedShards(std::lock_guard< std::mutex > const &lock)
Definition: DatabaseShardImp.cpp:1743
ripple::NodeStore::DatabaseShardImp::iterateLedgerSQLsBack
bool iterateLedgerSQLsBack(std::optional< std::uint32_t > maxShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateLedgerSQLsBack Checks out ledger databases for all shards in descending order starting from gi...
Definition: DatabaseShardImp.cpp:2151
ripple::NodeStore::DatabaseShardImp::updateFileStats
void updateFileStats()
Definition: DatabaseShardImp.cpp:1543
ripple::NodeStore::Database::earliestLedgerSeq_
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
ripple::Application::overlay
virtual Overlay & overlay()=0
ripple::NodeStore::DatabaseShardImp::shardBoundaryIndex
std::uint32_t shardBoundaryIndex() const
Definition: DatabaseShardImp.cpp:1716
std::count_if
T count_if(T... args)
std::vector::empty
T empty(T... args)
ripple::NodeStore::DatabaseShardImp::prepareShards
bool prepareShards(std::vector< std::uint32_t > const &shardIndexes) override
Prepare one or more shard indexes to be imported into the database.
Definition: DatabaseShardImp.cpp:299
std::unique
T unique(T... args)
std::optional< std::uint32_t >
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::NodeStore::Database::earliestShardIndex_
const std::uint32_t earliestShardIndex_
Definition: Database.h:325
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:38
ripple::NodeStore::DatabaseShardImp::startDatabaseImportThread
void startDatabaseImportThread(std::lock_guard< std::mutex > const &)
Definition: DatabaseShardImp.cpp:2208
ripple::NodeStore::DatabaseShardImp::setStoredInShard
bool setStoredInShard(std::shared_ptr< Shard > &shard, std::shared_ptr< Ledger const > const &ledger)
Definition: DatabaseShardImp.cpp:1659
ripple::NodeStore::DatabaseShardImp::canAdd_
bool canAdd_
Definition: DatabaseShardImp.h:255
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
std::vector::end
T end(T... args)
ripple::NodeStore::Database::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:158
ripple::NodeStore::Database::scheduler_
Scheduler & scheduler_
Definition: Database.h:302
ripple::RangeSet
boost::icl::interval_set< T, std::less, ClosedInterval< T > > RangeSet
A set of closed intervals over the domain T.
Definition: RangeSet.h:69
ripple::NodeStore::Database::earliestLedgerSeq
std::uint32_t earliestLedgerSeq() const noexcept
Definition: Database.h:238
ripple::NodeStore::DatabaseShardImp::finalizeShard
void finalizeShard(std::shared_ptr< Shard > &shard, bool writeSQLite, std::optional< uint256 > const &expectedHash)
Definition: DatabaseShardImp.cpp:1469
std::max
T max(T... args)
ripple::RelationalDBInterface::getHashesByIndex
virtual std::optional< LedgerHashPair > getHashesByIndex(LedgerIndex ledgerIndex)=0
getHashesByIndex Returns hash of the ledger and hash of parent ledger for the ledger of given sequenc...
ripple::NodeStore::Shard
Definition: Shard.h:54
ripple::NodeStore::Database::maxLedgers
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:76
ripple::NodeStore::DatabaseShardImp::DatabaseShardImp
DatabaseShardImp()=delete
std::make_reverse_iterator
T make_reverse_iterator(T... args)
std::unique_ptr
STL class.
ripple::loadLedgerHelper
std::shared_ptr< Ledger > loadLedgerHelper(LedgerInfo const &info, Application &app, bool acquire)
Definition: Ledger.cpp:1020
ripple::NodeStore::DatabaseShardImp::databaseImportMarker_
static constexpr auto databaseImportMarker_
Definition: DatabaseShardImp.h:276
std::unordered_map
STL class.
ripple::RPC::make_error
Json::Value make_error(error_code_i code)
Returns a new json object that reflects the error code.
Definition: ErrorCodes.cpp:209
ripple::PublisherStatus::available
@ available
ripple::NodeStore::DatabaseShardImp::prepareForNewShard
std::optional< PathDesignation > prepareForNewShard(std::uint32_t shardIndex, std::uint32_t numHistoricalShards, std::lock_guard< std::mutex > const &lock)
Definition: DatabaseShardImp.cpp:1880
ripple::ConfigSection::nodeDatabase
static std::string nodeDatabase()
Definition: ConfigSections.h:33
std::thread::join
T join(T... args)
std::exception::what
T what(T... args)
ripple::ShardState::queued
@ queued
ripple::NodeStore::DatabaseShardImp::iterateTransactionSQLsForward
bool iterateTransactionSQLsForward(std::optional< std::uint32_t > minShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateTransactionSQLsForward Checks out transaction databases for all shards in ascending order star...
Definition: DatabaseShardImp.cpp:2110
ripple::HashPrefix::shardInfo
@ shardInfo
shard info for signing
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::NodeStore::DatabaseShardImp::prepareLedger
std::optional< std::uint32_t > prepareLedger(std::uint32_t validLedgerSeq) override
Prepare to store a new ledger in the shard being acquired.
Definition: DatabaseShardImp.cpp:229
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::NodeStore::DatabaseShardImp::ctx_
std::unique_ptr< nudb::context > ctx_
Definition: DatabaseShardImp.h:237
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:127
ripple::NodeStore::DatabaseShardImp::backendName_
std::string backendName_
Definition: DatabaseShardImp.h:258
ripple::NodeStore::DatabaseShardImp::getShardInfo
std::unique_ptr< ShardInfo > getShardInfo() const override
Query information about shards held.
Definition: DatabaseShardImp.cpp:692