mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 19:56:00 +00:00
allow different shifts for books and keys. default to 10 and 20 respectively
This commit is contained in:
@@ -2,7 +2,8 @@
|
||||
|
||||
namespace Backend {
|
||||
BackendIndexer::BackendIndexer(boost::json::object const& config)
|
||||
: shift_(config.at("indexer_shift").as_int64())
|
||||
: keyShift_(config.at("indexer_key_shift").as_int64())
|
||||
, bookShift_(config.at("indexer_book_shift").as_int64())
|
||||
{
|
||||
work_.emplace(ioc_);
|
||||
ioThread_ = std::thread{[this]() { ioc_.run(); }};
|
||||
@@ -65,11 +66,59 @@ BackendIndexer::deleteBookOffer(
|
||||
}
|
||||
|
||||
void
|
||||
writeFlagLedger(
|
||||
writeKeyFlagLedger(
|
||||
uint32_t ledgerSequence,
|
||||
uint32_t shift,
|
||||
BackendInterface const& backend,
|
||||
std::unordered_set<ripple::uint256> const& keys)
|
||||
{
|
||||
uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift));
|
||||
ripple::uint256 zero = {};
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__
|
||||
<< " starting. ledgerSequence = " << std::to_string(ledgerSequence)
|
||||
<< " nextFlag = " << std::to_string(nextFlag)
|
||||
<< " keys.size() = " << std::to_string(keys.size());
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto [objects, curCursor, warning] =
|
||||
backend.fetchLedgerPage({}, nextFlag, 1);
|
||||
if (!warning)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " flag ledger already written. sequence = "
|
||||
<< std::to_string(ledgerSequence)
|
||||
<< " next flag = " << std::to_string(nextFlag)
|
||||
<< "returning";
|
||||
return;
|
||||
}
|
||||
break;
|
||||
}
|
||||
catch (DatabaseTimeout& t)
|
||||
{
|
||||
;
|
||||
}
|
||||
}
|
||||
auto start = std::chrono::system_clock::now();
|
||||
|
||||
backend.writeKeys(keys, nextFlag);
|
||||
backend.writeKeys({zero}, nextFlag);
|
||||
auto end = std::chrono::system_clock::now();
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__
|
||||
<< " finished. ledgerSequence = " << std::to_string(ledgerSequence)
|
||||
<< " nextFlag = " << std::to_string(nextFlag)
|
||||
<< " keys.size() = " << std::to_string(keys.size())
|
||||
<< std::chrono::duration_cast<std::chrono::seconds>(end - start)
|
||||
.count();
|
||||
}
|
||||
void
|
||||
writeBookFlagLedger(
|
||||
uint32_t ledgerSequence,
|
||||
uint32_t shift,
|
||||
BackendInterface const& backend,
|
||||
std::unordered_set<ripple::uint256> const& keys,
|
||||
std::unordered_map<
|
||||
ripple::uint256,
|
||||
std::unordered_set<ripple::uint256>> const& books)
|
||||
@@ -81,15 +130,14 @@ writeFlagLedger(
|
||||
<< __func__
|
||||
<< " starting. ledgerSequence = " << std::to_string(ledgerSequence)
|
||||
<< " nextFlag = " << std::to_string(nextFlag)
|
||||
<< " keys.size() = " << std::to_string(keys.size())
|
||||
<< " books.size() = " << std::to_string(books.size());
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto [objects, curCursor, warning] =
|
||||
backend.fetchLedgerPage({}, nextFlag, 1);
|
||||
if (!(warning || objects.size() == 0))
|
||||
backend.fetchBookOffers(zero, nextFlag, 1);
|
||||
if (!warning)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " flag ledger already written. sequence = "
|
||||
@@ -109,16 +157,11 @@ writeFlagLedger(
|
||||
backend.writeBooks(books, nextFlag);
|
||||
backend.writeBooks({{zero, {zero}}}, nextFlag);
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote books. writing keys ...";
|
||||
|
||||
backend.writeKeys(keys, nextFlag);
|
||||
backend.writeKeys({zero}, nextFlag);
|
||||
auto end = std::chrono::system_clock::now();
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__
|
||||
<< " finished. ledgerSequence = " << std::to_string(ledgerSequence)
|
||||
<< " nextFlag = " << std::to_string(nextFlag)
|
||||
<< " keys.size() = " << std::to_string(keys.size())
|
||||
<< " books.size() = " << std::to_string(books.size()) << " time = "
|
||||
<< std::chrono::duration_cast<std::chrono::seconds>(end - start)
|
||||
.count();
|
||||
@@ -132,7 +175,80 @@ BackendIndexer::clearCaches()
|
||||
}
|
||||
|
||||
void
|
||||
BackendIndexer::populateCaches(
|
||||
BackendIndexer::doBooksRepair(
|
||||
BackendInterface const& backend,
|
||||
std::optional<uint32_t> sequence)
|
||||
{
|
||||
if (!sequence)
|
||||
{
|
||||
auto rng = backend.fetchLedgerRangeNoThrow();
|
||||
if (!rng)
|
||||
return;
|
||||
sequence = rng->maxSequence;
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " sequence = " << std::to_string(*sequence);
|
||||
ripple::uint256 zero = {};
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto [objects, cursor, warning] =
|
||||
backend.fetchBookOffers(zero, *sequence, 1);
|
||||
if (!warning)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " flag ledger already written. sequence = "
|
||||
<< std::to_string(*sequence) << "returning";
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
uint32_t lower = (*sequence - 1) >> bookShift_ << bookShift_;
|
||||
doBooksRepair(backend, lower);
|
||||
}
|
||||
break;
|
||||
}
|
||||
catch (DatabaseTimeout& t)
|
||||
{
|
||||
;
|
||||
}
|
||||
}
|
||||
std::optional<ripple::uint256> cursor;
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto [objects, curCursor, warning] =
|
||||
backend.fetchLedgerPage(cursor, *sequence, 2048);
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
|
||||
cursor = curCursor;
|
||||
for (auto& obj : objects)
|
||||
{
|
||||
if (isOffer(obj.blob))
|
||||
{
|
||||
auto book = getBook(obj.blob);
|
||||
booksRepair[book].insert(obj.key);
|
||||
}
|
||||
}
|
||||
if (!cursor)
|
||||
break;
|
||||
}
|
||||
catch (DatabaseTimeout const& e)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " Database timeout fetching keys";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
}
|
||||
}
|
||||
writeBookFlagLedger(*sequence, bookShift_, backend, booksRepair);
|
||||
booksRepair = {};
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " finished. sequence = " << std::to_string(*sequence);
|
||||
}
|
||||
void
|
||||
BackendIndexer::doKeysRepair(
|
||||
BackendInterface const& backend,
|
||||
std::optional<uint32_t> sequence)
|
||||
{
|
||||
@@ -152,16 +268,65 @@ BackendIndexer::populateCaches(
|
||||
{
|
||||
auto [objects, curCursor, warning] =
|
||||
backend.fetchLedgerPage(cursor, *sequence, 2048);
|
||||
if (warning)
|
||||
// no cursor means this is the first page
|
||||
if (!cursor)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " performing index repair";
|
||||
uint32_t lower = (*sequence - 1) >> shift_ << shift_;
|
||||
populateCaches(backend, lower);
|
||||
writeFlagLedger(
|
||||
lower, shift_, backend, keysCumulative, booksCumulative);
|
||||
clearCaches();
|
||||
// if there is no warning, we don't need to do a repair
|
||||
// warning only shows up on the first page
|
||||
if (!warning)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__
|
||||
<< " flag ledger already written. returning";
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_;
|
||||
doKeysRepair(backend, lower);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
|
||||
cursor = curCursor;
|
||||
for (auto& obj : objects)
|
||||
{
|
||||
keysRepair.insert(obj.key);
|
||||
}
|
||||
if (!cursor)
|
||||
break;
|
||||
}
|
||||
catch (DatabaseTimeout const& e)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " Database timeout fetching keys";
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
}
|
||||
}
|
||||
writeKeyFlagLedger(*sequence, keyShift_, backend, keysRepair);
|
||||
keysRepair = {};
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " finished. sequence = " << std::to_string(*sequence);
|
||||
}
|
||||
|
||||
void
|
||||
BackendIndexer::populateCaches(BackendInterface const& backend)
|
||||
{
|
||||
auto rng = backend.fetchLedgerRangeNoThrow();
|
||||
if (!rng)
|
||||
return;
|
||||
uint32_t sequence = rng->maxSequence;
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " sequence = " << std::to_string(sequence);
|
||||
doBooksRepair(backend, sequence);
|
||||
doKeysRepair(backend, sequence);
|
||||
std::optional<ripple::uint256> cursor;
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto [objects, curCursor, warning] =
|
||||
backend.fetchLedgerPage(cursor, sequence, 2048);
|
||||
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
|
||||
cursor = curCursor;
|
||||
for (auto& obj : objects)
|
||||
@@ -189,7 +354,6 @@ BackendIndexer::populateCaches(
|
||||
std::unique_lock lck(mtx);
|
||||
populatingCacheAsync = false;
|
||||
}
|
||||
auto tip = backend.fetchLatestLedgerSequence();
|
||||
for (auto& key : deletedKeys)
|
||||
{
|
||||
deleteKey(key);
|
||||
@@ -212,9 +376,7 @@ BackendIndexer::populateCaches(
|
||||
<< " finished. keys.size() = " << std::to_string(keysCumulative.size());
|
||||
}
|
||||
void
|
||||
BackendIndexer::populateCachesAsync(
|
||||
BackendInterface const& backend,
|
||||
std::optional<uint32_t> sequence)
|
||||
BackendIndexer::populateCachesAsync(BackendInterface const& backend)
|
||||
{
|
||||
if (keysCumulative.size() > 0)
|
||||
{
|
||||
@@ -226,11 +388,8 @@ BackendIndexer::populateCachesAsync(
|
||||
std::unique_lock lck(mtx);
|
||||
populatingCacheAsync = true;
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " seq = " << (sequence ? std::to_string(*sequence) : "");
|
||||
boost::asio::post(ioc_, [this, sequence, &backend]() {
|
||||
populateCaches(backend, sequence);
|
||||
});
|
||||
BOOST_LOG_TRIVIAL(info) << __func__;
|
||||
boost::asio::post(ioc_, [this, &backend]() { populateCaches(backend); });
|
||||
}
|
||||
|
||||
void
|
||||
@@ -243,7 +402,25 @@ BackendIndexer::waitForCaches()
|
||||
}
|
||||
|
||||
void
|
||||
BackendIndexer::writeFlagLedgerAsync(
|
||||
BackendIndexer::writeKeyFlagLedgerAsync(
|
||||
uint32_t ledgerSequence,
|
||||
BackendInterface const& backend)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__
|
||||
<< " starting. sequence = " << std::to_string(ledgerSequence);
|
||||
|
||||
waitForCaches();
|
||||
auto keysCopy = keysCumulative;
|
||||
boost::asio::post(ioc_, [=, this, &backend]() {
|
||||
writeKeyFlagLedger(ledgerSequence, keyShift_, backend, keysCopy);
|
||||
});
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__
|
||||
<< " finished. sequence = " << std::to_string(ledgerSequence);
|
||||
}
|
||||
void
|
||||
BackendIndexer::writeBookFlagLedgerAsync(
|
||||
uint32_t ledgerSequence,
|
||||
BackendInterface const& backend)
|
||||
{
|
||||
@@ -253,9 +430,8 @@ BackendIndexer::writeFlagLedgerAsync(
|
||||
|
||||
waitForCaches();
|
||||
auto booksCopy = booksCumulative;
|
||||
auto keysCopy = keysCumulative;
|
||||
boost::asio::post(ioc_, [=, this, &backend]() {
|
||||
writeFlagLedger(ledgerSequence, shift_, backend, keysCopy, booksCopy);
|
||||
writeBookFlagLedger(ledgerSequence, bookShift_, backend, booksCopy);
|
||||
});
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__
|
||||
@@ -269,21 +445,23 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
|
||||
<< __func__
|
||||
<< " starting. sequence = " << std::to_string(ledgerSequence);
|
||||
bool isFirst = false;
|
||||
uint32_t index = getIndexOfSeq(ledgerSequence);
|
||||
uint32_t keyIndex = getKeyIndexOfSeq(ledgerSequence);
|
||||
uint32_t bookIndex = getKeyIndexOfSeq(ledgerSequence);
|
||||
auto rng = backend.fetchLedgerRangeNoThrow();
|
||||
if (!rng || rng->minSequence == ledgerSequence)
|
||||
{
|
||||
isFirst = true;
|
||||
index = ledgerSequence;
|
||||
keyIndex = bookIndex = ledgerSequence;
|
||||
}
|
||||
backend.writeKeys(keys, index);
|
||||
backend.writeBooks(books, index);
|
||||
backend.writeKeys(keys, keyIndex);
|
||||
backend.writeBooks(books, bookIndex);
|
||||
if (isFirst)
|
||||
{
|
||||
ripple::uint256 zero = {};
|
||||
backend.writeBooks({{zero, {zero}}}, ledgerSequence);
|
||||
backend.writeKeys({zero}, ledgerSequence);
|
||||
writeFlagLedgerAsync(ledgerSequence, backend);
|
||||
writeBookFlagLedgerAsync(ledgerSequence, backend);
|
||||
writeKeyFlagLedgerAsync(ledgerSequence, backend);
|
||||
}
|
||||
keys = {};
|
||||
books = {};
|
||||
|
||||
Reference in New Issue
Block a user