rippled
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/unity/rocksdb.h>
21
22#if RIPPLE_ROCKSDB_AVAILABLE
23#include <xrpld/core/Config.h> // VFALCO Bad dependency
24#include <xrpld/nodestore/Factory.h>
25#include <xrpld/nodestore/Manager.h>
26#include <xrpld/nodestore/detail/BatchWriter.h>
27#include <xrpld/nodestore/detail/DecodedBlob.h>
28#include <xrpld/nodestore/detail/EncodedBlob.h>
29
30#include <xrpl/basics/ByteUtilities.h>
31#include <xrpl/basics/contract.h>
32#include <xrpl/basics/safe_cast.h>
33#include <xrpl/beast/core/CurrentThreadName.h>
34
35#include <atomic>
36#include <memory>
37
38namespace ripple {
39namespace NodeStore {
40
41class RocksDBEnv : public rocksdb::EnvWrapper
42{
43public:
44 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
45 {
46 }
47
48 struct ThreadParams
49 {
50 ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
51 {
52 }
53
54 void (*f)(void*);
55 void* a;
56 };
57
58 static void
59 thread_entry(void* ptr)
60 {
61 ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
62 void (*f)(void*) = p->f;
63 void* a(p->a);
64 delete p;
65
67 std::size_t const id(++n);
69 ss << "rocksdb #" << id;
71
72 (*f)(a);
73 }
74
75 void
76 StartThread(void (*f)(void*), void* a) override
77 {
78 ThreadParams* const p(new ThreadParams(f, a));
79 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
80 }
81};
82
83//------------------------------------------------------------------------------
84
85class RocksDBBackend : public Backend, public BatchWriter::Callback
86{
87private:
88 std::atomic<bool> m_deletePath;
89
90public:
91 beast::Journal m_journal;
92 size_t const m_keyBytes;
93 BatchWriter m_batch;
94 std::string m_name;
96 int fdRequired_ = 2048;
97 rocksdb::Options m_options;
98
99 RocksDBBackend(
100 int keyBytes,
101 Section const& keyValues,
102 Scheduler& scheduler,
103 beast::Journal journal,
104 RocksDBEnv* env)
105 : m_deletePath(false)
106 , m_journal(journal)
107 , m_keyBytes(keyBytes)
108 , m_batch(*this, scheduler)
109 {
110 if (!get_if_exists(keyValues, "path", m_name))
111 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112
113 rocksdb::BlockBasedTableOptions table_options;
114 m_options.env = env;
115
116 bool hard_set =
117 keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
118
119 if (keyValues.exists("cache_mb"))
120 {
121 auto size = get<int>(keyValues, "cache_mb");
122
123 if (!hard_set && size == 256)
124 size = 1024;
125
126 table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
127 }
128
129 if (auto const v = get<int>(keyValues, "filter_bits"))
130 {
131 bool const filter_blocks = !keyValues.exists("filter_full") ||
132 (get<int>(keyValues, "filter_full") == 0);
133 table_options.filter_policy.reset(
134 rocksdb::NewBloomFilterPolicy(v, filter_blocks));
135 }
136
137 if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
138 {
139 if (!hard_set && m_options.max_open_files == 2000)
140 m_options.max_open_files = 8000;
141
142 fdRequired_ = m_options.max_open_files + 128;
143 }
144
145 if (keyValues.exists("file_size_mb"))
146 {
147 auto file_size_mb = get<int>(keyValues, "file_size_mb");
148
149 if (!hard_set && file_size_mb == 8)
150 file_size_mb = 256;
151
152 m_options.target_file_size_base = megabytes(file_size_mb);
153 m_options.max_bytes_for_level_base =
154 5 * m_options.target_file_size_base;
155 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
156 }
157
159 keyValues, "file_size_mult", m_options.target_file_size_multiplier);
160
161 if (keyValues.exists("bg_threads"))
162 {
163 m_options.env->SetBackgroundThreads(
164 get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
165 }
166
167 if (keyValues.exists("high_threads"))
168 {
169 auto const highThreads = get<int>(keyValues, "high_threads");
170 m_options.env->SetBackgroundThreads(
171 highThreads, rocksdb::Env::HIGH);
172
173 // If we have high-priority threads, presumably we want to
174 // use them for background flushes
175 if (highThreads > 0)
176 m_options.max_background_flushes = highThreads;
177 }
178
179 m_options.compression = rocksdb::kSnappyCompression;
180
181 get_if_exists(keyValues, "block_size", table_options.block_size);
182
183 if (keyValues.exists("universal_compaction") &&
184 (get<int>(keyValues, "universal_compaction") != 0))
185 {
186 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
187 m_options.min_write_buffer_number_to_merge = 2;
188 m_options.max_write_buffer_number = 6;
189 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
190 }
191
192 if (keyValues.exists("bbt_options"))
193 {
194 rocksdb::ConfigOptions config_options;
195 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
196 config_options,
197 table_options,
198 get(keyValues, "bbt_options"),
199 &table_options);
200 if (!s.ok())
201 Throw<std::runtime_error>(
202 std::string("Unable to set RocksDB bbt_options: ") +
203 s.ToString());
204 }
205
206 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
207
208 if (keyValues.exists("options"))
209 {
210 auto const s = rocksdb::GetOptionsFromString(
211 m_options, get(keyValues, "options"), &m_options);
212 if (!s.ok())
213 Throw<std::runtime_error>(
214 std::string("Unable to set RocksDB options: ") +
215 s.ToString());
216 }
217
218 std::string s1, s2;
219 rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
220 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
221 JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
222 JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
223 }
224
225 ~RocksDBBackend() override
226 {
227 close();
228 }
229
230 void
231 open(bool createIfMissing) override
232 {
233 if (m_db)
234 {
235 // LCOV_EXCL_START
236 UNREACHABLE(
237 "ripple::NodeStore::RocksDBBackend::open : database is already "
238 "open");
239 JLOG(m_journal.error()) << "database is already open";
240 return;
241 // LCOV_EXCL_STOP
242 }
243 rocksdb::DB* db = nullptr;
244 m_options.create_if_missing = createIfMissing;
245 rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
246 if (!status.ok() || !db)
247 Throw<std::runtime_error>(
248 std::string("Unable to open/create RocksDB: ") +
249 status.ToString());
250 m_db.reset(db);
251 }
252
253 bool
254 isOpen() override
255 {
256 return static_cast<bool>(m_db);
257 }
258
259 void
260 close() override
261 {
262 if (m_db)
263 {
264 m_db.reset();
265 if (m_deletePath)
266 {
267 boost::filesystem::path dir = m_name;
268 boost::filesystem::remove_all(dir);
269 }
270 }
271 }
272
274 getName() override
275 {
276 return m_name;
277 }
278
279 //--------------------------------------------------------------------------
280
281 Status
282 fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
283 {
284 XRPL_ASSERT(
285 m_db,
286 "ripple::NodeStore::RocksDBBackend::fetch : non-null database");
287 pObject->reset();
288
289 Status status(ok);
290
291 rocksdb::ReadOptions const options;
292 rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
293
294 std::string string;
295
296 rocksdb::Status getStatus = m_db->Get(options, slice, &string);
297
298 if (getStatus.ok())
299 {
300 DecodedBlob decoded(key, string.data(), string.size());
301
302 if (decoded.wasOk())
303 {
304 *pObject = decoded.createObject();
305 }
306 else
307 {
308 // Decoding failed, probably corrupted!
309 //
311 }
312 }
313 else
314 {
315 if (getStatus.IsCorruption())
316 {
318 }
319 else if (getStatus.IsNotFound())
320 {
322 }
323 else
324 {
325 status =
326 Status(customCode + unsafe_cast<int>(getStatus.code()));
327
328 JLOG(m_journal.error()) << getStatus.ToString();
329 }
330 }
331
332 return status;
333 }
334
336 fetchBatch(std::vector<uint256 const*> const& hashes) override
337 {
339 results.reserve(hashes.size());
340 for (auto const& h : hashes)
341 {
343 Status status = fetch(h->begin(), &nObj);
344 if (status != ok)
345 results.push_back({});
346 else
347 results.push_back(nObj);
348 }
349
350 return {results, ok};
351 }
352
353 void
354 store(std::shared_ptr<NodeObject> const& object) override
355 {
356 m_batch.store(object);
357 }
358
359 void
360 storeBatch(Batch const& batch) override
361 {
362 XRPL_ASSERT(
363 m_db,
364 "ripple::NodeStore::RocksDBBackend::storeBatch : non-null "
365 "database");
366 rocksdb::WriteBatch wb;
367
368 for (auto const& e : batch)
369 {
370 EncodedBlob encoded(e);
371
372 wb.Put(
373 rocksdb::Slice(
374 reinterpret_cast<char const*>(encoded.getKey()),
375 m_keyBytes),
376 rocksdb::Slice(
377 reinterpret_cast<char const*>(encoded.getData()),
378 encoded.getSize()));
379 }
380
381 rocksdb::WriteOptions const options;
382
383 auto ret = m_db->Write(options, &wb);
384
385 if (!ret.ok())
386 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
387 }
388
389 void
390 sync() override
391 {
392 }
393
394 void
396 {
397 XRPL_ASSERT(
398 m_db,
399 "ripple::NodeStore::RocksDBBackend::for_each : non-null database");
400 rocksdb::ReadOptions const options;
401
402 std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
403
404 for (it->SeekToFirst(); it->Valid(); it->Next())
405 {
406 if (it->key().size() == m_keyBytes)
407 {
408 DecodedBlob decoded(
409 it->key().data(), it->value().data(), it->value().size());
410
411 if (decoded.wasOk())
412 {
413 f(decoded.createObject());
414 }
415 else
416 {
417 // Uh oh, corrupted data!
418 JLOG(m_journal.fatal())
419 << "Corrupt NodeObject #" << it->key().ToString(true);
420 }
421 }
422 else
423 {
424 // VFALCO NOTE What does it mean to find an
425 // incorrectly sized key? Corruption?
426 JLOG(m_journal.fatal())
427 << "Bad key size = " << it->key().size();
428 }
429 }
430 }
431
432 int
433 getWriteLoad() override
434 {
435 return m_batch.getWriteLoad();
436 }
437
438 void
439 setDeletePath() override
440 {
441 m_deletePath = true;
442 }
443
444 //--------------------------------------------------------------------------
445
446 void
447 writeBatch(Batch const& batch) override
448 {
449 storeBatch(batch);
450 }
451
453 int
454 fdRequired() const override
455 {
456 return fdRequired_;
457 }
458};
459
460//------------------------------------------------------------------------------
461
462class RocksDBFactory : public Factory
463{
464public:
465 RocksDBEnv m_env;
466
467 RocksDBFactory()
468 {
469 Manager::instance().insert(*this);
470 }
471
472 ~RocksDBFactory() override
473 {
474 Manager::instance().erase(*this);
475 }
476
478 getName() const override
479 {
480 return "RocksDB";
481 }
482
484 createInstance(
485 size_t keyBytes,
486 Section const& keyValues,
488 Scheduler& scheduler,
489 beast::Journal journal) override
490 {
492 keyBytes, keyValues, scheduler, journal, &m_env);
493 }
494};
495
496static RocksDBFactory rocksDBFactory;
497
498} // namespace NodeStore
499} // namespace ripple
500
501#endif
A generic endpoint for log messages.
Definition Journal.h:60
Stream fatal() const
Definition Journal.h:352
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
T for_each(T... args)
T is_same_v
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
@ ok
No action required.
Definition Disposition.h:29
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:101
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)