rippled
Database.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/Ledger.h>
21 #include <ripple/basics/chrono.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_value.h>
24 #include <ripple/nodestore/Database.h>
25 #include <ripple/protocol/HashPrefix.h>
26 #include <ripple/protocol/jss.h>
27 #include <chrono>
28 
29 namespace ripple {
30 namespace NodeStore {
31 
33  Scheduler& scheduler,
34  int readThreads,
35  Section const& config,
36  beast::Journal journal)
37  : j_(journal)
38  , scheduler_(scheduler)
39  , ledgersPerShard_(get<std::uint32_t>(
40  config,
41  "ledgers_per_shard",
43  , earliestLedgerSeq_(
44  get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
45  , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
46  , requestBundle_(get<int>(config, "rq_bundle", 4))
47  , readThreads_(std::max(1, readThreads))
48 {
49  assert(readThreads != 0);
50 
51  if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
52  Throw<std::runtime_error>("Invalid ledgers_per_shard");
53 
54  if (earliestLedgerSeq_ < 1)
55  Throw<std::runtime_error>("Invalid earliest_seq");
56 
57  if (requestBundle_ < 1 || requestBundle_ > 64)
58  Throw<std::runtime_error>("Invalid rq_bundle");
59 
60  for (int i = readThreads_.load(); i != 0; --i)
61  {
62  std::thread t(
63  [this](int i) {
65 
67  "db prefetch #" + std::to_string(i));
68 
69  decltype(read_) read;
70 
71  while (!isStopping())
72  {
73  {
75 
76  if (read_.empty())
77  {
79  readCondVar_.wait(lock);
81  }
82 
83  if (isStopping())
84  continue;
85 
86  // If configured, extract multiple object at a time to
87  // minimize the overhead of acquiring the mutex.
88  for (int cnt = 0;
89  !read_.empty() && cnt != requestBundle_;
90  ++cnt)
91  read.insert(read_.extract(read_.begin()));
92  }
93 
94  for (auto it = read.begin(); it != read.end(); ++it)
95  {
96  assert(!it->second.empty());
97 
98  auto const& hash = it->first;
99  auto const& data = it->second;
100  auto const seqn = data[0].first;
101 
102  auto obj =
103  fetchNodeObject(hash, seqn, FetchType::async);
104 
105  // This could be further optimized: if there are
106  // multiple requests for sequence numbers mapping to
107  // multiple databases by sorting requests such that all
108  // indices mapping to the same database are grouped
109  // together and serviced by a single read.
110  for (auto const& req : data)
111  {
112  req.second(
113  (seqn == req.first) || isSameDB(req.first, seqn)
114  ? obj
115  : fetchNodeObject(
116  hash, req.first, FetchType::async));
117  }
118  }
119 
120  read.clear();
121  }
122 
123  --readThreads_;
124  },
125  i);
126  t.detach();
127  }
128 }
129 
131 {
132  // NOTE!
133  // Any derived class should call the stop() method in its
134  // destructor. Otherwise, occasionally, the derived class may
135  // crash during shutdown when its members are accessed by one of
136  // these threads after the derived class is destroyed but before
137  // this base class is destroyed.
138  stop();
139 }
140 
141 bool
143 {
144  return readStopping_.load(std::memory_order_relaxed);
145 }
146 
148 Database::maxLedgers(std::uint32_t shardIndex) const noexcept
149 {
150  if (shardIndex > earliestShardIndex_)
151  return ledgersPerShard_;
152 
153  if (shardIndex == earliestShardIndex_)
154  return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
155 
156  assert(!"Invalid shard index");
157  return 0;
158 }
159 
160 void
162 {
163  if (!readStopping_.exchange(true, std::memory_order_relaxed))
164  {
166  read_.clear();
168  }
169 
170  while (readThreads_.load() != 0)
172 }
173 
174 void
176  uint256 const& hash,
177  std::uint32_t ledgerSeq,
178  std::function<void(std::shared_ptr<NodeObject> const&)>&& cb)
179 {
180  // Post a read
182  read_[hash].emplace_back(ledgerSeq, std::move(cb));
184 }
185 
186 void
188 {
189  Batch batch;
191  auto storeBatch = [&, fname = __func__]() {
192  try
193  {
194  dstBackend.storeBatch(batch);
195  }
196  catch (std::exception const& e)
197  {
198  JLOG(j_.error()) << "Exception caught in function " << fname
199  << ". Error: " << e.what();
200  return;
201  }
202 
203  std::uint64_t sz{0};
204  for (auto const& nodeObject : batch)
205  sz += nodeObject->getData().size();
206  storeStats(batch.size(), sz);
207  batch.clear();
208  };
209 
210  srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
211  assert(nodeObject);
212  if (!nodeObject) // This should never happen
213  return;
214 
215  batch.emplace_back(std::move(nodeObject));
216  if (batch.size() >= batchWritePreallocationSize)
217  storeBatch();
218  });
219 
220  if (!batch.empty())
221  storeBatch();
222 }
223 
224 // Perform a fetch and report the time it took
227  uint256 const& hash,
228  std::uint32_t ledgerSeq,
229  FetchType fetchType,
230  bool duplicate)
231 {
232  FetchReport fetchReport(fetchType);
233 
234  using namespace std::chrono;
235  auto const begin{steady_clock::now()};
236 
237  auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
238  auto dur = steady_clock::now() - begin;
239  fetchDurationUs_ += duration_cast<microseconds>(dur).count();
240  if (nodeObject)
241  {
242  ++fetchHitCount_;
243  fetchSz_ += nodeObject->getData().size();
244  }
246 
247  fetchReport.elapsed = duration_cast<milliseconds>(dur);
248  scheduler_.onFetch(fetchReport);
249  return nodeObject;
250 }
251 
252 bool
254  Ledger const& srcLedger,
255  std::shared_ptr<Backend> dstBackend)
256 {
257  auto fail = [&](std::string const& msg) {
258  JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
259  << ". " << msg;
260  return false;
261  };
262 
263  if (srcLedger.info().hash.isZero())
264  return fail("Invalid hash");
265  if (srcLedger.info().accountHash.isZero())
266  return fail("Invalid account hash");
267 
268  auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
269  if (&srcDB == this)
270  return fail("Source and destination databases are the same");
271 
272  Batch batch;
274  auto storeBatch = [&, fname = __func__]() {
275  std::uint64_t sz{0};
276  for (auto const& nodeObject : batch)
277  sz += nodeObject->getData().size();
278 
279  try
280  {
281  dstBackend->storeBatch(batch);
282  }
283  catch (std::exception const& e)
284  {
285  fail(
286  std::string("Exception caught in function ") + fname +
287  ". Error: " + e.what());
288  return false;
289  }
290 
291  storeStats(batch.size(), sz);
292  batch.clear();
293  return true;
294  };
295 
296  // Store ledger header
297  {
298  Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
300  addRaw(srcLedger.info(), s);
301  auto nObj = NodeObject::createObject(
302  hotLEDGER, std::move(s.modData()), srcLedger.info().hash);
303  batch.emplace_back(std::move(nObj));
304  }
305 
306  bool error = false;
307  auto visit = [&](SHAMapTreeNode& node) {
308  if (!isStopping())
309  {
310  if (auto nodeObject = srcDB.fetchNodeObject(
311  node.getHash().as_uint256(), srcLedger.info().seq))
312  {
313  batch.emplace_back(std::move(nodeObject));
314  if (batch.size() < batchWritePreallocationSize || storeBatch())
315  return true;
316  }
317  }
318 
319  error = true;
320  return false;
321  };
322 
323  // Store the state map
324  if (srcLedger.stateMap().getHash().isNonZero())
325  {
326  if (!srcLedger.stateMap().isValid())
327  return fail("Invalid state map");
328 
329  srcLedger.stateMap().snapShot(false)->visitNodes(visit);
330  if (error)
331  return fail("Failed to store state map");
332  }
333 
334  // Store the transaction map
335  if (srcLedger.info().txHash.isNonZero())
336  {
337  if (!srcLedger.txMap().isValid())
338  return fail("Invalid transaction map");
339 
340  srcLedger.txMap().snapShot(false)->visitNodes(visit);
341  if (error)
342  return fail("Failed to store transaction map");
343  }
344 
345  if (!batch.empty() && !storeBatch())
346  return fail("Failed to store");
347 
348  return true;
349 }
350 
351 void
353 {
354  assert(obj.isObject());
355 
356  {
358  obj["read_queue"] = static_cast<Json::UInt>(read_.size());
359  }
360 
361  obj["read_threads_total"] = readThreads_.load();
362  obj["read_threads_running"] = runningThreads_.load();
363  obj["read_request_bundle"] = requestBundle_;
364 
365  obj[jss::node_writes] = std::to_string(storeCount_);
366  obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
367  obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
368  obj[jss::node_written_bytes] = std::to_string(storeSz_);
369  obj[jss::node_read_bytes] = std::to_string(fetchSz_);
370  obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
371 
372  if (auto c = getCounters())
373  {
374  obj[jss::node_read_errors] = std::to_string(c->readErrors);
375  obj[jss::node_read_retries] = std::to_string(c->readRetries);
376  obj[jss::node_write_retries] = std::to_string(c->writeRetries);
377  obj[jss::node_writes_delayed] = std::to_string(c->writesDelayed);
378  obj[jss::node_writes_duration_us] = std::to_string(c->writeDurationUs);
379  }
380 }
381 
382 } // namespace NodeStore
383 } // namespace ripple
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
ripple::SHAMap::isValid
bool isValid() const
Definition: SHAMap.h:617
ripple::DEFAULT_LEDGERS_PER_SHARD
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD
The number of ledgers in a shard.
Definition: SystemParameters.h:64
ripple::NodeStore::Database::fetchDurationUs_
std::atomic< std::uint64_t > fetchDurationUs_
Definition: Database.h:360
ripple::NodeStore::Database
Persistency layer for NodeObject.
Definition: Database.h:51
ripple::NodeStore::read
void read(nudb::detail::istream &is, std::size_t &u)
Definition: varint.h:120
Json::Value::isObject
bool isObject() const
Definition: json_value.cpp:1027
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::SHAMap::getHash
SHAMapHash getHash() const
Definition: SHAMap.cpp:843
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:530
ripple::SHAMap::family
Family const & family() const
Definition: SHAMap.h:139
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
ripple::NodeStore::Database::ledgersPerShard_
const std::uint32_t ledgersPerShard_
Definition: Database.h:314
std::vector::reserve
T reserve(T... args)
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:101
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:162
Json::UInt
unsigned int UInt
Definition: json_forwards.h:27
std::vector< std::shared_ptr< NodeObject > >
std::vector::size
T size(T... args)
ripple::NodeStore::Database::read_
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition: Database.h:372
ripple::NodeStore::Database::requestBundle_
const int requestBundle_
Definition: Database.h:330
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
ripple::NodeStore::Database::fetchTotalCount_
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:359
ripple::NodeStore::Database::asyncFetch
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:175
ripple::NodeStore::Database::fetchSz_
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:306
std::lock_guard
STL class.
ripple::NodeStore::Database::stop
virtual void stop()
Definition: Database.cpp:161
ripple::NodeStore::FetchReport
Contains information about a fetch operation.
Definition: ripple/nodestore/Scheduler.h:32
std::function
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:93
ripple::NodeStore::Database::readStopping_
std::atomic< bool > readStopping_
Definition: Database.h:374
ripple::NodeStore::Database::readThreads_
std::atomic< int > readThreads_
Definition: Database.h:375
ripple::SHAMapHash::isNonZero
bool isNonZero() const
Definition: SHAMapHash.h:58
ripple::SHAMap::snapShot
std::shared_ptr< SHAMap > snapShot(bool isMutable) const
Definition: SHAMap.cpp:70
ripple::NodeStore::Database::fetchHitCount_
std::atomic< std::uint32_t > fetchHitCount_
Definition: Database.h:305
ripple::Family::db
virtual NodeStore::Database & db()=0
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:102
std::thread::detach
T detach(T... args)
ripple::NodeStore::Database::storeCount_
std::atomic< std::uint64_t > storeCount_
Definition: Database.h:357
ripple::NodeStore::batchWritePreallocationSize
@ batchWritePreallocationSize
Definition: nodestore/Types.h:34
ripple::NodeStore::Database::readLock_
std::mutex readLock_
Definition: Database.h:363
ripple::base_uint< 256 >
ripple::NodeStore::Database::storeSz_
std::atomic< std::uint64_t > storeSz_
Definition: Database.h:358
ripple::Ledger::info
LedgerInfo const & info() const override
Returns information about the ledger.
Definition: Ledger.h:148
ripple::NodeStore::Database::importInternal
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:187
ripple::base_uint::isZero
bool isZero() const
Definition: base_uint.h:525
std::thread
STL class.
ripple::Ledger
Holds a ledger.
Definition: Ledger.h:76
std::atomic::load
T load(T... args)
chrono
ripple::Ledger::stateMap
SHAMap const & stateMap() const
Definition: Ledger.h:307
std::unique_lock
STL class.
ripple::NodeStore::Database::readCondVar_
std::condition_variable readCondVar_
Definition: Database.h:364
ripple::SHAMapTreeNode
Definition: SHAMapTreeNode.h:53
std::to_string
T to_string(T... args)
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::NodeStore::Scheduler::onFetch
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
std::condition_variable::wait
T wait(T... args)
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:60
ripple::NodeStore::Database::~Database
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:130
ripple::NodeStore::Database::for_each
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
ripple::NodeStore::Database::storeStats
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:333
ripple::NodeStore::FetchType
FetchType
Definition: ripple/nodestore/Scheduler.h:29
ripple::NodeStore::Database::isStopping
bool isStopping() const
Definition: Database.cpp:142
std::condition_variable::notify_one
T notify_one(T... args)
ripple::Serializer
Definition: Serializer.h:39
ripple::Ledger::txMap
SHAMap const & txMap() const
Definition: Ledger.h:319
ripple::NodeStore::FetchType::async
@ async
ripple::NodeStore::Database::getCounters
virtual std::optional< Backend::Counters< std::uint64_t > > getCounters() const
Retrieve backend read and write stats.
Definition: Database.h:401
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
std::atomic::exchange
T exchange(T... args)
ripple::NodeStore::Database::j_
const beast::Journal j_
Definition: Database.h:301
std
STL namespace.
ripple::XRP_LEDGER_EARLIEST_SEQ
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
Definition: SystemParameters.h:61
ripple::NodeStore::Database::getCountsJson
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:352
ripple::NodeStore::Database::earliestLedgerSeq_
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
std::vector::empty
T empty(T... args)
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:85
ripple::NodeStore::Database::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:226
ripple::NodeStore::Database::scheduler_
Scheduler & scheduler_
Definition: Database.h:302
ripple::NodeStore::Database::runningThreads_
std::atomic< int > runningThreads_
Definition: Database.h:376
ripple::NodeStore::Database::isSameDB
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
ripple::NodeStore::Database::maxLedgers
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:148
ripple::NodeStore::Backend::storeBatch
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
ripple::NodeStore::FetchReport::elapsed
std::chrono::milliseconds elapsed
Definition: ripple/nodestore/Scheduler.h:38
std::condition_variable::notify_all
T notify_all(T... args)
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:103
std::this_thread::yield
T yield(T... args)
std::exception::what
T what(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::NodeStore::Database::Database
Database()=delete
std::chrono
ripple::NodeStore::Backend
A backend used for the NodeStore.
Definition: Backend.h:39