Use soci in some places:

* Brings the soci subtree into rippled.
* Validator, peerfinder, and SHAMapStore use new soci backend.
* Optional postgresql backend for soci (if POSTGRESQL_ROOT env var is set).
This commit is contained in:
seelabs
2015-01-22 15:04:30 -08:00
committed by Vinnie Falco
parent c7cfd23580
commit d0ef2f7dd8
28 changed files with 1412 additions and 663 deletions

View File

@@ -23,6 +23,7 @@
#include <ripple/peerfinder/impl/Logic.h>
#include <ripple/peerfinder/impl/SourceStrings.h>
#include <ripple/peerfinder/impl/StoreSqdb.h>
#include <ripple/app/data/SociDB.h>
#include <boost/asio/io_service.hpp>
#include <boost/optional.hpp>
#include <beast/cxx14/memory.h> // <memory>
@@ -37,33 +38,31 @@ class ManagerImp
public:
boost::asio::io_service &io_service_;
boost::optional <boost::asio::io_service::work> work_;
beast::File m_databaseFile;
clock_type& m_clock;
beast::Journal m_journal;
StoreSqdb m_store;
Checker<boost::asio::ip::tcp> checker_;
Logic <decltype(checker_)> m_logic;
SociConfig m_sociConfig;
//--------------------------------------------------------------------------
ManagerImp (
Stoppable& stoppable,
boost::asio::io_service& io_service,
beast::File const& pathToDbFileOrDirectory,
clock_type& clock,
beast::Journal journal)
beast::Journal journal,
BasicConfig const& config)
: Manager (stoppable)
, io_service_(io_service)
, work_(boost::in_place(std::ref(io_service_)))
, m_databaseFile (pathToDbFileOrDirectory)
, m_clock (clock)
, m_journal (journal)
, m_store (journal)
, checker_ (io_service_)
, m_logic (clock, m_store, checker_, journal)
, m_sociConfig (config, "peerfinder")
{
if (m_databaseFile.isDirectory ())
m_databaseFile = m_databaseFile.getChildFile("peerfinder.sqlite");
}
~ManagerImp()
@@ -214,12 +213,8 @@ public:
void
onPrepare ()
{
beast::Error error (m_store.open (m_databaseFile));
if (error)
m_journal.fatal <<
"Failed to open '" << m_databaseFile.getFullPathName() << "'";
if (! error)
m_logic.load ();
m_store.open (m_sociConfig);
m_logic.load ();
}
void
@@ -255,10 +250,10 @@ Manager::Manager (Stoppable& parent)
std::unique_ptr<Manager>
make_Manager (beast::Stoppable& parent, boost::asio::io_service& io_service,
beast::File const& databaseFile, clock_type& clock, beast::Journal journal)
clock_type& clock, beast::Journal journal, BasicConfig const& config)
{
return std::make_unique<ManagerImp> (
parent, io_service, databaseFile, clock, journal);
parent, io_service, clock, journal, config);
}
}

View File

