rippled
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpl/basics/rocksdb.h>
21
22#if RIPPLE_ROCKSDB_AVAILABLE
23#include <xrpl/basics/ByteUtilities.h>
24#include <xrpl/basics/contract.h>
25#include <xrpl/basics/safe_cast.h>
26#include <xrpl/beast/core/CurrentThreadName.h>
27#include <xrpl/nodestore/Factory.h>
28#include <xrpl/nodestore/Manager.h>
29#include <xrpl/nodestore/detail/BatchWriter.h>
30#include <xrpl/nodestore/detail/DecodedBlob.h>
31#include <xrpl/nodestore/detail/EncodedBlob.h>
32
33#include <atomic>
34#include <memory>
35
36namespace ripple {
37namespace NodeStore {
38
39class RocksDBEnv : public rocksdb::EnvWrapper
40{
41public:
42 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
43 {
44 }
45
46 struct ThreadParams
47 {
48 ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
49 {
50 }
51
52 void (*f)(void*);
53 void* a;
54 };
55
56 static void
57 thread_entry(void* ptr)
58 {
59 ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
60 void (*f)(void*) = p->f;
61 void* a(p->a);
62 delete p;
63
65 std::size_t const id(++n);
67 ss << "rocksdb #" << id;
69
70 (*f)(a);
71 }
72
73 void
74 StartThread(void (*f)(void*), void* a) override
75 {
76 ThreadParams* const p(new ThreadParams(f, a));
77 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
78 }
79};
80
81//------------------------------------------------------------------------------
82
83class RocksDBBackend : public Backend, public BatchWriter::Callback
84{
85private:
86 std::atomic<bool> m_deletePath;
87
88public:
89 beast::Journal m_journal;
90 size_t const m_keyBytes;
91 BatchWriter m_batch;
92 std::string m_name;
94 int fdRequired_ = 2048;
95 rocksdb::Options m_options;
96
97 RocksDBBackend(
98 int keyBytes,
99 Section const& keyValues,
100 Scheduler& scheduler,
101 beast::Journal journal,
102 RocksDBEnv* env)
103 : m_deletePath(false)
104 , m_journal(journal)
105 , m_keyBytes(keyBytes)
106 , m_batch(*this, scheduler)
107 {
108 if (!get_if_exists(keyValues, "path", m_name))
109 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
110
111 rocksdb::BlockBasedTableOptions table_options;
112 m_options.env = env;
113
114 bool hard_set =
115 keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
116
117 if (keyValues.exists("cache_mb"))
118 {
119 auto size = get<int>(keyValues, "cache_mb");
120
121 if (!hard_set && size == 256)
122 size = 1024;
123
124 table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
125 }
126
127 if (auto const v = get<int>(keyValues, "filter_bits"))
128 {
129 bool const filter_blocks = !keyValues.exists("filter_full") ||
130 (get<int>(keyValues, "filter_full") == 0);
131 table_options.filter_policy.reset(
132 rocksdb::NewBloomFilterPolicy(v, filter_blocks));
133 }
134
135 if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
136 {
137 if (!hard_set && m_options.max_open_files == 2000)
138 m_options.max_open_files = 8000;
139
140 fdRequired_ = m_options.max_open_files + 128;
141 }
142
143 if (keyValues.exists("file_size_mb"))
144 {
145 auto file_size_mb = get<int>(keyValues, "file_size_mb");
146
147 if (!hard_set && file_size_mb == 8)
148 file_size_mb = 256;
149
150 m_options.target_file_size_base = megabytes(file_size_mb);
151 m_options.max_bytes_for_level_base =
152 5 * m_options.target_file_size_base;
153 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
154 }
155
157 keyValues, "file_size_mult", m_options.target_file_size_multiplier);
158
159 if (keyValues.exists("bg_threads"))
160 {
161 m_options.env->SetBackgroundThreads(
162 get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
163 }
164
165 if (keyValues.exists("high_threads"))
166 {
167 auto const highThreads = get<int>(keyValues, "high_threads");
168 m_options.env->SetBackgroundThreads(
169 highThreads, rocksdb::Env::HIGH);
170
171 // If we have high-priority threads, presumably we want to
172 // use them for background flushes
173 if (highThreads > 0)
174 m_options.max_background_flushes = highThreads;
175 }
176
177 m_options.compression = rocksdb::kSnappyCompression;
178
179 get_if_exists(keyValues, "block_size", table_options.block_size);
180
181 if (keyValues.exists("universal_compaction") &&
182 (get<int>(keyValues, "universal_compaction") != 0))
183 {
184 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
185 m_options.min_write_buffer_number_to_merge = 2;
186 m_options.max_write_buffer_number = 6;
187 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
188 }
189
190 if (keyValues.exists("bbt_options"))
191 {
192 rocksdb::ConfigOptions config_options;
193 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
194 config_options,
195 table_options,
196 get(keyValues, "bbt_options"),
197 &table_options);
198 if (!s.ok())
199 Throw<std::runtime_error>(
200 std::string("Unable to set RocksDB bbt_options: ") +
201 s.ToString());
202 }
203
204 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
205
206 if (keyValues.exists("options"))
207 {
208 auto const s = rocksdb::GetOptionsFromString(
209 m_options, get(keyValues, "options"), &m_options);
210 if (!s.ok())
211 Throw<std::runtime_error>(
212 std::string("Unable to set RocksDB options: ") +
213 s.ToString());
214 }
215
216 std::string s1, s2;
217 rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
218 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
219 JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
220 JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
221 }
222
223 ~RocksDBBackend() override
224 {
225 close();
226 }
227
228 void
229 open(bool createIfMissing) override
230 {
231 if (m_db)
232 {
233 // LCOV_EXCL_START
234 UNREACHABLE(
235 "ripple::NodeStore::RocksDBBackend::open : database is already "
236 "open");
237 JLOG(m_journal.error()) << "database is already open";
238 return;
239 // LCOV_EXCL_STOP
240 }
241 rocksdb::DB* db = nullptr;
242 m_options.create_if_missing = createIfMissing;
243 rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
244 if (!status.ok() || !db)
245 Throw<std::runtime_error>(
246 std::string("Unable to open/create RocksDB: ") +
247 status.ToString());
248 m_db.reset(db);
249 }
250
251 bool
252 isOpen() override
253 {
254 return static_cast<bool>(m_db);
255 }
256
257 void
258 close() override
259 {
260 if (m_db)
261 {
262 m_db.reset();
263 if (m_deletePath)
264 {
265 boost::filesystem::path dir = m_name;
266 boost::filesystem::remove_all(dir);
267 }
268 }
269 }
270
272 getName() override
273 {
274 return m_name;
275 }
276
277 //--------------------------------------------------------------------------
278
279 Status
280 fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
281 {
282 XRPL_ASSERT(
283 m_db,
284 "ripple::NodeStore::RocksDBBackend::fetch : non-null database");
285 pObject->reset();
286
287 Status status(ok);
288
289 rocksdb::ReadOptions const options;
290 rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
291
292 std::string string;
293
294 rocksdb::Status getStatus = m_db->Get(options, slice, &string);
295
296 if (getStatus.ok())
297 {
298 DecodedBlob decoded(key, string.data(), string.size());
299
300 if (decoded.wasOk())
301 {
302 *pObject = decoded.createObject();
303 }
304 else
305 {
306 // Decoding failed, probably corrupted!
307 //
309 }
310 }
311 else
312 {
313 if (getStatus.IsCorruption())
314 {
316 }
317 else if (getStatus.IsNotFound())
318 {
320 }
321 else
322 {
323 status =
324 Status(customCode + unsafe_cast<int>(getStatus.code()));
325
326 JLOG(m_journal.error()) << getStatus.ToString();
327 }
328 }
329
330 return status;
331 }
332
334 fetchBatch(std::vector<uint256 const*> const& hashes) override
335 {
337 results.reserve(hashes.size());
338 for (auto const& h : hashes)
339 {
341 Status status = fetch(h->begin(), &nObj);
342 if (status != ok)
343 results.push_back({});
344 else
345 results.push_back(nObj);
346 }
347
348 return {results, ok};
349 }
350
351 void
352 store(std::shared_ptr<NodeObject> const& object) override
353 {
354 m_batch.store(object);
355 }
356
357 void
358 storeBatch(Batch const& batch) override
359 {
360 XRPL_ASSERT(
361 m_db,
362 "ripple::NodeStore::RocksDBBackend::storeBatch : non-null "
363 "database");
364 rocksdb::WriteBatch wb;
365
366 for (auto const& e : batch)
367 {
368 EncodedBlob encoded(e);
369
370 wb.Put(
371 rocksdb::Slice(
372 reinterpret_cast<char const*>(encoded.getKey()),
373 m_keyBytes),
374 rocksdb::Slice(
375 reinterpret_cast<char const*>(encoded.getData()),
376 encoded.getSize()));
377 }
378
379 rocksdb::WriteOptions const options;
380
381 auto ret = m_db->Write(options, &wb);
382
383 if (!ret.ok())
384 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
385 }
386
387 void
388 sync() override
389 {
390 }
391
392 void
394 {
395 XRPL_ASSERT(
396 m_db,
397 "ripple::NodeStore::RocksDBBackend::for_each : non-null database");
398 rocksdb::ReadOptions const options;
399
400 std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
401
402 for (it->SeekToFirst(); it->Valid(); it->Next())
403 {
404 if (it->key().size() == m_keyBytes)
405 {
406 DecodedBlob decoded(
407 it->key().data(), it->value().data(), it->value().size());
408
409 if (decoded.wasOk())
410 {
411 f(decoded.createObject());
412 }
413 else
414 {
415 // Uh oh, corrupted data!
416 JLOG(m_journal.fatal())
417 << "Corrupt NodeObject #" << it->key().ToString(true);
418 }
419 }
420 else
421 {
422 // VFALCO NOTE What does it mean to find an
423 // incorrectly sized key? Corruption?
424 JLOG(m_journal.fatal())
425 << "Bad key size = " << it->key().size();
426 }
427 }
428 }
429
430 int
431 getWriteLoad() override
432 {
433 return m_batch.getWriteLoad();
434 }
435
436 void
437 setDeletePath() override
438 {
439 m_deletePath = true;
440 }
441
442 //--------------------------------------------------------------------------
443
444 void
445 writeBatch(Batch const& batch) override
446 {
447 storeBatch(batch);
448 }
449
451 int
452 fdRequired() const override
453 {
454 return fdRequired_;
455 }
456};
457
458//------------------------------------------------------------------------------
459
460class RocksDBFactory : public Factory
461{
462private:
463 Manager& manager_;
464
465public:
466 RocksDBEnv m_env;
467
468 RocksDBFactory(Manager& manager) : manager_(manager)
469 {
470 manager_.insert(*this);
471 }
472
474 getName() const override
475 {
476 return "RocksDB";
477 }
478
480 createInstance(
481 size_t keyBytes,
482 Section const& keyValues,
484 Scheduler& scheduler,
485 beast::Journal journal) override
486 {
488 keyBytes, keyValues, scheduler, journal, &m_env);
489 }
490};
491
492void
493registerRocksDBFactory(Manager& manager)
494{
495 static RocksDBFactory instance{manager};
496}
497
498} // namespace NodeStore
499} // namespace ripple
500
501#endif
A generic endpoint for log messages.
Definition Journal.h:60
Stream fatal() const
Definition Journal.h:352
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
T for_each(T... args)
T is_same_v
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
void registerRocksDBFactory(Manager &manager)
Status
Return codes from Backend operations.
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:101
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)