rippled
RocksDBFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/unity/rocksdb.h>
21 
22 #if RIPPLE_ROCKSDB_AVAILABLE
23 
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/contract.h>
26 #include <ripple/beast/core/CurrentThreadName.h>
27 #include <ripple/core/Config.h> // VFALCO Bad dependency
28 #include <ripple/nodestore/Factory.h>
29 #include <ripple/nodestore/Manager.h>
30 #include <ripple/nodestore/impl/BatchWriter.h>
31 #include <ripple/nodestore/impl/DecodedBlob.h>
32 #include <ripple/nodestore/impl/EncodedBlob.h>
33 #include <atomic>
34 #include <memory>
35 
36 namespace ripple {
37 namespace NodeStore {
38 
39 class RocksDBEnv : public rocksdb::EnvWrapper
40 {
41 public:
42  RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
43  {
44  }
45 
46  struct ThreadParams
47  {
48  ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
49  {
50  }
51 
52  void (*f)(void*);
53  void* a;
54  };
55 
56  static void
57  thread_entry(void* ptr)
58  {
59  ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
60  void (*f)(void*) = p->f;
61  void* a(p->a);
62  delete p;
63 
64  static std::atomic<std::size_t> n;
65  std::size_t const id(++n);
67  ss << "rocksdb #" << id;
69 
70  (*f)(a);
71  }
72 
73  void
74  StartThread(void (*f)(void*), void* a) override
75  {
76  ThreadParams* const p(new ThreadParams(f, a));
77  EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
78  }
79 };
80 
81 //------------------------------------------------------------------------------
82 
83 class RocksDBBackend : public Backend, public BatchWriter::Callback
84 {
85 private:
86  std::atomic<bool> m_deletePath;
87 
88 public:
89  beast::Journal m_journal;
90  size_t const m_keyBytes;
91  Scheduler& m_scheduler;
92  BatchWriter m_batch;
93  std::string m_name;
95  int fdRequired_ = 2048;
96  rocksdb::Options m_options;
97 
98  RocksDBBackend(
99  int keyBytes,
100  Section const& keyValues,
101  Scheduler& scheduler,
102  beast::Journal journal,
103  RocksDBEnv* env)
104  : m_deletePath(false)
105  , m_journal(journal)
106  , m_keyBytes(keyBytes)
107  , m_scheduler(scheduler)
108  , m_batch(*this, scheduler)
109  {
110  if (!get_if_exists(keyValues, "path", m_name))
111  Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112 
113  rocksdb::BlockBasedTableOptions table_options;
114  m_options.env = env;
115 
116  if (keyValues.exists("cache_mb"))
117  table_options.block_cache = rocksdb::NewLRUCache(
118  get<int>(keyValues, "cache_mb") * megabytes(1));
119 
120  if (auto const v = get<int>(keyValues, "filter_bits"))
121  {
122  bool const filter_blocks = !keyValues.exists("filter_full") ||
123  (get<int>(keyValues, "filter_full") == 0);
124  table_options.filter_policy.reset(
125  rocksdb::NewBloomFilterPolicy(v, filter_blocks));
126  }
127 
128  if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
129  fdRequired_ = m_options.max_open_files;
130 
131  if (keyValues.exists("file_size_mb"))
132  {
133  m_options.target_file_size_base =
134  megabytes(1) * get<int>(keyValues, "file_size_mb");
135  m_options.max_bytes_for_level_base =
136  5 * m_options.target_file_size_base;
137  m_options.write_buffer_size = 2 * m_options.target_file_size_base;
138  }
139 
141  keyValues, "file_size_mult", m_options.target_file_size_multiplier);
142 
143  if (keyValues.exists("bg_threads"))
144  {
145  m_options.env->SetBackgroundThreads(
146  get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
147  }
148 
149  if (keyValues.exists("high_threads"))
150  {
151  auto const highThreads = get<int>(keyValues, "high_threads");
152  m_options.env->SetBackgroundThreads(
153  highThreads, rocksdb::Env::HIGH);
154 
155  // If we have high-priority threads, presumably we want to
156  // use them for background flushes
157  if (highThreads > 0)
158  m_options.max_background_flushes = highThreads;
159  }
160 
161  m_options.compression = rocksdb::kSnappyCompression;
162 
163  get_if_exists(keyValues, "block_size", table_options.block_size);
164 
165  if (keyValues.exists("universal_compaction") &&
166  (get<int>(keyValues, "universal_compaction") != 0))
167  {
168  m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
169  m_options.min_write_buffer_number_to_merge = 2;
170  m_options.max_write_buffer_number = 6;
171  m_options.write_buffer_size = 6 * m_options.target_file_size_base;
172  }
173 
174  if (keyValues.exists("bbt_options"))
175  {
176  auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
177  table_options,
178  get<std::string>(keyValues, "bbt_options"),
179  &table_options);
180  if (!s.ok())
181  Throw<std::runtime_error>(
182  std::string("Unable to set RocksDB bbt_options: ") +
183  s.ToString());
184  }
185 
186  m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
187 
188  if (keyValues.exists("options"))
189  {
190  auto const s = rocksdb::GetOptionsFromString(
191  m_options, get<std::string>(keyValues, "options"), &m_options);
192  if (!s.ok())
193  Throw<std::runtime_error>(
194  std::string("Unable to set RocksDB options: ") +
195  s.ToString());
196  }
197 
198  std::string s1, s2;
199  rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
200  rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
201  JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
202  JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
203  }
204 
205  ~RocksDBBackend() override
206  {
207  close();
208  }
209 
210  void
211  open(bool createIfMissing) override
212  {
213  if (m_db)
214  {
215  assert(false);
216  JLOG(m_journal.error()) << "database is already open";
217  return;
218  }
219  rocksdb::DB* db = nullptr;
220  m_options.create_if_missing = createIfMissing;
221  rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
222  if (!status.ok() || !db)
223  Throw<std::runtime_error>(
224  std::string("Unable to open/create RocksDB: ") +
225  status.ToString());
226  m_db.reset(db);
227  }
228 
229  bool
230  isOpen() override
231  {
232  return static_cast<bool>(m_db);
233  }
234 
235  void
236  close() override
237  {
238  if (m_db)
239  {
240  m_db.reset();
241  if (m_deletePath)
242  {
243  boost::filesystem::path dir = m_name;
244  boost::filesystem::remove_all(dir);
245  }
246  }
247  }
248 
250  getName() override
251  {
252  return m_name;
253  }
254 
255  //--------------------------------------------------------------------------
256 
257  Status
258  fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
259  {
260  assert(m_db);
261  pObject->reset();
262 
263  Status status(ok);
264 
265  rocksdb::ReadOptions const options;
266  rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
267 
268  std::string string;
269 
270  rocksdb::Status getStatus = m_db->Get(options, slice, &string);
271 
272  if (getStatus.ok())
273  {
274  DecodedBlob decoded(key, string.data(), string.size());
275 
276  if (decoded.wasOk())
277  {
278  *pObject = decoded.createObject();
279  }
280  else
281  {
282  // Decoding failed, probably corrupted!
283  //
285  }
286  }
287  else
288  {
289  if (getStatus.IsCorruption())
290  {
292  }
293  else if (getStatus.IsNotFound())
294  {
295  status = notFound;
296  }
297  else
298  {
299  status = Status(customCode + getStatus.code());
300 
301  JLOG(m_journal.error()) << getStatus.ToString();
302  }
303  }
304 
305  return status;
306  }
307 
308  void
309  store(std::shared_ptr<NodeObject> const& object) override
310  {
311  m_batch.store(object);
312  }
313 
314  void
315  storeBatch(Batch const& batch) override
316  {
317  assert(m_db);
318  rocksdb::WriteBatch wb;
319 
320  EncodedBlob encoded;
321 
322  for (auto const& e : batch)
323  {
324  encoded.prepare(e);
325 
326  wb.Put(
327  rocksdb::Slice(
328  reinterpret_cast<char const*>(encoded.getKey()),
329  m_keyBytes),
330  rocksdb::Slice(
331  reinterpret_cast<char const*>(encoded.getData()),
332  encoded.getSize()));
333  }
334 
335  rocksdb::WriteOptions const options;
336 
337  auto ret = m_db->Write(options, &wb);
338 
339  if (!ret.ok())
340  Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
341  }
342 
343  void
345  {
346  assert(m_db);
347  rocksdb::ReadOptions const options;
348 
349  std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
350 
351  for (it->SeekToFirst(); it->Valid(); it->Next())
352  {
353  if (it->key().size() == m_keyBytes)
354  {
355  DecodedBlob decoded(
356  it->key().data(), it->value().data(), it->value().size());
357 
358  if (decoded.wasOk())
359  {
360  f(decoded.createObject());
361  }
362  else
363  {
364  // Uh oh, corrupted data!
365  JLOG(m_journal.fatal())
366  << "Corrupt NodeObject #" << it->key().ToString(true);
367  }
368  }
369  else
370  {
371  // VFALCO NOTE What does it mean to find an
372  // incorrectly sized key? Corruption?
373  JLOG(m_journal.fatal())
374  << "Bad key size = " << it->key().size();
375  }
376  }
377  }
378 
379  int
380  getWriteLoad() override
381  {
382  return m_batch.getWriteLoad();
383  }
384 
385  void
386  setDeletePath() override
387  {
388  m_deletePath = true;
389  }
390 
391  //--------------------------------------------------------------------------
392 
393  void
394  writeBatch(Batch const& batch) override
395  {
396  storeBatch(batch);
397  }
398 
399  void
400  verify() override
401  {
402  }
403 
405  int
406  fdRequired() const override
407  {
408  return fdRequired_;
409  }
410 };
411 
412 //------------------------------------------------------------------------------
413 
414 class RocksDBFactory : public Factory
415 {
416 public:
417  RocksDBEnv m_env;
418 
419  RocksDBFactory()
420  {
421  Manager::instance().insert(*this);
422  }
423 
424  ~RocksDBFactory() override
425  {
426  Manager::instance().erase(*this);
427  }
428 
430  getName() const override
431  {
432  return "RocksDB";
433  }
434 
436  createInstance(
437  size_t keyBytes,
438  Section const& keyValues,
439  std::size_t,
440  Scheduler& scheduler,
441  beast::Journal journal) override
442  {
443  return std::make_unique<RocksDBBackend>(
444  keyBytes, keyValues, scheduler, journal, &m_env);
445  }
446 };
447 
448 static RocksDBFactory rocksDBFactory;
449 
450 } // namespace NodeStore
451 } // namespace ripple
452 
453 #endif
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
std::for_each
T for_each(T... args)
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::size
T size(T... args)
std::stringstream
STL class.
std::function
std::unique_ptr::reset
T reset(T... args)
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:347
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
ripple::verify
bool verify(PublicKey const &publicKey, Slice const &m, Slice const &sig, bool mustBeFullyCanonical)
Verify a signature on a message.
Definition: PublicKey.cpp:268
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:54
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
ripple::NodeStore::customCode
@ customCode
Definition: nodestore/Types.h:50
ripple::megabytes
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:34
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:44
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:119
std::unique_ptr
STL class.
std::data
T data(T... args)
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:100