rippled
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/unity/rocksdb.h>
21
22#if RIPPLE_ROCKSDB_AVAILABLE
23#include <xrpld/core/Config.h> // VFALCO Bad dependency
24#include <xrpld/nodestore/Factory.h>
25#include <xrpld/nodestore/Manager.h>
26#include <xrpld/nodestore/detail/BatchWriter.h>
27#include <xrpld/nodestore/detail/DecodedBlob.h>
28#include <xrpld/nodestore/detail/EncodedBlob.h>
29
30#include <xrpl/basics/ByteUtilities.h>
31#include <xrpl/basics/contract.h>
32#include <xrpl/basics/safe_cast.h>
33#include <xrpl/beast/core/CurrentThreadName.h>
34
35#include <atomic>
36#include <memory>
37
38namespace ripple {
39namespace NodeStore {
40
41class RocksDBEnv : public rocksdb::EnvWrapper
42{
43public:
44 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
45 {
46 }
47
48 struct ThreadParams
49 {
50 ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
51 {
52 }
53
54 void (*f)(void*);
55 void* a;
56 };
57
58 static void
59 thread_entry(void* ptr)
60 {
61 ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
62 void (*f)(void*) = p->f;
63 void* a(p->a);
64 delete p;
65
67 std::size_t const id(++n);
69 ss << "rocksdb #" << id;
71
72 (*f)(a);
73 }
74
75 void
76 StartThread(void (*f)(void*), void* a) override
77 {
78 ThreadParams* const p(new ThreadParams(f, a));
79 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
80 }
81};
82
83//------------------------------------------------------------------------------
84
85class RocksDBBackend : public Backend, public BatchWriter::Callback
86{
87private:
88 std::atomic<bool> m_deletePath;
89
90public:
91 beast::Journal m_journal;
92 size_t const m_keyBytes;
93 BatchWriter m_batch;
94 std::string m_name;
96 int fdRequired_ = 2048;
97 rocksdb::Options m_options;
98
99 RocksDBBackend(
100 int keyBytes,
101 Section const& keyValues,
102 Scheduler& scheduler,
103 beast::Journal journal,
104 RocksDBEnv* env)
105 : m_deletePath(false)
106 , m_journal(journal)
107 , m_keyBytes(keyBytes)
108 , m_batch(*this, scheduler)
109 {
110 if (!get_if_exists(keyValues, "path", m_name))
111 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112
113 rocksdb::BlockBasedTableOptions table_options;
114 m_options.env = env;
115
116 bool hard_set =
117 keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
118
119 if (keyValues.exists("cache_mb"))
120 {
121 auto size = get<int>(keyValues, "cache_mb");
122
123 if (!hard_set && size == 256)
124 size = 1024;
125
126 table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
127 }
128
129 if (auto const v = get<int>(keyValues, "filter_bits"))
130 {
131 bool const filter_blocks = !keyValues.exists("filter_full") ||
132 (get<int>(keyValues, "filter_full") == 0);
133 table_options.filter_policy.reset(
134 rocksdb::NewBloomFilterPolicy(v, filter_blocks));
135 }
136
137 if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
138 {
139 if (!hard_set && m_options.max_open_files == 2000)
140 m_options.max_open_files = 8000;
141
142 fdRequired_ = m_options.max_open_files + 128;
143 }
144
145 if (keyValues.exists("file_size_mb"))
146 {
147 auto file_size_mb = get<int>(keyValues, "file_size_mb");
148
149 if (!hard_set && file_size_mb == 8)
150 file_size_mb = 256;
151
152 m_options.target_file_size_base = megabytes(file_size_mb);
153 m_options.max_bytes_for_level_base =
154 5 * m_options.target_file_size_base;
155 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
156 }
157
159 keyValues, "file_size_mult", m_options.target_file_size_multiplier);
160
161 if (keyValues.exists("bg_threads"))
162 {
163 m_options.env->SetBackgroundThreads(
164 get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
165 }
166
167 if (keyValues.exists("high_threads"))
168 {
169 auto const highThreads = get<int>(keyValues, "high_threads");
170 m_options.env->SetBackgroundThreads(
171 highThreads, rocksdb::Env::HIGH);
172
173 // If we have high-priority threads, presumably we want to
174 // use them for background flushes
175 if (highThreads > 0)
176 m_options.max_background_flushes = highThreads;
177 }
178
179 m_options.compression = rocksdb::kSnappyCompression;
180
181 get_if_exists(keyValues, "block_size", table_options.block_size);
182
183 if (keyValues.exists("universal_compaction") &&
184 (get<int>(keyValues, "universal_compaction") != 0))
185 {
186 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
187 m_options.min_write_buffer_number_to_merge = 2;
188 m_options.max_write_buffer_number = 6;
189 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
190 }
191
192 if (keyValues.exists("bbt_options"))
193 {
194 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
195 table_options, get(keyValues, "bbt_options"), &table_options);
196 if (!s.ok())
197 Throw<std::runtime_error>(
198 std::string("Unable to set RocksDB bbt_options: ") +
199 s.ToString());
200 }
201
202 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
203
204 if (keyValues.exists("options"))
205 {
206 auto const s = rocksdb::GetOptionsFromString(
207 m_options, get(keyValues, "options"), &m_options);
208 if (!s.ok())
209 Throw<std::runtime_error>(
210 std::string("Unable to set RocksDB options: ") +
211 s.ToString());
212 }
213
214 std::string s1, s2;
215 rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
216 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
217 JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
218 JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
219 }
220
221 ~RocksDBBackend() override
222 {
223 close();
224 }
225
226 void
227 open(bool createIfMissing) override
228 {
229 if (m_db)
230 {
231 UNREACHABLE(
232 "ripple::NodeStore::RocksDBBackend::open : database is already "
233 "open");
234 JLOG(m_journal.error()) << "database is already open";
235 return;
236 }
237 rocksdb::DB* db = nullptr;
238 m_options.create_if_missing = createIfMissing;
239 rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
240 if (!status.ok() || !db)
241 Throw<std::runtime_error>(
242 std::string("Unable to open/create RocksDB: ") +
243 status.ToString());
244 m_db.reset(db);
245 }
246
247 bool
248 isOpen() override
249 {
250 return static_cast<bool>(m_db);
251 }
252
253 void
254 close() override
255 {
256 if (m_db)
257 {
258 m_db.reset();
259 if (m_deletePath)
260 {
261 boost::filesystem::path dir = m_name;
262 boost::filesystem::remove_all(dir);
263 }
264 }
265 }
266
268 getName() override
269 {
270 return m_name;
271 }
272
273 //--------------------------------------------------------------------------
274
275 Status
276 fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
277 {
278 XRPL_ASSERT(
279 m_db,
280 "ripple::NodeStore::RocksDBBackend::fetch : non-null database");
281 pObject->reset();
282
283 Status status(ok);
284
285 rocksdb::ReadOptions const options;
286 rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
287
288 std::string string;
289
290 rocksdb::Status getStatus = m_db->Get(options, slice, &string);
291
292 if (getStatus.ok())
293 {
294 DecodedBlob decoded(key, string.data(), string.size());
295
296 if (decoded.wasOk())
297 {
298 *pObject = decoded.createObject();
299 }
300 else
301 {
302 // Decoding failed, probably corrupted!
303 //
305 }
306 }
307 else
308 {
309 if (getStatus.IsCorruption())
310 {
312 }
313 else if (getStatus.IsNotFound())
314 {
316 }
317 else
318 {
319 status =
320 Status(customCode + unsafe_cast<int>(getStatus.code()));
321
322 JLOG(m_journal.error()) << getStatus.ToString();
323 }
324 }
325
326 return status;
327 }
328
330 fetchBatch(std::vector<uint256 const*> const& hashes) override
331 {
333 results.reserve(hashes.size());
334 for (auto const& h : hashes)
335 {
337 Status status = fetch(h->begin(), &nObj);
338 if (status != ok)
339 results.push_back({});
340 else
341 results.push_back(nObj);
342 }
343
344 return {results, ok};
345 }
346
347 void
348 store(std::shared_ptr<NodeObject> const& object) override
349 {
350 m_batch.store(object);
351 }
352
353 void
354 storeBatch(Batch const& batch) override
355 {
356 XRPL_ASSERT(
357 m_db,
358 "ripple::NodeStore::RocksDBBackend::storeBatch : non-null "
359 "database");
360 rocksdb::WriteBatch wb;
361
362 for (auto const& e : batch)
363 {
364 EncodedBlob encoded(e);
365
366 wb.Put(
367 rocksdb::Slice(
368 reinterpret_cast<char const*>(encoded.getKey()),
369 m_keyBytes),
370 rocksdb::Slice(
371 reinterpret_cast<char const*>(encoded.getData()),
372 encoded.getSize()));
373 }
374
375 rocksdb::WriteOptions const options;
376
377 auto ret = m_db->Write(options, &wb);
378
379 if (!ret.ok())
380 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
381 }
382
383 void
384 sync() override
385 {
386 }
387
388 void
390 {
391 XRPL_ASSERT(
392 m_db,
393 "ripple::NodeStore::RocksDBBackend::for_each : non-null database");
394 rocksdb::ReadOptions const options;
395
396 std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
397
398 for (it->SeekToFirst(); it->Valid(); it->Next())
399 {
400 if (it->key().size() == m_keyBytes)
401 {
402 DecodedBlob decoded(
403 it->key().data(), it->value().data(), it->value().size());
404
405 if (decoded.wasOk())
406 {
407 f(decoded.createObject());
408 }
409 else
410 {
411 // Uh oh, corrupted data!
412 JLOG(m_journal.fatal())
413 << "Corrupt NodeObject #" << it->key().ToString(true);
414 }
415 }
416 else
417 {
418 // VFALCO NOTE What does it mean to find an
419 // incorrectly sized key? Corruption?
420 JLOG(m_journal.fatal())
421 << "Bad key size = " << it->key().size();
422 }
423 }
424 }
425
426 int
427 getWriteLoad() override
428 {
429 return m_batch.getWriteLoad();
430 }
431
432 void
433 setDeletePath() override
434 {
435 m_deletePath = true;
436 }
437
438 //--------------------------------------------------------------------------
439
440 void
441 writeBatch(Batch const& batch) override
442 {
443 storeBatch(batch);
444 }
445
447 int
448 fdRequired() const override
449 {
450 return fdRequired_;
451 }
452};
453
454//------------------------------------------------------------------------------
455
456class RocksDBFactory : public Factory
457{
458public:
459 RocksDBEnv m_env;
460
461 RocksDBFactory()
462 {
463 Manager::instance().insert(*this);
464 }
465
466 ~RocksDBFactory() override
467 {
468 Manager::instance().erase(*this);
469 }
470
472 getName() const override
473 {
474 return "RocksDB";
475 }
476
478 createInstance(
479 size_t keyBytes,
480 Section const& keyValues,
482 Scheduler& scheduler,
483 beast::Journal journal) override
484 {
485 return std::make_unique<RocksDBBackend>(
486 keyBytes, keyValues, scheduler, journal, &m_env);
487 }
488};
489
490static RocksDBFactory rocksDBFactory;
491
492} // namespace NodeStore
493} // namespace ripple
494
495#endif
A generic endpoint for log messages.
Definition: Journal.h:60
Stream fatal() const
Definition: Journal.h:352
Stream error() const
Definition: Journal.h:346
Stream debug() const
Definition: Journal.h:328
T data(T... args)
T for_each(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
@ ok
No action required.
Definition: Disposition.h:29
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:34
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:386
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:101
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
Definition: BasicConfig.h:355
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)