rippled
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1#include <xrpl/basics/rocksdb.h>
2
3#if XRPL_ROCKSDB_AVAILABLE
4#include <xrpl/basics/ByteUtilities.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/basics/safe_cast.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/nodestore/Factory.h>
9#include <xrpl/nodestore/Manager.h>
10#include <xrpl/nodestore/detail/BatchWriter.h>
11#include <xrpl/nodestore/detail/DecodedBlob.h>
12#include <xrpl/nodestore/detail/EncodedBlob.h>
13
14#include <atomic>
15#include <memory>
16
17namespace xrpl {
18namespace NodeStore {
19
20class RocksDBEnv : public rocksdb::EnvWrapper
21{
22public:
23 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
24 {
25 }
26
27 struct ThreadParams
28 {
29 ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
30 {
31 }
32
33 void (*f)(void*);
34 void* a;
35 };
36
37 static void
38 thread_entry(void* ptr)
39 {
40 ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
41 void (*f)(void*) = p->f;
42 void* a(p->a);
43 delete p;
44
46 std::size_t const id(++n);
48 ss << "rocksdb #" << id;
50
51 (*f)(a);
52 }
53
54 void
55 StartThread(void (*f)(void*), void* a) override
56 {
57 ThreadParams* const p(new ThreadParams(f, a));
58 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
59 }
60};
61
62//------------------------------------------------------------------------------
63
64class RocksDBBackend : public Backend, public BatchWriter::Callback
65{
66private:
67 std::atomic<bool> m_deletePath;
68
69public:
70 beast::Journal m_journal;
71 size_t const m_keyBytes;
72 BatchWriter m_batch;
73 std::string m_name;
75 int fdRequired_ = 2048;
76 rocksdb::Options m_options;
77
78 RocksDBBackend(
79 int keyBytes,
80 Section const& keyValues,
81 Scheduler& scheduler,
82 beast::Journal journal,
83 RocksDBEnv* env)
84 : m_deletePath(false), m_journal(journal), m_keyBytes(keyBytes), m_batch(*this, scheduler)
85 {
86 if (!get_if_exists(keyValues, "path", m_name))
87 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
88
89 rocksdb::BlockBasedTableOptions table_options;
90 m_options.env = env;
91
92 bool hard_set = keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
93
94 if (keyValues.exists("cache_mb"))
95 {
96 auto size = get<int>(keyValues, "cache_mb");
97
98 if (!hard_set && size == 256)
99 size = 1024;
100
101 table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
102 }
103
104 if (auto const v = get<int>(keyValues, "filter_bits"))
105 {
106 bool const filter_blocks = !keyValues.exists("filter_full") || (get<int>(keyValues, "filter_full") == 0);
107 table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(v, filter_blocks));
108 }
109
110 if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
111 {
112 if (!hard_set && m_options.max_open_files == 2000)
113 m_options.max_open_files = 8000;
114
115 fdRequired_ = m_options.max_open_files + 128;
116 }
117
118 if (keyValues.exists("file_size_mb"))
119 {
120 auto file_size_mb = get<int>(keyValues, "file_size_mb");
121
122 if (!hard_set && file_size_mb == 8)
123 file_size_mb = 256;
124
125 m_options.target_file_size_base = megabytes(file_size_mb);
126 m_options.max_bytes_for_level_base = 5 * m_options.target_file_size_base;
127 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
128 }
129
130 get_if_exists(keyValues, "file_size_mult", m_options.target_file_size_multiplier);
131
132 if (keyValues.exists("bg_threads"))
133 {
134 m_options.env->SetBackgroundThreads(get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
135 }
136
137 if (keyValues.exists("high_threads"))
138 {
139 auto const highThreads = get<int>(keyValues, "high_threads");
140 m_options.env->SetBackgroundThreads(highThreads, rocksdb::Env::HIGH);
141
142 // If we have high-priority threads, presumably we want to
143 // use them for background flushes
144 if (highThreads > 0)
145 m_options.max_background_flushes = highThreads;
146 }
147
148 m_options.compression = rocksdb::kSnappyCompression;
149
150 get_if_exists(keyValues, "block_size", table_options.block_size);
151
152 if (keyValues.exists("universal_compaction") && (get<int>(keyValues, "universal_compaction") != 0))
153 {
154 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
155 m_options.min_write_buffer_number_to_merge = 2;
156 m_options.max_write_buffer_number = 6;
157 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
158 }
159
160 if (keyValues.exists("bbt_options"))
161 {
162 rocksdb::ConfigOptions config_options;
163 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
164 config_options, table_options, get(keyValues, "bbt_options"), &table_options);
165 if (!s.ok())
166 Throw<std::runtime_error>(std::string("Unable to set RocksDB bbt_options: ") + s.ToString());
167 }
168
169 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
170
171 if (keyValues.exists("options"))
172 {
173 auto const s = rocksdb::GetOptionsFromString(m_options, get(keyValues, "options"), &m_options);
174 if (!s.ok())
175 Throw<std::runtime_error>(std::string("Unable to set RocksDB options: ") + s.ToString());
176 }
177
178 std::string s1, s2;
179 rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
180 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
181 JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
182 JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
183 }
184
185 ~RocksDBBackend() override
186 {
187 close();
188 }
189
190 void
191 open(bool createIfMissing) override
192 {
193 if (m_db)
194 {
195 // LCOV_EXCL_START
196 UNREACHABLE(
197 "xrpl::NodeStore::RocksDBBackend::open : database is already "
198 "open");
199 JLOG(m_journal.error()) << "database is already open";
200 return;
201 // LCOV_EXCL_STOP
202 }
203 rocksdb::DB* db = nullptr;
204 m_options.create_if_missing = createIfMissing;
205 rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
206 if (!status.ok() || !db)
207 Throw<std::runtime_error>(std::string("Unable to open/create RocksDB: ") + status.ToString());
208 m_db.reset(db);
209 }
210
211 bool
212 isOpen() override
213 {
214 return static_cast<bool>(m_db);
215 }
216
217 void
218 close() override
219 {
220 if (m_db)
221 {
222 m_db.reset();
223 if (m_deletePath)
224 {
225 boost::filesystem::path dir = m_name;
226 boost::filesystem::remove_all(dir);
227 }
228 }
229 }
230
232 getName() override
233 {
234 return m_name;
235 }
236
237 //--------------------------------------------------------------------------
238
239 Status
240 fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
241 {
242 XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
243 pObject->reset();
244
245 Status status(ok);
246
247 rocksdb::ReadOptions const options;
248 rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
249
250 std::string string;
251
252 rocksdb::Status getStatus = m_db->Get(options, slice, &string);
253
254 if (getStatus.ok())
255 {
256 DecodedBlob decoded(key, string.data(), string.size());
257
258 if (decoded.wasOk())
259 {
260 *pObject = decoded.createObject();
261 }
262 else
263 {
264 // Decoding failed, probably corrupted!
265 //
267 }
268 }
269 else
270 {
271 if (getStatus.IsCorruption())
272 {
274 }
275 else if (getStatus.IsNotFound())
276 {
278 }
279 else
280 {
281 status = Status(customCode + unsafe_cast<int>(getStatus.code()));
282
283 JLOG(m_journal.error()) << getStatus.ToString();
284 }
285 }
286
287 return status;
288 }
289
291 fetchBatch(std::vector<uint256 const*> const& hashes) override
292 {
294 results.reserve(hashes.size());
295 for (auto const& h : hashes)
296 {
298 Status status = fetch(h->begin(), &nObj);
299 if (status != ok)
300 results.push_back({});
301 else
302 results.push_back(nObj);
303 }
304
305 return {results, ok};
306 }
307
308 void
309 store(std::shared_ptr<NodeObject> const& object) override
310 {
311 m_batch.store(object);
312 }
313
314 void
315 storeBatch(Batch const& batch) override
316 {
317 XRPL_ASSERT(
318 m_db,
319 "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
320 "database");
321 rocksdb::WriteBatch wb;
322
323 for (auto const& e : batch)
324 {
325 EncodedBlob encoded(e);
326
327 wb.Put(
328 rocksdb::Slice(reinterpret_cast<char const*>(encoded.getKey()), m_keyBytes),
329 rocksdb::Slice(reinterpret_cast<char const*>(encoded.getData()), encoded.getSize()));
330 }
331
332 rocksdb::WriteOptions const options;
333
334 auto ret = m_db->Write(options, &wb);
335
336 if (!ret.ok())
337 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
338 }
339
340 void
341 sync() override
342 {
343 }
344
345 void
347 {
348 XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::for_each : non-null database");
349 rocksdb::ReadOptions const options;
350
351 std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
352
353 for (it->SeekToFirst(); it->Valid(); it->Next())
354 {
355 if (it->key().size() == m_keyBytes)
356 {
357 DecodedBlob decoded(it->key().data(), it->value().data(), it->value().size());
358
359 if (decoded.wasOk())
360 {
361 f(decoded.createObject());
362 }
363 else
364 {
365 // Uh oh, corrupted data!
366 JLOG(m_journal.fatal()) << "Corrupt NodeObject #" << it->key().ToString(true);
367 }
368 }
369 else
370 {
371 // VFALCO NOTE What does it mean to find an
372 // incorrectly sized key? Corruption?
373 JLOG(m_journal.fatal()) << "Bad key size = " << it->key().size();
374 }
375 }
376 }
377
378 int
379 getWriteLoad() override
380 {
381 return m_batch.getWriteLoad();
382 }
383
384 void
385 setDeletePath() override
386 {
387 m_deletePath = true;
388 }
389
390 //--------------------------------------------------------------------------
391
392 void
393 writeBatch(Batch const& batch) override
394 {
395 storeBatch(batch);
396 }
397
399 int
400 fdRequired() const override
401 {
402 return fdRequired_;
403 }
404};
405
406//------------------------------------------------------------------------------
407
408class RocksDBFactory : public Factory
409{
410private:
411 Manager& manager_;
412
413public:
414 RocksDBEnv m_env;
415
416 RocksDBFactory(Manager& manager) : manager_(manager)
417 {
418 manager_.insert(*this);
419 }
420
422 getName() const override
423 {
424 return "RocksDB";
425 }
426
428 createInstance(size_t keyBytes, Section const& keyValues, std::size_t, Scheduler& scheduler, beast::Journal journal)
429 override
430 {
431 return std::make_unique<RocksDBBackend>(keyBytes, keyValues, scheduler, journal, &m_env);
432 }
433};
434
435void
436registerRocksDBFactory(Manager& manager)
437{
438 static RocksDBFactory instance{manager};
439}
440
441} // namespace NodeStore
442} // namespace xrpl
443
444#endif
A generic endpoint for log messages.
Definition Journal.h:40
Stream fatal() const
Definition Journal.h:324
Stream error() const
Definition Journal.h:318
Stream debug() const
Definition Journal.h:300
T for_each(T... args)
T is_same_v
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
void registerRocksDBFactory(Manager &manager)
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:75
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)