diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 8c1a47c6f5..e87fa586ea 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -860,11 +860,14 @@ public: // transaction database mTxnDB = std::make_unique( - setup, TxDBName, TxDBPragma, TxDBInit); + setup, + TxDBName, + TxDBPragma, + TxDBInit, + DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()}); mTxnDB->getSession() << boost::str( boost::format("PRAGMA cache_size=-%d;") % kilobytes(config_->getValueFor(SizedItem::txnDBCache))); - mTxnDB->setupCheckpointing(m_jobQueue.get(), logs()); if (!setup.standAlone || setup.startUp == Config::LOAD || setup.startUp == Config::LOAD_FILE || @@ -899,11 +902,14 @@ public: // ledger database mLedgerDB = std::make_unique( - setup, LgrDBName, LgrDBPragma, LgrDBInit); + setup, + LgrDBName, + LgrDBPragma, + LgrDBInit, + DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()}); mLedgerDB->getSession() << boost::str( boost::format("PRAGMA cache_size=-%d;") % kilobytes(config_->getValueFor(SizedItem::lgrDBCache))); - mLedgerDB->setupCheckpointing(m_jobQueue.get(), logs()); // wallet database setup.useGlobalPragma = false; diff --git a/src/ripple/core/DatabaseCon.h b/src/ripple/core/DatabaseCon.h index 5cdabb08f0..8c6f8bb0b4 100644 --- a/src/ripple/core/DatabaseCon.h +++ b/src/ripple/core/DatabaseCon.h @@ -34,52 +34,50 @@ class session; namespace ripple { -template -class LockedPointer +class LockedSociSession { public: - using mutex = TMutex; + using mutex = std::recursive_mutex; private: - T* it_; + std::shared_ptr session_; std::unique_lock lock_; public: - LockedPointer(T* it, mutex& m) : it_(it), lock_(m) + LockedSociSession(std::shared_ptr it, mutex& m) + : session_(std::move(it)), lock_(m) { } - LockedPointer(LockedPointer&& rhs) noexcept - : it_(rhs.it_), lock_(std::move(rhs.lock_)) + LockedSociSession(LockedSociSession&& rhs) noexcept + : session_(std::move(rhs.session_)), lock_(std::move(rhs.lock_)) { } - LockedPointer() = delete; - LockedPointer(LockedPointer const& rhs) = delete; - LockedPointer& - operator=(LockedPointer const& rhs) = delete; + LockedSociSession() = delete; + LockedSociSession(LockedSociSession const& rhs) = delete; + LockedSociSession& + operator=(LockedSociSession const& rhs) = delete; - T* + soci::session* get() { - return it_; + return session_.get(); } - T& + soci::session& operator*() { - return *it_; + return *session_; } - T* + soci::session* operator->() { - return it_; + return session_.get(); } explicit operator bool() const { - return bool(it_); + return bool(session_); } }; -using LockedSociSession = LockedPointer; - class DatabaseCon { public: @@ -105,10 +103,16 @@ public: static std::unique_ptr const> globalPragma; }; + struct CheckpointerSetup + { + JobQueue* jobQueue; + Logs* logs; + }; + template DatabaseCon( Setup const& setup, - std::string const& DBName, + std::string const& dbName, std::array const& pragma, std::array const& initSQL) // Use temporary files or regular DB files? @@ -117,74 +121,114 @@ public: setup.startUp != Config::LOAD_FILE && setup.startUp != Config::REPLAY ? "" - : (setup.dataDir / DBName), + : (setup.dataDir / dbName), setup.commonPragma(), pragma, initSQL) { } + // Use this constructor to setup checkpointing + template + DatabaseCon( + Setup const& setup, + std::string const& dbName, + std::array const& pragma, + std::array const& initSQL, + CheckpointerSetup const& checkpointerSetup) + : DatabaseCon(setup, dbName, pragma, initSQL) + { + setupCheckpointing(checkpointerSetup.jobQueue, *checkpointerSetup.logs); + } + template DatabaseCon( boost::filesystem::path const& dataDir, - std::string const& DBName, + std::string const& dbName, std::array const& pragma, std::array const& initSQL) - : DatabaseCon(dataDir / DBName, nullptr, pragma, initSQL) + : DatabaseCon(dataDir / dbName, nullptr, pragma, initSQL) { } + // Use this constructor to setup checkpointing + template + DatabaseCon( + boost::filesystem::path const& dataDir, + std::string const& dbName, + std::array const& pragma, + std::array const& initSQL, + CheckpointerSetup const& checkpointerSetup) + : DatabaseCon(dataDir, dbName, pragma, initSQL) + { + setupCheckpointing(checkpointerSetup.jobQueue, *checkpointerSetup.logs); + } + + ~DatabaseCon(); + soci::session& getSession() { - return session_; + return *session_; } LockedSociSession checkoutDb() { - return LockedSociSession(&session_, lock_); + return LockedSociSession(session_, lock_); } +private: void setupCheckpointing(JobQueue*, Logs&); -private: template DatabaseCon( boost::filesystem::path const& pPath, std::vector const* commonPragma, std::array const& pragma, std::array const& initSQL) + : session_(std::make_shared()) { - open(session_, "sqlite", pPath.string()); + open(*session_, "sqlite", pPath.string()); if (commonPragma) { for (auto const& p : *commonPragma) { - soci::statement st = session_.prepare << p; + soci::statement st = session_->prepare << p; st.execute(true); } } for (auto const& p : pragma) { - soci::statement st = session_.prepare << p; + soci::statement st = session_->prepare << p; st.execute(true); } for (auto const& sql : initSQL) { - soci::statement st = session_.prepare << sql; + soci::statement st = session_->prepare << sql; st.execute(true); } } LockedSociSession::mutex lock_; - soci::session session_; - std::unique_ptr checkpointer_; + // checkpointer may outlive the DatabaseCon when the checkpointer jobQueue + // callback locks a weak pointer and the DatabaseCon is then destroyed. In + // this case, the checkpointer needs to make sure it doesn't use an already + // destroyed session. Thus this class keeps a shared_ptr to the session (so + // the checkpointer can keep a weak_ptr) and the checkpointer is a + // shared_ptr in this class. session_ will never be null. + std::shared_ptr const session_; + std::shared_ptr checkpointer_; }; +// Return the checkpointer from its id. If the checkpointer no longer exists, an +// nullptr is returned +std::shared_ptr +checkpointerFromId(std::uintptr_t id); + DatabaseCon::Setup setup_DatabaseCon( Config const& c, diff --git a/src/ripple/core/SociDB.h b/src/ripple/core/SociDB.h index 20c545da4e..0c66deed51 100644 --- a/src/ripple/core/SociDB.h +++ b/src/ripple/core/SociDB.h @@ -131,20 +131,32 @@ convert(std::vector const& from, soci::blob& to); void convert(std::string const& from, soci::blob& to); -class Checkpointer +class Checkpointer : public std::enable_shared_from_this { public: + virtual std::uintptr_t + id() const = 0; virtual ~Checkpointer() = default; + + virtual void + schedule() = 0; + + virtual void + checkpoint() = 0; }; /** Returns a new checkpointer which makes checkpoints of a soci database every checkpointPageCount pages, using a job on the job queue. - The Checkpointer contains references to the session and job queue + The checkpointer contains references to the session and job queue and so must outlive them both. */ -std::unique_ptr -makeCheckpointer(soci::session&, JobQueue&, Logs&); +std::shared_ptr +makeCheckpointer( + std::uintptr_t id, + std::weak_ptr, + JobQueue&, + Logs&); } // namespace ripple diff --git a/src/ripple/core/impl/DatabaseCon.cpp b/src/ripple/core/impl/DatabaseCon.cpp index 89c4ee1f29..064f44b50a 100644 --- a/src/ripple/core/impl/DatabaseCon.cpp +++ b/src/ripple/core/impl/DatabaseCon.cpp @@ -21,12 +21,74 @@ #include #include #include + #include #include + #include +#include namespace ripple { +class CheckpointersCollection +{ + std::uintptr_t nextId_{0}; + // Mutex protects the CheckpointersCollection + std::mutex mutex_; + // Each checkpointer is given a unique id. All the checkpointers that are + // part of a DatabaseCon are part of this collection. When the DatabaseCon + // is destroyed, its checkpointer is removed from the collection + std::unordered_map> + checkpointers_; + +public: + std::shared_ptr + fromId(std::uintptr_t id) + { + std::lock_guard l{mutex_}; + auto it = checkpointers_.find(id); + if (it != checkpointers_.end()) + return it->second; + return {}; + } + + void + erase(std::uintptr_t id) + { + std::lock_guard lock{mutex_}; + checkpointers_.erase(id); + } + + std::shared_ptr + create( + std::shared_ptr const& session, + JobQueue& jobQueue, + Logs& logs) + { + std::lock_guard lock{mutex_}; + auto const id = nextId_++; + auto const r = makeCheckpointer(id, session, jobQueue, logs); + checkpointers_[id] = r; + return r; + } +}; + +CheckpointersCollection checkpointers; + +std::shared_ptr +checkpointerFromId(std::uintptr_t id) +{ + return checkpointers.fromId(id); +} + +DatabaseCon::~DatabaseCon() +{ + if (checkpointer_) + { + checkpointers.erase(checkpointer_->id()); + } +} + DatabaseCon::Setup setup_DatabaseCon(Config const& c, boost::optional j) { @@ -173,7 +235,7 @@ DatabaseCon::setupCheckpointing(JobQueue* q, Logs& l) { if (!q) Throw("No JobQueue"); - checkpointer_ = makeCheckpointer(session_, *q, l); + checkpointer_ = checkpointers.create(session_, *q, l); } } // namespace ripple diff --git a/src/ripple/core/impl/SociDB.cpp b/src/ripple/core/impl/SociDB.cpp index c21b185e7d..5c81d9f9e0 100644 --- a/src/ripple/core/impl/SociDB.cpp +++ b/src/ripple/core/impl/SociDB.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -196,44 +197,48 @@ namespace { is the default behavior of sqlite. We may be able to remove this class. */ + class WALCheckpointer : public Checkpointer { public: - WALCheckpointer(sqlite_api::sqlite3& conn, JobQueue& q, Logs& logs) - : conn_(conn), jobQueue_(q), j_(logs.journal("WALCheckpointer")) + WALCheckpointer( + std::uintptr_t id, + std::weak_ptr session, + JobQueue& q, + Logs& logs) + : id_(id) + , session_(std::move(session)) + , jobQueue_(q) + , j_(logs.journal("WALCheckpointer")) { - sqlite_api::sqlite3_wal_hook(&conn_, &sqliteWALHook, this); + if (auto [conn, keepAlive] = getConnection(); conn) + { + (void)keepAlive; + sqlite_api::sqlite3_wal_hook( + conn, &sqliteWALHook, reinterpret_cast(id_)); + } + } + + std::pair> + getConnection() const + { + if (auto p = session_.lock()) + { + return {ripple::getConnection(*p), p}; + } + return {nullptr, std::shared_ptr{}}; + } + + std::uintptr_t + id() const override + { + return id_; } ~WALCheckpointer() override = default; -private: - sqlite_api::sqlite3& conn_; - std::mutex mutex_; - JobQueue& jobQueue_; - - bool running_ = false; - beast::Journal const j_; - - static int - sqliteWALHook( - void* cp, - sqlite_api::sqlite3*, - const char* dbName, - int walSize) - { - if (walSize >= checkpointPageCount) - { - if (auto checkpointer = reinterpret_cast(cp)) - checkpointer->scheduleCheckpoint(); - else - Throw("Didn't get a WALCheckpointer"); - } - return SQLITE_OK; - } - void - scheduleCheckpoint() + schedule() override { { std::lock_guard lock(mutex_); @@ -243,7 +248,18 @@ private: } // If the Job is not added to the JobQueue then we're not running_. - if (!jobQueue_.addJob(jtWAL, "WAL", [this](Job&) { checkpoint(); })) + if (!jobQueue_.addJob( + jtWAL, + "WAL", + // If the owning DatabaseCon is destroyed, no need to checkpoint + // or keep the checkpointer alive so use a weak_ptr to this. + // There is a separate check in `checkpoint` for a valid + // connection in the rare case when the DatabaseCon is destroyed + // after locking this weak_ptr + [wp = std::weak_ptr{shared_from_this()}](Job&) { + if (auto self = wp.lock()) + self->checkpoint(); + })) { std::lock_guard lock(mutex_); running_ = false; @@ -251,13 +267,18 @@ private: } void - checkpoint() + checkpoint() override { + auto [conn, keepAlive] = getConnection(); + (void)keepAlive; + if (!conn) + return; + int log = 0, ckpt = 0; int ret = sqlite3_wal_checkpoint_v2( - &conn_, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); + conn, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); - auto fname = sqlite3_db_filename(&conn_, "main"); + auto fname = sqlite3_db_filename(conn, "main"); if (ret != SQLITE_OK) { auto jm = (ret == SQLITE_LOCKED) ? j_.trace() : j_.warn(); @@ -272,16 +293,53 @@ private: std::lock_guard lock(mutex_); running_ = false; } + +protected: + std::uintptr_t const id_; + // session is owned by the DatabaseCon parent that holds the checkpointer. + // It is possible (tho rare) for the DatabaseCon class to be destoryed + // before the checkpointer. + std::weak_ptr session_; + std::mutex mutex_; + JobQueue& jobQueue_; + + bool running_ = false; + beast::Journal const j_; + + static int + sqliteWALHook( + void* cpId, + sqlite_api::sqlite3* conn, + const char* dbName, + int walSize) + { + if (walSize >= checkpointPageCount) + { + if (auto checkpointer = + checkpointerFromId(reinterpret_cast(cpId))) + { + checkpointer->schedule(); + } + else + { + sqlite_api::sqlite3_wal_hook(conn, nullptr, nullptr); + } + } + return SQLITE_OK; + } }; } // namespace -std::unique_ptr -makeCheckpointer(soci::session& session, JobQueue& queue, Logs& logs) +std::shared_ptr +makeCheckpointer( + std::uintptr_t id, + std::weak_ptr session, + JobQueue& queue, + Logs& logs) { - if (auto conn = getConnection(session)) - return std::make_unique(*conn, queue, logs); - return {}; + return std::make_shared( + id, std::move(session), queue, logs); } } // namespace ripple diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 1bf7877c75..7bc2c1cec2 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -131,9 +131,8 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx) setup, AcquireShardDBName, AcquireShardDBPragma, - AcquireShardDBInit); - acquireInfo_->SQLiteDB->setupCheckpointing( - &app_.getJobQueue(), app_.logs()); + AcquireShardDBInit, + DatabaseCon::CheckpointerSetup{&app_.getJobQueue(), &app_.logs()}); }; try @@ -741,18 +740,26 @@ Shard::initSQLite(std::lock_guard const&) { // The incomplete shard uses a Write Ahead Log for performance lgrSQLiteDB_ = std::make_unique( - setup, LgrDBName, LgrDBPragma, LgrDBInit); + setup, + LgrDBName, + LgrDBPragma, + LgrDBInit, + DatabaseCon::CheckpointerSetup{ + &app_.getJobQueue(), &app_.logs()}); lgrSQLiteDB_->getSession() << boost::str( boost::format("PRAGMA cache_size=-%d;") % kilobytes(config.getValueFor(SizedItem::lgrDBCache))); - lgrSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs()); txSQLiteDB_ = std::make_unique( - setup, TxDBName, TxDBPragma, TxDBInit); + setup, + TxDBName, + TxDBPragma, + TxDBInit, + DatabaseCon::CheckpointerSetup{ + &app_.getJobQueue(), &app_.logs()}); txSQLiteDB_->getSession() << boost::str( boost::format("PRAGMA cache_size=-%d;") % kilobytes(config.getValueFor(SizedItem::txnDBCache))); - txSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs()); } } catch (std::exception const& e)