rippled
RocksDBFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/unity/rocksdb.h>
21 
22 #if RIPPLE_ROCKSDB_AVAILABLE
23 
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/contract.h>
26 #include <ripple/beast/core/CurrentThreadName.h>
27 #include <ripple/core/Config.h> // VFALCO Bad dependency
28 #include <ripple/nodestore/Factory.h>
29 #include <ripple/nodestore/Manager.h>
30 #include <ripple/nodestore/impl/BatchWriter.h>
31 #include <ripple/nodestore/impl/DecodedBlob.h>
32 #include <ripple/nodestore/impl/EncodedBlob.h>
33 #include <atomic>
34 #include <memory>
35 
36 namespace ripple {
37 namespace NodeStore {
38 
39 class RocksDBEnv : public rocksdb::EnvWrapper
40 {
41 public:
42  RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
43  {
44  }
45 
46  struct ThreadParams
47  {
48  ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
49  {
50  }
51 
52  void (*f)(void*);
53  void* a;
54  };
55 
56  static void
57  thread_entry(void* ptr)
58  {
59  ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
60  void (*f)(void*) = p->f;
61  void* a(p->a);
62  delete p;
63 
64  static std::atomic<std::size_t> n;
65  std::size_t const id(++n);
67  ss << "rocksdb #" << id;
69 
70  (*f)(a);
71  }
72 
73  void
74  StartThread(void (*f)(void*), void* a) override
75  {
76  ThreadParams* const p(new ThreadParams(f, a));
77  EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
78  }
79 };
80 
81 //------------------------------------------------------------------------------
82 
83 class RocksDBBackend : public Backend, public BatchWriter::Callback
84 {
85 private:
86  std::atomic<bool> m_deletePath;
87 
88 public:
89  beast::Journal m_journal;
90  size_t const m_keyBytes;
91  Scheduler& m_scheduler;
92  BatchWriter m_batch;
93  std::string m_name;
95  int fdRequired_ = 2048;
96  rocksdb::Options m_options;
97 
98  RocksDBBackend(
99  int keyBytes,
100  Section const& keyValues,
101  Scheduler& scheduler,
102  beast::Journal journal,
103  RocksDBEnv* env)
104  : m_deletePath(false)
105  , m_journal(journal)
106  , m_keyBytes(keyBytes)
107  , m_scheduler(scheduler)
108  , m_batch(*this, scheduler)
109  {
110  if (!get_if_exists(keyValues, "path", m_name))
111  Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112 
113  rocksdb::BlockBasedTableOptions table_options;
114  m_options.env = env;
115 
116  if (keyValues.exists("cache_mb"))
117  table_options.block_cache = rocksdb::NewLRUCache(
118  get<int>(keyValues, "cache_mb") * megabytes(1));
119 
120  if (auto const v = get<int>(keyValues, "filter_bits"))
121  {
122  bool const filter_blocks = !keyValues.exists("filter_full") ||
123  (get<int>(keyValues, "filter_full") == 0);
124  table_options.filter_policy.reset(
125  rocksdb::NewBloomFilterPolicy(v, filter_blocks));
126  }
127 
128  if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
129  fdRequired_ = m_options.max_open_files;
130 
131  if (keyValues.exists("file_size_mb"))
132  {
133  m_options.target_file_size_base =
134  megabytes(1) * get<int>(keyValues, "file_size_mb");
135  m_options.max_bytes_for_level_base =
136  5 * m_options.target_file_size_base;
137  m_options.write_buffer_size = 2 * m_options.target_file_size_base;
138  }
139 
141  keyValues, "file_size_mult", m_options.target_file_size_multiplier);
142 
143  if (keyValues.exists("bg_threads"))
144  {
145  m_options.env->SetBackgroundThreads(
146  get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
147  }
148 
149  if (keyValues.exists("high_threads"))
150  {
151  auto const highThreads = get<int>(keyValues, "high_threads");
152  m_options.env->SetBackgroundThreads(
153  highThreads, rocksdb::Env::HIGH);
154 
155  // If we have high-priority threads, presumably we want to
156  // use them for background flushes
157  if (highThreads > 0)
158  m_options.max_background_flushes = highThreads;
159  }
160 
161  m_options.compression = rocksdb::kSnappyCompression;
162 
163  get_if_exists(keyValues, "block_size", table_options.block_size);
164 
165  if (keyValues.exists("universal_compaction") &&
166  (get<int>(keyValues, "universal_compaction") != 0))
167  {
168  m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
169  m_options.min_write_buffer_number_to_merge = 2;
170  m_options.max_write_buffer_number = 6;
171  m_options.write_buffer_size = 6 * m_options.target_file_size_base;
172  }
173 
174  if (keyValues.exists("bbt_options"))
175  {
176  auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
177  table_options,
178  get<std::string>(keyValues, "bbt_options"),
179  &table_options);
180  if (!s.ok())
181  Throw<std::runtime_error>(
182  std::string("Unable to set RocksDB bbt_options: ") +
183  s.ToString());
184  }
185 
186  m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
187 
188  if (keyValues.exists("options"))
189  {
190  auto const s = rocksdb::GetOptionsFromString(
191  m_options, get<std::string>(keyValues, "options"), &m_options);
192  if (!s.ok())
193  Throw<std::runtime_error>(
194  std::string("Unable to set RocksDB options: ") +
195  s.ToString());
196  }
197 
198  std::string s1, s2;
199  rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
200  rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
201  JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
202  JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
203  }
204 
205  ~RocksDBBackend() override
206  {
207  close();
208  }
209 
210  void
211  open(bool createIfMissing) override
212  {
213  if (m_db)
214  {
215  assert(false);
216  JLOG(m_journal.error()) << "database is already open";
217  return;
218  }
219  rocksdb::DB* db = nullptr;
220  m_options.create_if_missing = createIfMissing;
221  rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
222  if (!status.ok() || !db)
223  Throw<std::runtime_error>(
224  std::string("Unable to open/create RocksDB: ") +
225  status.ToString());
226  m_db.reset(db);
227  }
228 
229  bool
230  isOpen() override
231  {
232  return static_cast<bool>(m_db);
233  }
234 
235  void
236  close() override
237  {
238  if (m_db)
239  {
240  m_db.reset();
241  if (m_deletePath)
242  {
243  boost::filesystem::path dir = m_name;
244  boost::filesystem::remove_all(dir);
245  }
246  }
247  }
248 
250  getName() override
251  {
252  return m_name;
253  }
254 
255  //--------------------------------------------------------------------------
256 
257  Status
258  fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
259  {
260  assert(m_db);
261  pObject->reset();
262 
263  Status status(ok);
264 
265  rocksdb::ReadOptions const options;
266  rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
267 
268  std::string string;
269 
270  rocksdb::Status getStatus = m_db->Get(options, slice, &string);
271 
272  if (getStatus.ok())
273  {
274  DecodedBlob decoded(key, string.data(), string.size());
275 
276  if (decoded.wasOk())
277  {
278  *pObject = decoded.createObject();
279  }
280  else
281  {
282  // Decoding failed, probably corrupted!
283  //
285  }
286  }
287  else
288  {
289  if (getStatus.IsCorruption())
290  {
292  }
293  else if (getStatus.IsNotFound())
294  {
295  status = notFound;
296  }
297  else
298  {
299  status = Status(customCode + getStatus.code());
300 
301  JLOG(m_journal.error()) << getStatus.ToString();
302  }
303  }
304 
305  return status;
306  }
307 
308  bool
309  canFetchBatch() override
310  {
311  return false;
312  }
313 
315  fetchBatch(std::size_t n, void const* const* keys) override
316  {
317  Throw<std::runtime_error>("pure virtual called");
318  return {};
319  }
320 
321  void
322  store(std::shared_ptr<NodeObject> const& object) override
323  {
324  m_batch.store(object);
325  }
326 
327  void
328  storeBatch(Batch const& batch) override
329  {
330  assert(m_db);
331  rocksdb::WriteBatch wb;
332 
333  EncodedBlob encoded;
334 
335  for (auto const& e : batch)
336  {
337  encoded.prepare(e);
338 
339  wb.Put(
340  rocksdb::Slice(
341  reinterpret_cast<char const*>(encoded.getKey()),
342  m_keyBytes),
343  rocksdb::Slice(
344  reinterpret_cast<char const*>(encoded.getData()),
345  encoded.getSize()));
346  }
347 
348  rocksdb::WriteOptions const options;
349 
350  auto ret = m_db->Write(options, &wb);
351 
352  if (!ret.ok())
353  Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
354  }
355 
356  void
358  {
359  assert(m_db);
360  rocksdb::ReadOptions const options;
361 
362  std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
363 
364  for (it->SeekToFirst(); it->Valid(); it->Next())
365  {
366  if (it->key().size() == m_keyBytes)
367  {
368  DecodedBlob decoded(
369  it->key().data(), it->value().data(), it->value().size());
370 
371  if (decoded.wasOk())
372  {
373  f(decoded.createObject());
374  }
375  else
376  {
377  // Uh oh, corrupted data!
378  JLOG(m_journal.fatal())
379  << "Corrupt NodeObject #" << it->key().ToString(true);
380  }
381  }
382  else
383  {
384  // VFALCO NOTE What does it mean to find an
385  // incorrectly sized key? Corruption?
386  JLOG(m_journal.fatal())
387  << "Bad key size = " << it->key().size();
388  }
389  }
390  }
391 
392  int
393  getWriteLoad() override
394  {
395  return m_batch.getWriteLoad();
396  }
397 
398  void
399  setDeletePath() override
400  {
401  m_deletePath = true;
402  }
403 
404  //--------------------------------------------------------------------------
405 
406  void
407  writeBatch(Batch const& batch) override
408  {
409  storeBatch(batch);
410  }
411 
412  void
413  verify() override
414  {
415  }
416 
418  int
419  fdRequired() const override
420  {
421  return fdRequired_;
422  }
423 };
424 
425 //------------------------------------------------------------------------------
426 
427 class RocksDBFactory : public Factory
428 {
429 public:
430  RocksDBEnv m_env;
431 
432  RocksDBFactory()
433  {
434  Manager::instance().insert(*this);
435  }
436 
437  ~RocksDBFactory() override
438  {
439  Manager::instance().erase(*this);
440  }
441 
443  getName() const override
444  {
445  return "RocksDB";
446  }
447 
449  createInstance(
450  size_t keyBytes,
451  Section const& keyValues,
452  std::size_t,
453  Scheduler& scheduler,
454  beast::Journal journal) override
455  {
456  return std::make_unique<RocksDBBackend>(
457  keyBytes, keyValues, scheduler, journal, &m_env);
458  }
459 };
460 
461 static RocksDBFactory rocksDBFactory;
462 
463 } // namespace NodeStore
464 } // namespace ripple
465 
466 #endif
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
std::for_each
T for_each(T... args)
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::vector
STL class.
std::size
T size(T... args)
std::stringstream
STL class.
std::function
std::unique_ptr::reset
T reset(T... args)
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:347
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
ripple::verify
bool verify(PublicKey const &publicKey, Slice const &m, Slice const &sig, bool mustBeFullyCanonical)
Verify a signature on a message.
Definition: PublicKey.cpp:268
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:54
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
ripple::NodeStore::customCode
@ customCode
Definition: nodestore/Types.h:50
ripple::megabytes
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:34
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:44
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:119
std::unique_ptr
STL class.
std::data
T data(T... args)
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:100