diff --git a/src/ripple/app/data/DatabaseCon.cpp b/src/ripple/app/data/DatabaseCon.cpp index 801783cd0..f51842a3f 100644 --- a/src/ripple/app/data/DatabaseCon.cpp +++ b/src/ripple/app/data/DatabaseCon.cpp @@ -25,11 +25,11 @@ namespace ripple { -DatabaseCon::DatabaseCon (Setup const& setup, - std::string const& strName, - const char* initStrings[], - int initCount) - :session_(std::make_shared()) +DatabaseCon::DatabaseCon ( + Setup const& setup, + std::string const& strName, + const char* initStrings[], + int initCount) { auto const useTempFiles // Use temporary files or regular DB files? = setup.standAlone && @@ -39,13 +39,13 @@ DatabaseCon::DatabaseCon (Setup const& setup, boost::filesystem::path pPath = useTempFiles ? "" : (setup.dataDir / strName); - open(*session_, "sqlite", pPath.string()); + open (session_, "sqlite", pPath.string()); for (int i = 0; i < initCount; ++i) { try { - *session_ << initStrings[i]; + session_ << initStrings[i]; } catch (soci::soci_error&) { @@ -67,6 +67,9 @@ DatabaseCon::Setup setup_DatabaseCon (Config const& c) void DatabaseCon::setupCheckpointing (JobQueue* q) { - walCheckpointer_ = std::make_unique (session_, q); + if (! q) + throw std::logic_error ("No JobQueue"); + checkpointer_ = makeCheckpointer (session_, *q); } + } // ripple diff --git a/src/ripple/app/data/DatabaseCon.h b/src/ripple/app/data/DatabaseCon.h index a7aa944cd..a2762e12b 100644 --- a/src/ripple/app/data/DatabaseCon.h +++ b/src/ripple/app/data/DatabaseCon.h @@ -68,7 +68,7 @@ public: } explicit operator bool() const { - return bool(it_); + return bool (it_); } }; @@ -85,27 +85,27 @@ public: }; DatabaseCon (Setup const& setup, - std::string const& name, - const char* initString[], - int countInit); + std::string const& name, + const char* initString[], + int countInit); soci::session& getSession() { - return *session_; + return session_; } - LockedSociSession checkoutDb() + LockedSociSession checkoutDb () { - return LockedSociSession(session_.get (), lock_); + return LockedSociSession (&session_, lock_); } void setupCheckpointing (JobQueue*); + private: - std::unique_ptr walCheckpointer_; - // shared_ptr to handle lifetime issues with walCheckpointer_ - // never null. - std::shared_ptr session_; LockedSociSession::mutex lock_; + + soci::session session_; + std::unique_ptr checkpointer_; }; DatabaseCon::Setup diff --git a/src/ripple/app/data/SociDB.cpp b/src/ripple/app/data/SociDB.cpp index 796eef71b..f400a0f23 100644 --- a/src/ripple/app/data/SociDB.cpp +++ b/src/ripple/app/data/SociDB.cpp @@ -27,12 +27,15 @@ #include namespace ripple { + +static auto checkpointPageCount = 1000; + namespace detail { std::pair getSociSqliteInit (std::string const& name, - std::string const& dir, - std::string const& ext) + std::string const& dir, + std::string const& ext) { if (dir.empty () || name.empty ()) { @@ -43,7 +46,7 @@ getSociSqliteInit (std::string const& name, boost::filesystem::path file (dir); if (is_directory (file)) file /= name + ext; - return std::make_pair (file.string (), std::ref(soci::sqlite3)); + return std::make_pair (file.string (), std::ref (soci::sqlite3)); } std::pair @@ -51,24 +54,21 @@ getSociInit (BasicConfig const& config, std::string const& dbName) { auto const& section = config.section ("sqdb"); - std::string const backendName(get(section, "backend", "sqlite")); + auto const backendName = get(section, "backend", "sqlite"); - if (backendName == "sqlite") - { - std::string const path = config.legacy ("database_path"); - std::string const ext = - (dbName == "validators" || dbName == "peerfinder") ? ".sqlite" - : ".db"; - return detail::getSociSqliteInit(dbName, path, ext); - } - else - { + if (backendName != "sqlite") throw std::runtime_error ("Unsupported soci backend: " + backendName); - } + + auto const path = config.legacy ("database_path"); + auto const ext = dbName == "validators" || dbName == "peerfinder" + ? ".sqlite" : ".db"; + return detail::getSociSqliteInit(dbName, path, ext); } + } // detail -SociConfig::SociConfig (std::pair init) +SociConfig::SociConfig ( + std::pair init) : connectionString_ (std::move (init.first)), backendFactory_ (init.second) { @@ -84,55 +84,63 @@ std::string SociConfig::connectionString () const return connectionString_; } -void SociConfig::open(soci::session& s) const +void SociConfig::open (soci::session& s) const { s.open (backendFactory_, connectionString ()); } -void open(soci::session& s, - BasicConfig const& config, - std::string const& dbName) +void open (soci::session& s, + BasicConfig const& config, + std::string const& dbName) { - SociConfig c(config, dbName); - c.open(s); + SociConfig (config, dbName).open(s); } -void open(soci::session& s, - std::string const& beName, - std::string const& connectionString) +void open (soci::session& s, + std::string const& beName, + std::string const& connectionString) { if (beName == "sqlite") - { s.open(soci::sqlite3, connectionString); - return; - } else - { throw std::runtime_error ("Unsupported soci backend: " + beName); - } +} + +static +sqlite_api::sqlite3* getConnection (soci::session& s) +{ + sqlite_api::sqlite3* result = nullptr; + auto be = s.get_backend (); + if (auto b = dynamic_cast (be)) + result = b->conn_; + + if (! result) + throw std::logic_error ("Didn't get a database connection."); + + return result; } size_t getKBUsedAll (soci::session& s) { - auto be = dynamic_cast(s.get_backend ()); - assert (be); // Make sure the backend is sqlite - (void) be; - return static_cast(sqlite_api::sqlite3_memory_used () / 1024); + if (! getConnection (s)) + throw std::logic_error ("No connection found."); + return static_cast (sqlite_api::sqlite3_memory_used () / 1024); } size_t getKBUsedDB (soci::session& s) { // This function will have to be customized when other backends are added - auto be = dynamic_cast(s.get_backend ()); - assert (be); - (void) be; - int cur = 0, hiw = 0; - sqlite_api::sqlite3_db_status ( - be->conn_, SQLITE_DBSTATUS_CACHE_USED, &cur, &hiw, 0); - return cur / 1024; + if (auto conn = getConnection (s)) + { + int cur = 0, hiw = 0; + sqlite_api::sqlite3_db_status ( + conn, SQLITE_DBSTATUS_CACHE_USED, &cur, &hiw, 0); + return cur / 1024; + } + throw std::logic_error (""); } -void convert(soci::blob& from, std::vector& to) +void convert (soci::blob& from, std::vector& to) { to.resize (from.get_len ()); if (to.empty ()) @@ -140,84 +148,103 @@ void convert(soci::blob& from, std::vector& to) from.read (0, reinterpret_cast(&to[0]), from.get_len ()); } -void convert(soci::blob& from, std::string& to) +void convert (soci::blob& from, std::string& to) { std::vector tmp; - convert(from, tmp); - to.assign(tmp.begin (), tmp.end()); + convert (from, tmp); + to.assign (tmp.begin (), tmp.end()); } -void convert(std::vector const& from, soci::blob& to) +void convert (std::vector const& from, soci::blob& to) { if (!from.empty ()) to.write (0, reinterpret_cast(&from[0]), from.size ()); } -int SqliteWALHook (void* s, - sqlite_api::sqlite3*, - const char* dbName, - int walSize) +namespace { + +/** Run a thread to checkpoint the write ahead log (wal) for + the given soci::session every 1000 pages. This is only implemented + for sqlite databases. + + Note: According to: https://www.sqlite.org/wal.html#ckpt this + is the default behavior of sqlite. We may be able to remove this + class. +*/ +class WALCheckpointer : public Checkpointer, private beast::Thread { - (reinterpret_cast(s))->doHook (dbName, walSize); - return SQLITE_OK; +public: + using Connection = sqlite_api::sqlite3; + + WALCheckpointer (sqlite_api::sqlite3&, JobQueue&); + ~WALCheckpointer () override; + + static + int sqliteWALHook (void* s, sqlite_api::sqlite3*, + const char* dbName, int walSize); + +private: + void runCheckpoint (const char* db, int walSize); + void run (); + void checkpoint (); + + using LockType = std::mutex; + using ScopedLockType = std::lock_guard; + + sqlite_api::sqlite3& conn_; + LockType mutex_; + JobQueue& jobQueue_; + bool running_ = false; + +}; + +int WALCheckpointer::sqliteWALHook ( + void* cp, sqlite_api::sqlite3*, const char* dbName, int walSize) +{ + if (auto checkpointer = reinterpret_cast (cp)) + { + checkpointer->runCheckpoint (dbName, walSize); + return SQLITE_OK; + } + throw std::logic_error ("Didn't get a WALCheckpointer"); } -WALCheckpointer::WALCheckpointer (std::shared_ptr const& s, - JobQueue* q) - : Thread ("sqlitedb"), session_(s) +WALCheckpointer::WALCheckpointer (sqlite_api::sqlite3& conn, JobQueue& q) + : Thread ("sqlitedb"), conn_ (conn), jobQueue_ (q) { - if (auto session = - dynamic_cast(s->get_backend ())) - conn_ = session->conn_; - - if (!conn_) return; startThread (); - setupCheckpointing (q); + sqlite_api::sqlite3_wal_hook (&conn_, &sqliteWALHook, this); } WALCheckpointer::~WALCheckpointer () { - if (!conn_) return; stopThread (); } -void WALCheckpointer::setupCheckpointing (JobQueue* q) +void WALCheckpointer::runCheckpoint (const char* db, int pages) { - if (!conn_) return; - q_ = q; - sqlite_api::sqlite3_wal_hook (conn_, SqliteWALHook, this); -} - -void WALCheckpointer::doHook (const char* db, int pages) -{ - if (!conn_) return; - - if (pages < 1000) + if (pages < checkpointPageCount) return; + // TODO: after it reaches 1000 pages, won't it checkpoint on every + // page after that? + // Should the line above be if ((1 + pages) % checkpointPageCount)? + { ScopedLockType sl (mutex_); - if (running_) return; - running_ = true; } - if (q_) - q_->addJob (jtWAL, - std::string ("WAL"), - std::bind (&WALCheckpointer::runWal, this)); - else - notify (); + jobQueue_.addJob ( + jtWAL, "WAL", std::bind (&WALCheckpointer::checkpoint, this)); } void WALCheckpointer::run () { - if (!conn_) return; - - // Simple thread loop runs Wal every time it wakes up via + // Simple thread loop checkpoints every time it wakes up via // the call to Thread::notify, unless Thread::threadShouldExit returns // true in which case we simply break. // @@ -226,32 +253,40 @@ void WALCheckpointer::run () wait (); if (threadShouldExit ()) break; - runWal (); + checkpoint (); } } -void WALCheckpointer::runWal () +void WALCheckpointer::checkpoint () { - if (!conn_) return; - int log = 0, ckpt = 0; int ret = sqlite3_wal_checkpoint_v2 ( - conn_, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); + &conn_, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt); + auto fname = sqlite3_db_filename (&conn_, "main"); if (ret != SQLITE_OK) { WriteLog ((ret == SQLITE_LOCKED) ? lsTRACE : lsWARNING, WALCheckpointer) - << "WAL(" << sqlite3_db_filename (conn_, "main") << "): error " - << ret; + << "WAL(" << fname << "): error " << ret; } else - WriteLog (lsTRACE, WALCheckpointer) - << "WAL(" << sqlite3_db_filename (conn_, "main") - << "): frames=" << log << ", written=" << ckpt; - { - ScopedLockType sl (mutex_); - running_ = false; + WriteLog (lsTRACE, WALCheckpointer) + << "WAL(" << fname << "): frames=" << log << ", written=" << ckpt; } + + ScopedLockType sl (mutex_); + running_ = false; } + +} // namespace + +std::unique_ptr makeCheckpointer ( + soci::session& session, JobQueue& queue) +{ + if (auto conn = getConnection (session)) + return std::make_unique (*conn, queue); + return {}; +} + } diff --git a/src/ripple/app/data/SociDB.h b/src/ripple/app/data/SociDB.h index 421b274ac..81c1cc4b5 100644 --- a/src/ripple/app/data/SociDB.h +++ b/src/ripple/app/data/SociDB.h @@ -33,7 +33,6 @@ #include #define SOCI_USE_BOOST #include -// #include #include #include #include @@ -43,6 +42,7 @@ namespace sqlite_api { } namespace ripple { + template T rangeCheckedCast (C c) { @@ -58,41 +58,42 @@ T rangeCheckedCast (C c) } return static_cast(c); } -} -namespace ripple { class BasicConfig; /** - * SociConfig is used when a client wants to delay opening a soci::session after - * parsing the config parameters. If a client want to open a session immediately, - * use the free function "open" below. + SociConfig is used when a client wants to delay opening a soci::session after + parsing the config parameters. If a client want to open a session + immediately, use the free function "open" below. */ class SociConfig { std::string connectionString_; soci::backend_factory const& backendFactory_; SociConfig(std::pair init); + public: SociConfig(BasicConfig const& config, std::string const& dbName); std::string connectionString () const; - void open(soci::session& s) const; + void open (soci::session& s) const; }; /** - * Open a soci session. - * - * @param s Session to open. - * @param config Parameters to pick the soci backend and how to connect to that - * backend. - * @param dbName Name of the database. This has different meaning for different backends. - * Sometimes it is part of a filename (sqlite3), othertimes it is a - * database name (postgresql). - */ -void open(soci::session& s, - BasicConfig const& config, - std::string const& dbName); + Open a soci session. + + @param s Session to open. + + @param config Parameters to pick the soci backend and how to connect to that + backend. + + @param dbName Name of the database. This has different meaning for different + backends. Sometimes it is part of a filename (sqlite3), + other times it is a database name (postgresql). +*/ +void open (soci::session& s, + BasicConfig const& config, + std::string const& dbName); /** * Open a soci session. @@ -103,48 +104,34 @@ void open(soci::session& s, * see the soci::open documentation for how to use this. * */ -void open(soci::session& s, - std::string const& beName, - std::string const& connectionString); +void open (soci::session& s, + std::string const& beName, + std::string const& connectionString); size_t getKBUsedAll (soci::session& s); size_t getKBUsedDB (soci::session& s); -void convert(soci::blob& from, std::vector& to); -void convert(soci::blob& from, std::string& to); -void convert(std::vector const& from, soci::blob& to); +void convert (soci::blob& from, std::vector& to); +void convert (soci::blob& from, std::string& to); +void convert (std::vector const& from, soci::blob& to); -/** Run a thread to checkpoint the write ahead log (wal) for - the given soci::session every 1000 pages. This is only implemented - for sqlite databases. - - Note: According to: https://www.sqlite.org/wal.html#ckpt this - is the default behavior of sqlite. We may be able to remove this - class. -*/ -class WALCheckpointer - :private beast::Thread +class Checkpointer { - friend int SqliteWALHook (void* s, sqlite_api::sqlite3*, - const char* dbName, int walSize); -public: - WALCheckpointer (std::shared_ptr const& s, - JobQueue* q); - ~WALCheckpointer (); -private: - void doHook (const char* db, int walSize); - void setupCheckpointing (JobQueue*); - void run (); - void runWal (); - - std::shared_ptr session_; - sqlite_api::sqlite3* conn_ = nullptr; - using LockType = std::mutex; - using ScopedLockType = std::lock_guard; - LockType mutex_; - JobQueue* q_ = nullptr; - bool running_ = false; + public: + virtual ~Checkpointer() = default; }; -} + +/** Returns a new checkpointer which start a thread that makes checkpoints of a + soci database every checkpointPageCount pages, using a new thread initially + and then subsequently scheduling later checkpoints on the job queue. + + TODO: couldn't we do without the additional thread? + + The Checkpointer contains references to the session and job queue + and so must outlive them both. + */ +std::unique_ptr makeCheckpointer (soci::session&, JobQueue&); + +} // ripple #endif