Use soci in more places:

* Validator, peerfinder, SHAMapStore,
  RpcDB, TxnDB, LedgerDB, WalletDB use soci backend.
This commit is contained in:
seelabs
2015-01-22 15:04:30 -08:00
committed by Vinnie Falco
parent d37802a42f
commit 97623d20c5
32 changed files with 1474 additions and 1122 deletions

View File

@@ -17,14 +17,6 @@
*/
//==============================================================================
/** An embedded database wrapper with an intuitive, type-safe interface.
This collection of classes let's you access embedded SQLite databases
using C++ syntax that is very similar to regular SQL.
This module requires the @ref beast_sqlite external module.
*/
#include <BeastConfig.h>
#include <ripple/core/ConfigSections.h>
@@ -32,9 +24,6 @@
#include <ripple/core/Config.h>
#include <beast/cxx14/memory.h> // <memory>
#include <backends/sqlite3/soci-sqlite3.h>
#if ENABLE_SOCI_POSTGRESQL
#include <backends/postgresql/soci-postgresql.h>
#endif
#include <boost/filesystem.hpp>
namespace ripple {
@@ -57,69 +46,12 @@ getSociSqliteInit (std::string const& name,
return std::make_pair (file.string (), std::ref(soci::sqlite3));
}
#if ENABLE_SOCI_POSTGRESQL
std::pair<std::string, soci::backend_factory const&>
getSociPostgresqlInit (Section const& configSection,
std::string const& name)
{
if (name.empty ())
{
throw std::runtime_error (
"Missing required value for postgresql backend: database name");
}
std::string const host(get <std::string> (configSection, "host", ""));
if (!host.empty())
{
throw std::runtime_error (
"Missing required value in config for postgresql backend: host");
}
std::string const user(get <std::string> (configSection, "user", ""));
if (user.empty ())
{
throw std::runtime_error (
"Missing required value in config for postgresql backend: user");
}
int const port = [&configSection]
{
std::string const portAsString (
get <std::string> (configSection, "port", ""));
if (portAsString.empty ())
{
throw std::runtime_error (
"Missing required value in config for postgresql backend: "
"user");
}
try
{
return std::stoi (portAsString);
}
catch (...)
{
throw std::runtime_error (
"The port value in the config for the postgresql backend must "
"be an integer. Got: " +
portAsString);
}
}();
std::stringstream s;
s << "host=" << host << " port=" << port << " dbname=" << name
<< " user=" << user;
return std::make_pair (s.str (), std::ref(soci::postgresql));
}
#endif // ENABLE_SOCI_POSTGRESQL
std::pair<std::string, soci::backend_factory const&>
getSociInit (BasicConfig const& config,
std::string const& dbName)
{
static const std::string sectionName ("sqdb");
static const std::string keyName ("backend");
auto const& section = config.section (sectionName);
std::string const backendName(get(section, keyName, std::string("sqlite")));
auto const& section = config.section ("sqdb");
std::string const backendName(get(section, "backend", "sqlite"));
if (backendName == "sqlite")
{
@@ -129,12 +61,6 @@ getSociInit (BasicConfig const& config,
: ".db";
return detail::getSociSqliteInit(dbName, path, ext);
}
#if ENABLE_SOCI_POSTGRESQL
else if (backendName == "postgresql")
{
return detail::getSociPostgresqlInit(section, dbName);
}
#endif
else
{
throw std::runtime_error ("Unsupported soci backend: " + backendName);
@@ -142,13 +68,14 @@ getSociInit (BasicConfig const& config,
}
} // detail
SociConfig::SociConfig(std::pair<std::string, soci::backend_factory const&> init)
:connectionString_(std::move(init.first)),
backendFactory_(init.second){}
SociConfig::SociConfig (std::pair<std::string, soci::backend_factory const&> init)
: connectionString_ (std::move (init.first)),
backendFactory_ (init.second)
{
}
SociConfig::SociConfig(BasicConfig const& config,
std::string const& dbName)
: SociConfig(detail::getSociInit(config, dbName))
SociConfig::SociConfig (BasicConfig const& config, std::string const& dbName)
: SociConfig (detail::getSociInit (config, dbName))
{
}
@@ -169,5 +96,160 @@ void open(soci::session& s,
SociConfig c(config, dbName);
c.open(s);
}
void open(soci::session& s,
std::string const& beName,
std::string const& connectionString)
{
if (beName == "sqlite")
{
s.open(soci::sqlite3, connectionString);
return;
}
else
{
throw std::runtime_error ("Unsupported soci backend: " + beName);
}
}
size_t getKBUsedAll (soci::session& s)
{
auto be = dynamic_cast<soci::sqlite3_session_backend*>(s.get_backend ());
assert (be); // Make sure the backend is sqlite
return static_cast<int>(sqlite_api::sqlite3_memory_used () / 1024);
}
size_t getKBUsedDB (soci::session& s)
{
// This function will have to be customized when other backends are added
auto be = dynamic_cast<soci::sqlite3_session_backend*>(s.get_backend ());
assert (be);
int cur = 0, hiw = 0;
sqlite_api::sqlite3_db_status (
be->conn_, SQLITE_DBSTATUS_CACHE_USED, &cur, &hiw, 0);
return cur / 1024;
}
void convert(soci::blob& from, std::vector<std::uint8_t>& to)
{
to.resize (from.get_len ());
if (to.empty ())
return;
from.read (0, reinterpret_cast<char*>(&to[0]), from.get_len ());
}
void convert(soci::blob& from, std::string& to)
{
std::vector<std::uint8_t> tmp;
convert(from, tmp);
to.assign(tmp.begin (), tmp.end());
}
void convert(std::vector<std::uint8_t> const& from, soci::blob& to)
{
if (!from.empty ())
to.write (0, reinterpret_cast<char const*>(&from[0]), from.size ());
}
int SqliteWALHook (void* s,
sqlite_api::sqlite3*,
const char* dbName,
int walSize)
{
(reinterpret_cast<WALCheckpointer*>(s))->doHook (dbName, walSize);
return SQLITE_OK;
}
WALCheckpointer::WALCheckpointer (std::shared_ptr<soci::session> const& s,
JobQueue* q)
: Thread ("sqlitedb"), session_(s)
{
if (auto session =
dynamic_cast<soci::sqlite3_session_backend*>(s->get_backend ()))
conn_ = session->conn_;
if (!conn_) return;
startThread ();
setupCheckpointing (q);
}
WALCheckpointer::~WALCheckpointer ()
{
if (!conn_) return;
stopThread ();
}
void WALCheckpointer::setupCheckpointing (JobQueue* q)
{
if (!conn_) return;
q_ = q;
sqlite_api::sqlite3_wal_hook (conn_, SqliteWALHook, this);
}
void WALCheckpointer::doHook (const char* db, int pages)
{
if (!conn_) return;
if (pages < 1000)
return;
{
ScopedLockType sl (mutex_);
if (running_)
return;
running_ = true;
}
if (q_)
q_->addJob (jtWAL,
std::string ("WAL"),
std::bind (&WALCheckpointer::runWal, this));
else
notify ();
}
void WALCheckpointer::run ()
{
if (!conn_) return;
// Simple thread loop runs Wal every time it wakes up via
// the call to Thread::notify, unless Thread::threadShouldExit returns
// true in which case we simply break.
//
for (;;)
{
wait ();
if (threadShouldExit ())
break;
runWal ();
}
}
void WALCheckpointer::runWal ()
{
if (!conn_) return;
int log = 0, ckpt = 0;
int ret = sqlite3_wal_checkpoint_v2 (
conn_, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
if (ret != SQLITE_OK)
{
WriteLog ((ret == SQLITE_LOCKED) ? lsTRACE : lsWARNING, WALCheckpointer)
<< "WAL(" << sqlite3_db_filename (conn_, "main") << "): error "
<< ret;
}
else
WriteLog (lsTRACE, WALCheckpointer)
<< "WAL(" << sqlite3_db_filename (conn_, "main")
<< "): frames=" << log << ", written=" << ckpt;
{
ScopedLockType sl (mutex_);
running_ = false;
}
}
}