rippled
UnitaryShard.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2021 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/rdb/UnitaryShard.h>
21 #include <ripple/basics/StringUtilities.h>
22 #include <boost/format.hpp>
23 #include <boost/range/adaptor/transformed.hpp>
24 
25 namespace ripple {
26 
27 DatabasePair
29  Config const& config,
30  DatabaseCon::Setup const& setup)
31 {
32  auto tx{std::make_unique<DatabaseCon>(
34  tx->getSession() << boost::str(
35  boost::format("PRAGMA cache_size=-%d;") %
36  kilobytes(config.getValueFor(SizedItem::txnDBCache, std::nullopt)));
37 
38  auto lgr{std::make_unique<DatabaseCon>(
40  lgr->getSession() << boost::str(
41  boost::format("PRAGMA cache_size=-%d;") %
42  kilobytes(config.getValueFor(SizedItem::lgrDBCache, std::nullopt)));
43 
44  return {std::move(lgr), std::move(tx)};
45 }
46 
47 DatabasePair
49  Config const& config,
50  DatabaseCon::Setup const& setup,
51  DatabaseCon::CheckpointerSetup const& checkpointerSetup)
52 {
53  // transaction database
54  auto tx{std::make_unique<DatabaseCon>(
55  setup, TxDBName, TxDBPragma, TxDBInit, checkpointerSetup)};
56  tx->getSession() << boost::str(
57  boost::format("PRAGMA cache_size=-%d;") %
59 
60  // ledger database
61  auto lgr{std::make_unique<DatabaseCon>(
62  setup, LgrDBName, LgrDBPragma, LgrDBInit, checkpointerSetup)};
63  lgr->getSession() << boost::str(
64  boost::format("PRAGMA cache_size=-%d;") %
66 
67  return {std::move(lgr), std::move(tx)};
68 }
69 
70 bool
72  soci::session& txsession,
73  soci::session& lgrsession,
74  std::shared_ptr<Ledger const> const& ledger,
75  std::uint32_t index,
76  std::atomic<bool>& stop,
78 {
79  auto const ledgerSeq{ledger->info().seq};
80 
81  // Update the transactions database
82  {
83  auto& session{txsession};
84  soci::transaction tr(session);
85 
86  session << "DELETE FROM Transactions "
87  "WHERE LedgerSeq = :seq;",
88  soci::use(ledgerSeq);
89  session << "DELETE FROM AccountTransactions "
90  "WHERE LedgerSeq = :seq;",
91  soci::use(ledgerSeq);
92 
93  if (ledger->info().txHash.isNonZero())
94  {
95  auto const sSeq{std::to_string(ledgerSeq)};
96  if (!ledger->txMap().isValid())
97  {
98  JLOG(j.error())
99  << "shard " << index << " has an invalid transaction map"
100  << " on sequence " << sSeq;
101  return false;
102  }
103 
104  for (auto const& item : ledger->txs)
105  {
106  if (stop.load(std::memory_order_relaxed))
107  return false;
108 
109  TxMeta const txMeta{
110  item.first->getTransactionID(),
111  ledger->seq(),
112  *item.second};
113 
114  auto const sTxID = to_string(txMeta.getTxID());
115 
116  session << "DELETE FROM AccountTransactions "
117  "WHERE TransID = :txID;",
118  soci::use(sTxID);
119 
120  auto const& accounts = txMeta.getAffectedAccounts();
121  if (!accounts.empty())
122  {
123  auto const sTxnSeq{std::to_string(txMeta.getIndex())};
124  auto const s{boost::str(
125  boost::format("('%s','%s',%s,%s)") % sTxID % "%s" %
126  sSeq % sTxnSeq)};
127  std::string sql;
128  sql.reserve((accounts.size() + 1) * 128);
129  sql =
130  "INSERT INTO AccountTransactions "
131  "(TransID, Account, LedgerSeq, TxnSeq) VALUES ";
132  sql += boost::algorithm::join(
133  accounts |
134  boost::adaptors::transformed(
135  [&](AccountID const& accountID) {
136  return boost::str(
137  boost::format(s) %
138  ripple::toBase58(accountID));
139  }),
140  ",");
141  sql += ';';
142  session << sql;
143 
144  JLOG(j.trace())
145  << "shard " << index << " account transaction: " << sql;
146  }
147  else if (!isPseudoTx(*item.first))
148  {
149  // It's okay for pseudo transactions to not affect any
150  // accounts. But otherwise...
151  JLOG(j.warn())
152  << "shard " << index << " transaction in ledger "
153  << sSeq << " affects no accounts";
154  }
155 
156  Serializer s;
157  item.second->add(s);
158  session
160  item.first->getMetaSQL(
161  ledgerSeq, sqlBlobLiteral(s.modData())) +
162  ';');
163  }
164  }
165 
166  tr.commit();
167  }
168 
169  auto const sHash{to_string(ledger->info().hash)};
170 
171  // Update the ledger database
172  {
173  auto& session{lgrsession};
174  soci::transaction tr(session);
175 
176  auto const sParentHash{to_string(ledger->info().parentHash)};
177  auto const sDrops{to_string(ledger->info().drops)};
178  auto const closingTime{
179  ledger->info().closeTime.time_since_epoch().count()};
180  auto const prevClosingTime{
181  ledger->info().parentCloseTime.time_since_epoch().count()};
182  auto const closeTimeRes{ledger->info().closeTimeResolution.count()};
183  auto const sAccountHash{to_string(ledger->info().accountHash)};
184  auto const sTxHash{to_string(ledger->info().txHash)};
185 
186  session << "DELETE FROM Ledgers "
187  "WHERE LedgerSeq = :seq;",
188  soci::use(ledgerSeq);
189  session << "INSERT OR REPLACE INTO Ledgers ("
190  "LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime,"
191  "PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash,"
192  "TransSetHash)"
193  "VALUES ("
194  ":ledgerHash, :ledgerSeq, :prevHash, :totalCoins,"
195  ":closingTime, :prevClosingTime, :closeTimeRes,"
196  ":closeFlags, :accountSetHash, :transSetHash);",
197  soci::use(sHash), soci::use(ledgerSeq), soci::use(sParentHash),
198  soci::use(sDrops), soci::use(closingTime),
199  soci::use(prevClosingTime), soci::use(closeTimeRes),
200  soci::use(ledger->info().closeFlags), soci::use(sAccountHash),
201  soci::use(sTxHash);
202 
203  tr.commit();
204  }
205 
206  return true;
207 }
208 
211  DatabaseCon::Setup const& setup,
212  DatabaseCon::CheckpointerSetup const& checkpointerSetup)
213 {
214  return std::make_unique<DatabaseCon>(
215  setup,
219  checkpointerSetup);
220 }
221 
222 void
223 insertAcquireDBIndex(soci::session& session, std::uint32_t index)
224 {
225  session << "INSERT INTO Shard (ShardIndex) "
226  "VALUES (:shardIndex);",
227  soci::use(index);
228 }
229 
231 selectAcquireDBLedgerSeqs(soci::session& session, std::uint32_t index)
232 {
233  // resIndex and must be boost::optional (not std) because that's
234  // what SOCI expects in its interface.
235  boost::optional<std::uint32_t> resIndex;
236  soci::blob sociBlob(session);
237  soci::indicator blobPresent;
238 
239  session << "SELECT ShardIndex, StoredLedgerSeqs "
240  "FROM Shard "
241  "WHERE ShardIndex = :index;",
242  soci::into(resIndex), soci::into(sociBlob, blobPresent),
243  soci::use(index);
244 
245  if (!resIndex || index != resIndex)
246  return {false, {}};
247 
248  if (blobPresent != soci::i_ok)
249  return {true, {}};
250 
251  std::string s;
252  convert(sociBlob, s);
253 
254  return {true, s};
255 }
256 
258 selectAcquireDBLedgerSeqsHash(soci::session& session, std::uint32_t index)
259 {
260  // resIndex and sHash0 must be boost::optional (not std) because that's
261  // what SOCI expects in its interface.
262  boost::optional<std::uint32_t> resIndex;
263  boost::optional<std::string> sHash0;
264  soci::blob sociBlob(session);
265  soci::indicator blobPresent;
266 
267  session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs "
268  "FROM Shard "
269  "WHERE ShardIndex = :index;",
270  soci::into(resIndex), soci::into(sHash0),
271  soci::into(sociBlob, blobPresent), soci::use(index);
272 
274  (sHash0 ? *sHash0 : std::optional<std::string>());
275 
276  if (!resIndex || index != resIndex)
277  return {false, {{}, {}}};
278 
279  if (blobPresent != soci::i_ok)
280  return {true, {{}, sHash}};
281 
282  std::string s;
283  convert(sociBlob, s);
284 
285  return {true, {s, sHash}};
286 }
287 
288 void
290  soci::session& session,
291  std::shared_ptr<Ledger const> const& ledger,
292  std::uint32_t index,
293  std::uint32_t lastSeq,
294  std::optional<std::string> const& seqs)
295 {
296  soci::blob sociBlob(session);
297  auto const sHash{to_string(ledger->info().hash)};
298 
299  if (seqs)
300  convert(*seqs, sociBlob);
301 
302  if (ledger->info().seq == lastSeq)
303  {
304  // Store shard's last ledger hash
305  session << "UPDATE Shard "
306  "SET LastLedgerHash = :lastLedgerHash,"
307  "StoredLedgerSeqs = :storedLedgerSeqs "
308  "WHERE ShardIndex = :shardIndex;",
309  soci::use(sHash), soci::use(sociBlob), soci::use(index);
310  }
311  else
312  {
313  session << "UPDATE Shard "
314  "SET StoredLedgerSeqs = :storedLedgerSeqs "
315  "WHERE ShardIndex = :shardIndex;",
316  soci::use(sociBlob), soci::use(index);
317  }
318 }
319 
320 } // namespace ripple
ripple::AcquireShardDBPragma
constexpr std::array< char const *, 1 > AcquireShardDBPragma
Definition: DBInit.h:198
ripple::AcquireShardDBName
constexpr auto AcquireShardDBName
Definition: DBInit.h:196
ripple::SHAMap::isValid
bool isValid() const
Definition: SHAMap.h:625
ripple::makeAcquireDB
std::unique_ptr< DatabaseCon > makeAcquireDB(DatabaseCon::Setup const &setup, DatabaseCon::CheckpointerSetup const &checkpointerSetup)
makeAcquireDB Opens the shard acquire database and returns its descriptor.
Definition: UnitaryShard.cpp:210
std::string
STL class.
std::shared_ptr
STL class.
ripple::TxDBPragma
constexpr std::array< char const *, 4 > TxDBPragma
Definition: DBInit.h:78
ripple::LedgerHeader::closeFlags
int closeFlags
Definition: LedgerHeader.h:63
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:537
ripple::DatabaseCon::Setup
Definition: DatabaseCon.h:84
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:308
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:179
std::pair
std::string::reserve
T reserve(T... args)
ripple::convert
void convert(soci::blob &from, std::vector< std::uint8_t > &to)
Definition: SociDB.cpp:154
ripple::selectAcquireDBLedgerSeqsHash
std::pair< bool, AcquireShardSeqsHash > selectAcquireDBLedgerSeqsHash(soci::session &session, std::uint32_t index)
selectAcquireDBLedgerSeqsHash Returns the set of acquired ledger sequences and the last ledger hash f...
Definition: UnitaryShard.cpp:258
ripple::insertAcquireDBIndex
void insertAcquireDBIndex(soci::session &session, std::uint32_t index)
insertAcquireDBIndex Adds a new shard index to the shard acquire database.
Definition: UnitaryShard.cpp:223
ripple::LedgerHeader::parentHash
uint256 parentHash
Definition: LedgerHeader.h:52
ripple::toBase58
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition: AccountID.cpp:104
ripple::DatabaseCon::CheckpointerSetup
Definition: DatabaseCon.h:107
ripple::LedgerHeader::seq
LedgerIndex seq
Definition: LedgerHeader.h:41
beast::Journal::warn
Stream warn() const
Definition: Journal.h:326
ripple::kilobytes
constexpr auto kilobytes(T value) noexcept
Definition: ByteUtilities.h:27
ripple::AcquireShardDBInit
constexpr std::array< char const *, 1 > AcquireShardDBInit
Definition: DBInit.h:201
ripple::LedgerHeader::accountHash
uint256 accountHash
Definition: LedgerHeader.h:51
ripple::STTx::getMetaSQLInsertReplaceHeader
static std::string const & getMetaSQLInsertReplaceHeader()
Definition: STTx.cpp:268
ripple::LedgerHeader::txHash
uint256 txHash
Definition: LedgerHeader.h:50
ripple::LgrDBInit
constexpr std::array< char const *, 5 > LgrDBInit
Definition: DBInit.h:48
ripple::TxMeta
Definition: TxMeta.h:32
ripple::LedgerHeader::hash
uint256 hash
Definition: LedgerHeader.h:49
ripple::TxDBName
constexpr auto TxDBName
Definition: DBInit.h:73
ripple::base_uint
Integers of any length that is a multiple of 32-bits.
Definition: base_uint.h:82
ripple::updateAcquireDB
void updateAcquireDB(soci::session &session, std::shared_ptr< Ledger const > const &ledger, std::uint32_t index, std::uint32_t lastSeq, std::optional< std::string > const &seqs)
updateAcquireDB Updates information in the acquire DB.
Definition: UnitaryShard.cpp:289
ripple::Ledger::info
LedgerInfo const & info() const override
Returns information about the ledger.
Definition: Ledger.h:152
std::chrono::time_point::time_since_epoch
T time_since_epoch(T... args)
ripple::LedgerHeader::parentCloseTime
NetClock::time_point parentCloseTime
Definition: LedgerHeader.h:42
ripple::isPseudoTx
bool isPseudoTx(STObject const &tx)
Check whether a transaction is a pseudo-transaction.
Definition: STTx.cpp:576
ripple::Config::getValueFor
int getValueFor(SizedItem item, std::optional< std::size_t > node=std::nullopt) const
Retrieve the default value for the item at the specified node size.
Definition: Config.cpp:1009
ripple::LgrDBPragma
constexpr std::array< char const *, 1 > LgrDBPragma
Definition: DBInit.h:45
std::atomic::load
T load(T... args)
ripple::Config
Definition: Config.h:92
std::to_string
T to_string(T... args)
beast::Journal::error
Stream error() const
Definition: Journal.h:332
ripple::SizedItem::lgrDBCache
@ lgrDBCache
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
std::atomic< bool >
ripple::SizedItem::txnDBCache
@ txnDBCache
ripple::FinalShardDBPragma
constexpr std::array< char const *, 2 > FinalShardDBPragma
Definition: DBInit.h:212
ripple::LedgerHeader::closeTime
NetClock::time_point closeTime
Definition: LedgerHeader.h:72
ripple::Serializer
Definition: Serializer.h:40
ripple::Ledger::txMap
SHAMap const & txMap() const
Definition: Ledger.h:322
ripple::updateLedgerDBs
bool updateLedgerDBs(soci::session &txsession, soci::session &lgrsession, std::shared_ptr< Ledger const > const &ledger, std::uint32_t index, std::atomic< bool > &stop, beast::Journal j)
updateLedgerDBs Saves the given ledger to shard databases.
Definition: UnitaryShard.cpp:71
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::LedgerHeader::drops
XRPAmount drops
Definition: LedgerHeader.h:54
ripple::LedgerHeader::closeTimeResolution
NetClock::duration closeTimeResolution
Definition: LedgerHeader.h:66
ripple::ReadView::seq
LedgerIndex seq() const
Returns the sequence number of the base ledger.
Definition: ReadView.h:122
std::chrono::duration::count
T count(T... args)
ripple::makeShardIncompleteLedgerDBs
DatabasePair makeShardIncompleteLedgerDBs(Config const &config, DatabaseCon::Setup const &setup, DatabaseCon::CheckpointerSetup const &checkpointerSetup)
makeShardIncompleteLedgerDBs Opens shard databases for partially downloaded or unverified shards and ...
Definition: UnitaryShard.cpp:48
std::optional< std::string >
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
ripple::selectAcquireDBLedgerSeqs
std::pair< bool, std::optional< std::string > > selectAcquireDBLedgerSeqs(soci::session &session, std::uint32_t index)
selectAcquireDBLedgerSeqs Returns the set of acquired ledgers for the given shard.
Definition: UnitaryShard.cpp:231
ripple::TxDBInit
constexpr std::array< char const *, 8 > TxDBInit
Definition: DBInit.h:94
std::unique_ptr
STL class.
ripple::sqlBlobLiteral
std::string sqlBlobLiteral(Blob const &blob)
Format arbitrary binary data as an SQLite "blob literal".
Definition: StringUtilities.cpp:33
ripple::ReadView::txs
txs_type txs
Definition: ReadView.h:252
ripple::makeShardCompleteLedgerDBs
DatabasePair makeShardCompleteLedgerDBs(Config const &config, DatabaseCon::Setup const &setup)
makeShardCompleteLedgerDBs Opens shard databases for verified shards and returns their descriptors.
Definition: UnitaryShard.cpp:28
ripple::LgrDBName
constexpr auto LgrDBName
Definition: DBInit.h:43