Fix memory management issues with checkpointers:

The checkpointer class had assumed that the database would exist for the
lifetime of the application. This is no long true. These changes resolve bugs
involving dangling pointers.
This commit is contained in:
seelabs
2020-07-28 13:24:16 -04:00
committed by Nik Bougalis
parent a931b020b5
commit 8cf542abb0
6 changed files with 276 additions and 87 deletions

View File

@@ -860,11 +860,14 @@ public:
// transaction database
mTxnDB = std::make_unique<DatabaseCon>(
setup, TxDBName, TxDBPragma, TxDBInit);
setup,
TxDBName,
TxDBPragma,
TxDBInit,
DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()});
mTxnDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::txnDBCache)));
mTxnDB->setupCheckpointing(m_jobQueue.get(), logs());
if (!setup.standAlone || setup.startUp == Config::LOAD ||
setup.startUp == Config::LOAD_FILE ||
@@ -899,11 +902,14 @@ public:
// ledger database
mLedgerDB = std::make_unique<DatabaseCon>(
setup, LgrDBName, LgrDBPragma, LgrDBInit);
setup,
LgrDBName,
LgrDBPragma,
LgrDBInit,
DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()});
mLedgerDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::lgrDBCache)));
mLedgerDB->setupCheckpointing(m_jobQueue.get(), logs());
// wallet database
setup.useGlobalPragma = false;

View File

