lots of refactor and bug fix

This commit is contained in:
CJ Cobb
2021-06-02 17:47:14 +00:00
parent 9edb743dcf
commit 2299d59fda
10 changed files with 176 additions and 304 deletions

View File

@@ -71,6 +71,7 @@ target_sources(reporting PRIVATE
reporting/CassandraBackend.cpp
reporting/PostgresBackend.cpp
reporting/BackendIndexer.cpp
reporting/BackendInterface.cpp
reporting/Pg.cpp
reporting/DBHelpers.cpp
reporting/ReportingETL.cpp

View File

@@ -35,9 +35,8 @@ doServerInfo(
auto keyIndex = backend.getKeyIndexOfSeq(cur);
assert(keyIndex.has_value());
cur = keyIndex->keyIndex;
auto page = backend.fetchLedgerPage({}, cur, 1);
boost::json::object entry;
entry["complete"] = !page.warning.has_value();
entry["complete"] = backend.isLedgerIndexed(cur);
entry["sequence"] = cur;
indexes.emplace_back(entry);
cur = cur + 1;

View File

@@ -2,6 +2,7 @@
namespace Backend {
BackendIndexer::BackendIndexer(boost::json::object const& config)
: strand_(ioc_)
{
if (config.contains("indexer_key_shift"))
keyShift_ = config.at("indexer_key_shift").as_int64();
@@ -44,12 +45,7 @@ BackendIndexer::doKeysRepair(
{
try
{
auto [objects, curCursor, warning] =
backend.fetchLedgerPage({}, *sequence, 1);
// no cursor means this is the first page
// if there is no warning, we don't need to do a repair
// warning only shows up on the first page
if (!warning)
if (backend.isLedgerIndexed(*sequence))
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " - " << std::to_string(*sequence)
@@ -64,9 +60,11 @@ BackendIndexer::doKeysRepair(
uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_;
doKeysRepair(backend, lower);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " - " << std::to_string(*sequence)
<< __func__ << " - "
<< " sequence = " << std::to_string(*sequence)
<< " lower = " << std::to_string(lower)
<< " finished recursing. submitting repair ";
writeKeyFlagLedgerAsync(*sequence, backend);
writeKeyFlagLedger(lower, backend);
return;
}
}
@@ -85,26 +83,22 @@ BackendIndexer::doKeysRepairAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
boost::asio::post(ioc_, [this, sequence, &backend]() {
boost::asio::post(strand_, [this, sequence, &backend]() {
doKeysRepair(backend, sequence);
});
}
void
BackendIndexer::writeKeyFlagLedgerAsync(
BackendIndexer::writeKeyFlagLedger(
uint32_t ledgerSequence,
BackendInterface const& backend)
{
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
boost::asio::post(ioc_, [this, ledgerSequence, &backend]() {
std::unordered_set<ripple::uint256> keys;
auto nextFlag = getKeyIndexOfSeq(ledgerSequence + 1);
uint32_t lower = ledgerSequence >> keyShift_ << keyShift_;
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex)
<< " starting";
<< "writeKeyFlagLedger - "
<< "next flag = " << std::to_string(nextFlag.keyIndex)
<< "lower = " << std::to_string(lower)
<< "ledgerSequence = " << std::to_string(ledgerSequence) << " starting";
ripple::uint256 zero = {};
std::optional<ripple::uint256> cursor;
size_t numKeys = 0;
@@ -116,9 +110,7 @@ BackendIndexer::writeKeyFlagLedgerAsync(
{
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - checking for complete...";
auto page =
backend.fetchLedgerPage({}, nextFlag.keyIndex, 1);
if (!page.warning)
if (backend.isLedgerIndexed(nextFlag.keyIndex))
{
BOOST_LOG_TRIVIAL(warning)
<< "writeKeyFlagLedger - "
@@ -134,7 +126,7 @@ BackendIndexer::writeKeyFlagLedgerAsync(
indexing_ = nextFlag.keyIndex;
auto start = std::chrono::system_clock::now();
auto [objects, curCursor, warning] =
backend.fetchLedgerPage(cursor, ledgerSequence, 2048);
backend.fetchLedgerPage(cursor, lower, 2048);
auto mid = std::chrono::system_clock::now();
// no cursor means this is the first page
if (!cursor)
@@ -152,6 +144,7 @@ BackendIndexer::writeKeyFlagLedgerAsync(
}
cursor = curCursor;
std::unordered_set<ripple::uint256> keys;
for (auto& obj : objects)
{
keys.insert(obj.key);
@@ -159,13 +152,12 @@ BackendIndexer::writeKeyFlagLedgerAsync(
backend.writeKeys(keys, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "writeKeyFlagLedger - "
<< std::to_string(nextFlag.keyIndex) << " fetched a page "
<< "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex)
<< " fetched a page "
<< " cursor = "
<< (cursor.has_value() ? ripple::strHex(*cursor)
: std::string{})
<< " num keys = " << std::to_string(numKeys)
<< " fetch time = "
<< " num keys = " << std::to_string(numKeys) << " fetch time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
mid - start)
.count()
@@ -189,10 +181,21 @@ BackendIndexer::writeKeyFlagLedgerAsync(
<< "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex)
<< " finished. "
<< " num keys = " << std::to_string(numKeys) << " total time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
end - begin)
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - begin)
.count();
indexing_ = 0;
}
void
BackendIndexer::writeKeyFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend)
{
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
boost::asio::post(strand_, [this, ledgerSequence, &backend]() {
writeKeyFlagLedger(ledgerSequence, backend);
});
BOOST_LOG_TRIVIAL(info)
<< __func__

View File

@@ -80,6 +80,7 @@ class BackendInterface;
class BackendIndexer
{
boost::asio::io_context ioc_;
boost::asio::io_context::strand strand_;
std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_;
@@ -94,6 +95,10 @@ class BackendIndexer
doKeysRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
void
writeKeyFlagLedger(
uint32_t ledgerSequence,
BackendInterface const& backend);
public:
BackendIndexer(boost::json::object const& config);
@@ -160,41 +165,14 @@ public:
return indexer_;
}
void
checkFlagLedgers() const;
std::optional<KeyIndex>
getKeyIndexOfSeq(uint32_t seq) const
{
if (indexer_.isKeyFlagLedger(seq))
return KeyIndex{seq};
auto rng = fetchLedgerRange();
if (!rng)
return {};
if (rng->minSequence == seq)
return KeyIndex{seq};
return indexer_.getKeyIndexOfSeq(seq);
}
getKeyIndexOfSeq(uint32_t seq) const;
bool
finishWrites(uint32_t ledgerSequence) const
{
indexer_.finish(ledgerSequence, *this);
auto commitRes = doFinishWrites();
if (commitRes)
{
if (isFirst_)
indexer_.doKeysRepairAsync(*this, ledgerSequence);
if (indexer_.isKeyFlagLedger(ledgerSequence))
indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this);
isFirst_ = false;
}
else
{
// if commitRes is false, we are relinquishing control of ETL. We
// reset isFirst_ to true so that way if we later regain control of
// ETL, we trigger the index repair
isFirst_ = true;
}
return commitRes;
}
finishWrites(uint32_t ledgerSequence) const;
virtual std::optional<uint32_t>
fetchLatestLedgerSequence() const = 0;
@@ -206,21 +184,7 @@ public:
fetchLedgerRange() const = 0;
std::optional<LedgerRange>
fetchLedgerRangeNoThrow() const
{
BOOST_LOG_TRIVIAL(warning) << __func__;
while (true)
{
try
{
return fetchLedgerRange();
}
catch (DatabaseTimeout& t)
{
;
}
}
}
fetchLedgerRangeNoThrow() const;
virtual std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;
@@ -239,87 +203,14 @@ public:
fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
assert(limit != 0);
bool incomplete = false;
{
auto check = doFetchLedgerPage({}, ledgerSequence, 1);
incomplete = check.warning.has_value();
}
uint32_t adjustedLimit = limit;
LedgerPage page;
page.cursor = cursor;
do
{
adjustedLimit = adjustedLimit > 2048 ? 2048 : adjustedLimit * 2;
auto partial =
doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit);
page.objects.insert(
page.objects.end(),
partial.objects.begin(),
partial.objects.end());
page.cursor = partial.cursor;
} while (page.objects.size() < limit && page.cursor);
if (incomplete)
{
auto rng = fetchLedgerRange();
if (!rng)
return page;
if (rng->minSequence == ledgerSequence)
{
BOOST_LOG_TRIVIAL(fatal)
<< __func__
<< " Database is populated but first flag ledger is "
"incomplete. This should never happen";
assert(false);
throw std::runtime_error("Missing base flag ledger");
}
uint32_t lowerSequence = (ledgerSequence - 1) >>
indexer_.getKeyShift() << indexer_.getKeyShift();
if (lowerSequence < rng->minSequence)
lowerSequence = rng->minSequence;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " recursing. ledgerSequence = "
<< std::to_string(ledgerSequence)
<< " , lowerSequence = " << std::to_string(lowerSequence);
auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit);
std::vector<ripple::uint256> keys;
std::transform(
std::move_iterator(lowerPage.objects.begin()),
std::move_iterator(lowerPage.objects.end()),
std::back_inserter(keys),
[](auto&& elt) { return std::move(elt.key); });
auto objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < keys.size(); ++i)
{
auto& obj = objs[i];
auto& key = keys[i];
if (obj.size())
page.objects.push_back({std::move(key), std::move(obj)});
}
std::sort(
page.objects.begin(), page.objects.end(), [](auto a, auto b) {
return a.key < b.key;
});
page.warning = "Data may be incomplete";
}
if (page.objects.size() >= limit)
{
page.objects.resize(limit);
page.cursor = page.objects.back().key;
}
return page;
}
std::uint32_t limit) const;
bool
isLedgerIndexed(std::uint32_t ledgerSequence) const;
std::optional<LedgerObject>
fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence)
{
auto page = fetchLedgerPage({++key}, ledgerSequence, 1);
if (page.objects.size())
return page.objects[0];
return {};
}
fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence) const;
virtual LedgerPage
doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
@@ -327,12 +218,12 @@ public:
std::uint32_t limit) const = 0;
// TODO add warning for incomplete data
virtual BookOffersPage
BookOffersPage
fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor = {}) const = 0;
std::optional<ripple::uint256> const& cursor = {}) const;
virtual std::vector<TransactionAndMetadata>
fetchTransactions(std::vector<ripple::uint256> const& hashes) const = 0;
@@ -365,18 +256,8 @@ public:
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& book) const
{
ripple::uint256 key256 = ripple::uint256::fromVoid(key.data());
indexer_.addKey(std::move(key256));
doWriteLedgerObject(
std::move(key),
seq,
std::move(blob),
isCreated,
isDeleted,
std::move(book));
}
std::optional<ripple::uint256>&& book) const;
virtual void
doWriteLedgerObject(
std::string&& key,

View File

@@ -494,15 +494,6 @@ CassandraBackend::fetchLedgerObjects(
<< "Fetched " << numKeys << " records from Cassandra";
return results;
}
BookOffersPage
CassandraBackend::fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
return {};
} // namespace Backend
struct WriteBookCallbackData
{
CassandraBackend const& backend;

View File

@@ -1019,12 +1019,6 @@ public:
std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index,
bool isAsync = false) const override;
BookOffersPage
fetchBookOffers(
ripple::uint256 const& book,
uint32_t sequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const override;
bool
canFetchBatch()

View File

@@ -476,6 +476,10 @@ public:
pool_->checkin(pg_);
}
// TODO. add sendQuery and getResult, for sending the query and getting the
// result asynchronously. This could be useful for sending a bunch of
// requests concurrently
/** Execute postgres query with parameters.
*
* @param dbParams Database command with parameters.

View File

@@ -375,16 +375,6 @@ PostgresBackend::doFetchLedgerPage(
return {};
}
BookOffersPage
PostgresBackend::fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
return {};
}
std::vector<TransactionAndMetadata>
PostgresBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const
@@ -665,46 +655,61 @@ PostgresBackend::writeKeys(
{
if (abortWrite_)
return false;
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_);
PgQuery& conn = isAsync ? pgQuery : writeConnection_;
std::stringstream asyncBuffer;
std::stringstream& buffer = isAsync ? asyncBuffer : keysBuffer_;
std::string tableName = isAsync ? "keys_temp_async" : "keys_temp";
if (isAsync)
conn("BEGIN");
conn(std::string{
"CREATE TABLE " + tableName + " AS SELECT * FROM keys WITH NO DATA"}
.c_str());
std::stringstream sql;
size_t numRows = 0;
for (auto& key : keys)
{
buffer << std::to_string(index.keyIndex) << '\t' << "\\\\x"
<< ripple::strHex(key) << '\n';
numRows++;
// If the buffer gets too large, the insert fails. Not sure why.
// When writing in the background, we insert after every 10000 rows
if ((isAsync && numRows == 10000) || numRows == 100000)
sql << "INSERT INTO keys (ledger_seq, key) VALUES ("
<< std::to_string(index.keyIndex) << ", \'\\x"
<< ripple::strHex(key) << "\') ON CONFLICT DO NOTHING; ";
if (numRows > 10000)
{
conn.bulkInsert(tableName.c_str(), buffer.str());
std::stringstream temp;
buffer.swap(temp);
conn(sql.str().c_str());
sql.str("");
sql.clear();
numRows = 0;
}
}
if (numRows > 0)
conn.bulkInsert(tableName.c_str(), buffer.str());
conn(std::string{
"INSERT INTO keys SELECT * FROM " + tableName +
" ON CONFLICT DO NOTHING"}
.c_str());
conn(std::string{"DROP TABLE " + tableName}.c_str());
if (isAsync)
conn(sql.str().c_str());
return true;
/*
BOOST_LOG_TRIVIAL(debug) << __func__;
std::condition_variable cv;
std::mutex mtx;
std::atomic_uint numRemaining = keys.size();
auto start = std::chrono::system_clock::now();
for (auto& key : keys)
{
conn("COMMIT");
boost::asio::post(
pool_, [this, key, &numRemaining, &cv, &mtx, &index]() {
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "INSERT INTO keys (ledger_seq, key) VALUES ("
<< std::to_string(index.keyIndex) << ", \'\\x"
<< ripple::strHex(key) << "\') ON CONFLICT DO NOTHING";
auto res = pgQuery(sql.str().data());
if (--numRemaining == 0)
{
std::unique_lock lck(mtx);
cv.notify_one();
}
std::stringstream temp;
buffer.swap(temp);
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
BOOST_LOG_TRIVIAL(info)
<< __func__ << " wrote " << std::to_string(keys.size())
<< " keys with threadpool. took " << std::to_string(duration);
*/
return true;
}
bool

View File

@@ -50,13 +50,6 @@ public:
std::uint32_t ledgerSequence,
std::uint32_t limit) const override;
BookOffersPage
fetchBookOffers(
ripple::uint256 const& book,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const override;
std::vector<TransactionAndMetadata>
fetchTransactions(
std::vector<ripple::uint256> const& hashes) const override;

View File

@@ -754,5 +754,6 @@ ReportingETL::ReportingETL(
if (config.contains("txn_threshold"))
txnThreshold_ = config.at("txn_threshold").as_int64();
flatMapBackend_->open(readOnly_);
flatMapBackend_->checkFlagLedgers();
}