rippled
Database.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/Ledger.h>
21 #include <ripple/basics/chrono.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_value.h>
24 #include <ripple/nodestore/Database.h>
25 #include <ripple/protocol/HashPrefix.h>
26 #include <ripple/protocol/jss.h>
27 #include <chrono>
28 
29 namespace ripple {
30 namespace NodeStore {
31 
33  Scheduler& scheduler,
34  int readThreads,
35  Section const& config,
36  beast::Journal journal)
37  : j_(journal)
38  , scheduler_(scheduler)
39  , ledgersPerShard_(get<std::uint32_t>(
40  config,
41  "ledgers_per_shard",
43  , earliestLedgerSeq_(
44  get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
45  , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
46  , readThreads_(std::min(1, readThreads))
47 {
48  assert(readThreads != 0);
49 
50  if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
51  Throw<std::runtime_error>("Invalid ledgers_per_shard");
52 
53  if (earliestLedgerSeq_ < 1)
54  Throw<std::runtime_error>("Invalid earliest_seq");
55 
56  for (int i = 0; i != readThreads_.load(); ++i)
57  {
58  std::thread t(
59  [this](int i) {
61  "db prefetch #" + std::to_string(i));
62 
63  decltype(read_) read;
64 
65  while (!isStopping())
66  {
67  {
69 
70  if (read_.empty())
71  readCondVar_.wait(lock);
72 
73  if (isStopping())
74  continue;
75 
76  // We extract up to 64 objects to minimize the overhead
77  // of acquiring the mutex.
78  for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt)
79  read.insert(read_.extract(read_.begin()));
80  }
81 
82  for (auto it = read.begin(); it != read.end(); ++it)
83  {
84  assert(!it->second.empty());
85 
86  auto const& hash = it->first;
87  auto const& data = std::move(it->second);
88  auto const seqn = data[0].first;
89 
90  auto obj =
91  fetchNodeObject(hash, seqn, FetchType::async);
92 
93  // This could be further optimized: if there are
94  // multiple requests for sequence numbers mapping to
95  // multiple databases by sorting requests such that all
96  // indices mapping to the same database are grouped
97  // together and serviced by a single read.
98  for (auto const& req : data)
99  {
100  req.second(
101  (seqn == req.first) || isSameDB(req.first, seqn)
102  ? obj
103  : fetchNodeObject(
104  hash, req.first, FetchType::async));
105  }
106  }
107 
108  read.clear();
109  }
110 
111  --readThreads_;
112  },
113  i);
114  t.detach();
115  }
116 }
117 
119 {
120  // NOTE!
121  // Any derived class should call the stop() method in its
122  // destructor. Otherwise, occasionally, the derived class may
123  // crash during shutdown when its members are accessed by one of
124  // these threads after the derived class is destroyed but before
125  // this base class is destroyed.
126  stop();
127 }
128 
129 bool
131 {
132  return readStopping_.load(std::memory_order_relaxed);
133 }
134 
136 Database::maxLedgers(std::uint32_t shardIndex) const noexcept
137 {
138  if (shardIndex > earliestShardIndex_)
139  return ledgersPerShard_;
140 
141  if (shardIndex == earliestShardIndex_)
142  return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
143 
144  assert(!"Invalid shard index");
145  return 0;
146 }
147 
148 void
150 {
151  if (!readStopping_.exchange(true, std::memory_order_relaxed))
152  {
154  read_.clear();
156  }
157 
158  while (readThreads_.load() != 0)
160 }
161 
162 void
164  uint256 const& hash,
165  std::uint32_t ledgerSeq,
166  std::function<void(std::shared_ptr<NodeObject> const&)>&& cb)
167 {
168  // Post a read
170  read_[hash].emplace_back(ledgerSeq, std::move(cb));
172 }
173 
174 void
176 {
177  Batch batch;
179  auto storeBatch = [&, fname = __func__]() {
180  try
181  {
182  dstBackend.storeBatch(batch);
183  }
184  catch (std::exception const& e)
185  {
186  JLOG(j_.error()) << "Exception caught in function " << fname
187  << ". Error: " << e.what();
188  return;
189  }
190 
191  std::uint64_t sz{0};
192  for (auto const& nodeObject : batch)
193  sz += nodeObject->getData().size();
194  storeStats(batch.size(), sz);
195  batch.clear();
196  };
197 
198  srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
199  assert(nodeObject);
200  if (!nodeObject) // This should never happen
201  return;
202 
203  batch.emplace_back(std::move(nodeObject));
204  if (batch.size() >= batchWritePreallocationSize)
205  storeBatch();
206  });
207 
208  if (!batch.empty())
209  storeBatch();
210 }
211 
212 // Perform a fetch and report the time it took
215  uint256 const& hash,
216  std::uint32_t ledgerSeq,
217  FetchType fetchType,
218  bool duplicate)
219 {
220  FetchReport fetchReport(fetchType);
221 
222  using namespace std::chrono;
223  auto const begin{steady_clock::now()};
224 
225  auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
226  auto dur = steady_clock::now() - begin;
227  fetchDurationUs_ += duration_cast<microseconds>(dur).count();
228  if (nodeObject)
229  {
230  ++fetchHitCount_;
231  fetchSz_ += nodeObject->getData().size();
232  }
234 
235  fetchReport.elapsed = duration_cast<milliseconds>(dur);
236  scheduler_.onFetch(fetchReport);
237  return nodeObject;
238 }
239 
240 bool
242  Ledger const& srcLedger,
243  std::shared_ptr<Backend> dstBackend)
244 {
245  auto fail = [&](std::string const& msg) {
246  JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
247  << ". " << msg;
248  return false;
249  };
250 
251  if (srcLedger.info().hash.isZero())
252  return fail("Invalid hash");
253  if (srcLedger.info().accountHash.isZero())
254  return fail("Invalid account hash");
255 
256  auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
257  if (&srcDB == this)
258  return fail("Source and destination databases are the same");
259 
260  Batch batch;
262  auto storeBatch = [&, fname = __func__]() {
263  std::uint64_t sz{0};
264  for (auto const& nodeObject : batch)
265  sz += nodeObject->getData().size();
266 
267  try
268  {
269  dstBackend->storeBatch(batch);
270  }
271  catch (std::exception const& e)
272  {
273  fail(
274  std::string("Exception caught in function ") + fname +
275  ". Error: " + e.what());
276  return false;
277  }
278 
279  storeStats(batch.size(), sz);
280  batch.clear();
281  return true;
282  };
283 
284  // Store ledger header
285  {
286  Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
288  addRaw(srcLedger.info(), s);
289  auto nObj = NodeObject::createObject(
290  hotLEDGER, std::move(s.modData()), srcLedger.info().hash);
291  batch.emplace_back(std::move(nObj));
292  }
293 
294  bool error = false;
295  auto visit = [&](SHAMapTreeNode& node) {
296  if (!isStopping())
297  {
298  if (auto nodeObject = srcDB.fetchNodeObject(
299  node.getHash().as_uint256(), srcLedger.info().seq))
300  {
301  batch.emplace_back(std::move(nodeObject));
302  if (batch.size() < batchWritePreallocationSize || storeBatch())
303  return true;
304  }
305  }
306 
307  error = true;
308  return false;
309  };
310 
311  // Store the state map
312  if (srcLedger.stateMap().getHash().isNonZero())
313  {
314  if (!srcLedger.stateMap().isValid())
315  return fail("Invalid state map");
316 
317  srcLedger.stateMap().snapShot(false)->visitNodes(visit);
318  if (error)
319  return fail("Failed to store state map");
320  }
321 
322  // Store the transaction map
323  if (srcLedger.info().txHash.isNonZero())
324  {
325  if (!srcLedger.txMap().isValid())
326  return fail("Invalid transaction map");
327 
328  srcLedger.txMap().snapShot(false)->visitNodes(visit);
329  if (error)
330  return fail("Failed to store transaction map");
331  }
332 
333  if (!batch.empty() && !storeBatch())
334  return fail("Failed to store");
335 
336  return true;
337 }
338 
339 void
341 {
342  assert(obj.isObject());
343  obj[jss::node_writes] = std::to_string(storeCount_);
344  obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
345  obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
346  obj[jss::node_written_bytes] = std::to_string(storeSz_);
347  obj[jss::node_read_bytes] = std::to_string(fetchSz_);
348  obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
349 
350  if (auto c = getCounters())
351  {
352  obj[jss::node_read_errors] = std::to_string(c->readErrors);
353  obj[jss::node_read_retries] = std::to_string(c->readRetries);
354  obj[jss::node_write_retries] = std::to_string(c->writeRetries);
355  obj[jss::node_writes_delayed] = std::to_string(c->writesDelayed);
356  obj[jss::node_writes_duration_us] = std::to_string(c->writeDurationUs);
357  }
358 }
359 
360 } // namespace NodeStore
361 } // namespace ripple
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
ripple::SHAMap::isValid
bool isValid() const
Definition: SHAMap.h:617
ripple::DEFAULT_LEDGERS_PER_SHARD
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD
The number of ledgers in a shard.
Definition: SystemParameters.h:64
ripple::NodeStore::Database::fetchDurationUs_
std::atomic< std::uint64_t > fetchDurationUs_
Definition: Database.h:355
ripple::NodeStore::Database
Persistency layer for NodeObject.
Definition: Database.h:51
ripple::NodeStore::read
void read(nudb::detail::istream &is, std::size_t &u)
Definition: varint.h:120
Json::Value::isObject
bool isObject() const
Definition: json_value.cpp:1027
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::SHAMap::getHash
SHAMapHash getHash() const
Definition: SHAMap.cpp:843
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:530
ripple::SHAMap::family
Family const & family() const
Definition: SHAMap.h:139
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
ripple::NodeStore::Database::ledgersPerShard_
const std::uint32_t ledgersPerShard_
Definition: Database.h:314
std::vector::reserve
T reserve(T... args)
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:100
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:162
std::vector< std::shared_ptr< NodeObject > >
std::vector::size
T size(T... args)
ripple::NodeStore::Database::read_
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition: Database.h:367
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
ripple::NodeStore::Database::fetchTotalCount_
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:354
ripple::NodeStore::Database::asyncFetch
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:163
ripple::NodeStore::Database::fetchSz_
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:306
std::lock_guard
STL class.
ripple::NodeStore::Database::stop
virtual void stop()
Definition: Database.cpp:149
ripple::NodeStore::FetchReport
Contains information about a fetch operation.
Definition: ripple/nodestore/Scheduler.h:32
std::function
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:92
ripple::NodeStore::Database::readStopping_
std::atomic< bool > readStopping_
Definition: Database.h:369
ripple::NodeStore::Database::readThreads_
std::atomic< int > readThreads_
Definition: Database.h:370
ripple::SHAMapHash::isNonZero
bool isNonZero() const
Definition: SHAMapHash.h:58
ripple::SHAMap::snapShot
std::shared_ptr< SHAMap > snapShot(bool isMutable) const
Definition: SHAMap.cpp:70
ripple::NodeStore::Database::fetchHitCount_
std::atomic< std::uint32_t > fetchHitCount_
Definition: Database.h:305
ripple::Family::db
virtual NodeStore::Database & db()=0
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:101
std::thread::detach
T detach(T... args)
ripple::NodeStore::Database::storeCount_
std::atomic< std::uint64_t > storeCount_
Definition: Database.h:352
ripple::NodeStore::batchWritePreallocationSize
@ batchWritePreallocationSize
Definition: nodestore/Types.h:34
ripple::NodeStore::Database::readLock_
std::mutex readLock_
Definition: Database.h:358
ripple::base_uint< 256 >
ripple::NodeStore::Database::storeSz_
std::atomic< std::uint64_t > storeSz_
Definition: Database.h:353
ripple::Ledger::info
LedgerInfo const & info() const override
Returns information about the ledger.
Definition: Ledger.h:148
ripple::NodeStore::Database::importInternal
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:175
ripple::base_uint::isZero
bool isZero() const
Definition: base_uint.h:525
std::thread
STL class.
ripple::Ledger
Holds a ledger.
Definition: Ledger.h:76
std::atomic::load
T load(T... args)
chrono
ripple::Ledger::stateMap
SHAMap const & stateMap() const
Definition: Ledger.h:307
std::unique_lock
STL class.
ripple::NodeStore::Database::readCondVar_
std::condition_variable readCondVar_
Definition: Database.h:359
ripple::SHAMapTreeNode
Definition: SHAMapTreeNode.h:53
std::to_string
T to_string(T... args)
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::NodeStore::Scheduler::onFetch
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
std::condition_variable::wait
T wait(T... args)
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:60
ripple::NodeStore::Database::~Database
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:118
ripple::NodeStore::Database::for_each
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
ripple::NodeStore::Database::storeStats
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:328
ripple::NodeStore::FetchType
FetchType
Definition: ripple/nodestore/Scheduler.h:29
ripple::NodeStore::Database::isStopping
bool isStopping() const
Definition: Database.cpp:130
std::condition_variable::notify_one
T notify_one(T... args)
ripple::Serializer
Definition: Serializer.h:39
ripple::Ledger::txMap
SHAMap const & txMap() const
Definition: Ledger.h:319
ripple::NodeStore::FetchType::async
@ async
ripple::NodeStore::Database::getCounters
virtual std::optional< Backend::Counters< std::uint64_t > > getCounters() const
Retrieve backend read and write stats.
Definition: Database.h:395
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
std::atomic::exchange
T exchange(T... args)
ripple::NodeStore::Database::j_
const beast::Journal j_
Definition: Database.h:301
std
STL namespace.
ripple::XRP_LEDGER_EARLIEST_SEQ
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
Definition: SystemParameters.h:61
ripple::NodeStore::Database::getCountsJson
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:340
ripple::NodeStore::Database::earliestLedgerSeq_
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
std::vector::empty
T empty(T... args)
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:84
ripple::NodeStore::Database::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:214
ripple::NodeStore::Database::scheduler_
Scheduler & scheduler_
Definition: Database.h:302
ripple::NodeStore::Database::isSameDB
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
ripple::NodeStore::Database::maxLedgers
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:136
ripple::NodeStore::Backend::storeBatch
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
ripple::NodeStore::FetchReport::elapsed
std::chrono::milliseconds elapsed
Definition: ripple/nodestore/Scheduler.h:38
std::condition_variable::notify_all
T notify_all(T... args)
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:102
std::this_thread::yield
T yield(T... args)
std::exception::what
T what(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::NodeStore::Database::Database
Database()=delete
std::chrono
ripple::NodeStore::Backend
A backend used for the NodeStore.
Definition: Backend.h:39