rippled
RocksDBFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 
21 #include <ripple/unity/rocksdb.h>
22 
23 #if RIPPLE_ROCKSDB_AVAILABLE
24 
25 #include <ripple/basics/contract.h>
26 #include <ripple/basics/ByteUtilities.h>
27 #include <ripple/core/Config.h> // VFALCO Bad dependency
28 #include <ripple/nodestore/Factory.h>
29 #include <ripple/nodestore/Manager.h>
30 #include <ripple/nodestore/impl/BatchWriter.h>
31 #include <ripple/nodestore/impl/DecodedBlob.h>
32 #include <ripple/nodestore/impl/EncodedBlob.h>
33 #include <ripple/beast/core/CurrentThreadName.h>
34 #include <atomic>
35 #include <memory>
36 
37 namespace ripple {
38 namespace NodeStore {
39 
40 class RocksDBEnv : public rocksdb::EnvWrapper
41 {
42 public:
43  RocksDBEnv ()
44  : EnvWrapper (rocksdb::Env::Default())
45  {
46  }
47 
48  struct ThreadParams
49  {
50  ThreadParams (void (*f_)(void*), void* a_)
51  : f (f_)
52  , a (a_)
53  {
54  }
55 
56  void (*f)(void*);
57  void* a;
58  };
59 
60  static
61  void
62  thread_entry (void* ptr)
63  {
64  ThreadParams* const p (reinterpret_cast <ThreadParams*> (ptr));
65  void (*f)(void*) = p->f;
66  void* a (p->a);
67  delete p;
68 
70  std::size_t const id (++n);
72  ss << "rocksdb #" << id;
74 
75  (*f)(a);
76  }
77 
78  void
79  StartThread (void (*f)(void*), void* a) override
80  {
81  ThreadParams* const p (new ThreadParams (f, a));
82  EnvWrapper::StartThread (&RocksDBEnv::thread_entry, p);
83  }
84 };
85 
86 //------------------------------------------------------------------------------
87 
88 class RocksDBBackend
89  : public Backend
90  , public BatchWriter::Callback
91 {
92 private:
93  std::atomic <bool> m_deletePath;
94 
95 public:
96  beast::Journal m_journal;
97  size_t const m_keyBytes;
98  Scheduler& m_scheduler;
99  BatchWriter m_batch;
100  std::string m_name;
102  int fdRequired_ = 2048;
103  rocksdb::Options m_options;
104 
105  RocksDBBackend (int keyBytes, Section const& keyValues,
106  Scheduler& scheduler, beast::Journal journal, RocksDBEnv* env)
107  : m_deletePath (false)
108  , m_journal (journal)
109  , m_keyBytes (keyBytes)
110  , m_scheduler (scheduler)
111  , m_batch (*this, scheduler)
112  {
113  if (! get_if_exists(keyValues, "path", m_name))
114  Throw<std::runtime_error> ("Missing path in RocksDBFactory backend");
115 
116  rocksdb::BlockBasedTableOptions table_options;
117  m_options.env = env;
118 
119  if (keyValues.exists ("cache_mb"))
120  table_options.block_cache = rocksdb::NewLRUCache (
121  get<int>(keyValues, "cache_mb") * megabytes(1));
122 
123  if (auto const v = get<int>(keyValues, "filter_bits"))
124  {
125  bool const filter_blocks = !keyValues.exists ("filter_full") ||
126  (get<int>(keyValues, "filter_full") == 0);
127  table_options.filter_policy.reset (rocksdb::NewBloomFilterPolicy (v, filter_blocks));
128  }
129 
130  if (get_if_exists (keyValues, "open_files", m_options.max_open_files))
131  fdRequired_ = m_options.max_open_files;
132 
133  if (keyValues.exists ("file_size_mb"))
134  {
135  m_options.target_file_size_base = megabytes(1) * get<int>(keyValues,"file_size_mb");
136  m_options.max_bytes_for_level_base = 5 * m_options.target_file_size_base;
137  m_options.write_buffer_size = 2 * m_options.target_file_size_base;
138  }
139 
140  get_if_exists (keyValues, "file_size_mult", m_options.target_file_size_multiplier);
141 
142  if (keyValues.exists ("bg_threads"))
143  {
144  m_options.env->SetBackgroundThreads
145  (get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
146  }
147 
148  if (keyValues.exists ("high_threads"))
149  {
150  auto const highThreads = get<int>(keyValues, "high_threads");
151  m_options.env->SetBackgroundThreads (highThreads, rocksdb::Env::HIGH);
152 
153  // If we have high-priority threads, presumably we want to
154  // use them for background flushes
155  if (highThreads > 0)
156  m_options.max_background_flushes = highThreads;
157  }
158 
159  m_options.compression = rocksdb::kSnappyCompression;
160 
161  get_if_exists (keyValues, "block_size", table_options.block_size);
162 
163  if (keyValues.exists ("universal_compaction") &&
164  (get<int>(keyValues, "universal_compaction") != 0))
165  {
166  m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
167  m_options.min_write_buffer_number_to_merge = 2;
168  m_options.max_write_buffer_number = 6;
169  m_options.write_buffer_size = 6 * m_options.target_file_size_base;
170  }
171 
172  if (keyValues.exists("bbt_options"))
173  {
174  auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
175  table_options,
176  get<std::string>(keyValues, "bbt_options"),
177  &table_options);
178  if (! s.ok())
179  Throw<std::runtime_error> (
180  std::string("Unable to set RocksDB bbt_options: ") + s.ToString());
181  }
182 
183  m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
184 
185  if (keyValues.exists("options"))
186  {
187  auto const s = rocksdb::GetOptionsFromString(
188  m_options, get<std::string>(keyValues, "options"), &m_options);
189  if (! s.ok())
190  Throw<std::runtime_error> (
191  std::string("Unable to set RocksDB options: ") + s.ToString());
192  }
193 
194  std::string s1, s2;
195  rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
196  rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
197  JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
198  JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
199  }
200 
201  ~RocksDBBackend () override
202  {
203  close();
204  }
205 
206  void
207  open(bool createIfMissing) override
208  {
209  if (m_db)
210  {
211  assert(false);
212  JLOG(m_journal.error()) <<
213  "database is already open";
214  return;
215  }
216  rocksdb::DB* db = nullptr;
217  m_options.create_if_missing = createIfMissing;
218  rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
219  if (!status.ok() || !db)
220  Throw<std::runtime_error>(
221  std::string("Unable to open/create RocksDB: ") +
222  status.ToString());
223  m_db.reset(db);
224  }
225 
226  void
227  close() override
228  {
229  if (m_db)
230  {
231  m_db.reset();
232  if (m_deletePath)
233  {
234  boost::filesystem::path dir = m_name;
235  boost::filesystem::remove_all (dir);
236  }
237  }
238  }
239 
241  getName() override
242  {
243  return m_name;
244  }
245 
246  //--------------------------------------------------------------------------
247 
248  Status
249  fetch (void const* key, std::shared_ptr<NodeObject>* pObject) override
250  {
251  assert(m_db);
252  pObject->reset ();
253 
254  Status status (ok);
255 
256  rocksdb::ReadOptions const options;
257  rocksdb::Slice const slice (static_cast <char const*> (key), m_keyBytes);
258 
259  std::string string;
260 
261  rocksdb::Status getStatus = m_db->Get (options, slice, &string);
262 
263  if (getStatus.ok ())
264  {
265  DecodedBlob decoded (key, string.data (), string.size ());
266 
267  if (decoded.wasOk ())
268  {
269  *pObject = decoded.createObject ();
270  }
271  else
272  {
273  // Decoding failed, probably corrupted!
274  //
276  }
277  }
278  else
279  {
280  if (getStatus.IsCorruption ())
281  {
283  }
284  else if (getStatus.IsNotFound ())
285  {
286  status = notFound;
287  }
288  else
289  {
290  status = Status (customCode + getStatus.code());
291 
292  JLOG(m_journal.error()) << getStatus.ToString ();
293  }
294  }
295 
296  return status;
297  }
298 
299  bool
300  canFetchBatch() override
301  {
302  return false;
303  }
304 
306  fetchBatch (std::size_t n, void const* const* keys) override
307  {
308  Throw<std::runtime_error> ("pure virtual called");
309  return {};
310  }
311 
312  void
313  store (std::shared_ptr<NodeObject> const& object) override
314  {
315  m_batch.store (object);
316  }
317 
318  void
319  storeBatch (Batch const& batch) override
320  {
321  assert(m_db);
322  rocksdb::WriteBatch wb;
323 
324  EncodedBlob encoded;
325 
326  for (auto const& e : batch)
327  {
328  encoded.prepare (e);
329 
330  wb.Put (
331  rocksdb::Slice (reinterpret_cast <char const*> (
332  encoded.getKey ()), m_keyBytes),
333  rocksdb::Slice (reinterpret_cast <char const*> (
334  encoded.getData ()), encoded.getSize ()));
335  }
336 
337  rocksdb::WriteOptions const options;
338 
339  auto ret = m_db->Write (options, &wb);
340 
341  if (! ret.ok ())
342  Throw<std::runtime_error> ("storeBatch failed: " + ret.ToString());
343  }
344 
345  void
347  {
348  assert(m_db);
349  rocksdb::ReadOptions const options;
350 
351  std::unique_ptr <rocksdb::Iterator> it (m_db->NewIterator (options));
352 
353  for (it->SeekToFirst (); it->Valid (); it->Next ())
354  {
355  if (it->key ().size () == m_keyBytes)
356  {
357  DecodedBlob decoded (it->key ().data (),
358  it->value ().data (),
359  it->value ().size ());
360 
361  if (decoded.wasOk ())
362  {
363  f (decoded.createObject ());
364  }
365  else
366  {
367  // Uh oh, corrupted data!
368  JLOG(m_journal.fatal()) <<
369  "Corrupt NodeObject #" <<
370  from_hex_text<uint256>(it->key ().data ());
371  }
372  }
373  else
374  {
375  // VFALCO NOTE What does it mean to find an
376  // incorrectly sized key? Corruption?
377  JLOG(m_journal.fatal()) <<
378  "Bad key size = " << it->key ().size ();
379  }
380  }
381  }
382 
383  int
384  getWriteLoad () override
385  {
386  return m_batch.getWriteLoad ();
387  }
388 
389  void
390  setDeletePath() override
391  {
392  m_deletePath = true;
393  }
394 
395  //--------------------------------------------------------------------------
396 
397  void
398  writeBatch (Batch const& batch) override
399  {
400  storeBatch (batch);
401  }
402 
403  void
404  verify() override
405  {
406  }
407 
409  int
410  fdRequired() const override
411  {
412  return fdRequired_;
413  }
414 };
415 
416 //------------------------------------------------------------------------------
417 
418 class RocksDBFactory : public Factory
419 {
420 public:
421  RocksDBEnv m_env;
422 
423  RocksDBFactory ()
424  {
425  Manager::instance().insert(*this);
426  }
427 
428  ~RocksDBFactory () override
429  {
430  Manager::instance().erase(*this);
431  }
432 
434  getName () const override
435  {
436  return "RocksDB";
437  }
438 
440  createInstance (
441  size_t keyBytes,
442  Section const& keyValues,
443  Scheduler& scheduler,
444  beast::Journal journal) override
445  {
446  return std::make_unique <RocksDBBackend> (
447  keyBytes, keyValues, scheduler, journal, &m_env);
448  }
449 };
450 
451 static RocksDBFactory rocksDBFactory;
452 
453 }
454 }
455 
456 #endif
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:312
std::for_each
T for_each(T... args)
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:47
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::vector
STL class.
std::stringstream
STL class.
std::function
std::unique_ptr::reset
T reset(T... args)
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:342
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:48
ripple::verify
bool verify(PublicKey const &publicKey, Slice const &m, Slice const &sig, bool mustBeFullyCanonical)
Verify a signature on a message.
Definition: PublicKey.cpp:277
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
ripple::NodeStore::customCode
@ customCode
Definition: nodestore/Types.h:52
ripple::megabytes
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:32
beast::Journal::error
Stream error() const
Definition: Journal.h:307
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:60
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:49
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:45
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:113
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:56
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:292
std::size_t
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:117
std::unique_ptr
STL class.
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:98