rippled
Shard.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2017 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedger.h>
21 #include <ripple/app/main/DBInit.h>
22 #include <ripple/basics/StringUtilities.h>
23 #include <ripple/core/ConfigSections.h>
24 #include <ripple/nodestore/Manager.h>
25 #include <ripple/nodestore/impl/DatabaseShardImp.h>
26 #include <ripple/nodestore/impl/Shard.h>
27 #include <ripple/protocol/digest.h>
28 
29 #include <boost/algorithm/string.hpp>
30 #include <boost/range/adaptor/transformed.hpp>
31 
32 namespace ripple {
33 namespace NodeStore {
34 
35 uint256 const Shard::finalKey{0};
36 
38  Application& app,
39  DatabaseShard const& db,
40  std::uint32_t index,
42  : app_(app)
43  , db_(db)
44  , index_(index)
45  , firstSeq_(db.firstLedgerSeq(index))
46  , lastSeq_(std::max(firstSeq_, db.lastLedgerSeq(index)))
47  , maxLedgers_(
48  index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1
49  : db.ledgersPerShard())
50  , dir_(db.getRootDir() / std::to_string(index_))
51  , j_(j)
52 {
53  if (index_ < db.earliestShardIndex())
54  Throw<std::runtime_error>("Shard: Invalid index");
55 }
56 
58 {
59  if (removeOnDestroy_)
60  {
61  backend_.reset();
62  lgrSQLiteDB_.reset();
63  txSQLiteDB_.reset();
64  acquireInfo_.reset();
65 
66  try
67  {
68  boost::filesystem::remove_all(dir_);
69  }
70  catch (std::exception const& e)
71  {
72  JLOG(j_.error()) << "shard " << index_ << " exception " << e.what()
73  << " in function " << __func__;
74  }
75  }
76 }
77 
78 bool
79 Shard::open(Scheduler& scheduler, nudb::context& ctx)
80 {
81  std::lock_guard lock{mutex_};
82  assert(!backend_);
83 
84  Config const& config{app_.config()};
85  {
86  Section section{config.section(ConfigSection::shardDatabase())};
87  std::string const type{get<std::string>(section, "type", "nudb")};
88  auto factory{Manager::instance().find(type)};
89  if (!factory)
90  {
91  JLOG(j_.error()) << "shard " << index_
92  << " failed to create backend type " << type;
93  return false;
94  }
95 
96  section.set("path", dir_.string());
97  backend_ = factory->createInstance(
98  NodeObject::keyBytes, section, scheduler, ctx, j_);
99  }
100 
101  using namespace boost::filesystem;
102  auto preexist{false};
103  auto fail = [this, &preexist](std::string const& msg) {
104  pCache_.reset();
105  nCache_.reset();
106  backend_.reset();
107  lgrSQLiteDB_.reset();
108  txSQLiteDB_.reset();
109  acquireInfo_.reset();
110 
111  if (!preexist)
112  remove_all(dir_);
113 
114  if (!msg.empty())
115  {
116  JLOG(j_.fatal()) << "shard " << index_ << " " << msg;
117  }
118  return false;
119  };
120 
121  auto createAcquireInfo = [this, &config]() {
122  acquireInfo_ = std::make_unique<AcquireInfo>();
123 
124  DatabaseCon::Setup setup;
125  setup.startUp = config.START_UP;
126  setup.standAlone = config.standalone();
127  setup.dataDir = dir_;
128  setup.useGlobalPragma = true;
129 
130  acquireInfo_->SQLiteDB = std::make_unique<DatabaseCon>(
131  setup,
135  acquireInfo_->SQLiteDB->setupCheckpointing(
136  &app_.getJobQueue(), app_.logs());
137  };
138 
139  try
140  {
141  // Open or create the NuDB key/value store
142  preexist = exists(dir_);
143  backend_->open(!preexist);
144 
145  if (!preexist)
146  {
147  // A new shard
148  createAcquireInfo();
149  acquireInfo_->SQLiteDB->getSession()
150  << "INSERT INTO Shard (ShardIndex) "
151  "VALUES (:shardIndex);",
152  soci::use(index_);
153  }
154  else if (exists(dir_ / AcquireShardDBName))
155  {
156  // An incomplete shard, being acquired
157  createAcquireInfo();
158 
159  auto& session{acquireInfo_->SQLiteDB->getSession()};
160  boost::optional<std::uint32_t> index;
161  soci::blob sociBlob(session);
162  soci::indicator blobPresent;
163 
164  session << "SELECT ShardIndex, StoredLedgerSeqs "
165  "FROM Shard "
166  "WHERE ShardIndex = :index;",
167  soci::into(index), soci::into(sociBlob, blobPresent),
168  soci::use(index_);
169 
170  if (!index || index != index_)
171  return fail("invalid acquire SQLite database");
172 
173  if (blobPresent == soci::i_ok)
174  {
175  std::string s;
176  auto& storedSeqs{acquireInfo_->storedSeqs};
177  if (convert(sociBlob, s); !from_string(storedSeqs, s))
178  return fail("invalid StoredLedgerSeqs");
179 
180  if (boost::icl::first(storedSeqs) < firstSeq_ ||
181  boost::icl::last(storedSeqs) > lastSeq_)
182  {
183  return fail("invalid StoredLedgerSeqs");
184  }
185 
186  if (boost::icl::length(storedSeqs) == maxLedgers_)
187  // All ledgers have been acquired, shard backend is complete
188  backendComplete_ = true;
189  }
190  }
191  else
192  {
193  // A finalized shard or has all ledgers stored in the backend
195  if (backend_->fetch(finalKey.data(), &nObj) != Status::ok)
196  {
197  legacy_ = true;
198  return fail("incompatible, missing backend final key");
199  }
200 
201  // Check final key's value
202  SerialIter sIt(nObj->getData().data(), nObj->getData().size());
203  if (sIt.get32() != version)
204  return fail("invalid version");
205 
206  if (sIt.get32() != firstSeq_ || sIt.get32() != lastSeq_)
207  return fail("out of range ledger sequences");
208 
209  if (sIt.get256().isZero())
210  return fail("invalid last ledger hash");
211 
212  if (exists(dir_ / LgrDBName) && exists(dir_ / TxDBName))
213  final_ = true;
214 
215  backendComplete_ = true;
216  }
217  }
218  catch (std::exception const& e)
219  {
220  return fail(
221  std::string("exception ") + e.what() + " in function " + __func__);
222  }
223 
224  setBackendCache(lock);
225  if (!initSQLite(lock))
226  return fail({});
227 
228  setFileStats(lock);
229  return true;
230 }
231 
232 boost::optional<std::uint32_t>
234 {
235  std::lock_guard lock(mutex_);
236  assert(backend_);
237 
238  if (backendComplete_)
239  {
240  JLOG(j_.warn()) << "shard " << index_
241  << " prepare called when shard backend is complete";
242  return {};
243  }
244 
245  assert(acquireInfo_);
246  auto const& storedSeqs{acquireInfo_->storedSeqs};
247  if (storedSeqs.empty())
248  return lastSeq_;
249  return prevMissing(storedSeqs, 1 + lastSeq_, firstSeq_);
250 }
251 
252 bool
254 {
255  auto const seq{ledger->info().seq};
256  if (seq < firstSeq_ || seq > lastSeq_)
257  {
258  JLOG(j_.error()) << "shard " << index_ << " invalid ledger sequence "
259  << seq;
260  return false;
261  }
262 
263  std::lock_guard lock(mutex_);
264  assert(backend_);
265 
266  if (backendComplete_)
267  {
268  JLOG(j_.debug()) << "shard " << index_ << " ledger sequence " << seq
269  << " already stored";
270  return true;
271  }
272 
273  assert(acquireInfo_);
274  auto& storedSeqs{acquireInfo_->storedSeqs};
275  if (boost::icl::contains(storedSeqs, seq))
276  {
277  JLOG(j_.debug()) << "shard " << index_ << " ledger sequence " << seq
278  << " already stored";
279  return true;
280  }
281  // storeSQLite looks at storedSeqs so insert before the call
282  storedSeqs.insert(seq);
283 
284  if (!storeSQLite(ledger, lock))
285  return false;
286 
287  if (boost::icl::length(storedSeqs) >= maxLedgers_)
288  {
289  if (!initSQLite(lock))
290  return false;
291 
292  backendComplete_ = true;
293  setBackendCache(lock);
294  }
295 
296  JLOG(j_.debug()) << "shard " << index_ << " stored ledger sequence " << seq
297  << (backendComplete_ ? " . All ledgers stored" : "");
298 
299  setFileStats(lock);
300  return true;
301 }
302 
303 bool
305 {
306  if (seq < firstSeq_ || seq > lastSeq_)
307  return false;
308 
309  std::lock_guard lock(mutex_);
310  if (backendComplete_)
311  return true;
312 
313  assert(acquireInfo_);
314  return boost::icl::contains(acquireInfo_->storedSeqs, seq);
315 }
316 
317 void
319 {
320  std::lock_guard lock(mutex_);
321  assert(pCache_ && nCache_);
322 
323  pCache_->sweep();
324  nCache_->sweep();
325 }
326 
327 std::tuple<
332 {
333  std::lock_guard lock(mutex_);
334  assert(backend_);
335 
336  return {backend_, pCache_, nCache_};
337 }
338 
341 {
342  std::lock_guard lock(mutex_);
343  assert(backend_);
344 
345  return backend_;
346 }
347 
348 bool
350 {
351  std::lock_guard lock(mutex_);
352  return backendComplete_;
353 }
354 
357 {
358  std::lock_guard lock(mutex_);
359  assert(pCache_);
360 
361  return pCache_;
362 }
363 
366 {
367  std::lock_guard lock(mutex_);
368  assert(nCache_);
369 
370  return nCache_;
371 }
372 
375 {
376  std::lock_guard lock(mutex_);
377  return {fileSz_, fdRequired_};
378 }
379 
380 bool
382 {
383  std::lock_guard lock(mutex_);
384  return final_;
385 }
386 
387 bool
389 {
390  std::lock_guard lock(mutex_);
391  return legacy_;
392 }
393 
394 bool
396  bool const writeSQLite,
397  boost::optional<uint256> const& expectedHash,
398  const bool writeDeterministicShard)
399 {
400  assert(backend_);
401 
402  if (stop_)
403  return false;
404 
405  uint256 hash{0};
406  std::uint32_t seq{0};
407  auto fail =
408  [j = j_, index = index_, &hash, &seq](std::string const& msg) {
409  JLOG(j.fatal())
410  << "shard " << index << ". " << msg
411  << (hash.isZero() ? "" : ". Ledger hash " + to_string(hash))
412  << (seq == 0 ? "" : ". Ledger sequence " + std::to_string(seq));
413  return false;
414  };
415 
416  try
417  {
418  std::unique_lock lock(mutex_);
419  if (!backendComplete_)
420  return fail("backend incomplete");
421 
422  /*
423  TODO MP
424  A lock is required when calling the NuDB verify function. Because
425  this can be a time consuming process, the server may desync.
426  Until this function is modified to work on an open database, we
427  are unable to use it from rippled.
428 
429  // Verify backend integrity
430  backend_->verify();
431  */
432 
433  // Check if a final key has been stored
434  lock.unlock();
436  backend_->fetch(finalKey.data(), &nObj) == Status::ok)
437  {
438  // Check final key's value
439  SerialIter sIt(nObj->getData().data(), nObj->getData().size());
440  if (sIt.get32() != version)
441  return fail("invalid version");
442 
443  if (sIt.get32() != firstSeq_ || sIt.get32() != lastSeq_)
444  return fail("out of range ledger sequences");
445 
446  if (hash = sIt.get256(); hash.isZero())
447  return fail("invalid last ledger hash");
448  }
449  else
450  {
451  // In the absence of a final key, an acquire SQLite database
452  // must be present in order to validate the shard
453  lock.lock();
454  if (!acquireInfo_)
455  return fail("missing acquire SQLite database");
456 
457  auto& session{acquireInfo_->SQLiteDB->getSession()};
458  boost::optional<std::uint32_t> index;
459  boost::optional<std::string> sHash;
460  soci::blob sociBlob(session);
461  soci::indicator blobPresent;
462  session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs "
463  "FROM Shard "
464  "WHERE ShardIndex = :index;",
465  soci::into(index), soci::into(sHash),
466  soci::into(sociBlob, blobPresent), soci::use(index_);
467 
468  lock.unlock();
469  if (!index || index != index_)
470  return fail("missing or invalid ShardIndex");
471 
472  if (!sHash)
473  return fail("missing LastLedgerHash");
474 
475  if (hash.SetHexExact(*sHash); hash.isZero())
476  return fail("invalid LastLedgerHash");
477 
478  if (blobPresent != soci::i_ok)
479  return fail("missing StoredLedgerSeqs");
480 
481  std::string s;
482  convert(sociBlob, s);
483 
484  lock.lock();
485 
486  auto& storedSeqs{acquireInfo_->storedSeqs};
487  if (!from_string(storedSeqs, s) ||
488  boost::icl::first(storedSeqs) != firstSeq_ ||
489  boost::icl::last(storedSeqs) != lastSeq_ ||
490  storedSeqs.size() != maxLedgers_)
491  {
492  return fail("invalid StoredLedgerSeqs");
493  }
494  }
495  }
496  catch (std::exception const& e)
497  {
498  return fail(
499  std::string("exception ") + e.what() + " in function " + __func__);
500  }
501 
502  // Validate the last ledger hash of a downloaded shard
503  // using a ledger hash obtained from the peer network
504  if (expectedHash && *expectedHash != hash)
505  return fail("invalid last ledger hash");
506 
507  // Validate every ledger stored in the backend
510  auto const lastLedgerHash{hash};
511 
513  if (writeDeterministicShard)
514  {
515  dsh = std::make_shared<DeterministicShard>(
516  app_, db_, index_, lastLedgerHash, j_);
517  if (!dsh->init())
518  {
519  return fail("can't create deterministic shard");
520  }
521  }
522 
523  // Start with the last ledger in the shard and walk backwards from
524  // child to parent until we reach the first ledger
525  seq = lastSeq_;
526  while (seq >= firstSeq_)
527  {
528  if (stop_)
529  return false;
530 
531  auto nObj = valFetch(hash);
532  if (!nObj)
533  return fail("invalid ledger");
534 
535  ledger = std::make_shared<Ledger>(
536  deserializePrefixedHeader(makeSlice(nObj->getData())),
537  app_.config(),
538  *app_.getShardFamily());
539  if (ledger->info().seq != seq)
540  return fail("invalid ledger sequence");
541  if (ledger->info().hash != hash)
542  return fail("invalid ledger hash");
543 
544  ledger->stateMap().setLedgerSeq(seq);
545  ledger->txMap().setLedgerSeq(seq);
546  ledger->setImmutable(app_.config());
547  if (!ledger->stateMap().fetchRoot(
548  SHAMapHash{ledger->info().accountHash}, nullptr))
549  {
550  return fail("missing root STATE node");
551  }
552  if (ledger->info().txHash.isNonZero() &&
553  !ledger->txMap().fetchRoot(
554  SHAMapHash{ledger->info().txHash}, nullptr))
555  {
556  return fail("missing root TXN node");
557  }
558 
559  if (dsh)
560  dsh->store(nObj);
561 
562  if (!verifyLedger(ledger, next, dsh))
563  return fail("verification check failed");
564 
565  if (writeSQLite)
566  {
567  std::lock_guard lock(mutex_);
568  if (!storeSQLite(ledger, lock))
569  return fail("failed storing to SQLite databases");
570  }
571 
572  hash = ledger->info().parentHash;
573  next = std::move(ledger);
574  --seq;
575  }
576 
577  JLOG(j_.debug()) << "shard " << index_ << " is valid";
578 
579  /*
580  TODO MP
581  SQLite VACUUM blocks all database access while processing.
582  Depending on the file size, that can take a while. Until we find
583  a non-blocking way of doing this, we cannot enable vacuum as
584  it can desync a server.
585 
586  try
587  {
588  // VACUUM the SQLite databases
589  auto const tmpDir {dir_ / "tmp_vacuum"};
590  create_directory(tmpDir);
591 
592  auto vacuum = [&tmpDir](std::unique_ptr<DatabaseCon>& sqliteDB)
593  {
594  auto& session {sqliteDB->getSession()};
595  session << "PRAGMA synchronous=OFF;";
596  session << "PRAGMA journal_mode=OFF;";
597  session << "PRAGMA temp_store_directory='" <<
598  tmpDir.string() << "';";
599  session << "VACUUM;";
600  };
601  vacuum(lgrSQLiteDB_);
602  vacuum(txSQLiteDB_);
603  remove_all(tmpDir);
604  }
605  catch (std::exception const& e)
606  {
607  return fail(std::string("exception ") +
608  e.what() + " in function " + __func__);
609  }
610  */
611 
612  // Store final key's value, may already be stored
613  Serializer s;
614  s.add32(version);
615  s.add32(firstSeq_);
616  s.add32(lastSeq_);
617  s.addBitString(lastLedgerHash);
618  auto nObj{
620  try
621  {
622  backend_->store(nObj);
623 
624  if (dsh)
625  {
626  dsh->store(nObj);
627  dsh->flush();
628  }
629 
630  std::lock_guard lock(mutex_);
631  final_ = true;
632 
633  // Remove the acquire SQLite database if present
634  if (acquireInfo_)
635  acquireInfo_.reset();
636  remove_all(dir_ / AcquireShardDBName);
637 
638  if (!initSQLite(lock))
639  return fail("failed to initialize SQLite databases");
640 
641  setFileStats(lock);
642  }
643  catch (std::exception const& e)
644  {
645  return fail(
646  std::string("exception ") + e.what() + " in function " + __func__);
647  }
648 
649  if (dsh)
650  {
651  /* Close non-deterministic shard database. */
652  backend_->close();
653  /* Replace non-deterministic shard by deterministic one. */
654  dsh->close();
655  /* Re-open deterministic shard database. */
656  backend_->open(false);
663  return finalize(false, expectedHash, false);
664  }
665 
666  return true;
667 }
668 
669 void
671 {
672  // Complete shards use the smallest cache and
673  // fastest expiration to reduce memory consumption.
674  // An incomplete shard is set according to configuration.
675 
676  Config const& config{app_.config()};
677  if (!pCache_)
678  {
679  auto const name{"shard " + std::to_string(index_)};
680  auto const sz{config.getValueFor(
682  backendComplete_ ? boost::optional<std::size_t>(0) : boost::none)};
683  auto const age{std::chrono::seconds{config.getValueFor(
685  backendComplete_ ? boost::optional<std::size_t>(0) : boost::none)}};
686 
687  pCache_ = std::make_shared<PCache>(name, sz, age, stopwatch(), j_);
688  nCache_ = std::make_shared<NCache>(name, stopwatch(), sz, age);
689  }
690  else
691  {
692  auto const sz{config.getValueFor(SizedItem::nodeCacheSize, 0)};
693  pCache_->setTargetSize(sz);
694  nCache_->setTargetSize(sz);
695 
696  auto const age{std::chrono::seconds{
697  config.getValueFor(SizedItem::nodeCacheAge, 0)}};
698  pCache_->setTargetAge(age);
699  nCache_->setTargetAge(age);
700  }
701 }
702 
703 bool
705 {
706  Config const& config{app_.config()};
707  DatabaseCon::Setup const setup = [&]() {
708  DatabaseCon::Setup result;
709  result.startUp = config.START_UP;
710  result.standAlone = config.standalone();
711  result.dataDir = dir_;
713  return result;
714  }();
715 
716  try
717  {
718  if (lgrSQLiteDB_)
719  lgrSQLiteDB_.reset();
720 
721  if (txSQLiteDB_)
722  txSQLiteDB_.reset();
723 
724  if (backendComplete_)
725  {
726  lgrSQLiteDB_ = std::make_unique<DatabaseCon>(
728  lgrSQLiteDB_->getSession() << boost::str(
729  boost::format("PRAGMA cache_size=-%d;") %
730  kilobytes(
731  config.getValueFor(SizedItem::lgrDBCache, boost::none)));
732 
733  txSQLiteDB_ = std::make_unique<DatabaseCon>(
735  txSQLiteDB_->getSession() << boost::str(
736  boost::format("PRAGMA cache_size=-%d;") %
737  kilobytes(
738  config.getValueFor(SizedItem::txnDBCache, boost::none)));
739  }
740  else
741  {
742  // The incomplete shard uses a Write Ahead Log for performance
743  lgrSQLiteDB_ = std::make_unique<DatabaseCon>(
744  setup, LgrDBName, LgrDBPragma, LgrDBInit);
745  lgrSQLiteDB_->getSession() << boost::str(
746  boost::format("PRAGMA cache_size=-%d;") %
747  kilobytes(config.getValueFor(SizedItem::lgrDBCache)));
748  lgrSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs());
749 
750  txSQLiteDB_ = std::make_unique<DatabaseCon>(
751  setup, TxDBName, TxDBPragma, TxDBInit);
752  txSQLiteDB_->getSession() << boost::str(
753  boost::format("PRAGMA cache_size=-%d;") %
754  kilobytes(config.getValueFor(SizedItem::txnDBCache)));
755  txSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs());
756  }
757  }
758  catch (std::exception const& e)
759  {
760  JLOG(j_.fatal()) << "shard " << index_ << " exception " << e.what()
761  << " in function " << __func__;
762  return false;
763  }
764  return true;
765 }
766 
767 bool
769  std::shared_ptr<Ledger const> const& ledger,
771 {
772  if (stop_)
773  return false;
774 
775  auto const seq{ledger->info().seq};
776 
777  try
778  {
779  // Update the transactions database
780  {
781  auto& session{txSQLiteDB_->getSession()};
782  soci::transaction tr(session);
783 
784  session << "DELETE FROM Transactions "
785  "WHERE LedgerSeq = :seq;",
786  soci::use(seq);
787  session << "DELETE FROM AccountTransactions "
788  "WHERE LedgerSeq = :seq;",
789  soci::use(seq);
790 
791  if (ledger->info().txHash.isNonZero())
792  {
793  auto const sSeq{std::to_string(seq)};
794  if (!ledger->txMap().isValid())
795  {
796  JLOG(j_.error()) << "shard " << index_
797  << " has an invalid transaction map"
798  << " on sequence " << sSeq;
799  return false;
800  }
801 
802  for (auto const& item : ledger->txs)
803  {
804  if (stop_)
805  return false;
806 
807  auto const txID{item.first->getTransactionID()};
808  auto const sTxID{to_string(txID)};
809  auto const txMeta{std::make_shared<TxMeta>(
810  txID, ledger->seq(), *item.second)};
811 
812  session << "DELETE FROM AccountTransactions "
813  "WHERE TransID = :txID;",
814  soci::use(sTxID);
815 
816  auto const& accounts = txMeta->getAffectedAccounts(j_);
817  if (!accounts.empty())
818  {
819  auto const sTxnSeq{std::to_string(txMeta->getIndex())};
820  auto const s{boost::str(
821  boost::format("('%s','%s',%s,%s)") % sTxID % "%s" %
822  sSeq % sTxnSeq)};
823  std::string sql;
824  sql.reserve((accounts.size() + 1) * 128);
825  sql =
826  "INSERT INTO AccountTransactions "
827  "(TransID, Account, LedgerSeq, TxnSeq) VALUES ";
828  sql += boost::algorithm::join(
829  accounts |
830  boost::adaptors::transformed(
831  [&](AccountID const& accountID) {
832  return boost::str(
833  boost::format(s) %
834  ripple::toBase58(accountID));
835  }),
836  ",");
837  sql += ';';
838  session << sql;
839 
840  JLOG(j_.trace()) << "shard " << index_
841  << " account transaction: " << sql;
842  }
843  else
844  {
845  JLOG(j_.warn())
846  << "shard " << index_ << " transaction in ledger "
847  << sSeq << " affects no accounts";
848  }
849 
850  Serializer s;
851  item.second->add(s);
852  session
854  item.first->getMetaSQL(
855  seq, sqlEscape(s.modData())) +
856  ';');
857  }
858  }
859 
860  tr.commit();
861  }
862 
863  auto const sHash{to_string(ledger->info().hash)};
864 
865  // Update the ledger database
866  {
867  auto& session{lgrSQLiteDB_->getSession()};
868  soci::transaction tr(session);
869 
870  auto const sParentHash{to_string(ledger->info().parentHash)};
871  auto const sDrops{to_string(ledger->info().drops)};
872  auto const sAccountHash{to_string(ledger->info().accountHash)};
873  auto const sTxHash{to_string(ledger->info().txHash)};
874 
875  session << "DELETE FROM Ledgers "
876  "WHERE LedgerSeq = :seq;",
877  soci::use(seq);
878  session
879  << "INSERT OR REPLACE INTO Ledgers ("
880  "LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime,"
881  "PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash,"
882  "TransSetHash)"
883  "VALUES ("
884  ":ledgerHash, :ledgerSeq, :prevHash, :totalCoins,"
885  ":closingTime, :prevClosingTime, :closeTimeRes,"
886  ":closeFlags, :accountSetHash, :transSetHash);",
887  soci::use(sHash), soci::use(seq), soci::use(sParentHash),
888  soci::use(sDrops),
889  soci::use(ledger->info().closeTime.time_since_epoch().count()),
890  soci::use(
891  ledger->info().parentCloseTime.time_since_epoch().count()),
892  soci::use(ledger->info().closeTimeResolution.count()),
893  soci::use(ledger->info().closeFlags), soci::use(sAccountHash),
894  soci::use(sTxHash);
895 
896  tr.commit();
897  }
898 
899  // Update the acquire database if present
900  if (acquireInfo_)
901  {
902  auto& session{acquireInfo_->SQLiteDB->getSession()};
903  soci::blob sociBlob(session);
904 
905  if (!acquireInfo_->storedSeqs.empty())
906  convert(to_string(acquireInfo_->storedSeqs), sociBlob);
907 
908  if (ledger->info().seq == lastSeq_)
909  {
910  // Store shard's last ledger hash
911  session << "UPDATE Shard "
912  "SET LastLedgerHash = :lastLedgerHash,"
913  "StoredLedgerSeqs = :storedLedgerSeqs "
914  "WHERE ShardIndex = :shardIndex;",
915  soci::use(sHash), soci::use(sociBlob), soci::use(index_);
916  }
917  else
918  {
919  session << "UPDATE Shard "
920  "SET StoredLedgerSeqs = :storedLedgerSeqs "
921  "WHERE ShardIndex = :shardIndex;",
922  soci::use(sociBlob), soci::use(index_);
923  }
924  }
925  }
926  catch (std::exception const& e)
927  {
928  JLOG(j_.fatal()) << "shard " << index_ << " exception " << e.what()
929  << " in function " << __func__;
930  return false;
931  }
932  return true;
933 }
934 
935 void
937 {
938  fileSz_ = 0;
939  fdRequired_ = 0;
940  try
941  {
942  using namespace boost::filesystem;
943  for (auto const& d : directory_iterator(dir_))
944  {
945  if (is_regular_file(d))
946  {
947  fileSz_ += file_size(d);
948  ++fdRequired_;
949  }
950  }
951  }
952  catch (std::exception const& e)
953  {
954  JLOG(j_.error()) << "shard " << index_ << " exception " << e.what()
955  << " in function " << __func__;
956  }
957 }
958 
959 bool
961  std::shared_ptr<Ledger const> const& ledger,
962  std::shared_ptr<Ledger const> const& next,
964 {
965  auto fail = [j = j_, index = index_, &ledger](std::string const& msg) {
966  JLOG(j.fatal()) << "shard " << index << ". " << msg
967  << (ledger->info().hash.isZero() ? ""
968  : ". Ledger hash " +
969  to_string(ledger->info().hash))
970  << (ledger->info().seq == 0 ? ""
971  : ". Ledger sequence " +
972  std::to_string(ledger->info().seq));
973  return false;
974  };
975 
976  if (ledger->info().hash.isZero())
977  return fail("Invalid ledger hash");
978  if (ledger->info().accountHash.isZero())
979  return fail("Invalid ledger account hash");
980 
981  bool error{false};
982  auto visit = [this, &error, dsh](SHAMapAbstractNode& node) {
983  if (stop_)
984  return false;
985  auto nObj = valFetch(node.getNodeHash().as_uint256());
986  if (!nObj)
987  error = true;
988  else if (dsh)
989  dsh->store(nObj);
990  return !error;
991  };
992 
993  // Validate the state map
994  if (ledger->stateMap().getHash().isNonZero())
995  {
996  if (!ledger->stateMap().isValid())
997  return fail("Invalid state map");
998 
999  try
1000  {
1001  if (next && next->info().parentHash == ledger->info().hash)
1002  ledger->stateMap().visitDifferences(&next->stateMap(), visit);
1003  else
1004  ledger->stateMap().visitNodes(visit);
1005  }
1006  catch (std::exception const& e)
1007  {
1008  return fail(
1009  std::string("exception ") + e.what() + " in function " +
1010  __func__);
1011  }
1012  if (stop_)
1013  return false;
1014  if (error)
1015  return fail("Invalid state map");
1016  }
1017 
1018  // Validate the transaction map
1019  if (ledger->info().txHash.isNonZero())
1020  {
1021  if (!ledger->txMap().isValid())
1022  return fail("Invalid transaction map");
1023 
1024  try
1025  {
1026  ledger->txMap().visitNodes(visit);
1027  }
1028  catch (std::exception const& e)
1029  {
1030  return fail(
1031  std::string("exception ") + e.what() + " in function " +
1032  __func__);
1033  }
1034  if (stop_)
1035  return false;
1036  if (error)
1037  return fail("Invalid transaction map");
1038  }
1039 
1040  return true;
1041 }
1042 
1044 Shard::valFetch(uint256 const& hash) const
1045 {
1047  auto fail = [j = j_, index = index_, &hash, &nObj](std::string const& msg) {
1048  JLOG(j.fatal()) << "shard " << index << ". " << msg
1049  << ". Node object hash " << to_string(hash);
1050  nObj.reset();
1051  return nObj;
1052  };
1053 
1054  try
1055  {
1056  switch (backend_->fetch(hash.data(), &nObj))
1057  {
1058  case ok:
1059  // This verifies that the hash of node object matches the
1060  // payload
1061  if (nObj->getHash() != sha512Half(makeSlice(nObj->getData())))
1062  return fail("Node object hash does not match payload");
1063  return nObj;
1064  case notFound:
1065  return fail("Missing node object");
1066  case dataCorrupt:
1067  return fail("Corrupt node object");
1068  default:
1069  return fail("Unknown error");
1070  }
1071  }
1072  catch (std::exception const& e)
1073  {
1074  return fail(
1075  std::string("exception ") + e.what() + " in function " + __func__);
1076  }
1077 }
1078 
1079 } // namespace NodeStore
1080 } // namespace ripple
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
ripple::AcquireShardDBPragma
constexpr std::array< char const *, 1 > AcquireShardDBPragma
Definition: DBInit.h:119
ripple::NodeStore::Shard::getBackend
std::shared_ptr< Backend > getBackend() const
Definition: Shard.cpp:340
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:43
ripple::NodeStore::Shard::dir_
const boost::filesystem::path dir_
Definition: Shard.h:199
ripple::Application
Definition: Application.h:97
ripple::AcquireShardDBName
constexpr auto AcquireShardDBName
Definition: DBInit.h:117
ripple::hotUNKNOWN
@ hotUNKNOWN
Definition: NodeObject.h:33
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:240
ripple::NodeStore::DatabaseShard::earliestShardIndex
virtual std::uint32_t earliestShardIndex() const =0
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::Shard::lgrSQLiteDB_
std::unique_ptr< DatabaseCon > lgrSQLiteDB_
Definition: Shard.h:211
ripple::NodeStore::Shard::removeOnDestroy_
std::atomic< bool > removeOnDestroy_
Definition: Shard.h:236
ripple::NodeStore::Shard::setBackendCache
void setBackendCache(std::lock_guard< std::recursive_mutex > const &lock)
Definition: Shard.cpp:670
std::exception
STL class.
ripple::DatabaseCon::Setup
Definition: DatabaseCon.h:86
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::SizedItem::nodeCacheSize
@ nodeCacheSize
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:176
std::pair
std::string::reserve
T reserve(T... args)
ripple::NodeStore::Shard::store
bool store(std::shared_ptr< Ledger const > const &ledger)
Definition: Shard.cpp:253
ripple::NodeStore::Shard::backendComplete_
bool backendComplete_
Definition: Shard.h:223
ripple::convert
void convert(soci::blob &from, std::vector< std::uint8_t > &to)
Definition: SociDB.cpp:155
ripple::DatabaseCon::Setup::startUp
Config::StartUpType startUp
Definition: DatabaseCon.h:90
ripple::ConfigSection::shardDatabase
static std::string shardDatabase()
Definition: ConfigSections.h:38
ripple::NodeStore::Shard::final_
bool final_
Definition: Shard.h:230
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
std::chrono::seconds
ripple::NodeStore::Shard::initSQLite
bool initSQLite(std::lock_guard< std::recursive_mutex > const &lock)
Definition: Shard.cpp:704
ripple::toBase58
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition: AccountID.cpp:29
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::lock_guard
STL class.
ripple::kilobytes
constexpr auto kilobytes(T value) noexcept
Definition: ByteUtilities.h:27
ripple::AcquireShardDBInit
constexpr std::array< char const *, 1 > AcquireShardDBInit
Definition: DBInit.h:122
std::tuple
ripple::DatabaseCon::Setup::dataDir
boost::filesystem::path dataDir
Definition: DatabaseCon.h:92
ripple::STTx::getMetaSQLInsertReplaceHeader
static std::string const & getMetaSQLInsertReplaceHeader()
Definition: STTx.cpp:216
ripple::from_string
bool from_string(RangeSet< T > &rs, std::string const &s)
Convert the given styled string to a RangeSet.
Definition: RangeSet.h:126
ripple::stopwatch
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition: chrono.h:86
ripple::SizedItem::nodeCacheAge
@ nodeCacheAge
ripple::sqlEscape
static std::string sqlEscape(std::string const &strSrc)
Definition: StringUtilities.h:34
ripple::to_string
std::string to_string(ListDisposition disposition)
Definition: ValidatorList.cpp:42
ripple::NodeStore::Shard::finalKey
static const uint256 finalKey
Definition: Shard.h:162
ripple::NodeStore::Shard::lastSeq_
const std::uint32_t lastSeq_
Definition: Shard.h:186
std::shared_ptr::reset
T reset(T... args)
ripple::base_uint::data
pointer data()
Definition: base_uint.h:103
ripple::LgrDBInit
constexpr std::array< char const *, 5 > LgrDBInit
Definition: DBInit.h:47
ripple::SHAMapHash
Definition: SHAMapTreeNode.h:43
ripple::NodeStore::Shard::finalize
bool finalize(bool const writeSQLite, boost::optional< uint256 > const &referenceHash, const bool writeDeterministicShard=true)
Finalize shard by walking its ledgers and verifying each Merkle tree.
Definition: Shard.cpp:395
ripple::NodeStore::Shard::isLegacy
bool isLegacy() const
Returns true if the shard is older, without final key data.
Definition: Shard.cpp:388
ripple::uint256
base_uint< 256 > uint256
Definition: base_uint.h:493
ripple::deserializePrefixedHeader
LedgerInfo deserializePrefixedHeader(Slice data)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
Definition: InboundLedger.cpp:292
ripple::NodeStore::Shard::version
static constexpr std::uint32_t version
Definition: Shard.h:157
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
ripple::TxDBName
constexpr auto TxDBName
Definition: DBInit.h:72
ripple::base_uint< 256 >
ripple::DatabaseCon::Setup::useGlobalPragma
bool useGlobalPragma
Definition: DatabaseCon.h:95
ripple::NodeStore::Shard::pCache
std::shared_ptr< PCache > pCache() const
Definition: Shard.cpp:356
ripple::NodeStore::Shard::j_
const beast::Journal j_
Definition: Shard.h:220
ripple::NodeStore::Shard::fileSz_
std::uint64_t fileSz_
Definition: Shard.h:202
ripple::NodeStore::Shard::valFetch
std::shared_ptr< NodeObject > valFetch(uint256 const &hash) const
Definition: Shard.cpp:1044
ripple::NodeStore::Shard::db_
DatabaseShard const & db_
Definition: Shard.h:177
ripple::DatabaseCon::Setup::standAlone
bool standAlone
Definition: DatabaseCon.h:91
ripple::base_uint::isZero
bool isZero() const
Definition: base_uint.h:475
ripple::LgrDBPragma
constexpr std::array< char const *, 1 > LgrDBPragma
Definition: DBInit.h:44
ripple::NodeStore::Shard::sweep
void sweep()
Definition: Shard.cpp:318
ripple::SerialIter::get256
uint256 get256()
Definition: Serializer.h:374
ripple::Config
Definition: Config.h:67
ripple::Application::config
virtual Config & config()=0
ripple::prevMissing
boost::optional< T > prevMissing(RangeSet< T > const &rs, T t, T minVal=0)
Find the largest value not in the set that is less than a given value.
Definition: RangeSet.h:184
std::unique_lock
STL class.
ripple::NodeStore::DatabaseShard
A collection of historical shards.
Definition: DatabaseShard.h:37
ripple::NodeStore::Shard::nCache_
std::shared_ptr< NCache > nCache_
Definition: Shard.h:196
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::NodeStore::Shard::backend_
std::shared_ptr< Backend > backend_
Definition: Shard.h:208
ripple::NodeStore::Shard::firstSeq_
const std::uint32_t firstSeq_
Definition: Shard.h:183
ripple::NodeStore::Shard::pCache_
std::shared_ptr< PCache > pCache_
Definition: Shard.h:193
ripple::NodeStore::Shard::isFinal
bool isFinal() const
Returns true if the shard is complete, validated, and immutable.
Definition: Shard.cpp:381
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::Application::logs
virtual Logs & logs()=0
ripple::NodeStore::Shard::maxLedgers_
const std::uint32_t maxLedgers_
Definition: Shard.h:190
ripple::SerialIter
Definition: Serializer.h:308
ripple::SizedItem::lgrDBCache
@ lgrDBCache
ripple::NodeStore::Shard::~Shard
~Shard()
Definition: Shard.cpp:57
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
std::uint32_t
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:57
ripple::SizedItem::txnDBCache
@ txnDBCache
ripple::NodeStore::Shard::acquireInfo_
std::unique_ptr< AcquireInfo > acquireInfo_
Definition: Shard.h:218
ripple::NodeStore::Shard::app_
Application & app_
Definition: Shard.h:174
ripple::NodeStore::Shard::isBackendComplete
bool isBackendComplete() const
Returns true if all shard ledgers have been stored in the backend.
Definition: Shard.cpp:349
ripple::NodeStore::Shard::mutex_
std::recursive_mutex mutex_
Definition: Shard.h:175
ripple::Serializer
Definition: Serializer.h:39
ripple::NodeStore::Manager::find
virtual Factory * find(std::string const &name)=0
Return a pointer to the matching factory if it exists.
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Serializer::addBitString
int addBitString(base_uint< Bits, Tag > const &v)
Definition: Serializer.h:97
ripple::NodeObject::keyBytes
static constexpr std::size_t keyBytes
Definition: NodeObject.h:57
ripple::NodeStore::Shard::verifyLedger
bool verifyLedger(std::shared_ptr< Ledger const > const &ledger, std::shared_ptr< Ledger const > const &next, std::shared_ptr< DeterministicShard > dsh={}) const
Definition: Shard.cpp:960
ripple::Application::getShardFamily
virtual Family * getShardFamily()=0
ripple::NodeStore::Shard::prepare
boost::optional< std::uint32_t > prepare()
Definition: Shard.cpp:233
std
STL namespace.
ripple::NodeStore::Shard::Shard
Shard(Application &app, DatabaseShard const &db, std::uint32_t index, beast::Journal j)
Definition: Shard.cpp:37
ripple::sha512Half
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition: digest.h:227
ripple::TxDBPragma
constexpr std::array TxDBPragma
Definition: DBInit.h:75
ripple::NodeStore::Shard::setFileStats
void setFileStats(std::lock_guard< std::recursive_mutex > const &lock)
Definition: Shard.cpp:936
ripple::SHAMapAbstractNode
Definition: SHAMapTreeNode.h:122
ripple::NodeStore::Shard::legacy_
bool legacy_
Definition: Shard.h:227
ripple::NodeStore::Shard::fdRequired_
std::uint32_t fdRequired_
Definition: Shard.h:205
ripple::NodeStore::Shard::index
std::uint32_t index() const
Definition: Shard.h:79
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::NodeStore::Shard::fileInfo
std::pair< std::uint64_t, std::uint32_t > fileInfo() const
Returns a pair where the first item describes the storage space utilized and the second item is the n...
Definition: Shard.cpp:374
ripple::NodeStore::Shard::open
bool open(Scheduler &scheduler, nudb::context &ctx)
Definition: Shard.cpp:79
ripple::CompleteShardDBPragma
constexpr std::array< char const *, 2 > CompleteShardDBPragma
Definition: DBInit.h:133
ripple::TxDBInit
constexpr std::array< char const *, 8 > TxDBInit
Definition: DBInit.h:83
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:117
ripple::SerialIter::get32
std::uint32_t get32()
Definition: Serializer.cpp:378
ripple::NodeStore::Shard::txSQLiteDB_
std::unique_ptr< DatabaseCon > txSQLiteDB_
Definition: Shard.h:214
ripple::NodeStore::Shard::containsLedger
bool containsLedger(std::uint32_t seq) const
Definition: Shard.cpp:304
ripple::NodeStore::Shard::index_
const std::uint32_t index_
Definition: Shard.h:180
ripple::NodeStore::Shard::stop_
std::atomic< bool > stop_
Definition: Shard.h:233
ripple::NodeStore::Shard::nCache
std::shared_ptr< NCache > nCache() const
Definition: Shard.cpp:365
std::exception::what
T what(T... args)
ripple::NodeStore::Shard::getBackendAll
std::tuple< std::shared_ptr< Backend >, std::shared_ptr< PCache >, std::shared_ptr< NCache > > getBackendAll() const
Definition: Shard.cpp:331
ripple::NodeStore::Shard::storeSQLite
bool storeSQLite(std::shared_ptr< Ledger const > const &ledger, std::lock_guard< std::recursive_mutex > const &lock)
Definition: Shard.cpp:768
ripple::LgrDBName
constexpr auto LgrDBName
Definition: DBInit.h:42