rippled
RocksDBFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/unity/rocksdb.h>
21 
22 #if RIPPLE_ROCKSDB_AVAILABLE
23 
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/contract.h>
26 #include <ripple/beast/core/CurrentThreadName.h>
27 #include <ripple/core/Config.h> // VFALCO Bad dependency
28 #include <ripple/nodestore/Factory.h>
29 #include <ripple/nodestore/Manager.h>
30 #include <ripple/nodestore/impl/BatchWriter.h>
31 #include <ripple/nodestore/impl/DecodedBlob.h>
32 #include <ripple/nodestore/impl/EncodedBlob.h>
33 #include <atomic>
34 #include <memory>
35 
36 namespace ripple {
37 namespace NodeStore {
38 
39 class RocksDBEnv : public rocksdb::EnvWrapper
40 {
41 public:
42  RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
43  {
44  }
45 
46  struct ThreadParams
47  {
48  ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
49  {
50  }
51 
52  void (*f)(void*);
53  void* a;
54  };
55 
56  static void
57  thread_entry(void* ptr)
58  {
59  ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
60  void (*f)(void*) = p->f;
61  void* a(p->a);
62  delete p;
63 
64  static std::atomic<std::size_t> n;
65  std::size_t const id(++n);
67  ss << "rocksdb #" << id;
69 
70  (*f)(a);
71  }
72 
73  void
74  StartThread(void (*f)(void*), void* a) override
75  {
76  ThreadParams* const p(new ThreadParams(f, a));
77  EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
78  }
79 };
80 
81 //------------------------------------------------------------------------------
82 
83 class RocksDBBackend : public Backend, public BatchWriter::Callback
84 {
85 private:
86  std::atomic<bool> m_deletePath;
87 
88 public:
89  beast::Journal m_journal;
90  size_t const m_keyBytes;
91  Scheduler& m_scheduler;
92  BatchWriter m_batch;
93  std::string m_name;
95  int fdRequired_ = 2048;
96  rocksdb::Options m_options;
97 
98  RocksDBBackend(
99  int keyBytes,
100  Section const& keyValues,
101  Scheduler& scheduler,
102  beast::Journal journal,
103  RocksDBEnv* env)
104  : m_deletePath(false)
105  , m_journal(journal)
106  , m_keyBytes(keyBytes)
107  , m_scheduler(scheduler)
108  , m_batch(*this, scheduler)
109  {
110  if (!get_if_exists(keyValues, "path", m_name))
111  Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112 
113  rocksdb::BlockBasedTableOptions table_options;
114  m_options.env = env;
115 
116  if (keyValues.exists("cache_mb"))
117  table_options.block_cache = rocksdb::NewLRUCache(
118  get<int>(keyValues, "cache_mb") * megabytes(1));
119 
120  if (auto const v = get<int>(keyValues, "filter_bits"))
121  {
122  bool const filter_blocks = !keyValues.exists("filter_full") ||
123  (get<int>(keyValues, "filter_full") == 0);
124  table_options.filter_policy.reset(
125  rocksdb::NewBloomFilterPolicy(v, filter_blocks));
126  }
127 
128  if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
129  fdRequired_ = m_options.max_open_files;
130 
131  if (keyValues.exists("file_size_mb"))
132  {
133  m_options.target_file_size_base =
134  megabytes(1) * get<int>(keyValues, "file_size_mb");
135  m_options.max_bytes_for_level_base =
136  5 * m_options.target_file_size_base;
137  m_options.write_buffer_size = 2 * m_options.target_file_size_base;
138  }
139 
141  keyValues, "file_size_mult", m_options.target_file_size_multiplier);
142 
143  if (keyValues.exists("bg_threads"))
144  {
145  m_options.env->SetBackgroundThreads(
146  get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
147  }
148 
149  if (keyValues.exists("high_threads"))
150  {
151  auto const highThreads = get<int>(keyValues, "high_threads");
152  m_options.env->SetBackgroundThreads(
153  highThreads, rocksdb::Env::HIGH);
154 
155  // If we have high-priority threads, presumably we want to
156  // use them for background flushes
157  if (highThreads > 0)
158  m_options.max_background_flushes = highThreads;
159  }
160 
161  m_options.compression = rocksdb::kSnappyCompression;
162 
163  get_if_exists(keyValues, "block_size", table_options.block_size);
164 
165  if (keyValues.exists("universal_compaction") &&
166  (get<int>(keyValues, "universal_compaction") != 0))
167  {
168  m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
169  m_options.min_write_buffer_number_to_merge = 2;
170  m_options.max_write_buffer_number = 6;
171  m_options.write_buffer_size = 6 * m_options.target_file_size_base;
172  }
173 
174  if (keyValues.exists("bbt_options"))
175  {
176  auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
177  table_options,
178  get<std::string>(keyValues, "bbt_options"),
179  &table_options);
180  if (!s.ok())
181  Throw<std::runtime_error>(
182  std::string("Unable to set RocksDB bbt_options: ") +
183  s.ToString());
184  }
185 
186  m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
187 
188  if (keyValues.exists("options"))
189  {
190  auto const s = rocksdb::GetOptionsFromString(
191  m_options, get<std::string>(keyValues, "options"), &m_options);
192  if (!s.ok())
193  Throw<std::runtime_error>(
194  std::string("Unable to set RocksDB options: ") +
195  s.ToString());
196  }
197 
198  std::string s1, s2;
199  rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
200  rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
201  JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
202  JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
203  }
204 
205  ~RocksDBBackend() override
206  {
207  close();
208  }
209 
210  void
211  open(bool createIfMissing) override
212  {
213  if (m_db)
214  {
215  assert(false);
216  JLOG(m_journal.error()) << "database is already open";
217  return;
218  }
219  rocksdb::DB* db = nullptr;
220  m_options.create_if_missing = createIfMissing;
221  rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
222  if (!status.ok() || !db)
223  Throw<std::runtime_error>(
224  std::string("Unable to open/create RocksDB: ") +
225  status.ToString());
226  m_db.reset(db);
227  }
228 
229  bool
230  isOpen() override
231  {
232  return static_cast<bool>(m_db);
233  }
234 
235  void
236  close() override
237  {
238  if (m_db)
239  {
240  m_db.reset();
241  if (m_deletePath)
242  {
243  boost::filesystem::path dir = m_name;
244  boost::filesystem::remove_all(dir);
245  }
246  }
247  }
248 
250  getName() override
251  {
252  return m_name;
253  }
254 
255  //--------------------------------------------------------------------------
256 
257  Status
258  fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
259  {
260  assert(m_db);
261  pObject->reset();
262 
263  Status status(ok);
264 
265  rocksdb::ReadOptions const options;
266  rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
267 
268  std::string string;
269 
270  rocksdb::Status getStatus = m_db->Get(options, slice, &string);
271 
272  if (getStatus.ok())
273  {
274  DecodedBlob decoded(key, string.data(), string.size());
275 
276  if (decoded.wasOk())
277  {
278  *pObject = decoded.createObject();
279  }
280  else
281  {
282  // Decoding failed, probably corrupted!
283  //
285  }
286  }
287  else
288  {
289  if (getStatus.IsCorruption())
290  {
292  }
293  else if (getStatus.IsNotFound())
294  {
295  status = notFound;
296  }
297  else
298  {
299  status = Status(customCode + getStatus.code());
300 
301  JLOG(m_journal.error()) << getStatus.ToString();
302  }
303  }
304 
305  return status;
306  }
307 
308  bool
309  canFetchBatch() override
310  {
311  return false;
312  }
313 
315  fetchBatch(std::vector<uint256 const*> const& hashes) override
316  {
318  results.reserve(hashes.size());
319  for (auto const& h : hashes)
320  {
322  Status status = fetch(h->begin(), &nObj);
323  if (status != ok)
324  results.push_back({});
325  else
326  results.push_back(nObj);
327  }
328 
329  return {results, ok};
330  }
331 
332  void
333  store(std::shared_ptr<NodeObject> const& object) override
334  {
335  m_batch.store(object);
336  }
337 
338  void
339  storeBatch(Batch const& batch) override
340  {
341  assert(m_db);
342  rocksdb::WriteBatch wb;
343 
344  EncodedBlob encoded;
345 
346  for (auto const& e : batch)
347  {
348  encoded.prepare(e);
349 
350  wb.Put(
351  rocksdb::Slice(
352  reinterpret_cast<char const*>(encoded.getKey()),
353  m_keyBytes),
354  rocksdb::Slice(
355  reinterpret_cast<char const*>(encoded.getData()),
356  encoded.getSize()));
357  }
358 
359  rocksdb::WriteOptions const options;
360 
361  auto ret = m_db->Write(options, &wb);
362 
363  if (!ret.ok())
364  Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
365  }
366 
367  void
368  sync() override
369  {
370  }
371 
372  void
374  {
375  assert(m_db);
376  rocksdb::ReadOptions const options;
377 
378  std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
379 
380  for (it->SeekToFirst(); it->Valid(); it->Next())
381  {
382  if (it->key().size() == m_keyBytes)
383  {
384  DecodedBlob decoded(
385  it->key().data(), it->value().data(), it->value().size());
386 
387  if (decoded.wasOk())
388  {
389  f(decoded.createObject());
390  }
391  else
392  {
393  // Uh oh, corrupted data!
394  JLOG(m_journal.fatal())
395  << "Corrupt NodeObject #" << it->key().ToString(true);
396  }
397  }
398  else
399  {
400  // VFALCO NOTE What does it mean to find an
401  // incorrectly sized key? Corruption?
402  JLOG(m_journal.fatal())
403  << "Bad key size = " << it->key().size();
404  }
405  }
406  }
407 
408  int
409  getWriteLoad() override
410  {
411  return m_batch.getWriteLoad();
412  }
413 
414  void
415  setDeletePath() override
416  {
417  m_deletePath = true;
418  }
419 
420  //--------------------------------------------------------------------------
421 
422  void
423  writeBatch(Batch const& batch) override
424  {
425  storeBatch(batch);
426  }
427 
428  void
429  verify() override
430  {
431  }
432 
434  int
435  fdRequired() const override
436  {
437  return fdRequired_;
438  }
439 };
440 
441 //------------------------------------------------------------------------------
442 
443 class RocksDBFactory : public Factory
444 {
445 public:
446  RocksDBEnv m_env;
447 
448  RocksDBFactory()
449  {
450  Manager::instance().insert(*this);
451  }
452 
453  ~RocksDBFactory() override
454  {
455  Manager::instance().erase(*this);
456  }
457 
459  getName() const override
460  {
461  return "RocksDB";
462  }
463 
465  createInstance(
466  size_t keyBytes,
467  Section const& keyValues,
468  std::size_t,
469  Scheduler& scheduler,
470  beast::Journal journal) override
471  {
472  return std::make_unique<RocksDBBackend>(
473  keyBytes, keyValues, scheduler, journal, &m_env);
474  }
475 };
476 
477 static RocksDBFactory rocksDBFactory;
478 
479 } // namespace NodeStore
480 } // namespace ripple
481 
482 #endif
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
std::for_each
T for_each(T... args)
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::pair
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
std::size
T size(T... args)
std::stringstream
STL class.
std::function
std::unique_ptr::reset
T reset(T... args)
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:347
std::vector::push_back
T push_back(T... args)
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
ripple::verify
bool verify(PublicKey const &publicKey, Slice const &m, Slice const &sig, bool mustBeFullyCanonical)
Verify a signature on a message.
Definition: PublicKey.cpp:268
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:55
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
ripple::NodeStore::customCode
@ customCode
Definition: nodestore/Types.h:51
ripple::megabytes
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:34
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:44
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:128
std::unique_ptr
STL class.
std::data
T data(T... args)
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:100