rippled
RocksDBFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/unity/rocksdb.h>
21 
22 #if RIPPLE_ROCKSDB_AVAILABLE
23 
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/contract.h>
26 #include <ripple/beast/core/CurrentThreadName.h>
27 #include <ripple/core/Config.h> // VFALCO Bad dependency
28 #include <ripple/nodestore/Factory.h>
29 #include <ripple/nodestore/Manager.h>
30 #include <ripple/nodestore/impl/BatchWriter.h>
31 #include <ripple/nodestore/impl/DecodedBlob.h>
32 #include <ripple/nodestore/impl/EncodedBlob.h>
33 #include <atomic>
34 #include <memory>
35 
36 namespace ripple {
37 namespace NodeStore {
38 
39 class RocksDBEnv : public rocksdb::EnvWrapper
40 {
41 public:
42  RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
43  {
44  }
45 
46  struct ThreadParams
47  {
48  ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
49  {
50  }
51 
52  void (*f)(void*);
53  void* a;
54  };
55 
56  static void
57  thread_entry(void* ptr)
58  {
59  ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
60  void (*f)(void*) = p->f;
61  void* a(p->a);
62  delete p;
63 
64  static std::atomic<std::size_t> n;
65  std::size_t const id(++n);
67  ss << "rocksdb #" << id;
69 
70  (*f)(a);
71  }
72 
73  void
74  StartThread(void (*f)(void*), void* a) override
75  {
76  ThreadParams* const p(new ThreadParams(f, a));
77  EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
78  }
79 };
80 
81 //------------------------------------------------------------------------------
82 
83 class RocksDBBackend : public Backend, public BatchWriter::Callback
84 {
85 private:
86  std::atomic<bool> m_deletePath;
87 
88 public:
89  beast::Journal m_journal;
90  size_t const m_keyBytes;
91  Scheduler& m_scheduler;
92  BatchWriter m_batch;
93  std::string m_name;
95  int fdRequired_ = 2048;
96  rocksdb::Options m_options;
97 
98  RocksDBBackend(
99  int keyBytes,
100  Section const& keyValues,
101  Scheduler& scheduler,
102  beast::Journal journal,
103  RocksDBEnv* env)
104  : m_deletePath(false)
105  , m_journal(journal)
106  , m_keyBytes(keyBytes)
107  , m_scheduler(scheduler)
108  , m_batch(*this, scheduler)
109  {
110  if (!get_if_exists(keyValues, "path", m_name))
111  Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112 
113  rocksdb::BlockBasedTableOptions table_options;
114  m_options.env = env;
115 
116  if (keyValues.exists("cache_mb"))
117  table_options.block_cache = rocksdb::NewLRUCache(
118  get<int>(keyValues, "cache_mb") * megabytes(1));
119 
120  if (auto const v = get<int>(keyValues, "filter_bits"))
121  {
122  bool const filter_blocks = !keyValues.exists("filter_full") ||
123  (get<int>(keyValues, "filter_full") == 0);
124  table_options.filter_policy.reset(
125  rocksdb::NewBloomFilterPolicy(v, filter_blocks));
126  }
127 
128  if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
129  fdRequired_ = m_options.max_open_files;
130 
131  if (keyValues.exists("file_size_mb"))
132  {
133  m_options.target_file_size_base =
134  megabytes(1) * get<int>(keyValues, "file_size_mb");
135  m_options.max_bytes_for_level_base =
136  5 * m_options.target_file_size_base;
137  m_options.write_buffer_size = 2 * m_options.target_file_size_base;
138  }
139 
141  keyValues, "file_size_mult", m_options.target_file_size_multiplier);
142 
143  if (keyValues.exists("bg_threads"))
144  {
145  m_options.env->SetBackgroundThreads(
146  get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
147  }
148 
149  if (keyValues.exists("high_threads"))
150  {
151  auto const highThreads = get<int>(keyValues, "high_threads");
152  m_options.env->SetBackgroundThreads(
153  highThreads, rocksdb::Env::HIGH);
154 
155  // If we have high-priority threads, presumably we want to
156  // use them for background flushes
157  if (highThreads > 0)
158  m_options.max_background_flushes = highThreads;
159  }
160 
161  m_options.compression = rocksdb::kSnappyCompression;
162 
163  get_if_exists(keyValues, "block_size", table_options.block_size);
164 
165  if (keyValues.exists("universal_compaction") &&
166  (get<int>(keyValues, "universal_compaction") != 0))
167  {
168  m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
169  m_options.min_write_buffer_number_to_merge = 2;
170  m_options.max_write_buffer_number = 6;
171  m_options.write_buffer_size = 6 * m_options.target_file_size_base;
172  }
173 
174  if (keyValues.exists("bbt_options"))
175  {
176  auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
177  table_options,
178  get<std::string>(keyValues, "bbt_options"),
179  &table_options);
180  if (!s.ok())
181  Throw<std::runtime_error>(
182  std::string("Unable to set RocksDB bbt_options: ") +
183  s.ToString());
184  }
185 
186  m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
187 
188  if (keyValues.exists("options"))
189  {
190  auto const s = rocksdb::GetOptionsFromString(
191  m_options, get<std::string>(keyValues, "options"), &m_options);
192  if (!s.ok())
193  Throw<std::runtime_error>(
194  std::string("Unable to set RocksDB options: ") +
195  s.ToString());
196  }
197 
198  std::string s1, s2;
199  rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
200  rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
201  JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
202  JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
203  }
204 
205  ~RocksDBBackend() override
206  {
207  close();
208  }
209 
210  void
211  open(bool createIfMissing) override
212  {
213  if (m_db)
214  {
215  assert(false);
216  JLOG(m_journal.error()) << "database is already open";
217  return;
218  }
219  rocksdb::DB* db = nullptr;
220  m_options.create_if_missing = createIfMissing;
221  rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
222  if (!status.ok() || !db)
223  Throw<std::runtime_error>(
224  std::string("Unable to open/create RocksDB: ") +
225  status.ToString());
226  m_db.reset(db);
227  }
228 
229  void
230  close() override
231  {
232  if (m_db)
233  {
234  m_db.reset();
235  if (m_deletePath)
236  {
237  boost::filesystem::path dir = m_name;
238  boost::filesystem::remove_all(dir);
239  }
240  }
241  }
242 
244  getName() override
245  {
246  return m_name;
247  }
248 
249  //--------------------------------------------------------------------------
250 
251  Status
252  fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
253  {
254  assert(m_db);
255  pObject->reset();
256 
257  Status status(ok);
258 
259  rocksdb::ReadOptions const options;
260  rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
261 
262  std::string string;
263 
264  rocksdb::Status getStatus = m_db->Get(options, slice, &string);
265 
266  if (getStatus.ok())
267  {
268  DecodedBlob decoded(key, string.data(), string.size());
269 
270  if (decoded.wasOk())
271  {
272  *pObject = decoded.createObject();
273  }
274  else
275  {
276  // Decoding failed, probably corrupted!
277  //
279  }
280  }
281  else
282  {
283  if (getStatus.IsCorruption())
284  {
286  }
287  else if (getStatus.IsNotFound())
288  {
289  status = notFound;
290  }
291  else
292  {
293  status = Status(customCode + getStatus.code());
294 
295  JLOG(m_journal.error()) << getStatus.ToString();
296  }
297  }
298 
299  return status;
300  }
301 
302  bool
303  canFetchBatch() override
304  {
305  return false;
306  }
307 
309  fetchBatch(std::size_t n, void const* const* keys) override
310  {
311  Throw<std::runtime_error>("pure virtual called");
312  return {};
313  }
314 
315  void
316  store(std::shared_ptr<NodeObject> const& object) override
317  {
318  m_batch.store(object);
319  }
320 
321  void
322  storeBatch(Batch const& batch) override
323  {
324  assert(m_db);
325  rocksdb::WriteBatch wb;
326 
327  EncodedBlob encoded;
328 
329  for (auto const& e : batch)
330  {
331  encoded.prepare(e);
332 
333  wb.Put(
334  rocksdb::Slice(
335  reinterpret_cast<char const*>(encoded.getKey()),
336  m_keyBytes),
337  rocksdb::Slice(
338  reinterpret_cast<char const*>(encoded.getData()),
339  encoded.getSize()));
340  }
341 
342  rocksdb::WriteOptions const options;
343 
344  auto ret = m_db->Write(options, &wb);
345 
346  if (!ret.ok())
347  Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
348  }
349 
350  void
352  {
353  assert(m_db);
354  rocksdb::ReadOptions const options;
355 
356  std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
357 
358  for (it->SeekToFirst(); it->Valid(); it->Next())
359  {
360  if (it->key().size() == m_keyBytes)
361  {
362  DecodedBlob decoded(
363  it->key().data(), it->value().data(), it->value().size());
364 
365  if (decoded.wasOk())
366  {
367  f(decoded.createObject());
368  }
369  else
370  {
371  // Uh oh, corrupted data!
372  JLOG(m_journal.fatal())
373  << "Corrupt NodeObject #"
374  << from_hex_text<uint256>(it->key().data());
375  }
376  }
377  else
378  {
379  // VFALCO NOTE What does it mean to find an
380  // incorrectly sized key? Corruption?
381  JLOG(m_journal.fatal())
382  << "Bad key size = " << it->key().size();
383  }
384  }
385  }
386 
387  int
388  getWriteLoad() override
389  {
390  return m_batch.getWriteLoad();
391  }
392 
393  void
394  setDeletePath() override
395  {
396  m_deletePath = true;
397  }
398 
399  //--------------------------------------------------------------------------
400 
401  void
402  writeBatch(Batch const& batch) override
403  {
404  storeBatch(batch);
405  }
406 
407  void
408  verify() override
409  {
410  }
411 
413  int
414  fdRequired() const override
415  {
416  return fdRequired_;
417  }
418 };
419 
420 //------------------------------------------------------------------------------
421 
422 class RocksDBFactory : public Factory
423 {
424 public:
425  RocksDBEnv m_env;
426 
427  RocksDBFactory()
428  {
429  Manager::instance().insert(*this);
430  }
431 
432  ~RocksDBFactory() override
433  {
434  Manager::instance().erase(*this);
435  }
436 
438  getName() const override
439  {
440  return "RocksDB";
441  }
442 
444  createInstance(
445  size_t keyBytes,
446  Section const& keyValues,
447  Scheduler& scheduler,
448  beast::Journal journal) override
449  {
450  return std::make_unique<RocksDBBackend>(
451  keyBytes, keyValues, scheduler, journal, &m_env);
452  }
453 };
454 
455 static RocksDBFactory rocksDBFactory;
456 
457 } // namespace NodeStore
458 } // namespace ripple
459 
460 #endif
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
std::for_each
T for_each(T... args)
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::vector
STL class.
std::size
T size(T... args)
std::stringstream
STL class.
std::function
std::unique_ptr::reset
T reset(T... args)
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:347
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
ripple::verify
bool verify(PublicKey const &publicKey, Slice const &m, Slice const &sig, bool mustBeFullyCanonical)
Verify a signature on a message.
Definition: PublicKey.cpp:268
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:54
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
ripple::NodeStore::customCode
@ customCode
Definition: nodestore/Types.h:50
ripple::megabytes
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:34
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:44
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:117
std::unique_ptr
STL class.
std::data
T data(T... args)
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:100