@@ -20,7 +20,7 @@
#ifndef RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED
#define RIPPLE_PEERFINDER_STORESQDB_H_INCLUDED
#include <beast/module/sqdb/sqdb.h>
#include <ripple/app/data/SociDB.h>
#include <beast/utility/Debug.h>
namespace ripple {
@@ -32,8 +32,7 @@ class StoreSqdb
{
private:
beast::Journal m_journal;
beast::sqdb::session m_session;
soci::session m_session;
public:
enum
{
@@ -50,19 +49,15 @@ public:
{
}
beast::Error open (beast::File const& file)
void open (SociConfig const& sociConfig)
{
beast::Error error (m_session.open (file.getFullPathName ()));
sociConfig.open (m_session);
m_journal.info << "Opening database at '" << file.getFullPathName() << "'";
m_journal.info << "Opening database at '" << sociConfig.connectionString ()
<< "'";
if (! error)
error = init ();
if (! error)
error = update ();
return error;
init ();
update ();
}
// Loads the bootstrap cache, calling the callback for each entry
@@ -70,42 +65,34 @@ public:
std::size_t load (load_callback const& cb)
{
std::size_t n (0);
beast::Error error;
std::string s;
int valence;
beast::sqdb::statement st = (m_session.prepare <<
soci::statement st = (m_session.prepare <<
"SELECT "
" address, "
" valence "
"FROM PeerFinder_BootstrapCache "
, beast::sqdb::into (s)
, beast::sqdb::into (valence)
, soci::into (s)
, soci::into (valence)
);
if (st.execute_and_fetch (error))
st.execute ();
while (st.fetch ())
{
do
beast::IP::Endpoint const endpoint (
beast::IP::Endpoint::from_string (s));
if (!is_unspecified (endpoint))
{
beast::IP::Endpoint const endpoint (
beast::IP::Endpoint::from_string (s));
if (! is_unspecified (endpoint))
{
cb (endpoint, valence);
++n;
}
else
{
m_journal.error <<
"Bad address string '" << s << "' in Bootcache table";
}
cb (endpoint, valence);
++n;
}
else
{
m_journal.error <<
"Bad address string '" << s << "' in Bootcache table";
}
while (st.fetch (error));
}
if (error)
report (error, __FILE__, __LINE__);
return n;
}
@@ -113,306 +100,237 @@ public:
//
void save (std::vector <Entry> const& v)
{
beast::Error error;
beast::sqdb::transaction tr (m_session);
m_session.once (error) <<
soci::transaction tr (m_session);
m_session <<
"DELETE FROM PeerFinder_BootstrapCache";
if (! error)
std::string s;
int valence;
soci::statement st = (m_session.prepare <<
"INSERT INTO PeerFinder_BootstrapCache ( "
" address, "
" valence "
") VALUES ( "
" :s, :valence "
");"
, soci::use (s)
, soci::use (valence)
);
for (auto const& e : v)
{
std::string s;
int valence;
beast::sqdb::statement st = (m_session.prepare <<
"INSERT INTO PeerFinder_BootstrapCache ( "
" address, "
" valence "
") VALUES ( "
" ?, ? "
");"
, beast::sqdb::use (s)
, beast::sqdb::use (valence)
);
for (auto const& e : v)
{
s = to_string (e.endpoint);
valence = e.valence;
st.execute_and_fetch (error);
if (error)
break;
}
s = to_string (e.endpoint);
valence = e.valence;
st.execute ();
st.fetch ();
}
if (! error)
error = tr.commit();
if (error)
{
tr.rollback ();
report (error, __FILE__, __LINE__);
}
tr.commit ();
}
// Convert any existing entries from an older schema to the
// current one, if appropriate.
//
beast::Error update ()
void update ()
{
beast::Error error;
beast::sqdb::transaction tr (m_session);
soci::transaction tr (m_session);
// get version
int version (0);
if (! error)
{
m_session.once (error) <<
m_session <<
"SELECT "
" version "
"FROM SchemaVersion WHERE "
" name = 'PeerFinder'"
,beast::sqdb::into (version)
, soci::into (version)
;
if (! error)
{
if (!m_session.got_data())
version = 0;
if (!m_session.got_data ())
version = 0;
m_journal.info <<
"Opened version " << version << " database";
}
m_journal.info <<
"Opened version " << version << " database";
}
if (!error)
{
if (version < currentSchemaVersion)
m_journal.info <<
"Updating database to version " << currentSchemaVersion;
else if (version > currentSchemaVersion)
error.fail (__FILE__, __LINE__,
{
throw std::runtime_error (
"The PeerFinder database version is higher than expected");
}
}
if (! error && version < 4)
if (version < 4)
{
//
// Remove the "uptime" column from the bootstrap table
//
if (! error)
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache_Next ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" address TEXT UNIQUE NOT NULL, "
" valence INTEGER"
");"
;
if (! error)
m_session.once (error) <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Next_Index ON "
" PeerFinder_BootstrapCache_Next "
" ( address ); "
;
std::size_t count;
if (! error)
m_session.once (error) <<
"SELECT COUNT(*) FROM PeerFinder_BootstrapCache "
,beast::sqdb::into (count)
;
std::vector <Store::Entry> list;
if (! error)
{
list.reserve (count);
std::string s;
int valence;
beast::sqdb::statement st = (m_session.prepare <<
"SELECT "
" address, "
" valence "
"FROM PeerFinder_BootstrapCache "
, beast::sqdb::into (s)
, beast::sqdb::into (valence)
);
if (st.execute_and_fetch (error))
{
do
{
Store::Entry entry;
entry.endpoint = beast::IP::Endpoint::from_string (s);
if (! is_unspecified (entry.endpoint))
{
entry.valence = valence;
list.push_back (entry);
}
else
{
m_journal.error <<
"Bad address string '" << s << "' in Bootcache table";
}
}
while (st.fetch (error));
}
}
if (! error)
{
std::string s;
int valence;
beast::sqdb::statement st = (m_session.prepare <<
"INSERT INTO PeerFinder_BootstrapCache_Next ( "
" address, "
" valence "
") VALUES ( "
" ?, ?"
");"
, beast::sqdb::use (s)
, beast::sqdb::use (valence)
);
for (auto iter (list.cbegin());
!error && iter != list.cend(); ++iter)
{
s = to_string (iter->endpoint);
valence = iter->valence;
st.execute_and_fetch (error);
}
}
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_BootstrapCache";
if (! error)
m_session.once (error) <<
"DROP INDEX IF EXISTS PeerFinder_BootstrapCache_Index";
if (! error)
m_session.once (error) <<
"ALTER TABLE PeerFinder_BootstrapCache_Next "
" RENAME TO PeerFinder_BootstrapCache";
if (! error)
m_session.once (error) <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Index ON PeerFinder_BootstrapCache "
" ( "
" address "
" ); "
;
}
if (! error && version < 3)
{
//
// Remove legacy endpoints from the schema
//
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS LegacyEndpoints";
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinderLegacyEndpoints";
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints";
if (! error)
m_session.once (error) <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints_Index";
}
if (! error)
{
int const version (currentSchemaVersion);
m_session.once (error) <<
"INSERT OR REPLACE INTO SchemaVersion ("
" name "
" ,version "
") VALUES ( "
" 'PeerFinder', ? "
")"
,beast::sqdb::use(version);
}
if (! error)
error = tr.commit();
if (error)
{
tr.rollback();
report (error, __FILE__, __LINE__);
}
return error;
}
private:
beast::Error init ()
{
beast::Error error;
beast::sqdb::transaction tr (m_session);
if (! error)
m_session.once (error) <<
"PRAGMA encoding=\"UTF-8\"";
if (! error)
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS SchemaVersion ( "
" name TEXT PRIMARY KEY, "
" version INTEGER"
");"
;
if (! error)
m_session.once (error) <<
"CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache ( "
m_session <<
"CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache_Next ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" address TEXT UNIQUE NOT NULL, "
" valence INTEGER"
");"
;
if (! error)
m_session.once (error) <<
m_session <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Index ON PeerFinder_BootstrapCache "
" PeerFinder_BootstrapCache_Next_Index ON "
" PeerFinder_BootstrapCache_Next "
" ( address ); "
;
std::size_t count;
m_session <<
"SELECT COUNT(*) FROM PeerFinder_BootstrapCache "
, soci::into (count)
;
std::vector <Store::Entry> list;
{
list.reserve (count);
std::string s;
int valence;
soci::statement st = (m_session.prepare <<
"SELECT "
" address, "
" valence "
"FROM PeerFinder_BootstrapCache "
, soci::into (s)
, soci::into (valence)
);
st.execute ();
while (st.fetch ())
{
Store::Entry entry;
entry.endpoint = beast::IP::Endpoint::from_string (s);
if (!is_unspecified (entry.endpoint))
{
entry.valence = valence;
list.push_back (entry);
}
else
{
m_journal.error <<
"Bad address string '" << s << "' in Bootcache table";
}
}
}
{
std::string s;
int valence;
soci::statement st = (m_session.prepare <<
"INSERT INTO PeerFinder_BootstrapCache_Next ( "
" address, "
" valence "
") VALUES ( "
" :s, :valence"
");"
, soci::use (s)
, soci::use (valence)
);
for (auto iter (list.cbegin ());
iter != list.cend (); ++iter)
{
s = to_string (iter->endpoint);
valence = iter->valence;
st.execute ();
st.fetch ();
}
}
m_session <<
"DROP TABLE IF EXISTS PeerFinder_BootstrapCache";
m_session <<
"DROP INDEX IF EXISTS PeerFinder_BootstrapCache_Index";
m_session <<
"ALTER TABLE PeerFinder_BootstrapCache_Next "
" RENAME TO PeerFinder_BootstrapCache";
m_session <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Index ON "
"PeerFinder_BootstrapCache "
" ( "
" address "
" ); "
;
if (! error)
error = tr.commit();
if (error)
{
tr.rollback ();
report (error, __FILE__, __LINE__);
}
return error;
if (version < 3)
{
//
// Remove legacy endpoints from the schema
//
m_session <<
"DROP TABLE IF EXISTS LegacyEndpoints";
m_session <<
"DROP TABLE IF EXISTS PeerFinderLegacyEndpoints";
m_session <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints";
m_session <<
"DROP TABLE IF EXISTS PeerFinder_LegacyEndpoints_Index";
}
{
int const version (currentSchemaVersion);
m_session <<
"INSERT OR REPLACE INTO SchemaVersion ("
" name "
" ,version "
") VALUES ( "
" 'PeerFinder', :version "
")"
, soci::use (version);
}
tr.commit ();
}
void report (beast::Error const& error, char const* fileName, int lineNumber)
private:
void init ()
{
if (error)
{
m_journal.error <<
"Failure: '"<< error.getReasonText() << "' " <<
" at " << beast::Debug::getSourceLocation (fileName, lineNumber);
}
soci::transaction tr (m_session);
m_session << "PRAGMA encoding=\"UTF-8\"";
m_session <<
"CREATE TABLE IF NOT EXISTS SchemaVersion ( "
" name TEXT PRIMARY KEY, "
" version INTEGER"
");"
;
m_session <<
"CREATE TABLE IF NOT EXISTS PeerFinder_BootstrapCache ( "
" id INTEGER PRIMARY KEY AUTOINCREMENT, "
" address TEXT UNIQUE NOT NULL, "
" valence INTEGER"
");"
;
m_session <<
"CREATE INDEX IF NOT EXISTS "
" PeerFinder_BootstrapCache_Index ON "
"PeerFinder_BootstrapCache "
" ( "
" address "
" ); "
;
tr.commit ();
}
};