Many changes around ledger_data and book_offers

* populate caches on startup
* implement new algorithms for Postgres
* use one shift interval instead of two
* block etl at flag ledger until writes are finished
* write one flag ledger ahead
* abandon ledger_diff component of ledger_data
This commit is contained in:
CJ Cobb
2021-04-29 21:30:05 +00:00
parent f1ff81ddc5
commit c0612e740e
7 changed files with 260 additions and 216 deletions

View File

@@ -2,8 +2,7 @@
namespace Backend { namespace Backend {
BackendIndexer::BackendIndexer(boost::json::object const& config) BackendIndexer::BackendIndexer(boost::json::object const& config)
: keyShift_(config.at("keyshift").as_int64()) : shift_(config.at("indexer_shift").as_int64())
, bookShift_(config.at("bookshift").as_int64())
{ {
work_.emplace(ioc_); work_.emplace(ioc_);
ioThread_ = std::thread{[this]() { ioc_.run(); }}; ioThread_ = std::thread{[this]() { ioc_.run(); }};
@@ -19,11 +18,12 @@ void
BackendIndexer::addKey(ripple::uint256 const& key) BackendIndexer::addKey(ripple::uint256 const& key)
{ {
keys.insert(key); keys.insert(key);
keysCumulative.insert(key);
} }
void void
BackendIndexer::deleteKey(ripple::uint256 const& key) BackendIndexer::deleteKey(ripple::uint256 const& key)
{ {
keys.erase(key); keysCumulative.erase(key);
} }
void void
@@ -31,46 +31,108 @@ BackendIndexer::addBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
ripple::uint256 const& offerKey) ripple::uint256 const& offerKey)
{ {
booksToOffers[book].insert(offerKey); books[book].insert(offerKey);
booksCumulative[book].insert(offerKey);
} }
void void
BackendIndexer::deleteBookOffer( BackendIndexer::deleteBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
ripple::uint256 const& offerKey) ripple::uint256 const& offerKey)
{ {
booksToOffers[book].erase(offerKey); booksCumulative[book].erase(offerKey);
booksToDeletedOffers[book].insert(offerKey); }
void
BackendIndexer::clearCaches()
{
keysCumulative = {};
booksCumulative = {};
}
void
BackendIndexer::populateCaches(BackendInterface const& backend)
{
if (keysCumulative.size() > 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " caches already populated. returning";
return;
}
auto tip = backend.fetchLatestLedgerSequence();
if (!tip)
return;
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
auto [objects, curCursor] =
backend.fetchLedgerPage(cursor, *tip, 2048);
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
for (auto& obj : objects)
{
keysCumulative.insert(obj.key);
if (isOffer(obj.blob))
{
auto book = getBook(obj.blob);
booksCumulative[book].insert(obj.key);
}
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
}
void
BackendIndexer::writeNext(
uint32_t ledgerSequence,
BackendInterface const& backend)
{
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
bool isFlag = (ledgerSequence % (1 << shift_)) == 0;
if (!backend.fetchLedgerRange())
{
isFlag = true;
}
if (isFlag)
{
uint32_t nextSeq =
((ledgerSequence >> shift_ << shift_) + (1 << shift_));
BOOST_LOG_TRIVIAL(info)
<< __func__ << " actually doing the write. keysCumulative.size() = "
<< std::to_string(keysCumulative.size());
backend.writeKeys(keysCumulative, nextSeq);
BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys";
backend.writeBooks(booksCumulative, nextSeq);
BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books";
}
} }
void void
BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
{ {
if (ledgerSequence >> keyShift_ << keyShift_ == ledgerSequence) bool isFlag = ledgerSequence % (1 << shift_) == 0;
if (!backend.fetchLedgerRange())
{ {
std::unordered_set<ripple::uint256> keysCopy = keys; isFlag = true;
boost::asio::post(ioc_, [=, &backend]() {
BOOST_LOG_TRIVIAL(info) << "Indexer - writing keys. Ledger = "
<< std::to_string(ledgerSequence);
backend.writeKeys(keysCopy, ledgerSequence);
BOOST_LOG_TRIVIAL(info) << "Indexer - wrote keys. Ledger = "
<< std::to_string(ledgerSequence);
});
} }
if (ledgerSequence >> bookShift_ << bookShift_ == ledgerSequence) uint32_t nextSeq = ((ledgerSequence >> shift_ << shift_) + (1 << shift_));
{ uint32_t curSeq = isFlag ? ledgerSequence : nextSeq;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>> backend.writeKeys(keys, curSeq);
booksToOffersCopy = booksToOffers; keys = {};
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>> backend.writeBooks(books, curSeq);
booksToDeletedOffersCopy = booksToDeletedOffers; books = {};
boost::asio::post(ioc_, [=, &backend]() {
BOOST_LOG_TRIVIAL(info) << "Indexer - writing books. Ledger = " } // namespace Backend
<< std::to_string(ledgerSequence);
backend.writeBooks(booksToOffersCopy, ledgerSequence);
backend.writeBooks(booksToDeletedOffersCopy, ledgerSequence);
BOOST_LOG_TRIVIAL(info) << "Indexer - wrote books. Ledger = "
<< std::to_string(ledgerSequence);
});
booksToDeletedOffers = {};
}
}
} // namespace Backend } // namespace Backend

View File

@@ -61,18 +61,23 @@ class BackendIndexer
std::mutex mutex_; std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_; std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_; std::thread ioThread_;
uint32_t keyShift_ = 16; uint32_t shift_ = 16;
uint32_t bookShift_ = 16;
std::unordered_set<ripple::uint256> keys; std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>> std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksToOffers; books;
std::unordered_set<ripple::uint256> keysCumulative;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>> std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksToDeletedOffers; booksCumulative;
public: public:
BackendIndexer(boost::json::object const& config); BackendIndexer(boost::json::object const& config);
~BackendIndexer(); ~BackendIndexer();
void
populateCaches(BackendInterface const& backend);
void
clearCaches();
void void
addKey(ripple::uint256 const& key); addKey(ripple::uint256 const& key);
void void
@@ -87,11 +92,18 @@ public:
void void
finish(uint32_t ledgerSequence, BackendInterface const& backend); finish(uint32_t ledgerSequence, BackendInterface const& backend);
void
writeNext(uint32_t ledgerSequence, BackendInterface const& backend);
uint32_t
getShift()
{
return shift_;
}
}; };
class BackendInterface class BackendInterface
{ {
private: protected:
mutable BackendIndexer indexer_; mutable BackendIndexer indexer_;
public: public:
@@ -100,6 +112,26 @@ public:
{ {
} }
BackendIndexer&
getIndexer() const
{
return indexer_;
}
std::optional<uint32_t>
getIndexOfSeq(uint32_t seq) const
{
if (!fetchLedgerRange())
return {};
if (fetchLedgerRange()->minSequence == seq)
return seq;
uint32_t shift = indexer_.getShift();
uint32_t incr = (1 << shift);
if ((seq % incr) == 0)
return seq;
return (seq >> shift << shift) + incr;
}
virtual std::optional<uint32_t> virtual std::optional<uint32_t>
fetchLatestLedgerSequence() const = 0; fetchLatestLedgerSequence() const = 0;
@@ -226,6 +258,7 @@ public:
finishWrites(uint32_t ledgerSequence) const finishWrites(uint32_t ledgerSequence) const
{ {
indexer_.finish(ledgerSequence, *this); indexer_.finish(ledgerSequence, *this);
indexer_.writeNext(ledgerSequence, *this);
return doFinishWrites(); return doFinishWrites();
} }
virtual bool virtual bool

View File

@@ -473,41 +473,18 @@ CassandraBackend::fetchLedgerPage(
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const std::uint32_t limit) const
{ {
auto rng = fetchLedgerRange(); auto index = getIndexOfSeq(ledgerSequence);
if (!rng) if (!index)
return {{}, {}}; return {};
if (!isIndexed(ledgerSequence))
{
return fetchLedgerPage2(cursor, ledgerSequence, limit);
}
LedgerPage page; LedgerPage page;
bool cursorIsInt = false;
if (cursor && !cursor->isZero())
{
bool foundNonZero = false;
for (size_t i = 0; i < 28 && !foundNonZero; ++i)
{
if (cursor->data()[i] != 0)
foundNonZero = true;
}
cursorIsInt = !foundNonZero;
}
if (cursor) if (cursor)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor) << __func__ << " - Cursor = " << ripple::strHex(*cursor);
<< " : cursorIsInt = " << std::to_string(cursorIsInt);
if (!cursor || !cursorIsInt)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger";
CassandraStatement statement{selectKeys_};
uint32_t upper = ledgerSequence;
if (upper != rng->minSequence)
upper = (ledgerSequence >> indexerShift_) << indexerShift_;
if (upper != ledgerSequence)
upper += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " upper is " << std::to_string(upper); << __func__ << " ledgerSequence = " << std::to_string(ledgerSequence)
statement.bindInt(upper); << " index = " << std::to_string(*index);
CassandraStatement statement{selectKeys_};
statement.bindInt(*index);
if (cursor) if (cursor)
statement.bindBytes(*cursor); statement.bindBytes(*cursor);
else else
@@ -528,18 +505,14 @@ CassandraBackend::fetchLedgerPage(
{ {
keys.push_back(result.getUInt256()); keys.push_back(result.getUInt256());
} while (result.nextRow()); } while (result.nextRow());
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Read keys";
<< __func__ << " Using base ledger. Read keys";
auto objects = fetchLedgerObjects(keys, ledgerSequence); auto objects = fetchLedgerObjects(keys, ledgerSequence);
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Using base ledger. Got objects"; << __func__ << " Using base ledger. Got objects";
if (objects.size() != keys.size()) if (objects.size() != keys.size())
throw std::runtime_error( throw std::runtime_error("Mismatch in size of objects and keys");
"Mismatch in size of objects and keys");
if (keys.size() == limit) if (keys.size() == limit)
page.cursor = keys[keys.size() - 1]; page.cursor = keys[keys.size() - 1];
else if (ledgerSequence < upper)
page.cursor = upper - 1;
if (cursor) if (cursor)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
@@ -556,47 +529,6 @@ CassandraBackend::fetchLedgerPage(
} }
return page; return page;
} }
}
else
{
uint32_t curSequence = 0;
for (size_t i = 28; i < 32; ++i)
{
uint32_t digit = cursor->data()[i];
digit = digit << (8 * (31 - i));
curSequence += digit;
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Using ledger diffs. Sequence = " << curSequence
<< " size_of uint32_t " << std::to_string(sizeof(uint32_t))
<< " cursor = " << ripple::strHex(*cursor);
auto diff = fetchLedgerDiff(curSequence);
BOOST_LOG_TRIVIAL(debug) << __func__ << " diff size = " << diff.size();
std::vector<ripple::uint256> deletedKeys;
for (auto& obj : diff)
{
if (obj.blob.size() == 0)
deletedKeys.push_back(std::move(obj.key));
}
auto objects = fetchLedgerObjects(deletedKeys, ledgerSequence);
if (objects.size() != deletedKeys.size())
throw std::runtime_error("Mismatch in size of objects and keys");
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " deleted keys size = " << deletedKeys.size();
for (size_t i = 0; i < objects.size(); ++i)
{
auto& obj = objects[i];
auto& key = deletedKeys[i];
if (obj.size())
{
page.objects.push_back({std::move(key), std::move(obj)});
}
}
if (curSequence - 1 >= ledgerSequence)
page.cursor = curSequence - 1;
return page;
// do the diff algorithm
}
return {{}, {}}; return {{}, {}};
} }
std::vector<Blob> std::vector<Blob>
@@ -642,17 +574,12 @@ CassandraBackend::fetchBookOffers(
{ {
CassandraStatement statement{selectBook_}; CassandraStatement statement{selectBook_};
statement.bindBytes(book); statement.bindBytes(book);
uint32_t upper = sequence; auto index = getIndexOfSeq(sequence);
auto rng = fetchLedgerRange(); if (!index)
if (rng && sequence != rng->minSequence) return {};
{ BOOST_LOG_TRIVIAL(info) << __func__ << " index = " << std::to_string(*index)
upper = (sequence >> 8) << 8;
if (upper != sequence)
upper += (1 << 8);
}
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper)
<< " book = " << ripple::strHex(book); << " book = " << ripple::strHex(book);
statement.bindInt(upper); statement.bindInt(*index);
if (cursor) if (cursor)
statement.bindBytes(*cursor); statement.bindBytes(*cursor);
else else
@@ -855,7 +782,7 @@ CassandraBackend::writeKeys(
std::mutex mtx; std::mutex mtx;
std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs; std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs;
cbs.reserve(keys.size()); cbs.reserve(keys.size());
uint32_t concurrentLimit = maxRequestsOutstanding / 2; uint32_t concurrentLimit = maxRequestsOutstanding;
uint32_t numSubmitted = 0; uint32_t numSubmitted = 0;
for (auto& key : keys) for (auto& key : keys)
{ {
@@ -941,6 +868,8 @@ CassandraBackend::writeBooks(
bool bool
CassandraBackend::isIndexed(uint32_t ledgerSequence) const CassandraBackend::isIndexed(uint32_t ledgerSequence) const
{ {
return false;
/*
auto rng = fetchLedgerRange(); auto rng = fetchLedgerRange();
if (!rng) if (!rng)
return false; return false;
@@ -955,11 +884,14 @@ CassandraBackend::isIndexed(uint32_t ledgerSequence) const
statement.bindUInt(1); statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
return !!result; return !!result;
*/
} }
std::optional<uint32_t> std::optional<uint32_t>
CassandraBackend::getNextToIndex() const CassandraBackend::getNextToIndex() const
{ {
return {};
/*
auto rng = fetchLedgerRange(); auto rng = fetchLedgerRange();
if (!rng) if (!rng)
return {}; return {};
@@ -969,6 +901,7 @@ CassandraBackend::getNextToIndex() const
cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_); cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_);
} }
return cur; return cur;
*/
} }
bool bool
@@ -1748,6 +1681,7 @@ CassandraBackend::open(bool readOnly)
{ {
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
} }
/*
if (config_.contains("run_indexer")) if (config_.contains("run_indexer"))
{ {
if (config_["run_indexer"].as_bool()) if (config_["run_indexer"].as_bool())
@@ -1768,6 +1702,7 @@ CassandraBackend::open(bool readOnly)
}}; }};
} }
} }
*/
work_.emplace(ioContext_); work_.emplace(ioContext_);
ioThread_ = std::thread{[this]() { ioContext_.run(); }}; ioThread_ = std::thread{[this]() { ioContext_.run(); }};

View File

@@ -631,9 +631,6 @@ private:
std::optional<boost::asio::io_context::work> work_; std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_; std::thread ioThread_;
std::thread indexer_;
uint32_t indexerShift_ = 16;
// maximum number of concurrent in flight requests. New requests will wait // maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded // for earlier requests to finish if this limit is exceeded
uint32_t maxRequestsOutstanding = 10000; uint32_t maxRequestsOutstanding = 10000;
@@ -693,8 +690,6 @@ public:
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
work_.reset(); work_.reset();
ioThread_.join(); ioThread_.join();
if (indexer_.joinable())
indexer_.join();
} }
open_ = false; open_ = false;
} }

View File

@@ -748,7 +748,7 @@ CREATE TABLE IF NOT EXISTS objects (
key bytea NOT NULL, key bytea NOT NULL,
ledger_seq bigint NOT NULL, ledger_seq bigint NOT NULL,
object bytea, object bytea,
PRIMARY KEY(ledger_seq, key) PRIMARY KEY(key, ledger_seq)
) PARTITION BY RANGE (ledger_seq); ) PARTITION BY RANGE (ledger_seq);
create table if not exists objects1 partition of objects for values from (0) to (10000000); create table if not exists objects1 partition of objects for values from (0) to (10000000);
@@ -805,16 +805,18 @@ create table if not exists account_transactions7 partition of account_transactio
CREATE TABLE IF NOT EXISTS books ( CREATE TABLE IF NOT EXISTS books (
ledger_seq bigint NOT NULL, ledger_seq bigint NOT NULL,
book bytea NOT NULL, book bytea NOT NULL,
offer_key bytea NOT NULL, offer_key bytea NOT NULL
PRIMARY KEY(ledger_seq, book, offer_key)
); );
CREATE INDEX book_idx ON books using btree(ledger_seq, book, offer_key);
CREATE TABLE IF NOT EXISTS keys ( CREATE TABLE IF NOT EXISTS keys (
ledger_seq bigint NOT NULL, ledger_seq bigint NOT NULL,
key bytea NOT NULL, key bytea NOT NULL
PRIMARY KEY(ledger_seq, key)
); );
CREATE INDEX key_idx ON keys USING btree(ledger_seq, key);
-- account_tx() RPC helper. From the rippled reporting process, only the -- account_tx() RPC helper. From the rippled reporting process, only the
-- parameters without defaults are required. For the parameters with -- parameters without defaults are required. For the parameters with
-- defaults, validation should be done by rippled, such as: -- defaults, validation should be done by rippled, such as:

View File

@@ -329,30 +329,40 @@ PostgresBackend::fetchLedgerPage(
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const std::uint32_t limit) const
{ {
auto index = getIndexOfSeq(ledgerSequence);
if (!index)
return {};
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000"); pgQuery("SET statement_timeout TO 10000");
std::stringstream sql; std::stringstream sql;
sql << "SELECT key,object FROM" sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index);
<< " (SELECT DISTINCT ON (key) * FROM objects"
<< " WHERE ledger_seq <= " << std::to_string(ledgerSequence);
if (cursor) if (cursor)
sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'"; sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key DESC, ledger_seq DESC) sub" sql << " ORDER BY key DESC LIMIT " << std::to_string(limit);
<< " WHERE object != \'\\x\'"
<< " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 2)) BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched keys";
std::optional<ripple::uint256> returnCursor;
if (size_t numRows = checkResult(res, 1))
{ {
std::vector<LedgerObject> objects; std::vector<ripple::uint256> keys;
for (size_t i = 0; i < numRows; ++i) for (size_t i = 0; i < numRows; ++i)
{ {
objects.push_back({res.asUInt256(i, 0), res.asUnHexedBlob(i, 1)}); keys.push_back({res.asUInt256(i, 0)});
} }
if (numRows == limit) if (numRows == limit)
return {objects, objects[objects.size() - 1].key}; returnCursor = keys.back();
else
return {objects, {}}; auto objs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results;
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size())
{
results.push_back({keys[i], objs[i]});
}
}
return {results, returnCursor};
} }
return {}; return {};
} }
@@ -366,15 +376,12 @@ PostgresBackend::fetchBookOffers(
{ {
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
std::stringstream sql; std::stringstream sql;
sql << "SELECT offer_key FROM" sql << "SELECT offer_key FROM books WHERE book = "
<< " (SELECT DISTINCT ON (offer_key) * FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(book) << "\'\\x" << ripple::strHex(book)
<< "\' AND ledger_seq <= " << std::to_string(ledgerSequence); << "\' AND ledger_seq = " << std::to_string(ledgerSequence);
if (cursor) if (cursor)
sql << " AND offer_key < \'\\x" << ripple::strHex(*cursor) << "\'"; sql << " AND offer_key < \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY offer_key DESC, ledger_seq DESC)" sql << " ORDER BY offer_key DESC, ledger_seq DESC"
<< " sub WHERE NOT deleted"
<< " ORDER BY offer_key DESC "
<< " LIMIT " << std::to_string(limit); << " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << sql.str(); BOOST_LOG_TRIVIAL(debug) << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
@@ -421,8 +428,6 @@ PostgresBackend::fetchTransactions(
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0; auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info) << __func__ << " created threadpool. took "
<< std::to_string(duration);
results.resize(hashes.size()); results.resize(hashes.size());
std::condition_variable cv; std::condition_variable cv;
std::mutex mtx; std::mutex mtx;
@@ -507,40 +512,47 @@ PostgresBackend::fetchLedgerObjects(
{ {
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000"); pgQuery("SET statement_timeout TO 10000");
std::vector<Blob> results;
results.resize(keys.size());
std::condition_variable cv;
std::mutex mtx;
std::atomic_uint numRemaining = keys.size();
auto start = std::chrono::system_clock::now();
for (size_t i = 0; i < keys.size(); ++i)
{
auto const& key = keys[i];
boost::asio::post(
pool_,
[this, &key, &results, &numRemaining, &cv, &mtx, i, sequence]() {
PgQuery pgQuery(pgPool_);
std::stringstream sql; std::stringstream sql;
sql << "SELECT DISTINCT ON(key) object FROM objects WHERE"; sql << "SELECT object FROM "
"objects "
bool first = true; "WHERE key = \'\\x"
for (auto const& key : keys) << ripple::strHex(key) << "\'"
{
if (!first)
{
sql << " OR ";
}
else
{
sql << " ( ";
first = false;
}
sql << " key = "
<< "\'\\x" << ripple::strHex(key) << "\'";
}
sql << " ) "
<< " AND ledger_seq <= " << std::to_string(sequence) << " AND ledger_seq <= " << std::to_string(sequence)
<< " ORDER BY key DESC, ledger_seq DESC"; << " ORDER BY ledger_seq DESC LIMIT 1";
BOOST_LOG_TRIVIAL(trace) << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1)) if (size_t numRows = checkResult(res, 1))
{ {
std::vector<Blob> results; results[i] = res.asUnHexedBlob(0, 0);
for (size_t i = 0; i < numRows; ++i) }
if (--numRemaining == 0)
{ {
results.push_back(res.asUnHexedBlob(i, 0)); std::unique_lock lck(mtx);
cv.notify_one();
} }
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(keys.size())
<< " objects with threadpool. took " << std::to_string(duration);
return results; return results;
}
return {};
} }
std::pair< std::pair<

View File

@@ -365,6 +365,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
assert(false); assert(false);
throw std::runtime_error("runETLPipeline: parent ledger is null"); throw std::runtime_error("runETLPipeline: parent ledger is null");
} }
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches";
flatMapBackend_->getIndexer().populateCaches(*flatMapBackend_);
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches";
std::atomic_bool writeConflict = false; std::atomic_bool writeConflict = false;
std::optional<uint32_t> lastPublishedSequence; std::optional<uint32_t> lastPublishedSequence;
@@ -523,6 +528,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence << "Extracted and wrote " << *lastPublishedSequence - startSequence
<< " in " << ((end - begin).count()) / 1000000000.0; << " in " << ((end - begin).count()) / 1000000000.0;
writing_ = false; writing_ = false;
flatMapBackend_->getIndexer().clearCaches();
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Stopping etl pipeline"; << "Stopping etl pipeline";
@@ -607,7 +613,6 @@ ReportingETL::monitor()
} }
else else
{ {
// publishLedger(ledger);
} }
uint32_t nextSequence = latestSequence.value() + 1; uint32_t nextSequence = latestSequence.value() + 1;