rippled
RocksDBFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/unity/rocksdb.h>
21 
22 #if RIPPLE_ROCKSDB_AVAILABLE
23 
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/contract.h>
26 #include <ripple/basics/safe_cast.h>
27 #include <ripple/beast/core/CurrentThreadName.h>
28 #include <ripple/core/Config.h> // VFALCO Bad dependency
29 #include <ripple/nodestore/Factory.h>
30 #include <ripple/nodestore/Manager.h>
31 #include <ripple/nodestore/impl/BatchWriter.h>
32 #include <ripple/nodestore/impl/DecodedBlob.h>
33 #include <ripple/nodestore/impl/EncodedBlob.h>
34 
35 #include <atomic>
36 #include <memory>
37 
38 namespace ripple {
39 namespace NodeStore {
40 
41 class RocksDBEnv : public rocksdb::EnvWrapper
42 {
43 public:
44  RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
45  {
46  }
47 
48  struct ThreadParams
49  {
50  ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
51  {
52  }
53 
54  void (*f)(void*);
55  void* a;
56  };
57 
58  static void
59  thread_entry(void* ptr)
60  {
61  ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
62  void (*f)(void*) = p->f;
63  void* a(p->a);
64  delete p;
65 
66  static std::atomic<std::size_t> n;
67  std::size_t const id(++n);
69  ss << "rocksdb #" << id;
71 
72  (*f)(a);
73  }
74 
75  void
76  StartThread(void (*f)(void*), void* a) override
77  {
78  ThreadParams* const p(new ThreadParams(f, a));
79  EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
80  }
81 };
82 
83 //------------------------------------------------------------------------------
84 
85 class RocksDBBackend : public Backend, public BatchWriter::Callback
86 {
87 private:
88  std::atomic<bool> m_deletePath;
89 
90 public:
91  beast::Journal m_journal;
92  size_t const m_keyBytes;
93  BatchWriter m_batch;
94  std::string m_name;
96  int fdRequired_ = 2048;
97  rocksdb::Options m_options;
98 
99  RocksDBBackend(
100  int keyBytes,
101  Section const& keyValues,
102  Scheduler& scheduler,
103  beast::Journal journal,
104  RocksDBEnv* env)
105  : m_deletePath(false)
106  , m_journal(journal)
107  , m_keyBytes(keyBytes)
108  , m_batch(*this, scheduler)
109  {
110  if (!get_if_exists(keyValues, "path", m_name))
111  Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112 
113  rocksdb::BlockBasedTableOptions table_options;
114  m_options.env = env;
115 
116  bool hard_set =
117  keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
118 
119  if (keyValues.exists("cache_mb"))
120  {
121  auto size = get<int>(keyValues, "cache_mb");
122 
123  if (!hard_set && size == 256)
124  size = 1024;
125 
126  table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
127  }
128 
129  if (auto const v = get<int>(keyValues, "filter_bits"))
130  {
131  bool const filter_blocks = !keyValues.exists("filter_full") ||
132  (get<int>(keyValues, "filter_full") == 0);
133  table_options.filter_policy.reset(
134  rocksdb::NewBloomFilterPolicy(v, filter_blocks));
135  }
136 
137  if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
138  {
139  if (!hard_set && m_options.max_open_files == 2000)
140  m_options.max_open_files = 8000;
141 
142  fdRequired_ = m_options.max_open_files + 128;
143  }
144 
145  if (keyValues.exists("file_size_mb"))
146  {
147  auto file_size_mb = get<int>(keyValues, "file_size_mb");
148 
149  if (!hard_set && file_size_mb == 8)
150  file_size_mb = 256;
151 
152  m_options.target_file_size_base = megabytes(file_size_mb);
153  m_options.max_bytes_for_level_base =
154  5 * m_options.target_file_size_base;
155  m_options.write_buffer_size = 2 * m_options.target_file_size_base;
156  }
157 
159  keyValues, "file_size_mult", m_options.target_file_size_multiplier);
160 
161  if (keyValues.exists("bg_threads"))
162  {
163  m_options.env->SetBackgroundThreads(
164  get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
165  }
166 
167  if (keyValues.exists("high_threads"))
168  {
169  auto const highThreads = get<int>(keyValues, "high_threads");
170  m_options.env->SetBackgroundThreads(
171  highThreads, rocksdb::Env::HIGH);
172 
173  // If we have high-priority threads, presumably we want to
174  // use them for background flushes
175  if (highThreads > 0)
176  m_options.max_background_flushes = highThreads;
177  }
178 
179  m_options.compression = rocksdb::kSnappyCompression;
180 
181  get_if_exists(keyValues, "block_size", table_options.block_size);
182 
183  if (keyValues.exists("universal_compaction") &&
184  (get<int>(keyValues, "universal_compaction") != 0))
185  {
186  m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
187  m_options.min_write_buffer_number_to_merge = 2;
188  m_options.max_write_buffer_number = 6;
189  m_options.write_buffer_size = 6 * m_options.target_file_size_base;
190  }
191 
192  if (keyValues.exists("bbt_options"))
193  {
194  auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
195  table_options, get(keyValues, "bbt_options"), &table_options);
196  if (!s.ok())
197  Throw<std::runtime_error>(
198  std::string("Unable to set RocksDB bbt_options: ") +
199  s.ToString());
200  }
201 
202  m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
203 
204  if (keyValues.exists("options"))
205  {
206  auto const s = rocksdb::GetOptionsFromString(
207  m_options, get(keyValues, "options"), &m_options);
208  if (!s.ok())
209  Throw<std::runtime_error>(
210  std::string("Unable to set RocksDB options: ") +
211  s.ToString());
212  }
213 
214  std::string s1, s2;
215  rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
216  rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
217  JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
218  JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
219  }
220 
221  ~RocksDBBackend() override
222  {
223  close();
224  }
225 
226  void
227  open(bool createIfMissing) override
228  {
229  if (m_db)
230  {
231  assert(false);
232  JLOG(m_journal.error()) << "database is already open";
233  return;
234  }
235  rocksdb::DB* db = nullptr;
236  m_options.create_if_missing = createIfMissing;
237  rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
238  if (!status.ok() || !db)
239  Throw<std::runtime_error>(
240  std::string("Unable to open/create RocksDB: ") +
241  status.ToString());
242  m_db.reset(db);
243  }
244 
245  bool
246  isOpen() override
247  {
248  return static_cast<bool>(m_db);
249  }
250 
251  void
252  close() override
253  {
254  if (m_db)
255  {
256  m_db.reset();
257  if (m_deletePath)
258  {
259  boost::filesystem::path dir = m_name;
260  boost::filesystem::remove_all(dir);
261  }
262  }
263  }
264 
266  getName() override
267  {
268  return m_name;
269  }
270 
271  //--------------------------------------------------------------------------
272 
273  Status
274  fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
275  {
276  assert(m_db);
277  pObject->reset();
278 
279  Status status(ok);
280 
281  rocksdb::ReadOptions const options;
282  rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
283 
284  std::string string;
285 
286  rocksdb::Status getStatus = m_db->Get(options, slice, &string);
287 
288  if (getStatus.ok())
289  {
290  DecodedBlob decoded(key, string.data(), string.size());
291 
292  if (decoded.wasOk())
293  {
294  *pObject = decoded.createObject();
295  }
296  else
297  {
298  // Decoding failed, probably corrupted!
299  //
301  }
302  }
303  else
304  {
305  if (getStatus.IsCorruption())
306  {
308  }
309  else if (getStatus.IsNotFound())
310  {
311  status = notFound;
312  }
313  else
314  {
315  status =
316  Status(customCode + unsafe_cast<int>(getStatus.code()));
317 
318  JLOG(m_journal.error()) << getStatus.ToString();
319  }
320  }
321 
322  return status;
323  }
324 
326  fetchBatch(std::vector<uint256 const*> const& hashes) override
327  {
329  results.reserve(hashes.size());
330  for (auto const& h : hashes)
331  {
333  Status status = fetch(h->begin(), &nObj);
334  if (status != ok)
335  results.push_back({});
336  else
337  results.push_back(nObj);
338  }
339 
340  return {results, ok};
341  }
342 
343  void
344  store(std::shared_ptr<NodeObject> const& object) override
345  {
346  m_batch.store(object);
347  }
348 
349  void
350  storeBatch(Batch const& batch) override
351  {
352  assert(m_db);
353  rocksdb::WriteBatch wb;
354 
355  EncodedBlob encoded;
356 
357  for (auto const& e : batch)
358  {
359  encoded.prepare(e);
360 
361  wb.Put(
362  rocksdb::Slice(
363  reinterpret_cast<char const*>(encoded.getKey()),
364  m_keyBytes),
365  rocksdb::Slice(
366  reinterpret_cast<char const*>(encoded.getData()),
367  encoded.getSize()));
368  }
369 
370  rocksdb::WriteOptions const options;
371 
372  auto ret = m_db->Write(options, &wb);
373 
374  if (!ret.ok())
375  Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
376  }
377 
378  void
379  sync() override
380  {
381  }
382 
383  void
385  {
386  assert(m_db);
387  rocksdb::ReadOptions const options;
388 
389  std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
390 
391  for (it->SeekToFirst(); it->Valid(); it->Next())
392  {
393  if (it->key().size() == m_keyBytes)
394  {
395  DecodedBlob decoded(
396  it->key().data(), it->value().data(), it->value().size());
397 
398  if (decoded.wasOk())
399  {
400  f(decoded.createObject());
401  }
402  else
403  {
404  // Uh oh, corrupted data!
405  JLOG(m_journal.fatal())
406  << "Corrupt NodeObject #" << it->key().ToString(true);
407  }
408  }
409  else
410  {
411  // VFALCO NOTE What does it mean to find an
412  // incorrectly sized key? Corruption?
413  JLOG(m_journal.fatal())
414  << "Bad key size = " << it->key().size();
415  }
416  }
417  }
418 
419  int
420  getWriteLoad() override
421  {
422  return m_batch.getWriteLoad();
423  }
424 
425  void
426  setDeletePath() override
427  {
428  m_deletePath = true;
429  }
430 
431  //--------------------------------------------------------------------------
432 
433  void
434  writeBatch(Batch const& batch) override
435  {
436  storeBatch(batch);
437  }
438 
440  int
441  fdRequired() const override
442  {
443  return fdRequired_;
444  }
445 };
446 
447 //------------------------------------------------------------------------------
448 
449 class RocksDBFactory : public Factory
450 {
451 public:
452  RocksDBEnv m_env;
453 
454  RocksDBFactory()
455  {
456  Manager::instance().insert(*this);
457  }
458 
459  ~RocksDBFactory() override
460  {
461  Manager::instance().erase(*this);
462  }
463 
465  getName() const override
466  {
467  return "RocksDB";
468  }
469 
471  createInstance(
472  size_t keyBytes,
473  Section const& keyValues,
474  std::size_t,
475  Scheduler& scheduler,
476  beast::Journal journal) override
477  {
478  return std::make_unique<RocksDBBackend>(
479  keyBytes, keyValues, scheduler, journal, &m_env);
480  }
481 };
482 
483 static RocksDBFactory rocksDBFactory;
484 
485 } // namespace NodeStore
486 } // namespace ripple
487 
488 #endif
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
std::for_each
T for_each(T... args)
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::pair
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
std::size
T size(T... args)
std::stringstream
STL class.
std::function
std::unique_ptr::reset
T reset(T... args)
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:384
std::vector::push_back
T push_back(T... args)
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:55
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
ripple::NodeStore::customCode
@ customCode
Definition: nodestore/Types.h:51
ripple::megabytes
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:34
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:44
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:120
std::unique_ptr
STL class.
std::data
T data(T... args)
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:98