@@ -34,52 +34,50 @@ class session;
namespace ripple {
template <class T, class TMutex>
class LockedPointer
class LockedSociSession
{
public:
using mutex = TMutex;
using mutex = std::recursive_mutex;
private:
T* it_;
std::shared_ptr<soci::session> session_;
std::unique_lock<mutex> lock_;
public:
LockedPointer(T* it, mutex& m) : it_(it), lock_(m)
LockedSociSession(std::shared_ptr<soci::session> it, mutex& m)
: session_(std::move(it)), lock_(m)
{
}
LockedPointer(LockedPointer&& rhs) noexcept
: it_(rhs.it_), lock_(std::move(rhs.lock_))
LockedSociSession(LockedSociSession&& rhs) noexcept
: session_(std::move(rhs.session_)), lock_(std::move(rhs.lock_))
{
}
LockedPointer() = delete;
LockedPointer(LockedPointer const& rhs) = delete;
LockedPointer&
operator=(LockedPointer const& rhs) = delete;
LockedSociSession() = delete;
LockedSociSession(LockedSociSession const& rhs) = delete;
LockedSociSession&
operator=(LockedSociSession const& rhs) = delete;
T*
soci::session*
get()
{
return it_;
return session_.get();
}
T&
soci::session&
operator*()
{
return *it_;
return *session_;
}
T*
soci::session*
operator->()
{
return it_;
return session_.get();
}
explicit operator bool() const
{
return bool(it_);
return bool(session_);
}
};
using LockedSociSession = LockedPointer<soci::session, std::recursive_mutex>;
class DatabaseCon
{
public:
@@ -105,10 +103,16 @@ public:
static std::unique_ptr<std::vector<std::string> const> globalPragma;
};
struct CheckpointerSetup
{
JobQueue* jobQueue;
Logs* logs;
};
template <std::size_t N, std::size_t M>
DatabaseCon(
Setup const& setup,
std::string const& DBName,
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
// Use temporary files or regular DB files?
@@ -117,74 +121,114 @@ public:
setup.startUp != Config::LOAD_FILE &&
setup.startUp != Config::REPLAY
? ""
: (setup.dataDir / DBName),
: (setup.dataDir / dbName),
setup.commonPragma(),
pragma,
initSQL)
{
}
// Use this constructor to setup checkpointing
template <std::size_t N, std::size_t M>
DatabaseCon(
Setup const& setup,
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL,
CheckpointerSetup const& checkpointerSetup)
: DatabaseCon(setup, dbName, pragma, initSQL)
{
setupCheckpointing(checkpointerSetup.jobQueue, *checkpointerSetup.logs);
}
template <std::size_t N, std::size_t M>
DatabaseCon(
boost::filesystem::path const& dataDir,
std::string const& DBName,
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
: DatabaseCon(dataDir / DBName, nullptr, pragma, initSQL)
: DatabaseCon(dataDir / dbName, nullptr, pragma, initSQL)
{
}
// Use this constructor to setup checkpointing
template <std::size_t N, std::size_t M>
DatabaseCon(
boost::filesystem::path const& dataDir,
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL,
CheckpointerSetup const& checkpointerSetup)
: DatabaseCon(dataDir, dbName, pragma, initSQL)
{
setupCheckpointing(checkpointerSetup.jobQueue, *checkpointerSetup.logs);
}
~DatabaseCon();
soci::session&
getSession()
{
return session_;
return *session_;
}
LockedSociSession
checkoutDb()
{
return LockedSociSession(&session_, lock_);
return LockedSociSession(session_, lock_);
}
private:
void
setupCheckpointing(JobQueue*, Logs&);
private:
template <std::size_t N, std::size_t M>
DatabaseCon(
boost::filesystem::path const& pPath,
std::vector<std::string> const* commonPragma,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
: session_(std::make_shared<soci::session>())
{
open(session_, "sqlite", pPath.string());
open(*session_, "sqlite", pPath.string());
if (commonPragma)
{
for (auto const& p : *commonPragma)
{
soci::statement st = session_.prepare << p;
soci::statement st = session_->prepare << p;
st.execute(true);
}
}
for (auto const& p : pragma)
{
soci::statement st = session_.prepare << p;
soci::statement st = session_->prepare << p;
st.execute(true);
}
for (auto const& sql : initSQL)
{
soci::statement st = session_.prepare << sql;
soci::statement st = session_->prepare << sql;
st.execute(true);
}
}
LockedSociSession::mutex lock_;
soci::session session_;
std::unique_ptr<Checkpointer> checkpointer_;
// checkpointer may outlive the DatabaseCon when the checkpointer jobQueue
// callback locks a weak pointer and the DatabaseCon is then destroyed. In
// this case, the checkpointer needs to make sure it doesn't use an already
// destroyed session. Thus this class keeps a shared_ptr to the session (so
// the checkpointer can keep a weak_ptr) and the checkpointer is a
// shared_ptr in this class. session_ will never be null.
std::shared_ptr<soci::session> const session_;
std::shared_ptr<Checkpointer> checkpointer_;
};
// Return the checkpointer from its id. If the checkpointer no longer exists, an
// nullptr is returned
std::shared_ptr<Checkpointer>
checkpointerFromId(std::uintptr_t id);
DatabaseCon::Setup
setup_DatabaseCon(
Config const& c,

View File

@@ -131,20 +131,32 @@ convert(std::vector<std::uint8_t> const& from, soci::blob& to);
void
convert(std::string const& from, soci::blob& to);
class Checkpointer
class Checkpointer : public std::enable_shared_from_this<Checkpointer>
{
public:
virtual std::uintptr_t
id() const = 0;
virtual ~Checkpointer() = default;
virtual void
schedule() = 0;
virtual void
checkpoint() = 0;
};
/** Returns a new checkpointer which makes checkpoints of a
soci database every checkpointPageCount pages, using a job on the job queue.
The Checkpointer contains references to the session and job queue
The checkpointer contains references to the session and job queue
and so must outlive them both.
*/
std::unique_ptr<Checkpointer>
makeCheckpointer(soci::session&, JobQueue&, Logs&);
std::shared_ptr<Checkpointer>
makeCheckpointer(
std::uintptr_t id,
std::weak_ptr<soci::session>,
JobQueue&,
Logs&);
} // namespace ripple

View File

@@ -21,12 +21,74 @@
#include <ripple/basics/contract.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/SociDB.h>
#include <boost/algorithm/string.hpp>
#include <boost/format.hpp>
#include <memory>
#include <unordered_map>
namespace ripple {
class CheckpointersCollection
{
std::uintptr_t nextId_{0};
// Mutex protects the CheckpointersCollection
std::mutex mutex_;
// Each checkpointer is given a unique id. All the checkpointers that are
// part of a DatabaseCon are part of this collection. When the DatabaseCon
// is destroyed, its checkpointer is removed from the collection
std::unordered_map<std::uintptr_t, std::shared_ptr<Checkpointer>>
checkpointers_;
public:
std::shared_ptr<Checkpointer>
fromId(std::uintptr_t id)
{
std::lock_guard l{mutex_};
auto it = checkpointers_.find(id);
if (it != checkpointers_.end())
return it->second;
return {};
}
void
erase(std::uintptr_t id)
{
std::lock_guard lock{mutex_};
checkpointers_.erase(id);
}
std::shared_ptr<Checkpointer>
create(
std::shared_ptr<soci::session> const& session,
JobQueue& jobQueue,
Logs& logs)
{
std::lock_guard lock{mutex_};
auto const id = nextId_++;
auto const r = makeCheckpointer(id, session, jobQueue, logs);
checkpointers_[id] = r;
return r;
}
};
CheckpointersCollection checkpointers;
std::shared_ptr<Checkpointer>
checkpointerFromId(std::uintptr_t id)
{
return checkpointers.fromId(id);
}
DatabaseCon::~DatabaseCon()
{
if (checkpointer_)
{
checkpointers.erase(checkpointer_->id());
}
}
DatabaseCon::Setup
setup_DatabaseCon(Config const& c, boost::optional<beast::Journal> j)
{
@@ -173,7 +235,7 @@ DatabaseCon::setupCheckpointing(JobQueue* q, Logs& l)
{
if (!q)
Throw<std::logic_error>("No JobQueue");
checkpointer_ = makeCheckpointer(session_, *q, l);
checkpointer_ = checkpointers.create(session_, *q, l);
}
} // namespace ripple

View File

@@ -26,6 +26,7 @@
#include <ripple/basics/contract.h>
#include <ripple/core/Config.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/SociDB.h>
#include <boost/filesystem.hpp>
#include <memory>
@@ -196,44 +197,48 @@ namespace {
is the default behavior of sqlite. We may be able to remove this
class.
*/
class WALCheckpointer : public Checkpointer
{
public:
WALCheckpointer(sqlite_api::sqlite3& conn, JobQueue& q, Logs& logs)
: conn_(conn), jobQueue_(q), j_(logs.journal("WALCheckpointer"))
WALCheckpointer(
std::uintptr_t id,
std::weak_ptr<soci::session> session,
JobQueue& q,
Logs& logs)
: id_(id)
, session_(std::move(session))
, jobQueue_(q)
, j_(logs.journal("WALCheckpointer"))
{
sqlite_api::sqlite3_wal_hook(&conn_, &sqliteWALHook, this);
if (auto [conn, keepAlive] = getConnection(); conn)
{
(void)keepAlive;
sqlite_api::sqlite3_wal_hook(
conn, &sqliteWALHook, reinterpret_cast<void*>(id_));
}
}
std::pair<sqlite_api::sqlite3*, std::shared_ptr<soci::session>>
getConnection() const
{
if (auto p = session_.lock())
{
return {ripple::getConnection(*p), p};
}
return {nullptr, std::shared_ptr<soci::session>{}};
}
std::uintptr_t
id() const override
{
return id_;
}
~WALCheckpointer() override = default;
private:
sqlite_api::sqlite3& conn_;
std::mutex mutex_;
JobQueue& jobQueue_;
bool running_ = false;
beast::Journal const j_;
static int
sqliteWALHook(
void* cp,
sqlite_api::sqlite3*,
const char* dbName,
int walSize)
{
if (walSize >= checkpointPageCount)
{
if (auto checkpointer = reinterpret_cast<WALCheckpointer*>(cp))
checkpointer->scheduleCheckpoint();
else
Throw<std::logic_error>("Didn't get a WALCheckpointer");
}
return SQLITE_OK;
}
void
scheduleCheckpoint()
schedule() override
{
{
std::lock_guard lock(mutex_);
@@ -243,7 +248,18 @@ private:
}
// If the Job is not added to the JobQueue then we're not running_.
if (!jobQueue_.addJob(jtWAL, "WAL", [this](Job&) { checkpoint(); }))
if (!jobQueue_.addJob(
jtWAL,
"WAL",
// If the owning DatabaseCon is destroyed, no need to checkpoint
// or keep the checkpointer alive so use a weak_ptr to this.
// There is a separate check in `checkpoint` for a valid
// connection in the rare case when the DatabaseCon is destroyed
// after locking this weak_ptr
[wp = std::weak_ptr<Checkpointer>{shared_from_this()}](Job&) {
if (auto self = wp.lock())
self->checkpoint();
}))
{
std::lock_guard lock(mutex_);
running_ = false;
@@ -251,13 +267,18 @@ private:
}
void
checkpoint()
checkpoint() override
{
auto [conn, keepAlive] = getConnection();
(void)keepAlive;
if (!conn)
return;
int log = 0, ckpt = 0;
int ret = sqlite3_wal_checkpoint_v2(
&conn_, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
conn, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
auto fname = sqlite3_db_filename(&conn_, "main");
auto fname = sqlite3_db_filename(conn, "main");
if (ret != SQLITE_OK)
{
auto jm = (ret == SQLITE_LOCKED) ? j_.trace() : j_.warn();
@@ -272,16 +293,53 @@ private:
std::lock_guard lock(mutex_);
running_ = false;
}
protected:
std::uintptr_t const id_;
// session is owned by the DatabaseCon parent that holds the checkpointer.
// It is possible (tho rare) for the DatabaseCon class to be destoryed
// before the checkpointer.
std::weak_ptr<soci::session> session_;
std::mutex mutex_;
JobQueue& jobQueue_;
bool running_ = false;
beast::Journal const j_;
static int
sqliteWALHook(
void* cpId,
sqlite_api::sqlite3* conn,
const char* dbName,
int walSize)
{
if (walSize >= checkpointPageCount)
{
if (auto checkpointer =
checkpointerFromId(reinterpret_cast<std::uintptr_t>(cpId)))
{
checkpointer->schedule();
}
else
{
sqlite_api::sqlite3_wal_hook(conn, nullptr, nullptr);
}
}
return SQLITE_OK;
}
};
} // namespace
std::unique_ptr<Checkpointer>
makeCheckpointer(soci::session& session, JobQueue& queue, Logs& logs)
std::shared_ptr<Checkpointer>
makeCheckpointer(
std::uintptr_t id,
std::weak_ptr<soci::session> session,
JobQueue& queue,
Logs& logs)
{
if (auto conn = getConnection(session))
return std::make_unique<WALCheckpointer>(*conn, queue, logs);
return {};
return std::make_shared<WALCheckpointer>(
id, std::move(session), queue, logs);
}
} // namespace ripple

View File

@@ -131,9 +131,8 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx)
setup,
AcquireShardDBName,
AcquireShardDBPragma,
AcquireShardDBInit);
acquireInfo_->SQLiteDB->setupCheckpointing(
&app_.getJobQueue(), app_.logs());
AcquireShardDBInit,
DatabaseCon::CheckpointerSetup{&app_.getJobQueue(), &app_.logs()});
};
try
@@ -741,18 +740,26 @@ Shard::initSQLite(std::lock_guard<std::recursive_mutex> const&)
{
// The incomplete shard uses a Write Ahead Log for performance
lgrSQLiteDB_ = std::make_unique<DatabaseCon>(
setup, LgrDBName, LgrDBPragma, LgrDBInit);
setup,
LgrDBName,
LgrDBPragma,
LgrDBInit,
DatabaseCon::CheckpointerSetup{
&app_.getJobQueue(), &app_.logs()});
lgrSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(SizedItem::lgrDBCache)));
lgrSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs());
txSQLiteDB_ = std::make_unique<DatabaseCon>(
setup, TxDBName, TxDBPragma, TxDBInit);
setup,
TxDBName,
TxDBPragma,
TxDBInit,
DatabaseCon::CheckpointerSetup{
&app_.getJobQueue(), &app_.logs()});
txSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(SizedItem::txnDBCache)));
txSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs());
}
}
catch (std::exception const& e)