rippled
Database.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/Ledger.h>
21 #include <ripple/basics/chrono.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/nodestore/Database.h>
24 #include <ripple/protocol/HashPrefix.h>
25 
26 namespace ripple {
27 namespace NodeStore {
28 
30  std::string name,
31  Stoppable& parent,
32  Scheduler& scheduler,
33  int readThreads,
34  Section const& config,
35  beast::Journal journal)
36  : Stoppable(name, parent.getRoot())
37  , j_(journal)
38  , scheduler_(scheduler)
39  , earliestLedgerSeq_(
40  get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
41 {
42  if (earliestLedgerSeq_ < 1)
43  Throw<std::runtime_error>("Invalid earliest_seq");
44 
45  while (readThreads-- > 0)
47 }
48 
50 {
51  // NOTE!
52  // Any derived class should call the stopReadThreads() method in its
53  // destructor. Otherwise, occasionally, the derived class may
54  // crash during shutdown when its members are accessed by one of
55  // these threads after the derived class is destroyed but before
56  // this base class is destroyed.
58 }
59 
60 void
62 {
64  // Wake in two generations.
65  // Each generation is a full pass over the space.
66  // If we're in generation N and you issue a request,
67  // that request will only be done during generation N
68  // if it happens to land after where the pass currently is.
69  // But, if not, it will definitely be done during generation
70  // N+1 since the request was in the table before that pass
71  // even started. So when you reach generation N+2,
72  // you know the request is done.
73  std::uint64_t const wakeGen = readGen_ + 2;
74  while (!readShut_ && !read_.empty() && (readGen_ < wakeGen))
75  readGenCondVar_.wait(lock);
76 }
77 
78 void
80 {
81  // After stop time we can no longer use the JobQueue for background
82  // reads. Join the background read threads.
84 }
85 
86 void
88 {
89  stopped();
90 }
91 
92 void
94 {
95  {
97  if (readShut_) // Only stop threads once.
98  return;
99 
100  readShut_ = true;
103  }
104 
105  for (auto& e : readThreads_)
106  e.join();
107 }
108 
109 void
111 {
112  // Post a read
114  if (read_.emplace(hash, ledgerSeq).second)
116 }
117 
118 void
120 {
121  Batch batch;
123  auto storeBatch = [&]() {
124  try
125  {
126  dstBackend.storeBatch(batch);
127  }
128  catch (std::exception const& e)
129  {
130  JLOG(j_.error()) << "Exception caught in function " << __func__
131  << ". Error: " << e.what();
132  return;
133  }
134 
135  std::uint64_t sz{0};
136  for (auto const& nodeObject : batch)
137  sz += nodeObject->getData().size();
138  storeStats(batch.size(), sz);
139  batch.clear();
140  };
141 
142  srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
143  assert(nodeObject);
144  if (!nodeObject) // This should never happen
145  return;
146 
147  batch.emplace_back(std::move(nodeObject));
148  if (batch.size() >= batchWritePreallocationSize)
149  storeBatch();
150  });
151 
152  if (!batch.empty())
153  storeBatch();
154 }
155 
156 // Perform a fetch and report the time it took
159  uint256 const& hash,
160  std::uint32_t ledgerSeq,
161  FetchType fetchType)
162 {
163  FetchReport fetchReport(fetchType);
164 
165  using namespace std::chrono;
166  auto const begin{steady_clock::now()};
167 
168  auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport)};
169  if (nodeObject)
170  {
171  ++fetchHitCount_;
172  fetchSz_ += nodeObject->getData().size();
173  }
174  if (fetchReport.wentToDisk)
176 
177  fetchReport.elapsed =
178  duration_cast<milliseconds>(steady_clock::now() - begin);
179  scheduler_.onFetch(fetchReport);
180  return nodeObject;
181 }
182 
183 bool
185  Ledger const& srcLedger,
186  std::shared_ptr<Backend> dstBackend,
189 {
190  auto fail = [&](std::string const& msg) {
191  JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
192  << ". " << msg;
193  return false;
194  };
195 
196  if (!dstPCache || !dstNCache)
197  return fail("Invalid destination cache");
198  if (srcLedger.info().hash.isZero())
199  return fail("Invalid hash");
200  if (srcLedger.info().accountHash.isZero())
201  return fail("Invalid account hash");
202 
203  auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
204  if (&srcDB == this)
205  return fail("Source and destination databases are the same");
206 
207  Batch batch;
209  auto storeBatch = [&]() {
210  std::uint64_t sz{0};
211  for (auto const& nodeObject : batch)
212  {
213  dstPCache->canonicalize_replace_cache(
214  nodeObject->getHash(), nodeObject);
215  dstNCache->erase(nodeObject->getHash());
216  sz += nodeObject->getData().size();
217  }
218 
219  try
220  {
221  dstBackend->storeBatch(batch);
222  }
223  catch (std::exception const& e)
224  {
225  fail(
226  std::string("Exception caught in function ") + __func__ +
227  ". Error: " + e.what());
228  return false;
229  }
230 
231  storeStats(batch.size(), sz);
232  batch.clear();
233  return true;
234  };
235 
236  // Store ledger header
237  {
238  Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
240  addRaw(srcLedger.info(), s);
241  auto nObj = NodeObject::createObject(
242  hotLEDGER, std::move(s.modData()), srcLedger.info().hash);
243  batch.emplace_back(std::move(nObj));
244  }
245 
246  bool error = false;
247  auto visit = [&](SHAMapTreeNode& node) {
248  if (!isStopping())
249  {
250  if (auto nodeObject = srcDB.fetchNodeObject(
251  node.getHash().as_uint256(), srcLedger.info().seq))
252  {
253  batch.emplace_back(std::move(nodeObject));
254  if (batch.size() < batchWritePreallocationSize || storeBatch())
255  return true;
256  }
257  }
258 
259  error = true;
260  return false;
261  };
262 
263  // Store the state map
264  if (srcLedger.stateMap().getHash().isNonZero())
265  {
266  if (!srcLedger.stateMap().isValid())
267  return fail("Invalid state map");
268 
269  srcLedger.stateMap().snapShot(false)->visitNodes(visit);
270  if (error)
271  return fail("Failed to store state map");
272  }
273 
274  // Store the transaction map
275  if (srcLedger.info().txHash.isNonZero())
276  {
277  if (!srcLedger.txMap().isValid())
278  return fail("Invalid transaction map");
279 
280  srcLedger.txMap().snapShot(false)->visitNodes(visit);
281  if (error)
282  return fail("Failed to store transaction map");
283  }
284 
285  if (!batch.empty() && !storeBatch())
286  return fail("Failed to store");
287 
288  return true;
289 }
290 
291 // Entry point for async read threads
292 void
294 {
295  beast::setCurrentThreadName("prefetch");
296  while (true)
297  {
298  uint256 lastHash;
299  std::uint32_t lastSeq;
300  {
302  while (!readShut_ && read_.empty())
303  {
304  // All work is done
306  readCondVar_.wait(lock);
307  }
308  if (readShut_)
309  break;
310 
311  // Read in key order to make the back end more efficient
312  auto it = read_.lower_bound(readLastHash_);
313  if (it == read_.end())
314  {
315  it = read_.begin();
316  // A generation has completed
317  ++readGen_;
319  }
320  lastHash = it->first;
321  lastSeq = it->second;
322  read_.erase(it);
323  readLastHash_ = lastHash;
324  }
325 
326  // Perform the read
327  fetchNodeObject(lastHash, lastSeq, FetchType::async);
328  }
329 }
330 
331 } // namespace NodeStore
332 } // namespace ripple
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:43
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
ripple::NodeStore::Database::readLastHash_
uint256 readLastHash_
Definition: Database.h:294
ripple::SHAMap::isValid
bool isValid() const
Definition: SHAMap.h:539
ripple::NodeStore::Database
Persistency layer for NodeObject.
Definition: Database.h:53
std::string
STL class.
ripple::NodeStore::Database::readShut_
bool readShut_
Definition: Database.h:297
std::shared_ptr< NodeObject >
ripple::TaggedCache
Map/cache combination.
Definition: TaggedCache.h:52
ripple::SHAMap::getHash
SHAMapHash getHash() const
Definition: SHAMap.cpp:778
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:444
ripple::Stoppable::stopped
void stopped()
Called by derived classes to indicate that the stoppable has stopped.
Definition: Stoppable.cpp:72
ripple::SHAMap::family
Family const & family() const
Definition: SHAMap.h:139
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:176
std::vector::reserve
T reserve(T... args)
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:100
std::vector< std::shared_ptr< NodeObject > >
std::vector::size
T size(T... args)
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
ripple::NodeStore::Database::fetchTotalCount_
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:284
ripple::NodeStore::Database::fetchSz_
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:252
std::lock_guard
STL class.
ripple::NodeStore::FetchReport
Contains information about a fetch operation.
Definition: ripple/nodestore/Scheduler.h:32
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s)
Definition: View.cpp:43
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:92
ripple::SHAMapHash::isNonZero
bool isNonZero() const
Definition: SHAMapTreeNode.h:73
ripple::NodeStore::Database::read_
std::map< uint256, std::uint32_t > read_
Definition: Database.h:291
ripple::SHAMap::snapShot
std::shared_ptr< SHAMap > snapShot(bool isMutable) const
Definition: SHAMap.cpp:70
ripple::NodeStore::Database::fetchHitCount_
std::atomic< std::uint32_t > fetchHitCount_
Definition: Database.h:251
ripple::Family::db
virtual NodeStore::Database & db()=0
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:101
ripple::NodeStore::Database::readLock_
std::mutex readLock_
Definition: Database.h:286
ripple::NodeStore::Database::onChildrenStopped
void onChildrenStopped() override
Override called when all children have stopped.
Definition: Database.cpp:87
ripple::base_uint< 256 >
ripple::Ledger::info
LedgerInfo const & info() const override
Returns information about the ledger.
Definition: Ledger.h:149
ripple::Stoppable
Provides an interface for starting and stopping.
Definition: Stoppable.h:201
ripple::NodeStore::Database::importInternal
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:119
ripple::base_uint::isZero
bool isZero() const
Definition: base_uint.h:439
ripple::Ledger
Holds a ledger.
Definition: Ledger.h:77
ripple::Ledger::stateMap
SHAMap const & stateMap() const
Definition: Ledger.h:283
std::unique_lock
STL class.
ripple::NodeStore::Database::readCondVar_
std::condition_variable readCondVar_
Definition: Database.h:287
ripple::SHAMapTreeNode
Definition: SHAMapTreeNode.h:133
ripple::NodeStore::Database::waitReads
void waitReads()
Wait for all currently pending async reads to complete.
Definition: Database.cpp:61
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::NodeStore::batchWritePreallocationSize
@ batchWritePreallocationSize
Definition: nodestore/Types.h:34
ripple::NodeStore::Scheduler::onFetch
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint64_t
std::condition_variable::wait
T wait(T... args)
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:61
ripple::NodeStore::Database::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous)
Fetch a node object.
Definition: Database.cpp:158
ripple::NodeStore::Database::~Database
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:49
ripple::NodeStore::Database::for_each
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
ripple::NodeStore::Database::storeStats
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:258
ripple::NodeStore::FetchType
FetchType
Definition: ripple/nodestore/Scheduler.h:29
std::condition_variable::notify_one
T notify_one(T... args)
ripple::NodeStore::Database::threadEntry
void threadEntry()
Definition: Database.cpp:293
ripple::Serializer
Definition: Serializer.h:39
ripple::NodeStore::Database::stopReadThreads
void stopReadThreads()
Definition: Database.cpp:93
ripple::Ledger::txMap
SHAMap const & txMap() const
Definition: Ledger.h:295
ripple::NodeStore::FetchType::async
@ async
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::NodeStore::Database::onStop
void onStop() override
Override called when the stop notification is issued.
Definition: Database.cpp:79
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
ripple::NodeStore::Database::readGenCondVar_
std::condition_variable readGenCondVar_
Definition: Database.h:288
ripple::NodeStore::Database::asyncFetch
virtual bool asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::shared_ptr< NodeObject > &nodeObject)=0
Fetch an object without waiting.
ripple::NodeStore::Database::j_
const beast::Journal j_
Definition: Database.h:247
ripple::NodeStore::Database::readThreads_
std::vector< std::thread > readThreads_
Definition: Database.h:296
std
STL namespace.
ripple::XRP_LEDGER_EARLIEST_SEQ
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
Definition: SystemParameters.h:61
ripple::NodeStore::Database::earliestLedgerSeq_
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:304
std::vector::empty
T empty(T... args)
ripple::NodeStore::Database::readGen_
uint64_t readGen_
Definition: Database.h:300
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:84
ripple::NodeStore::Database::scheduler_
Scheduler & scheduler_
Definition: Database.h:248
ripple::KeyCache
Maintains a cache of keys with no associated data.
Definition: KeyCache.h:43
ripple::NodeStore::Backend::storeBatch
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
ripple::NodeStore::FetchReport::wentToDisk
bool wentToDisk
Definition: ripple/nodestore/Scheduler.h:40
ripple::NodeStore::FetchReport::elapsed
std::chrono::milliseconds elapsed
Definition: ripple/nodestore/Scheduler.h:38
std::condition_variable::notify_all
T notify_all(T... args)
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:102
std::exception::what
T what(T... args)
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:116
ripple::NodeStore::Database::Database
Database()=delete
ripple::Stoppable::isStopping
bool isStopping() const
Returns true if the stoppable should stop.
Definition: Stoppable.cpp:54
std::chrono
ripple::NodeStore::Backend
A backend used for the NodeStore.
Definition: Backend.h:37