Implement Shard SQLite support

This commit is contained in:
Miguel Portilla
2019-07-19 18:29:27 -04:00
committed by Nik Bougalis
parent 008fc5155a
commit 66fad62e66
27 changed files with 1217 additions and 986 deletions

View File

@@ -19,27 +19,31 @@
#include <ripple/nodestore/impl/Shard.h>
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/app/main/DBInit.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/impl/DatabaseShardImp.h>
#include <ripple/nodestore/Manager.h>
#include <boost/algorithm/string.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <fstream>
namespace ripple {
namespace NodeStore {
Shard::Shard(DatabaseShard const& db, std::uint32_t index,
int cacheSz, std::chrono::seconds cacheAge, beast::Journal& j)
: index_(index)
Shard::Shard(
Application& app,
DatabaseShard const& db,
std::uint32_t index,
beast::Journal& j)
: app_(app)
, index_(index)
, firstSeq_(db.firstLedgerSeq(index))
, lastSeq_(std::max(firstSeq_, db.lastLedgerSeq(index)))
, maxLedgers_(index == db.earliestShardIndex() ?
lastSeq_ - firstSeq_ + 1 : db.ledgersPerShard())
, pCache_(std::make_shared<PCache>(
"shard " + std::to_string(index_),
cacheSz, cacheAge, stopwatch(), j))
, nCache_(std::make_shared<NCache>(
"shard " + std::to_string(index_),
stopwatch(), cacheSz, cacheAge))
, dir_(db.getRootDir() / std::to_string(index_))
, control_(dir_ / controlFileName)
, j_(j)
@@ -49,38 +53,47 @@ Shard::Shard(DatabaseShard const& db, std::uint32_t index,
}
bool
Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx)
Shard::open(Scheduler& scheduler, nudb::context& ctx)
{
assert(!backend_);
using namespace boost::filesystem;
using namespace boost::beast::detail;
std::string const type (get<std::string>(config, "type", "nudb"));
Config const& config {app_.config()};
Section section {config.section(ConfigSection::shardDatabase())};
std::string const type (get<std::string>(section, "type", "nudb"));
auto factory {Manager::instance().find(type)};
if (!factory)
{
JLOG(j_.error()) <<
"shard " << index_ <<
": failed to create shard store type " << type;
" failed to create backend type " << type;
return false;
}
config.set("path", dir_.string());
section.set("path", dir_.string());
backend_ = factory->createInstance(
NodeObject::keyBytes, config, scheduler, ctx, j_);
NodeObject::keyBytes, section, scheduler, ctx, j_);
auto const preexist {exists(dir_)};
auto fail = [&](std::string msg)
auto fail = [this, preexist](std::string const& msg)
{
pCache_.reset();
nCache_.reset();
backend_.reset();
lgrSQLiteDB_.reset();
txSQLiteDB_.reset();
storedSeqs_.clear();
lastStored_.reset();
if (!preexist)
removeAll(dir_, j_);
if (!msg.empty())
{
JLOG(j_.error()) <<
"shard " << index_ << ": " << msg;
"shard " << index_ << " " << msg;
}
if (backend_)
backend_->close();
if (!preexist)
removeAll(dir_, j_);
return false;
};
@@ -112,100 +125,84 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx)
if (boost::icl::first(storedSeqs_) < firstSeq_ ||
boost::icl::last(storedSeqs_) > lastSeq_)
{
return fail("invalid control file");
return fail("has an invalid control file");
}
if (boost::icl::length(storedSeqs_) >= maxLedgers_)
{
JLOG(j_.error()) <<
JLOG(j_.warn()) <<
"shard " << index_ <<
": found control file for complete shard";
storedSeqs_.clear();
complete_ = true;
" has a control file for complete shard";
setComplete();
remove_all(control_);
}
}
}
else
complete_ = true;
setComplete();
// Calculate file foot print of backend files
for (auto const& p : recursive_directory_iterator(dir_))
if (!is_directory(p))
fileSize_ += file_size(p);
setCache();
if (!initSQLite() || !setFileStats())
return fail({});
}
catch (std::exception const& e)
{
return fail(e.what());
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
return true;
}
bool
Shard::setStored(std::shared_ptr<Ledger const> const& l)
Shard::setStored(std::shared_ptr<Ledger const> const& ledger)
{
assert(backend_&& !complete_);
if (boost::icl::contains(storedSeqs_, l->info().seq))
if (boost::icl::contains(storedSeqs_, ledger->info().seq))
{
JLOG(j_.debug()) <<
"shard " << index_ <<
" ledger seq " << l->info().seq <<
" already stored";
" has ledger sequence " << ledger->info().seq << " already stored";
return false;
}
if (!setSQLiteStored(ledger))
return false;
// Check if the shard is complete
if (boost::icl::length(storedSeqs_) >= maxLedgers_ - 1)
{
setComplete();
if (backend_->backed())
{
if (!removeAll(control_, j_))
return false;
// Update file foot print of backend files
using namespace boost::filesystem;
std::uint64_t sz {0};
try
{
for (auto const& p : recursive_directory_iterator(dir_))
if (!is_directory(p))
sz += file_size(p);
}
catch (const filesystem_error& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
fileSize_ = std::max(fileSize_, sz);
setCache();
if (!initSQLite() || !setFileStats())
return false;
}
fileSize_ = sz;
}
complete_ = true;
storedSeqs_.clear();
JLOG(j_.debug()) <<
"shard " << index_ <<
" ledger seq " << l->info().seq <<
" stored. Shard complete";
}
else
{
storedSeqs_.insert(l->info().seq);
lastStored_ = l;
storedSeqs_.insert(ledger->info().seq);
if (backend_->backed() && !saveControl())
return false;
JLOG(j_.debug()) <<
"shard " << index_ <<
" ledger seq " << l->info().seq <<
" stored";
}
JLOG(j_.debug()) <<
"shard " << index_ <<
" stored ledger sequence " << ledger->info().seq <<
(complete_ ? " and is complete" : "");
lastStored_ = ledger;
return true;
}
boost::optional<std::uint32_t>
Shard::prepare()
{
assert(backend_);
if (storedSeqs_.empty())
return lastSeq_;
return prevMissing(storedSeqs_, 1 + lastSeq_, firstSeq_);
@@ -214,6 +211,7 @@ Shard::prepare()
bool
Shard::contains(std::uint32_t seq) const
{
assert(backend_);
if (seq < firstSeq_ || seq > lastSeq_)
return false;
if (complete_)
@@ -221,44 +219,53 @@ Shard::contains(std::uint32_t seq) const
return boost::icl::contains(storedSeqs_, seq);
}
void
Shard::sweep()
{
assert(backend_);
pCache_->sweep();
nCache_->sweep();
}
bool
Shard::validate(Application& app)
Shard::validate()
{
uint256 hash;
std::uint32_t seq;
std::shared_ptr<Ledger> l;
std::shared_ptr<Ledger> ledger;
auto fail = [this](std::string const& msg)
{
JLOG(j_.error()) << "shard " << index_ << " " << msg;
return false;
};
// Find the hash of the last ledger in this shard
{
std::tie(l, seq, hash) = loadLedgerHelper(
std::tie(ledger, seq, hash) = loadLedgerHelper(
"WHERE LedgerSeq >= " + std::to_string(lastSeq_) +
" order by LedgerSeq desc limit 1", app, false);
if (!l)
{
JLOG(j_.error()) <<
"shard " << index_ <<
" unable to validate. No lookup data";
return false;
}
" order by LedgerSeq desc limit 1", app_, false);
if (!ledger)
return fail("is unable to validate due to lacking lookup data");
if (seq != lastSeq_)
{
l->setImmutable(app.config());
ledger->setImmutable(app_.config());
boost::optional<uint256> h;
try
{
h = hashOfSeq(*l, lastSeq_, j_);
h = hashOfSeq(*ledger, lastSeq_, j_);
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
return false;
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
if (!h)
{
JLOG(j_.error()) <<
"shard " << index_ <<
" No hash for last ledger seq " << lastSeq_;
return false;
return fail("is missing hash for last ledger sequence " +
std::to_string(lastSeq_));
}
hash = *h;
seq = lastSeq_;
@@ -266,9 +273,8 @@ Shard::validate(Application& app)
}
JLOG(j_.debug()) <<
"Validating shard " << index_ <<
" ledgers " << firstSeq_ <<
"-" << lastSeq_;
"shard " << index_ <<
" has ledger sequences " << firstSeq_ << "-" << lastSeq_;
// Use a short age to keep memory consumption low
auto const savedAge {pCache_->getTargetAge()};
@@ -282,44 +288,45 @@ Shard::validate(Application& app)
auto nObj = valFetch(hash);
if (!nObj)
break;
l = std::make_shared<Ledger>(
ledger = std::make_shared<Ledger>(
InboundLedger::deserializeHeader(makeSlice(nObj->getData()),
true), app.config(), *app.shardFamily());
if (l->info().hash != hash || l->info().seq != seq)
true), app_.config(), *app_.shardFamily());
if (ledger->info().seq != seq)
{
JLOG(j_.error()) <<
"ledger seq " << seq <<
" hash " << hash <<
" cannot be a ledger";
fail("encountered invalid ledger sequence " + std::to_string(seq));
break;
}
l->stateMap().setLedgerSeq(seq);
l->txMap().setLedgerSeq(seq);
l->setImmutable(app.config());
if (!l->stateMap().fetchRoot(
SHAMapHash {l->info().accountHash}, nullptr))
if (ledger->info().hash != hash)
{
JLOG(j_.error()) <<
"ledger seq " << seq <<
" missing Account State root";
fail("encountered invalid ledger hash " + to_string(hash) +
" on sequence " + std::to_string(seq));
break;
}
if (l->info().txHash.isNonZero())
ledger->stateMap().setLedgerSeq(seq);
ledger->txMap().setLedgerSeq(seq);
ledger->setImmutable(app_.config());
if (!ledger->stateMap().fetchRoot(
SHAMapHash {ledger->info().accountHash}, nullptr))
{
if (!l->txMap().fetchRoot(
SHAMapHash {l->info().txHash}, nullptr))
fail("is missing root STATE node on sequence " +
std::to_string(seq));
break;
}
if (ledger->info().txHash.isNonZero())
{
if (!ledger->txMap().fetchRoot(
SHAMapHash {ledger->info().txHash}, nullptr))
{
JLOG(j_.error()) <<
"ledger seq " << seq <<
" missing TX root";
fail("is missing root TXN node on sequence " +
std::to_string(seq));
break;
}
}
if (!valLedger(l, next))
if (!valLedger(ledger, next))
break;
hash = l->info().parentHash;
hash = ledger->info().parentHash;
--seq;
next = l;
next = ledger;
if (seq % 128 == 0)
pCache_->sweep();
}
@@ -330,79 +337,87 @@ Shard::validate(Application& app)
if (seq >= firstSeq_)
{
JLOG(j_.error()) <<
"shard " << index_ <<
(complete_ ? " is invalid, failed" : " is incomplete, stopped") <<
" at seq " << seq <<
" hash " << hash;
return false;
return fail(std::string(" is ") +
(complete_ ? "invalid, failed" : "incomplete, stopped") +
" on hash " + to_string(hash) + " on sequence " +
std::to_string(seq));
}
JLOG(j_.debug()) <<
"shard " << index_ <<
" is complete.";
"shard " << index_ << " is valid and complete";
return true;
}
bool
Shard::valLedger(std::shared_ptr<Ledger const> const& l,
Shard::valLedger(std::shared_ptr<Ledger const> const& ledger,
std::shared_ptr<Ledger const> const& next)
{
if (l->info().hash.isZero() || l->info().accountHash.isZero())
auto fail = [this](std::string const& msg)
{
JLOG(j_.error()) <<
"invalid ledger";
JLOG(j_.error()) << "shard " << index_ << " " << msg;
return false;
};
if (ledger->info().hash.isZero())
{
return fail("encountered a zero ledger hash on sequence " +
std::to_string(ledger->info().seq));
}
if (ledger->info().accountHash.isZero())
{
return fail("encountered a zero account hash on sequence " +
std::to_string(ledger->info().seq));
}
bool error {false};
auto f = [&, this](SHAMapAbstractNode& node) {
auto f = [this, &error](SHAMapAbstractNode& node)
{
if (!valFetch(node.getNodeHash().as_uint256()))
error = true;
return !error;
};
// Validate the state map
if (l->stateMap().getHash().isNonZero())
if (ledger->stateMap().getHash().isNonZero())
{
if (!l->stateMap().isValid())
if (!ledger->stateMap().isValid())
{
JLOG(j_.error()) <<
"invalid state map";
return false;
return fail("has an invalid state map on sequence " +
std::to_string(ledger->info().seq));
}
try
{
if (next && next->info().parentHash == l->info().hash)
l->stateMap().visitDifferences(&next->stateMap(), f);
if (next && next->info().parentHash == ledger->info().hash)
ledger->stateMap().visitDifferences(&next->stateMap(), f);
else
l->stateMap().visitNodes(f);
ledger->stateMap().visitNodes(f);
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
return false;
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
if (error)
return false;
}
// Validate the tx map
if (l->info().txHash.isNonZero())
// Validate the transaction map
if (ledger->info().txHash.isNonZero())
{
if (!l->txMap().isValid())
if (!ledger->txMap().isValid())
{
JLOG(j_.error()) <<
"invalid transaction map";
return false;
return fail("has an invalid transaction map on sequence " +
std::to_string(ledger->info().seq));
}
try
{
l->txMap().visitNodes(f);
ledger->txMap().visitNodes(f);
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
return false;
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
if (error)
return false;
@@ -415,6 +430,11 @@ Shard::valFetch(uint256 const& hash)
{
assert(backend_);
std::shared_ptr<NodeObject> nObj;
auto fail = [this](std::string const& msg)
{
JLOG(j_.error()) << "shard " << index_ << " " << msg;
};
try
{
switch (backend_->fetch(hash.begin(), &nObj))
@@ -423,29 +443,324 @@ Shard::valFetch(uint256 const& hash)
break;
case notFound:
{
JLOG(j_.error()) <<
"NodeObject not found. hash " << hash;
fail("is missing node object on hash " + to_string(hash));
break;
}
case dataCorrupt:
{
JLOG(j_.error()) <<
"NodeObject is corrupt. hash " << hash;
fail("has a corrupt node object on hash " + to_string(hash));
break;
}
default:
{
JLOG(j_.error()) <<
"unknown error. hash " << hash;
fail("encountered unknown error on hash " + to_string(hash));
}
}
catch (std::exception const& e)
{
fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
return nObj;
}
void
Shard::setComplete()
{
storedSeqs_.clear();
complete_ = true;
}
void
Shard::setCache()
{
// complete shards use the smallest cache and
// fastest expiration to reduce memory consumption.
// The incomplete shard is set according to configuration.
if (!pCache_)
{
auto const name {"shard " + std::to_string(index_)};
auto const sz {complete_ ?
Config::getSize(siNodeCacheSize, 0) :
app_.config().getSize(siNodeCacheSize)};
auto const age {std::chrono::seconds{complete_ ?
Config::getSize(siNodeCacheAge, 0) :
app_.config().getSize(siNodeCacheAge)}};
pCache_ = std::make_shared<PCache>(name, sz, age, stopwatch(), j_);
nCache_ = std::make_shared<NCache>(name, stopwatch(), sz, age);
}
else
{
auto const sz {Config::getSize(siNodeCacheSize, 0)};
pCache_->setTargetSize(sz);
nCache_->setTargetSize(sz);
auto const age {std::chrono::seconds{
Config::getSize(siNodeCacheAge, 0)}};
pCache_->setTargetAge(age);
nCache_->setTargetAge(age);
}
}
bool
Shard::initSQLite()
{
Config const& config {app_.config()};
DatabaseCon::Setup setup;
setup.startUp = config.START_UP;
setup.standAlone = config.standalone();
setup.dataDir = dir_;
try
{
if (complete_)
{
using namespace boost::filesystem;
// Remove WAL files if they exist
for (auto const& d : directory_iterator(dir_))
{
if (is_regular_file(d) &&
boost::iends_with(extension(d), "-wal"))
{
// Closing the session forces a checkpoint
if (!lgrSQLiteDB_)
{
lgrSQLiteDB_ = std::make_unique <DatabaseCon>(
setup,
LgrDBName,
LgrDBPragma,
LgrDBInit);
}
lgrSQLiteDB_->getSession().close();
if (!txSQLiteDB_)
{
txSQLiteDB_ = std::make_unique <DatabaseCon>(
setup,
TxDBName,
TxDBPragma,
TxDBInit);
}
txSQLiteDB_->getSession().close();
break;
}
}
lgrSQLiteDB_ = std::make_unique <DatabaseCon>(
setup,
LgrDBName,
CompleteShardDBPragma,
LgrDBInit);
lgrSQLiteDB_->getSession() <<
boost::str(boost::format("PRAGMA cache_size=-%d;") %
kilobytes(Config::getSize(siLgrDBCache, 0)));
txSQLiteDB_ = std::make_unique <DatabaseCon>(
setup,
TxDBName,
CompleteShardDBPragma,
TxDBInit);
txSQLiteDB_->getSession() <<
boost::str(boost::format("PRAGMA cache_size=-%d;") %
kilobytes(Config::getSize(siTxnDBCache, 0)));
}
else
{
// The incomplete shard uses a Write Ahead Log for performance
lgrSQLiteDB_ = std::make_unique <DatabaseCon>(
setup,
LgrDBName,
LgrDBPragma,
LgrDBInit);
lgrSQLiteDB_->getSession() <<
boost::str(boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getSize(siLgrDBCache)));
lgrSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs());
txSQLiteDB_ = std::make_unique <DatabaseCon>(
setup,
TxDBName,
TxDBPragma,
TxDBInit);
txSQLiteDB_->getSession() <<
boost::str(boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getSize(siTxnDBCache)));
txSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs());
}
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
"shard " << index_ <<
" exception " << e.what() <<
" in function " << __func__;
return false;
}
return nObj;
return true;
}
bool
Shard::setSQLiteStored(std::shared_ptr<Ledger const> const& ledger)
{
auto const seq {ledger->info().seq};
assert(backend_ && !complete_);
assert(!boost::icl::contains(storedSeqs_, seq));
try
{
{
auto& session {txSQLiteDB_->getSession()};
soci::transaction tr(session);
session <<
"DELETE FROM Transactions WHERE LedgerSeq = :seq;"
, soci::use(seq);
session <<
"DELETE FROM AccountTransactions WHERE LedgerSeq = :seq;"
, soci::use(seq);
if (ledger->info().txHash.isNonZero())
{
auto const sSeq {std::to_string(seq)};
if (!ledger->txMap().isValid())
{
JLOG(j_.error()) <<
"shard " << index_ <<
" has an invalid transaction map" <<
" on sequence " << sSeq;
return false;
}
for (auto const& item : ledger->txs)
{
auto const txID {item.first->getTransactionID()};
auto const sTxID {to_string(txID)};
auto const txMeta {std::make_shared<TxMeta>(
txID, ledger->seq(), *item.second)};
session <<
"DELETE FROM AccountTransactions WHERE TransID = :txID;"
, soci::use(sTxID);
auto const& accounts = txMeta->getAffectedAccounts(j_);
if (!accounts.empty())
{
auto const s(boost::str(boost::format(
"('%s','%s',%s,%s)")
% sTxID
% "%s"
% sSeq
% std::to_string(txMeta->getIndex())));
std::string sql;
sql.reserve((accounts.size() + 1) * 128);
sql = "INSERT INTO AccountTransactions "
"(TransID, Account, LedgerSeq, TxnSeq) VALUES ";
sql += boost::algorithm::join(
accounts | boost::adaptors::transformed(
[&](AccountID const& accountID)
{
return boost::str(boost::format(s)
% ripple::toBase58(accountID));
}),
",");
sql += ';';
session << sql;
JLOG(j_.trace()) <<
"shard " << index_ <<
" account transaction: " << sql;
}
else
{
JLOG(j_.warn()) <<
"shard " << index_ <<
" transaction in ledger " << sSeq <<
" affects no accounts";
}
Serializer s;
item.second->add(s);
session <<
(STTx::getMetaSQLInsertReplaceHeader() +
item.first->getMetaSQL(
seq,
sqlEscape(std::move(s.modData())))
+ ';');
}
}
tr.commit ();
}
auto& session {lgrSQLiteDB_->getSession()};
soci::transaction tr(session);
session <<
"DELETE FROM Ledgers WHERE LedgerSeq = :seq;"
, soci::use(seq);
session <<
"INSERT OR REPLACE INTO Ledgers ("
"LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime,"
"PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash,"
"TransSetHash)"
"VALUES ("
":ledgerHash, :ledgerSeq, :prevHash, :totalCoins, :closingTime,"
":prevClosingTime, :closeTimeRes, :closeFlags, :accountSetHash,"
":transSetHash);",
soci::use(to_string(ledger->info().hash)),
soci::use(seq),
soci::use(to_string(ledger->info().parentHash)),
soci::use(to_string(ledger->info().drops)),
soci::use(ledger->info().closeTime.time_since_epoch().count()),
soci::use(ledger->info().parentCloseTime.time_since_epoch().count()),
soci::use(ledger->info().closeTimeResolution.count()),
soci::use(ledger->info().closeFlags),
soci::use(to_string(ledger->info().accountHash)),
soci::use(to_string(ledger->info().txHash));
tr.commit();
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"shard " << index_ <<
" exception " << e.what() <<
" in function " << __func__;
return false;
}
return true;
}
bool
Shard::setFileStats()
{
fileSz_ = 0;
fdRequired_ = 0;
if (backend_->backed())
{
try
{
using namespace boost::filesystem;
for (auto const& d : directory_iterator(dir_))
{
if (is_regular_file(d))
{
fileSz_ += file_size(d);
++fdRequired_;
}
}
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"shard " << index_ <<
" exception " << e.what() <<
" in function " << __func__;
return false;
}
}
return true;
}
bool
@@ -455,10 +770,10 @@ Shard::saveControl()
if (!ofs.is_open())
{
JLOG(j_.fatal()) <<
"shard " << index_ <<
" unable to save control file";
"shard " << index_ << " is unable to save control file";
return false;
}
boost::archive::text_oarchive ar(ofs);
ar & storedSeqs_;
return true;