mirror of
https://github.com/Xahau/xahaud.git
synced 2025-12-06 17:27:52 +00:00
Clean SOCI code.
* Throw exception rather than SEGV. * Hide details of checkpointing from clients. * Restrict to 80 columns and minor style tweaks.
This commit is contained in:
committed by
Nik Bougalis
parent
1b8c77eee0
commit
03d1c0ed21
@@ -25,11 +25,11 @@
|
||||
|
||||
namespace ripple {
|
||||
|
||||
DatabaseCon::DatabaseCon (Setup const& setup,
|
||||
std::string const& strName,
|
||||
const char* initStrings[],
|
||||
int initCount)
|
||||
:session_(std::make_shared<soci::session>())
|
||||
DatabaseCon::DatabaseCon (
|
||||
Setup const& setup,
|
||||
std::string const& strName,
|
||||
const char* initStrings[],
|
||||
int initCount)
|
||||
{
|
||||
auto const useTempFiles // Use temporary files or regular DB files?
|
||||
= setup.standAlone &&
|
||||
@@ -39,13 +39,13 @@ DatabaseCon::DatabaseCon (Setup const& setup,
|
||||
boost::filesystem::path pPath = useTempFiles
|
||||
? "" : (setup.dataDir / strName);
|
||||
|
||||
open(*session_, "sqlite", pPath.string());
|
||||
open (session_, "sqlite", pPath.string());
|
||||
|
||||
for (int i = 0; i < initCount; ++i)
|
||||
{
|
||||
try
|
||||
{
|
||||
*session_ << initStrings[i];
|
||||
session_ << initStrings[i];
|
||||
}
|
||||
catch (soci::soci_error&)
|
||||
{
|
||||
@@ -67,6 +67,9 @@ DatabaseCon::Setup setup_DatabaseCon (Config const& c)
|
||||
|
||||
void DatabaseCon::setupCheckpointing (JobQueue* q)
|
||||
{
|
||||
walCheckpointer_ = std::make_unique<WALCheckpointer> (session_, q);
|
||||
if (! q)
|
||||
throw std::logic_error ("No JobQueue");
|
||||
checkpointer_ = makeCheckpointer (session_, *q);
|
||||
}
|
||||
|
||||
} // ripple
|
||||
|
||||
@@ -68,7 +68,7 @@ public:
|
||||
}
|
||||
explicit operator bool() const
|
||||
{
|
||||
return bool(it_);
|
||||
return bool (it_);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -85,27 +85,27 @@ public:
|
||||
};
|
||||
|
||||
DatabaseCon (Setup const& setup,
|
||||
std::string const& name,
|
||||
const char* initString[],
|
||||
int countInit);
|
||||
std::string const& name,
|
||||
const char* initString[],
|
||||
int countInit);
|
||||
|
||||
soci::session& getSession()
|
||||
{
|
||||
return *session_;
|
||||
return session_;
|
||||
}
|
||||
|
||||
LockedSociSession checkoutDb()
|
||||
LockedSociSession checkoutDb ()
|
||||
{
|
||||
return LockedSociSession(session_.get (), lock_);
|
||||
return LockedSociSession (&session_, lock_);
|
||||
}
|
||||
|
||||
void setupCheckpointing (JobQueue*);
|
||||
|
||||
private:
|
||||
std::unique_ptr<WALCheckpointer> walCheckpointer_;
|
||||
// shared_ptr to handle lifetime issues with walCheckpointer_
|
||||
// never null.
|
||||
std::shared_ptr<soci::session> session_;
|
||||
LockedSociSession::mutex lock_;
|
||||
|
||||
soci::session session_;
|
||||
std::unique_ptr<Checkpointer> checkpointer_;
|
||||
};
|
||||
|
||||
DatabaseCon::Setup
|
||||
|
||||
@@ -27,12 +27,15 @@
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
static auto checkpointPageCount = 1000;
|
||||
|
||||
namespace detail {
|
||||
|
||||
std::pair<std::string, soci::backend_factory const&>
|
||||
getSociSqliteInit (std::string const& name,
|
||||
std::string const& dir,
|
||||
std::string const& ext)
|
||||
std::string const& dir,
|
||||
std::string const& ext)
|
||||
{
|
||||
if (dir.empty () || name.empty ())
|
||||
{
|
||||
@@ -43,7 +46,7 @@ getSociSqliteInit (std::string const& name,
|
||||
boost::filesystem::path file (dir);
|
||||
if (is_directory (file))
|
||||
file /= name + ext;
|
||||
return std::make_pair (file.string (), std::ref(soci::sqlite3));
|
||||
return std::make_pair (file.string (), std::ref (soci::sqlite3));
|
||||
}
|
||||
|
||||
std::pair<std::string, soci::backend_factory const&>
|
||||
@@ -51,24 +54,21 @@ getSociInit (BasicConfig const& config,
|
||||
std::string const& dbName)
|
||||
{
|
||||
auto const& section = config.section ("sqdb");
|
||||
std::string const backendName(get(section, "backend", "sqlite"));
|
||||
auto const backendName = get(section, "backend", "sqlite");
|
||||
|
||||
if (backendName == "sqlite")
|
||||
{
|
||||
std::string const path = config.legacy ("database_path");
|
||||
std::string const ext =
|
||||
(dbName == "validators" || dbName == "peerfinder") ? ".sqlite"
|
||||
: ".db";
|
||||
return detail::getSociSqliteInit(dbName, path, ext);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (backendName != "sqlite")
|
||||
throw std::runtime_error ("Unsupported soci backend: " + backendName);
|
||||
}
|
||||
|
||||
auto const path = config.legacy ("database_path");
|
||||
auto const ext = dbName == "validators" || dbName == "peerfinder"
|
||||
? ".sqlite" : ".db";
|
||||
return detail::getSociSqliteInit(dbName, path, ext);
|
||||
}
|
||||
|
||||
} // detail
|
||||
|
||||
SociConfig::SociConfig (std::pair<std::string, soci::backend_factory const&> init)
|
||||
SociConfig::SociConfig (
|
||||
std::pair<std::string, soci::backend_factory const&> init)
|
||||
: connectionString_ (std::move (init.first)),
|
||||
backendFactory_ (init.second)
|
||||
{
|
||||
@@ -84,55 +84,63 @@ std::string SociConfig::connectionString () const
|
||||
return connectionString_;
|
||||
}
|
||||
|
||||
void SociConfig::open(soci::session& s) const
|
||||
void SociConfig::open (soci::session& s) const
|
||||
{
|
||||
s.open (backendFactory_, connectionString ());
|
||||
}
|
||||
|
||||
void open(soci::session& s,
|
||||
BasicConfig const& config,
|
||||
std::string const& dbName)
|
||||
void open (soci::session& s,
|
||||
BasicConfig const& config,
|
||||
std::string const& dbName)
|
||||
{
|
||||
SociConfig c(config, dbName);
|
||||
c.open(s);
|
||||
SociConfig (config, dbName).open(s);
|
||||
}
|
||||
|
||||
void open(soci::session& s,
|
||||
std::string const& beName,
|
||||
std::string const& connectionString)
|
||||
void open (soci::session& s,
|
||||
std::string const& beName,
|
||||
std::string const& connectionString)
|
||||
{
|
||||
if (beName == "sqlite")
|
||||
{
|
||||
s.open(soci::sqlite3, connectionString);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw std::runtime_error ("Unsupported soci backend: " + beName);
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
sqlite_api::sqlite3* getConnection (soci::session& s)
|
||||
{
|
||||
sqlite_api::sqlite3* result = nullptr;
|
||||
auto be = s.get_backend ();
|
||||
if (auto b = dynamic_cast<soci::sqlite3_session_backend*> (be))
|
||||
result = b->conn_;
|
||||
|
||||
if (! result)
|
||||
throw std::logic_error ("Didn't get a database connection.");
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
size_t getKBUsedAll (soci::session& s)
|
||||
{
|
||||
auto be = dynamic_cast<soci::sqlite3_session_backend*>(s.get_backend ());
|
||||
assert (be); // Make sure the backend is sqlite
|
||||
(void) be;
|
||||
return static_cast<int>(sqlite_api::sqlite3_memory_used () / 1024);
|
||||
if (! getConnection (s))
|
||||
throw std::logic_error ("No connection found.");
|
||||
return static_cast <size_t> (sqlite_api::sqlite3_memory_used () / 1024);
|
||||
}
|
||||
|
||||
size_t getKBUsedDB (soci::session& s)
|
||||
{
|
||||
// This function will have to be customized when other backends are added
|
||||
auto be = dynamic_cast<soci::sqlite3_session_backend*>(s.get_backend ());
|
||||
assert (be);
|
||||
(void) be;
|
||||
int cur = 0, hiw = 0;
|
||||
sqlite_api::sqlite3_db_status (
|
||||
be->conn_, SQLITE_DBSTATUS_CACHE_USED, &cur, &hiw, 0);
|
||||
return cur / 1024;
|
||||
if (auto conn = getConnection (s))
|
||||
{
|
||||
int cur = 0, hiw = 0;
|
||||
sqlite_api::sqlite3_db_status (
|
||||
conn, SQLITE_DBSTATUS_CACHE_USED, &cur, &hiw, 0);
|
||||
return cur / 1024;
|
||||
}
|
||||
throw std::logic_error ("");
|
||||
}
|
||||
|
||||
void convert(soci::blob& from, std::vector<std::uint8_t>& to)
|
||||
void convert (soci::blob& from, std::vector<std::uint8_t>& to)
|
||||
{
|
||||
to.resize (from.get_len ());
|
||||
if (to.empty ())
|
||||
@@ -140,84 +148,103 @@ void convert(soci::blob& from, std::vector<std::uint8_t>& to)
|
||||
from.read (0, reinterpret_cast<char*>(&to[0]), from.get_len ());
|
||||
}
|
||||
|
||||
void convert(soci::blob& from, std::string& to)
|
||||
void convert (soci::blob& from, std::string& to)
|
||||
{
|
||||
std::vector<std::uint8_t> tmp;
|
||||
convert(from, tmp);
|
||||
to.assign(tmp.begin (), tmp.end());
|
||||
convert (from, tmp);
|
||||
to.assign (tmp.begin (), tmp.end());
|
||||
|
||||
}
|
||||
|
||||
void convert(std::vector<std::uint8_t> const& from, soci::blob& to)
|
||||
void convert (std::vector<std::uint8_t> const& from, soci::blob& to)
|
||||
{
|
||||
if (!from.empty ())
|
||||
to.write (0, reinterpret_cast<char const*>(&from[0]), from.size ());
|
||||
}
|
||||
|
||||
int SqliteWALHook (void* s,
|
||||
sqlite_api::sqlite3*,
|
||||
const char* dbName,
|
||||
int walSize)
|
||||
namespace {
|
||||
|
||||
/** Run a thread to checkpoint the write ahead log (wal) for
|
||||
the given soci::session every 1000 pages. This is only implemented
|
||||
for sqlite databases.
|
||||
|
||||
Note: According to: https://www.sqlite.org/wal.html#ckpt this
|
||||
is the default behavior of sqlite. We may be able to remove this
|
||||
class.
|
||||
*/
|
||||
class WALCheckpointer : public Checkpointer, private beast::Thread
|
||||
{
|
||||
(reinterpret_cast<WALCheckpointer*>(s))->doHook (dbName, walSize);
|
||||
return SQLITE_OK;
|
||||
public:
|
||||
using Connection = sqlite_api::sqlite3;
|
||||
|
||||
WALCheckpointer (sqlite_api::sqlite3&, JobQueue&);
|
||||
~WALCheckpointer () override;
|
||||
|
||||
static
|
||||
int sqliteWALHook (void* s, sqlite_api::sqlite3*,
|
||||
const char* dbName, int walSize);
|
||||
|
||||
private:
|
||||
void runCheckpoint (const char* db, int walSize);
|
||||
void run ();
|
||||
void checkpoint ();
|
||||
|
||||
using LockType = std::mutex;
|
||||
using ScopedLockType = std::lock_guard<LockType>;
|
||||
|
||||
sqlite_api::sqlite3& conn_;
|
||||
LockType mutex_;
|
||||
JobQueue& jobQueue_;
|
||||
bool running_ = false;
|
||||
|
||||
};
|
||||
|
||||
int WALCheckpointer::sqliteWALHook (
|
||||
void* cp, sqlite_api::sqlite3*, const char* dbName, int walSize)
|
||||
{
|
||||
if (auto checkpointer = reinterpret_cast <WALCheckpointer*> (cp))
|
||||
{
|
||||
checkpointer->runCheckpoint (dbName, walSize);
|
||||
return SQLITE_OK;
|
||||
}
|
||||
throw std::logic_error ("Didn't get a WALCheckpointer");
|
||||
}
|
||||
|
||||
WALCheckpointer::WALCheckpointer (std::shared_ptr<soci::session> const& s,
|
||||
JobQueue* q)
|
||||
: Thread ("sqlitedb"), session_(s)
|
||||
WALCheckpointer::WALCheckpointer (sqlite_api::sqlite3& conn, JobQueue& q)
|
||||
: Thread ("sqlitedb"), conn_ (conn), jobQueue_ (q)
|
||||
{
|
||||
if (auto session =
|
||||
dynamic_cast<soci::sqlite3_session_backend*>(s->get_backend ()))
|
||||
conn_ = session->conn_;
|
||||
|
||||
if (!conn_) return;
|
||||
startThread ();
|
||||
setupCheckpointing (q);
|
||||
sqlite_api::sqlite3_wal_hook (&conn_, &sqliteWALHook, this);
|
||||
}
|
||||
|
||||
WALCheckpointer::~WALCheckpointer ()
|
||||
{
|
||||
if (!conn_) return;
|
||||
stopThread ();
|
||||
}
|
||||
|
||||
void WALCheckpointer::setupCheckpointing (JobQueue* q)
|
||||
void WALCheckpointer::runCheckpoint (const char* db, int pages)
|
||||
{
|
||||
if (!conn_) return;
|
||||
q_ = q;
|
||||
sqlite_api::sqlite3_wal_hook (conn_, SqliteWALHook, this);
|
||||
}
|
||||
|
||||
void WALCheckpointer::doHook (const char* db, int pages)
|
||||
{
|
||||
if (!conn_) return;
|
||||
|
||||
if (pages < 1000)
|
||||
if (pages < checkpointPageCount)
|
||||
return;
|
||||
|
||||
// TODO: after it reaches 1000 pages, won't it checkpoint on every
|
||||
// page after that?
|
||||
// Should the line above be if ((1 + pages) % checkpointPageCount)?
|
||||
|
||||
{
|
||||
ScopedLockType sl (mutex_);
|
||||
|
||||
if (running_)
|
||||
return;
|
||||
|
||||
running_ = true;
|
||||
}
|
||||
|
||||
if (q_)
|
||||
q_->addJob (jtWAL,
|
||||
std::string ("WAL"),
|
||||
std::bind (&WALCheckpointer::runWal, this));
|
||||
else
|
||||
notify ();
|
||||
jobQueue_.addJob (
|
||||
jtWAL, "WAL", std::bind (&WALCheckpointer::checkpoint, this));
|
||||
}
|
||||
|
||||
void WALCheckpointer::run ()
|
||||
{
|
||||
if (!conn_) return;
|
||||
|
||||
// Simple thread loop runs Wal every time it wakes up via
|
||||
// Simple thread loop checkpoints every time it wakes up via
|
||||
// the call to Thread::notify, unless Thread::threadShouldExit returns
|
||||
// true in which case we simply break.
|
||||
//
|
||||
@@ -226,32 +253,40 @@ void WALCheckpointer::run ()
|
||||
wait ();
|
||||
if (threadShouldExit ())
|
||||
break;
|
||||
runWal ();
|
||||
checkpoint ();
|
||||
}
|
||||
}
|
||||
|
||||
void WALCheckpointer::runWal ()
|
||||
void WALCheckpointer::checkpoint ()
|
||||
{
|
||||
if (!conn_) return;
|
||||
|
||||
int log = 0, ckpt = 0;
|
||||
int ret = sqlite3_wal_checkpoint_v2 (
|
||||
conn_, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
|
||||
&conn_, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
|
||||
|
||||
auto fname = sqlite3_db_filename (&conn_, "main");
|
||||
if (ret != SQLITE_OK)
|
||||
{
|
||||
WriteLog ((ret == SQLITE_LOCKED) ? lsTRACE : lsWARNING, WALCheckpointer)
|
||||
<< "WAL(" << sqlite3_db_filename (conn_, "main") << "): error "
|
||||
<< ret;
|
||||
<< "WAL(" << fname << "): error " << ret;
|
||||
}
|
||||
else
|
||||
WriteLog (lsTRACE, WALCheckpointer)
|
||||
<< "WAL(" << sqlite3_db_filename (conn_, "main")
|
||||
<< "): frames=" << log << ", written=" << ckpt;
|
||||
|
||||
{
|
||||
ScopedLockType sl (mutex_);
|
||||
running_ = false;
|
||||
WriteLog (lsTRACE, WALCheckpointer)
|
||||
<< "WAL(" << fname << "): frames=" << log << ", written=" << ckpt;
|
||||
}
|
||||
|
||||
ScopedLockType sl (mutex_);
|
||||
running_ = false;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
std::unique_ptr <Checkpointer> makeCheckpointer (
|
||||
soci::session& session, JobQueue& queue)
|
||||
{
|
||||
if (auto conn = getConnection (session))
|
||||
return std::make_unique <WALCheckpointer> (*conn, queue);
|
||||
return {};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -33,7 +33,6 @@
|
||||
#include <beast/threads/Thread.h>
|
||||
#define SOCI_USE_BOOST
|
||||
#include <core/soci.h>
|
||||
// #include <core/unsigned-types.h>
|
||||
#include <string>
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
@@ -43,6 +42,7 @@ namespace sqlite_api {
|
||||
}
|
||||
|
||||
namespace ripple {
|
||||
|
||||
template <class T, class C>
|
||||
T rangeCheckedCast (C c)
|
||||
{
|
||||
@@ -58,41 +58,42 @@ T rangeCheckedCast (C c)
|
||||
}
|
||||
return static_cast<T>(c);
|
||||
}
|
||||
}
|
||||
|
||||
namespace ripple {
|
||||
class BasicConfig;
|
||||
|
||||
/**
|
||||
* SociConfig is used when a client wants to delay opening a soci::session after
|
||||
* parsing the config parameters. If a client want to open a session immediately,
|
||||
* use the free function "open" below.
|
||||
SociConfig is used when a client wants to delay opening a soci::session after
|
||||
parsing the config parameters. If a client want to open a session
|
||||
immediately, use the free function "open" below.
|
||||
*/
|
||||
class SociConfig
|
||||
{
|
||||
std::string connectionString_;
|
||||
soci::backend_factory const& backendFactory_;
|
||||
SociConfig(std::pair<std::string, soci::backend_factory const&> init);
|
||||
|
||||
public:
|
||||
SociConfig(BasicConfig const& config,
|
||||
std::string const& dbName);
|
||||
std::string connectionString () const;
|
||||
void open(soci::session& s) const;
|
||||
void open (soci::session& s) const;
|
||||
};
|
||||
|
||||
/**
|
||||
* Open a soci session.
|
||||
*
|
||||
* @param s Session to open.
|
||||
* @param config Parameters to pick the soci backend and how to connect to that
|
||||
* backend.
|
||||
* @param dbName Name of the database. This has different meaning for different backends.
|
||||
* Sometimes it is part of a filename (sqlite3), othertimes it is a
|
||||
* database name (postgresql).
|
||||
*/
|
||||
void open(soci::session& s,
|
||||
BasicConfig const& config,
|
||||
std::string const& dbName);
|
||||
Open a soci session.
|
||||
|
||||
@param s Session to open.
|
||||
|
||||
@param config Parameters to pick the soci backend and how to connect to that
|
||||
backend.
|
||||
|
||||
@param dbName Name of the database. This has different meaning for different
|
||||
backends. Sometimes it is part of a filename (sqlite3),
|
||||
other times it is a database name (postgresql).
|
||||
*/
|
||||
void open (soci::session& s,
|
||||
BasicConfig const& config,
|
||||
std::string const& dbName);
|
||||
|
||||
/**
|
||||
* Open a soci session.
|
||||
@@ -103,48 +104,34 @@ void open(soci::session& s,
|
||||
* see the soci::open documentation for how to use this.
|
||||
*
|
||||
*/
|
||||
void open(soci::session& s,
|
||||
std::string const& beName,
|
||||
std::string const& connectionString);
|
||||
void open (soci::session& s,
|
||||
std::string const& beName,
|
||||
std::string const& connectionString);
|
||||
|
||||
size_t getKBUsedAll (soci::session& s);
|
||||
size_t getKBUsedDB (soci::session& s);
|
||||
|
||||
void convert(soci::blob& from, std::vector<std::uint8_t>& to);
|
||||
void convert(soci::blob& from, std::string& to);
|
||||
void convert(std::vector<std::uint8_t> const& from, soci::blob& to);
|
||||
void convert (soci::blob& from, std::vector<std::uint8_t>& to);
|
||||
void convert (soci::blob& from, std::string& to);
|
||||
void convert (std::vector<std::uint8_t> const& from, soci::blob& to);
|
||||
|
||||
/** Run a thread to checkpoint the write ahead log (wal) for
|
||||
the given soci::session every 1000 pages. This is only implemented
|
||||
for sqlite databases.
|
||||
|
||||
Note: According to: https://www.sqlite.org/wal.html#ckpt this
|
||||
is the default behavior of sqlite. We may be able to remove this
|
||||
class.
|
||||
*/
|
||||
class WALCheckpointer
|
||||
:private beast::Thread
|
||||
class Checkpointer
|
||||
{
|
||||
friend int SqliteWALHook (void* s, sqlite_api::sqlite3*,
|
||||
const char* dbName, int walSize);
|
||||
public:
|
||||
WALCheckpointer (std::shared_ptr<soci::session> const& s,
|
||||
JobQueue* q);
|
||||
~WALCheckpointer ();
|
||||
private:
|
||||
void doHook (const char* db, int walSize);
|
||||
void setupCheckpointing (JobQueue*);
|
||||
void run ();
|
||||
void runWal ();
|
||||
|
||||
std::shared_ptr<soci::session> session_;
|
||||
sqlite_api::sqlite3* conn_ = nullptr;
|
||||
using LockType = std::mutex;
|
||||
using ScopedLockType = std::lock_guard<LockType>;
|
||||
LockType mutex_;
|
||||
JobQueue* q_ = nullptr;
|
||||
bool running_ = false;
|
||||
public:
|
||||
virtual ~Checkpointer() = default;
|
||||
};
|
||||
}
|
||||
|
||||
/** Returns a new checkpointer which start a thread that makes checkpoints of a
|
||||
soci database every checkpointPageCount pages, using a new thread initially
|
||||
and then subsequently scheduling later checkpoints on the job queue.
|
||||
|
||||
TODO: couldn't we do without the additional thread?
|
||||
|
||||
The Checkpointer contains references to the session and job queue
|
||||
and so must outlive them both.
|
||||
*/
|
||||
std::unique_ptr <Checkpointer> makeCheckpointer (soci::session&, JobQueue&);
|
||||
|
||||
} // ripple
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user