checkpoint

This commit is contained in:
CJ Cobb
2021-05-24 18:23:36 +00:00
parent f513438a95
commit cadf2fa972
8 changed files with 89 additions and 57 deletions

View File

@@ -438,22 +438,27 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
bool isFirst = false;
auto keyIndex = getKeyIndexOfSeq(ledgerSequence);
auto bookIndex = getBookIndexOfSeq(ledgerSequence);
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng || rng->minSequence == ledgerSequence)
if (isFirst_)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (rng && rng->minSequence != ledgerSequence)
isFirst_ = false;
else
{
isFirst = true;
keyIndex = KeyIndex{ledgerSequence};
bookIndex = BookIndex{ledgerSequence};
}
}
backend.writeKeys(keys, keyIndex);
backend.writeBooks(books, bookIndex);
if (isFirst)
if (isFirst_)
{
// write completion record
ripple::uint256 zero = {};
backend.writeBooks({{zero, {zero}}}, bookIndex);
backend.writeKeys({zero}, keyIndex);
}
isFirst_ = false;
keys = {};
books = {};
BOOST_LOG_TRIVIAL(info)

View File

@@ -100,6 +100,8 @@ class BackendIndexer
std::mutex mtx;
std::condition_variable cv_;
mutable bool isFirst_ = true;
void
addKeyAsync(ripple::uint256 const& key);
void
@@ -200,6 +202,7 @@ class BackendInterface
{
protected:
mutable BackendIndexer indexer_;
mutable bool isFirst_ = true;
public:
// read methods
@@ -245,12 +248,17 @@ public:
auto commitRes = doFinishWrites();
if (commitRes)
{
bool isFirst =
fetchLedgerRangeNoThrow()->minSequence == ledgerSequence;
if (indexer_.isBookFlagLedger(ledgerSequence) || isFirst)
if (isFirst_)
{
auto rng = fetchLedgerRangeNoThrow();
if (rng && rng->minSequence != ledgerSequence)
isFirst_ = false;
}
if (indexer_.isBookFlagLedger(ledgerSequence) || isFirst_)
indexer_.writeBookFlagLedgerAsync(ledgerSequence, *this);
if (indexer_.isKeyFlagLedger(ledgerSequence) || isFirst)
if (indexer_.isKeyFlagLedger(ledgerSequence) || isFirst_)
indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this);
isFirst_ = false;
}
return commitRes;
}
@@ -267,6 +275,7 @@ public:
std::optional<LedgerRange>
fetchLedgerRangeNoThrow() const
{
BOOST_LOG_TRIVIAL(warning) << __func__;
while (true)
{
try

View File

@@ -1527,12 +1527,6 @@ CassandraBackend::open(bool readOnly)
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
@@ -1822,12 +1816,13 @@ CassandraBackend::open(bool readOnly)
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
/*
query.str("");
query << " SELECT key,object FROM " << tablePrefix
<< "objects WHERE sequence = ?";
if (!selectLedgerDiff_.prepareStatement(query, session_.get()))
continue;
*/
setupPreparedStatements = true;
}

View File

@@ -841,14 +841,6 @@ public:
{
// wait for all other writes to finish
sync();
auto rng = fetchLedgerRangeNoThrow();
if (rng && rng->maxSequence >= ledgerSequence_)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Ledger " << std::to_string(ledgerSequence_)
<< " already written. Returning";
return false;
}
// write range
if (isFirstLedger_)
{

View File

@@ -790,7 +790,7 @@ CREATE TABLE IF NOT EXISTS objects (
object bytea
) PARTITION BY RANGE (ledger_seq);
CREATE INDEX objects_idx ON objects USING btree(key, ledger_seq);
CREATE INDEX objects_idx ON objects USING btree(ledger_seq,key);
create table if not exists objects1 partition of objects for values from (0) to (10000000);
create table if not exists objects2 partition of objects for values from (10000000) to (20000000);

View File

@@ -775,12 +775,22 @@ PostgresBackend::doFinishWrites() const
{
if (!abortWrite_)
{
writeConnection_.bulkInsert("transactions", transactionsBuffer_.str());
std::string txStr = transactionsBuffer_.str();
writeConnection_.bulkInsert("transactions", txStr);
writeConnection_.bulkInsert(
"account_transactions", accountTxBuffer_.str());
std::string objectsStr = objectsBuffer_.str();
if (objectsStr.size())
writeConnection_.bulkInsert("objects", objectsStr);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " objects size = " << objectsStr.size()
<< " txns size = " << txStr.size();
std::string keysStr = keysBuffer_.str();
if (keysStr.size())
writeConnection_.bulkInsert("keys", keysStr);
std::string booksStr = booksBuffer_.str();
if (booksStr.size())
writeConnection_.bulkInsert("books", booksStr);
}
auto res = writeConnection_("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK)
@@ -795,6 +805,8 @@ PostgresBackend::doFinishWrites() const
objectsBuffer_.clear();
booksBuffer_.str("");
booksBuffer_.clear();
keysBuffer_.str("");
keysBuffer_.clear();
accountTxBuffer_.str("");
accountTxBuffer_.clear();
numRowsInObjectsBuffer_ = 0;
@@ -806,33 +818,36 @@ PostgresBackend::writeKeys(
KeyIndex const& index,
bool isAsync) const
{
return true;
if (isAsync)
return true;
if (abortWrite_)
return false;
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_);
pgQuery("BEGIN");
std::stringstream keysBuffer;
PgQuery& conn = isAsync ? pgQuery : writeConnection_;
if (isAsync)
conn("BEGIN");
size_t numRows = 0;
for (auto& key : keys)
{
keysBuffer << std::to_string(index.keyIndex) << '\t' << "\\\\x"
keysBuffer_ << std::to_string(index.keyIndex) << '\t' << "\\\\x"
<< ripple::strHex(key) << '\n';
numRows++;
// If the buffer gets too large, the insert fails. Not sure why.
// When writing in the background, we insert after every 10000 rows
if ((isAsync && numRows == 10000) || numRows == 100000)
{
pgQuery.bulkInsert("keys", keysBuffer.str());
conn.bulkInsert("keys", keysBuffer_.str());
std::stringstream temp;
keysBuffer.swap(temp);
keysBuffer_.swap(temp);
numRows = 0;
if (isAsync)
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
if (numRows > 0)
{
pgQuery.bulkInsert("keys", keysBuffer.str());
}
pgQuery("COMMIT");
if (isAsync)
conn("COMMIT");
return true;
}
bool
@@ -843,17 +858,23 @@ PostgresBackend::writeBooks(
BookIndex const& index,
bool isAsync) const
{
return true;
if (isAsync)
return true;
if (abortWrite_)
return false;
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_);
pgQuery("BEGIN");
std::stringstream booksBuffer;
PgQuery& conn = isAsync ? pgQuery : writeConnection_;
if (isAsync)
conn("BEGIN");
size_t numRows = 0;
for (auto& book : books)
{
for (auto& offer : book.second)
{
booksBuffer << std::to_string(index.bookIndex) << '\t' << "\\\\x"
booksBuffer_ << std::to_string(index.bookIndex) << '\t' << "\\\\x"
<< ripple::strHex(book.first) << '\t' << "\\\\x"
<< ripple::strHex(offer) << '\n';
numRows++;
@@ -861,20 +882,17 @@ PostgresBackend::writeBooks(
// When writing in the background, we insert after every 10 rows
if ((isAsync && numRows == 1000) || numRows == 100000)
{
pgQuery.bulkInsert("books", booksBuffer.str());
conn.bulkInsert("books", booksBuffer_.str());
std::stringstream temp;
booksBuffer.swap(temp);
booksBuffer_.swap(temp);
numRows = 0;
if (isAsync)
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
if (numRows > 0)
{
pgQuery.bulkInsert("books", booksBuffer.str());
}
pgQuery("COMMIT");
if (isAsync)
conn("COMMIT");
return true;
}
bool

View File

@@ -9,8 +9,9 @@ class PostgresBackend : public BackendInterface
private:
mutable size_t numRowsInObjectsBuffer_ = 0;
mutable std::stringstream objectsBuffer_;
mutable std::stringstream transactionsBuffer_;
mutable std::stringstream booksBuffer_;
mutable std::stringstream keysBuffer_;
mutable std::stringstream transactionsBuffer_;
mutable std::stringstream accountTxBuffer_;
std::shared_ptr<PgPool> pgPool_;
mutable PgQuery writeConnection_;

View File

@@ -69,7 +69,7 @@ ReportingETL::insertTransactions(
auto metaSerializer = std::make_shared<ripple::Serializer>(
txMeta.getAsObject().getSerializer());
BOOST_LOG_TRIVIAL(trace)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserting transaction = " << sttx.getTransactionID();
@@ -241,7 +241,7 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx)
std::pair<ripple::LedgerInfo, bool>
ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " : "
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Beginning ledger update";
ripple::LedgerInfo lgrInfo =
@@ -252,8 +252,12 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites();
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "started writes";
flatMapBackend_->writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header()));
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote ledger header";
std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)};
@@ -293,7 +297,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
isDeleted,
std::move(bookDir));
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "wrote objects. num objects = "
<< std::to_string(rawData.ledger_objects().objects_size());
flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote account_tx";
accumTxns_ += rawData.transactions_list().transactions_size();
bool success = true;
if (accumTxns_ >= txnThreshold_)
@@ -361,6 +371,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
assert(false);
throw std::runtime_error("runETLPipeline: parent ledger is null");
}
std::atomic<uint32_t> minSequence = rng->minSequence;
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches";
@@ -451,6 +462,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
}
std::thread transformer{[this,
&minSequence,
&writeConflict,
&startSequence,
&getNext,
@@ -499,16 +511,16 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
lastPublishedSequence = lgrInfo.seq;
}
writeConflict = !success;
auto range = flatMapBackend_->fetchLedgerRangeNoThrow();
if (onlineDeleteInterval_ && !deleting_ &&
range->maxSequence - range->minSequence >
*onlineDeleteInterval_)
lgrInfo.seq - minSequence > *onlineDeleteInterval_)
{
deleting_ = true;
ioContext_.post([this, &range]() {
ioContext_.post([this, &minSequence]() {
BOOST_LOG_TRIVIAL(info) << "Running online delete";
flatMapBackend_->doOnlineDelete(*onlineDeleteInterval_);
BOOST_LOG_TRIVIAL(info) << "Finished online delete";
auto rng = flatMapBackend_->fetchLedgerRangeNoThrow();
minSequence = rng->minSequence;
deleting_ = false;
});
}