diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index e5ee6409d..3aba2991b 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -219,7 +219,7 @@ private: run() { beast::setCurrentThreadName("LedgerCleaner"); - JLOG(j_.debug()) << "Started"; + JLOG(j_.warn()) << "Started ledger cleaner"; while (true) { @@ -392,7 +392,7 @@ private: if (app_.getFeeTrack().isLoadedLocal()) { - JLOG(j_.debug()) << "Waiting for load to subside"; + JLOG(j_.warn()) << "Ledger Cleaner: Waiting for load to subside"; std::this_thread::sleep_for(std::chrono::seconds(5)); continue; } @@ -415,13 +415,13 @@ private: bool fail = false; if (ledgerHash.isZero()) { - JLOG(j_.info()) - << "Unable to get hash for ledger " << ledgerIndex; + JLOG(j_.warn()) + << "Ledger Cleaner: Unable to get hash for ledger " << ledgerIndex; fail = true; } else if (!doLedger(ledgerIndex, ledgerHash, doNodes, doTxns)) { - JLOG(j_.info()) << "Failed to process ledger " << ledgerIndex; + JLOG(j_.warn()) << "Ledger Cleaner: Failed to process ledger " << ledgerIndex; fail = true; } diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h index d8cb7d318..a6e58b19e 100644 --- a/src/ripple/app/main/Application.h +++ b/src/ripple/app/main/Application.h @@ -19,7 +19,6 @@ #ifndef RIPPLE_APP_MAIN_APPLICATION_H_INCLUDED #define RIPPLE_APP_MAIN_APPLICATION_H_INCLUDED - #include #include #include diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index aa46cdf2d..d8fccbb43 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -118,7 +118,9 @@ SHAMapStoreImp::SHAMapStoreImp( get_if_exists(section, "online_delete", deleteInterval_); - if (deleteInterval_) + bool const isMem = config.mem_backend(); + + if (deleteInterval_ || isMem) { if (app_.config().reporting()) { @@ -127,15 +129,8 @@ SHAMapStoreImp::SHAMapStoreImp( "online_delete info from config"); } - if ((!app_.config().section(SECTION_RELATIONAL_DB).empty() && - boost::iequals(get(app.config().section(SECTION_RELATIONAL_DB), "backend"), "memory")) || - (!app_.config().section("node_db").empty() && - boost::iequals(get(app.config().section("node_db"), "type"), "memory"))) - { - Throw( - "Memory does not support online_delete. Remove " - "online_delete info from config. Use [ledger_history] to set a history limit."); - } + if (isMem) + deleteInterval_ = config.LEDGER_HISTORY; // Configuration that affects the behavior of online delete get_if_exists(section, "delete_batch", deleteBatch_); @@ -172,7 +167,8 @@ SHAMapStoreImp::SHAMapStoreImp( } state_db_.init(config, dbName_); - dbPaths(); + if (!isMem) + dbPaths(); } } @@ -205,6 +201,7 @@ SHAMapStoreImp::makeNodeStore(int readThreads) "online_delete info from config"); } SavedState state = state_db_.getState(); + auto writableBackend = makeBackendRotating(state.writableDb); auto archiveBackend = makeBackendRotating(state.archiveDb); if (!state.writableDb.size()) @@ -303,9 +300,14 @@ SHAMapStoreImp::run() fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache(0)); treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache(0)); + bool const isMem = app_.config().mem_backend(); + + std::cout << "SHAMapStoreImp: isMem = " << (isMem ? "true" : "false") << "\n"; + + std::cout << "SHAMapStoreImp: lastRotated = " << lastRotated << "\n"; if (advisoryDelete_) canDelete_ = state_db_.getCanDelete(); - + while (true) { healthy_ = true; @@ -353,7 +355,7 @@ SHAMapStoreImp::run() if (waitForImport) { - JLOG(journal_.info()) + JLOG(journal_.warn()) << "NOT rotating validatedSeq " << validatedSeq << " as rotation would interfere with ShardStore import"; } @@ -372,12 +374,12 @@ SHAMapStoreImp::run() if (healthWait() == stopping) return; - JLOG(journal_.debug()) << "copying ledger " << validatedSeq; + JLOG(journal_.warn()) << "copying ledger " << validatedSeq; std::uint64_t nodeCount = 0; try { - validatedLedger->stateMap().snapShot(false)->visitNodes( + validatedLedger->stateMap().snapShot(isMem)->visitNodes( std::bind( &SHAMapStoreImp::copyNode, this, @@ -395,19 +397,19 @@ SHAMapStoreImp::run() if (healthWait() == stopping) return; // Only log if we completed without a "health" abort - JLOG(journal_.debug()) << "copied ledger " << validatedSeq + JLOG(journal_.warn()) << "copied ledger " << validatedSeq << " nodecount " << nodeCount; - JLOG(journal_.debug()) << "freshening caches"; + JLOG(journal_.warn()) << "freshening caches"; freshenCaches(); if (healthWait() == stopping) return; // Only log if we completed without a "health" abort - JLOG(journal_.debug()) << validatedSeq << " freshened caches"; + JLOG(journal_.warn()) << validatedSeq << " freshened caches"; - JLOG(journal_.trace()) << "Making a new backend"; + JLOG(journal_.warn()) << "Making a new backend"; auto newBackend = makeBackendRotating(); - JLOG(journal_.debug()) + JLOG(journal_.warn()) << validatedSeq << " new backend " << newBackend->getName(); clearCaches(validatedSeq); diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index 995ee0267..1ddcd3877 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -40,6 +40,7 @@ class NetworkOPs; class SHAMapStoreImp : public SHAMapStore { private: + class SavedStateDB { public: diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index d3d032b01..2c69af860 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -39,6 +39,7 @@ #include #include #include +#include namespace ripple { @@ -350,6 +351,17 @@ public: { return RUN_REPORTING; } + bool + mem_backend() const + { + static bool const isMem = + (!section(SECTION_RELATIONAL_DB).empty() && + boost::beast::iequals(get(section(SECTION_RELATIONAL_DB), "backend"), "memory")) || + (!section("node_db").empty() && + boost::beast::iequals(get(section("node_db"), "type"), "memory")); + + return isMem; + } bool useTxTables() const diff --git a/src/ripple/nodestore/backend/MemoryFactory.cpp b/src/ripple/nodestore/backend/MemoryFactory.cpp index a2738019b..ef056b176 100644 --- a/src/ripple/nodestore/backend/MemoryFactory.cpp +++ b/src/ripple/nodestore/backend/MemoryFactory.cpp @@ -8,6 +8,11 @@ #include #include +// Define the map implementation selector +#ifndef MEMORY_DB_USE_CONCURRENT_MAP +#define MEMORY_DB_USE_CONCURRENT_MAP 0 // Set to 0 to use mutex-guarded std::map +#endif + namespace ripple { struct base_uint_hasher @@ -23,17 +28,74 @@ struct base_uint_hasher namespace NodeStore { +#if MEMORY_DB_USE_CONCURRENT_MAP + using DataStore = boost::unordered::concurrent_flat_map< + uint256, + std::shared_ptr, + base_uint_hasher>; +#else + using DataStore = std::map>; +#endif + struct MemoryDB { explicit MemoryDB() = default; - std::mutex mutex; +#if !MEMORY_DB_USE_CONCURRENT_MAP + mutable std::mutex mutex; // Only needed for std::map implementation +#endif bool open = false; - boost::unordered::concurrent_flat_map< - uint256, - std::shared_ptr, - base_uint_hasher> - table; + DataStore table; + + // Helper functions to abstract the different map implementations + Status fetch(uint256 const& hash, std::shared_ptr* pObject) const + { +#if MEMORY_DB_USE_CONCURRENT_MAP + bool found = table.visit(hash, [&](const auto& key_value_pair) { + *pObject = key_value_pair.second; + }); + return found ? ok : notFound; +#else + std::lock_guard lock(mutex); + auto it = table.find(hash); + if (it == table.end()) + return notFound; + *pObject = it->second; + return ok; +#endif + } + + void store(uint256 const& hash, std::shared_ptr const& object) + { +#if MEMORY_DB_USE_CONCURRENT_MAP + table.insert_or_assign(hash, object); +#else + std::lock_guard lock(mutex); + table[hash] = object; +#endif + } + + template + void for_each(Func&& f) const + { +#if MEMORY_DB_USE_CONCURRENT_MAP + table.visit_all([&f](const auto& entry) { f(entry.second); }); +#else + std::lock_guard lock(mutex); + for (const auto& entry : table) + f(entry.second); +#endif + } + + size_t size() const + { +#if MEMORY_DB_USE_CONCURRENT_MAP + return table.size(); +#else + std::lock_guard lock(mutex); + return table.size(); +#endif + } }; class MemoryFactory : public Factory @@ -77,11 +139,6 @@ static MemoryFactory memoryFactory; class MemoryBackend : public Backend { private: - using Map = boost::unordered::concurrent_flat_map< - uint256, - std::shared_ptr, - base_uint_hasher>; - std::string name_; beast::Journal const journal_; MemoryDB* db_{nullptr}; @@ -93,7 +150,7 @@ public: beast::Journal journal) : name_(get(keyValues, "path")), journal_(journal) { - boost::ignore_unused(journal_); // Keep unused journal_ just in case. + boost::ignore_unused(journal_); if (name_.empty()) name_ = "node_db"; } @@ -134,12 +191,7 @@ public: { assert(db_); uint256 const hash(uint256::fromVoid(key)); - - bool found = db_->table.visit(hash, [&](const auto& key_value_pair) { - *pObject = key_value_pair.second; - }); - - return found ? ok : notFound; + return db_->fetch(hash, pObject); } std::pair>, Status> @@ -156,7 +208,6 @@ public: else results.push_back(nObj); } - return {results, ok}; } @@ -164,7 +215,14 @@ public: store(std::shared_ptr const& object) override { assert(db_); - db_->table.insert_or_assign(object->getHash(), object); + if (!object) + std::cout << "mapping null object\n"; + + db_->store(object->getHash(), object); + + static int counter = 0; + if (counter++ % 1000 == 0) + std::cout << "map size: " << db_->size() << "\n"; } void @@ -183,7 +241,7 @@ public: for_each(std::function)> f) override { assert(db_); - db_->table.visit_all([&f](const auto& entry) { f(entry.second); }); + db_->for_each(f); } int @@ -204,8 +262,7 @@ public: } }; -//------------------------------------------------------------------------------ - +// Factory implementation remains the same MemoryFactory::MemoryFactory() { Manager::instance().insert(*this); diff --git a/src/ripple/shamap/impl/SHAMap.cpp b/src/ripple/shamap/impl/SHAMap.cpp index ce031003c..5bcda2063 100644 --- a/src/ripple/shamap/impl/SHAMap.cpp +++ b/src/ripple/shamap/impl/SHAMap.cpp @@ -244,6 +244,7 @@ SHAMap::checkFilter(SHAMapHash const& hash, SHAMapSyncFilter* filter) const // Get a node without throwing // Used on maps where missing nodes are expected +/* std::shared_ptr SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const { @@ -266,6 +267,49 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const return node; } +*/ + +std::shared_ptr +SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const +{ + using namespace std::chrono; + auto start = high_resolution_clock::now(); + auto timeout = nanoseconds(600); + + while (true) + { + // Try to fetch from cache first + auto node = cacheLookup(hash); + if (node) + return node; + + if (backed_) + { + node = fetchNodeFromDB(hash); + if (node) + { + canonicalize(hash, node); + return node; + } + } + + if (filter) + node = checkFilter(hash, filter); + + if (node) + return node; + + // Check if we've exceeded timeout + auto elapsed = high_resolution_clock::now() - start; + if (elapsed >= timeout) + break; + + // Short yield to avoid overwhelming CPU + std::this_thread::yield(); + } + + return nullptr; +} std::shared_ptr SHAMap::fetchNodeNT(SHAMapHash const& hash) const