rippled
Database.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/Ledger.h>
21 #include <ripple/basics/chrono.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_value.h>
24 #include <ripple/nodestore/Database.h>
25 #include <ripple/protocol/HashPrefix.h>
26 #include <ripple/protocol/jss.h>
27 #include <chrono>
28 
29 namespace ripple {
30 namespace NodeStore {
31 
33  Scheduler& scheduler,
34  int readThreads,
35  Section const& config,
36  beast::Journal journal)
37  : j_(journal)
38  , scheduler_(scheduler)
39  , ledgersPerShard_(get<std::uint32_t>(
40  config,
41  "ledgers_per_shard",
43  , earliestLedgerSeq_(
44  get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
45  , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
46 {
47  if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
48  Throw<std::runtime_error>("Invalid ledgers_per_shard");
49 
50  if (earliestLedgerSeq_ < 1)
51  Throw<std::runtime_error>("Invalid earliest_seq");
52 
53  while (readThreads-- > 0)
55 }
56 
58 {
59  // NOTE!
60  // Any derived class should call the stop() method in its
61  // destructor. Otherwise, occasionally, the derived class may
62  // crash during shutdown when its members are accessed by one of
63  // these threads after the derived class is destroyed but before
64  // this base class is destroyed.
65  stop();
66 }
67 
68 bool
70 {
72  return readStopping_;
73 }
74 
76 Database::maxLedgers(std::uint32_t shardIndex) const noexcept
77 {
78  if (shardIndex > earliestShardIndex_)
79  return ledgersPerShard_;
80 
81  if (shardIndex == earliestShardIndex_)
82  return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
83 
84  assert(!"Invalid shard index");
85  return 0;
86 }
87 
88 void
90 {
91  // After stop time we can no longer use the JobQueue for background
92  // reads. Join the background read threads.
93  {
95  if (readStopping_) // Only stop threads once.
96  return;
97 
98  readStopping_ = true;
100  }
101 
102  for (auto& e : readThreads_)
103  e.join();
104 }
105 
106 void
108  uint256 const& hash,
109  std::uint32_t ledgerSeq,
110  std::function<void(std::shared_ptr<NodeObject> const&)>&& cb)
111 {
112  // Post a read
114  read_[hash].emplace_back(ledgerSeq, std::move(cb));
116 }
117 
118 void
120 {
121  Batch batch;
123  auto storeBatch = [&, fname = __func__]() {
124  try
125  {
126  dstBackend.storeBatch(batch);
127  }
128  catch (std::exception const& e)
129  {
130  JLOG(j_.error()) << "Exception caught in function " << fname
131  << ". Error: " << e.what();
132  return;
133  }
134 
135  std::uint64_t sz{0};
136  for (auto const& nodeObject : batch)
137  sz += nodeObject->getData().size();
138  storeStats(batch.size(), sz);
139  batch.clear();
140  };
141 
142  srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
143  assert(nodeObject);
144  if (!nodeObject) // This should never happen
145  return;
146 
147  batch.emplace_back(std::move(nodeObject));
148  if (batch.size() >= batchWritePreallocationSize)
149  storeBatch();
150  });
151 
152  if (!batch.empty())
153  storeBatch();
154 }
155 
156 // Perform a fetch and report the time it took
159  uint256 const& hash,
160  std::uint32_t ledgerSeq,
161  FetchType fetchType,
162  bool duplicate)
163 {
164  FetchReport fetchReport(fetchType);
165 
166  using namespace std::chrono;
167  auto const begin{steady_clock::now()};
168 
169  auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
170  auto dur = steady_clock::now() - begin;
171  fetchDurationUs_ += duration_cast<microseconds>(dur).count();
172  if (nodeObject)
173  {
174  ++fetchHitCount_;
175  fetchSz_ += nodeObject->getData().size();
176  }
178 
179  fetchReport.elapsed = duration_cast<milliseconds>(dur);
180  scheduler_.onFetch(fetchReport);
181  return nodeObject;
182 }
183 
184 bool
186  Ledger const& srcLedger,
187  std::shared_ptr<Backend> dstBackend)
188 {
189  auto fail = [&](std::string const& msg) {
190  JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
191  << ". " << msg;
192  return false;
193  };
194 
195  if (srcLedger.info().hash.isZero())
196  return fail("Invalid hash");
197  if (srcLedger.info().accountHash.isZero())
198  return fail("Invalid account hash");
199 
200  auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
201  if (&srcDB == this)
202  return fail("Source and destination databases are the same");
203 
204  Batch batch;
206  auto storeBatch = [&, fname = __func__]() {
207  std::uint64_t sz{0};
208  for (auto const& nodeObject : batch)
209  sz += nodeObject->getData().size();
210 
211  try
212  {
213  dstBackend->storeBatch(batch);
214  }
215  catch (std::exception const& e)
216  {
217  fail(
218  std::string("Exception caught in function ") + fname +
219  ". Error: " + e.what());
220  return false;
221  }
222 
223  storeStats(batch.size(), sz);
224  batch.clear();
225  return true;
226  };
227 
228  // Store ledger header
229  {
230  Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
232  addRaw(srcLedger.info(), s);
233  auto nObj = NodeObject::createObject(
234  hotLEDGER, std::move(s.modData()), srcLedger.info().hash);
235  batch.emplace_back(std::move(nObj));
236  }
237 
238  bool error = false;
239  auto visit = [&](SHAMapTreeNode& node) {
240  if (!isStopping())
241  {
242  if (auto nodeObject = srcDB.fetchNodeObject(
243  node.getHash().as_uint256(), srcLedger.info().seq))
244  {
245  batch.emplace_back(std::move(nodeObject));
246  if (batch.size() < batchWritePreallocationSize || storeBatch())
247  return true;
248  }
249  }
250 
251  error = true;
252  return false;
253  };
254 
255  // Store the state map
256  if (srcLedger.stateMap().getHash().isNonZero())
257  {
258  if (!srcLedger.stateMap().isValid())
259  return fail("Invalid state map");
260 
261  srcLedger.stateMap().snapShot(false)->visitNodes(visit);
262  if (error)
263  return fail("Failed to store state map");
264  }
265 
266  // Store the transaction map
267  if (srcLedger.info().txHash.isNonZero())
268  {
269  if (!srcLedger.txMap().isValid())
270  return fail("Invalid transaction map");
271 
272  srcLedger.txMap().snapShot(false)->visitNodes(visit);
273  if (error)
274  return fail("Failed to store transaction map");
275  }
276 
277  if (!batch.empty() && !storeBatch())
278  return fail("Failed to store");
279 
280  return true;
281 }
282 
283 // Entry point for async read threads
284 void
286 {
287  beast::setCurrentThreadName("prefetch");
288  while (true)
289  {
290  uint256 lastHash;
294  entry;
295 
296  {
299  lock, [this] { return readStopping_ || !read_.empty(); });
300  if (readStopping_)
301  break;
302 
303  // Read in key order to make the back end more efficient
304  auto it = read_.lower_bound(readLastHash_);
305  if (it == read_.end())
306  {
307  // start over from the beginning
308  it = read_.begin();
309  }
310  lastHash = it->first;
311  entry = std::move(it->second);
312  read_.erase(it);
313  readLastHash_ = lastHash;
314  }
315 
316  auto seq = entry[0].first;
317  auto obj = fetchNodeObject(lastHash, seq, FetchType::async);
318 
319  for (auto const& req : entry)
320  {
321  if ((seq == req.first) || isSameDB(req.first, seq))
322  req.second(obj);
323  else
324  req.second(
325  fetchNodeObject(lastHash, req.first, FetchType::async));
326  }
327  }
328 }
329 
330 void
332 {
333  assert(obj.isObject());
334  obj[jss::node_writes] = std::to_string(storeCount_);
335  obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
336  obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
337  obj[jss::node_written_bytes] = std::to_string(storeSz_);
338  obj[jss::node_read_bytes] = std::to_string(fetchSz_);
339  obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
340 
341  if (auto c = getCounters())
342  {
343  obj[jss::node_read_errors] = std::to_string(c->readErrors);
344  obj[jss::node_read_retries] = std::to_string(c->readRetries);
345  obj[jss::node_write_retries] = std::to_string(c->writeRetries);
346  obj[jss::node_writes_delayed] = std::to_string(c->writesDelayed);
347  obj[jss::node_writes_duration_us] = std::to_string(c->writeDurationUs);
348  }
349 }
350 
351 } // namespace NodeStore
352 } // namespace ripple
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
ripple::NodeStore::Database::readLastHash_
uint256 readLastHash_
Definition: Database.h:370
ripple::SHAMap::isValid
bool isValid() const
Definition: SHAMap.h:611
ripple::DEFAULT_LEDGERS_PER_SHARD
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD
The number of ledgers in a shard.
Definition: SystemParameters.h:64
ripple::NodeStore::Database::fetchDurationUs_
std::atomic< std::uint64_t > fetchDurationUs_
Definition: Database.h:355
ripple::NodeStore::Database
Persistency layer for NodeObject.
Definition: Database.h:51
Json::Value::isObject
bool isObject() const
Definition: json_value.cpp:1027
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::SHAMap::getHash
SHAMapHash getHash() const
Definition: SHAMap.cpp:845
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:516
ripple::SHAMap::family
Family const & family() const
Definition: SHAMap.h:139
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
ripple::NodeStore::Database::ledgersPerShard_
const std::uint32_t ledgersPerShard_
Definition: Database.h:314
std::pair
std::vector::reserve
T reserve(T... args)
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:100
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:164
std::vector< std::shared_ptr< NodeObject > >
std::vector::size
T size(T... args)
ripple::NodeStore::Database::read_
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition: Database.h:367
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
ripple::NodeStore::Database::fetchTotalCount_
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:354
ripple::NodeStore::Database::asyncFetch
void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:107
ripple::NodeStore::Database::fetchSz_
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:306
std::lock_guard
STL class.
ripple::NodeStore::Database::stop
virtual void stop()
Definition: Database.cpp:89
ripple::NodeStore::FetchReport
Contains information about a fetch operation.
Definition: ripple/nodestore/Scheduler.h:32
ripple::NodeStore::Database::readStopping_
bool readStopping_
Definition: Database.h:373
std::function
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:92
ripple::SHAMapHash::isNonZero
bool isNonZero() const
Definition: SHAMapHash.h:58
ripple::SHAMap::snapShot
std::shared_ptr< SHAMap > snapShot(bool isMutable) const
Definition: SHAMap.cpp:70
ripple::NodeStore::Database::fetchHitCount_
std::atomic< std::uint32_t > fetchHitCount_
Definition: Database.h:305
ripple::Family::db
virtual NodeStore::Database & db()=0
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:101
ripple::NodeStore::Database::storeCount_
std::atomic< std::uint64_t > storeCount_
Definition: Database.h:352
ripple::NodeStore::batchWritePreallocationSize
@ batchWritePreallocationSize
Definition: nodestore/Types.h:34
ripple::NodeStore::Database::readLock_
std::mutex readLock_
Definition: Database.h:358
ripple::base_uint
Integers of any length that is a multiple of 32-bits.
Definition: base_uint.h:75
ripple::NodeStore::Database::storeSz_
std::atomic< std::uint64_t > storeSz_
Definition: Database.h:353
ripple::Ledger::info
LedgerInfo const & info() const override
Returns information about the ledger.
Definition: Ledger.h:148
ripple::NodeStore::Database::importInternal
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:119
ripple::base_uint::isZero
bool isZero() const
Definition: base_uint.h:511
ripple::Ledger
Holds a ledger.
Definition: Ledger.h:76
chrono
ripple::Ledger::stateMap
SHAMap const & stateMap() const
Definition: Ledger.h:307
std::unique_lock
STL class.
ripple::NodeStore::Database::readCondVar_
std::condition_variable readCondVar_
Definition: Database.h:359
ripple::SHAMapTreeNode
Definition: SHAMapTreeNode.h:53
std::to_string
T to_string(T... args)
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::NodeStore::Scheduler::onFetch
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
std::condition_variable::wait
T wait(T... args)
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:60
ripple::NodeStore::Database::~Database
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:57
ripple::NodeStore::Database::for_each
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
ripple::NodeStore::Database::storeStats
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:328
ripple::NodeStore::FetchType
FetchType
Definition: ripple/nodestore/Scheduler.h:29
ripple::NodeStore::Database::isStopping
bool isStopping() const
Definition: Database.cpp:69
std::condition_variable::notify_one
T notify_one(T... args)
ripple::NodeStore::Database::threadEntry
void threadEntry()
Definition: Database.cpp:285
ripple::Serializer
Definition: Serializer.h:39
ripple::Ledger::txMap
SHAMap const & txMap() const
Definition: Ledger.h:319
ripple::NodeStore::FetchType::async
@ async
ripple::NodeStore::Database::getCounters
virtual std::optional< Backend::Counters< std::uint64_t > > getCounters() const
Retrieve backend read and write stats.
Definition: Database.h:398
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
ripple::NodeStore::Database::j_
const beast::Journal j_
Definition: Database.h:301
ripple::NodeStore::Database::readThreads_
std::vector< std::thread > readThreads_
Definition: Database.h:372
std
STL namespace.
ripple::XRP_LEDGER_EARLIEST_SEQ
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
Definition: SystemParameters.h:61
ripple::NodeStore::Database::getCountsJson
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:331
ripple::NodeStore::Database::earliestLedgerSeq_
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
std::vector::empty
T empty(T... args)
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:84
ripple::NodeStore::Database::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:158
ripple::NodeStore::Database::scheduler_
Scheduler & scheduler_
Definition: Database.h:302
ripple::NodeStore::Database::isSameDB
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
ripple::NodeStore::Database::maxLedgers
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:76
ripple::NodeStore::Backend::storeBatch
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
ripple::NodeStore::FetchReport::elapsed
std::chrono::milliseconds elapsed
Definition: ripple/nodestore/Scheduler.h:38
std::condition_variable::notify_all
T notify_all(T... args)
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:102
std::exception::what
T what(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::NodeStore::Database::Database
Database()=delete
std::chrono
ripple::NodeStore::Backend
A backend used for the NodeStore.
Definition: Backend.h:39