fix up some issues with async indexer

This commit is contained in:
CJ Cobb
2021-05-05 20:22:57 +00:00
parent 736e0a675f
commit 20b8059151
8 changed files with 241 additions and 85 deletions

View File

@@ -64,6 +64,66 @@ BackendIndexer::deleteBookOffer(
deletedBooks[book].insert(offerKey);
}
void
writeFlagLedger(
uint32_t ledgerSequence,
uint32_t shift,
BackendInterface const& backend,
std::unordered_set<ripple::uint256> const& keys,
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books)
{
uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift));
ripple::uint256 zero = {};
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. ledgerSequence = " << std::to_string(ledgerSequence)
<< " nextFlag = " << std::to_string(nextFlag)
<< " keys.size() = " << std::to_string(keys.size())
<< " books.size() = " << std::to_string(books.size());
while (true)
{
try
{
auto [objects, curCursor, warning] =
backend.fetchLedgerPage({}, nextFlag, 1);
if (!(warning || objects.size() == 0))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " flag ledger already written. sequence = "
<< std::to_string(ledgerSequence)
<< " next flag = " << std::to_string(nextFlag)
<< "returning";
return;
}
break;
}
catch (DatabaseTimeout& t)
{
;
}
}
auto start = std::chrono::system_clock::now();
backend.writeBooks(books, nextFlag);
backend.writeBooks({{zero, {zero}}}, nextFlag);
BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote books. writing keys ...";
backend.writeKeys(keys, nextFlag);
backend.writeKeys({zero}, nextFlag);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. ledgerSequence = " << std::to_string(ledgerSequence)
<< " nextFlag = " << std::to_string(nextFlag)
<< " keys.size() = " << std::to_string(keys.size())
<< " books.size() = " << std::to_string(books.size()) << " time = "
<< std::chrono::duration_cast<std::chrono::seconds>(end - start)
.count();
}
void
BackendIndexer::clearCaches()
{
@@ -77,9 +137,12 @@ BackendIndexer::populateCaches(
std::optional<uint32_t> sequence)
{
if (!sequence)
sequence = backend.fetchLatestLedgerSequence();
if (!sequence)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng)
return;
sequence = rng->maxSequence;
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sequence = " << std::to_string(*sequence);
std::optional<ripple::uint256> cursor;
@@ -95,9 +158,9 @@ BackendIndexer::populateCaches(
<< __func__ << " performing index repair";
uint32_t lower = (*sequence - 1) >> shift_ << shift_;
populateCaches(backend, lower);
writeNext(lower, backend);
writeFlagLedger(
lower, shift_, backend, keysCumulative, booksCumulative);
clearCaches();
continue;
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
@@ -120,8 +183,8 @@ BackendIndexer::populateCaches(
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
// Do reconcilation. Remove anything from keys or books that shouldn't be
// there
// Do reconcilation. Remove anything from keys or books that shouldn't
// be there
{
std::unique_lock lck(mtx);
populatingCacheAsync = false;
@@ -163,7 +226,8 @@ BackendIndexer::populateCachesAsync(
std::unique_lock lck(mtx);
populatingCacheAsync = true;
}
BOOST_LOG_TRIVIAL(info) << __func__;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " seq = " << (sequence ? std::to_string(*sequence) : "");
boost::asio::post(ioc_, [this, sequence, &backend]() {
populateCaches(backend, sequence);
});
@@ -179,56 +243,53 @@ BackendIndexer::waitForCaches()
}
void
BackendIndexer::writeNext(
BackendIndexer::writeFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend)
{
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
bool isFlag = (ledgerSequence % (1 << shift_)) == 0;
if (!backend.fetchLedgerRange())
{
isFlag = true;
}
if (isFlag)
{
waitForCaches();
auto booksCopy = booksCumulative;
auto keysCopy = keysCumulative;
boost::asio::post(ioc_, [=, &backend]() {
uint32_t nextSeq =
((ledgerSequence >> shift_ << shift_) + (1 << shift_));
ripple::uint256 zero = {};
BOOST_LOG_TRIVIAL(info) << __func__ << " booksCumulative.size() = "
<< std::to_string(booksCumulative.size());
backend.writeBooks(booksCopy, nextSeq);
backend.writeBooks({{zero, {zero}}}, nextSeq);
BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books";
BOOST_LOG_TRIVIAL(info) << __func__ << " keysCumulative.size() = "
<< std::to_string(keysCumulative.size());
backend.writeKeys(keysCopy, nextSeq);
backend.writeKeys({zero}, nextSeq);
BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys";
boost::asio::post(ioc_, [=, this, &backend]() {
writeFlagLedger(ledgerSequence, shift_, backend, keysCopy, booksCopy);
});
}
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. sequence = " << std::to_string(ledgerSequence);
}
void
BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
{
bool isFlag = ledgerSequence % (1 << shift_) == 0;
if (!backend.fetchLedgerRange())
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
bool isFirst = false;
uint32_t index = getIndexOfSeq(ledgerSequence);
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng || rng->minSequence == ledgerSequence)
{
isFlag = true;
isFirst = true;
index = ledgerSequence;
}
backend.writeKeys(keys, index);
backend.writeBooks(books, index);
if (isFirst)
{
ripple::uint256 zero = {};
backend.writeBooks({{zero, {zero}}}, ledgerSequence);
backend.writeKeys({zero}, ledgerSequence);
writeFlagLedgerAsync(ledgerSequence, backend);
}
uint32_t nextSeq = ((ledgerSequence >> shift_ << shift_) + (1 << shift_));
uint32_t curSeq = isFlag ? ledgerSequence : nextSeq;
backend.writeKeys(keys, curSeq);
keys = {};
backend.writeBooks(books, curSeq);
books = {};
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. sequence = " << std::to_string(ledgerSequence);
} // namespace Backend
} // namespace Backend

View File

@@ -123,12 +123,27 @@ public:
void
finish(uint32_t ledgerSequence, BackendInterface const& backend);
void
writeNext(uint32_t ledgerSequence, BackendInterface const& backend);
writeFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend);
uint32_t
getShift()
{
return shift_;
}
uint32_t
getIndexOfSeq(uint32_t seq) const
{
if (isFlagLedger(seq))
return seq;
auto incr = (1 << shift_);
return (seq >> shift_ << shift_) + incr;
}
bool
isFlagLedger(uint32_t ledgerSequence) const
{
return (ledgerSequence % (1 << shift_)) == 0;
}
};
class BackendInterface
@@ -151,15 +166,27 @@ public:
std::optional<uint32_t>
getIndexOfSeq(uint32_t seq) const
{
if (!fetchLedgerRange())
if (indexer_.isFlagLedger(seq))
return seq;
auto rng = fetchLedgerRange();
if (!rng)
return {};
if (fetchLedgerRange()->minSequence == seq)
if (rng->minSequence == seq)
return seq;
uint32_t shift = indexer_.getShift();
uint32_t incr = (1 << shift);
if ((seq % incr) == 0)
return seq;
return (seq >> shift << shift) + incr;
return indexer_.getIndexOfSeq(seq);
}
bool
finishWrites(uint32_t ledgerSequence) const
{
indexer_.finish(ledgerSequence, *this);
auto commitRes = doFinishWrites();
if (commitRes)
{
if (indexer_.isFlagLedger(ledgerSequence))
indexer_.writeFlagLedgerAsync(ledgerSequence, *this);
}
return commitRes;
}
virtual std::optional<uint32_t>
@@ -171,6 +198,22 @@ public:
virtual std::optional<LedgerRange>
fetchLedgerRange() const = 0;
std::optional<LedgerRange>
fetchLedgerRangeNoThrow() const
{
while (true)
{
try
{
return fetchLedgerRange();
}
catch (DatabaseTimeout& t)
{
;
}
}
}
virtual std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;
@@ -286,13 +329,6 @@ public:
virtual void
startWrites() const = 0;
bool
finishWrites(uint32_t ledgerSequence) const
{
indexer_.finish(ledgerSequence, *this);
indexer_.writeNext(ledgerSequence, *this);
return doFinishWrites();
}
virtual bool
doFinishWrites() const = 0;

View File

@@ -708,13 +708,15 @@ CassandraBackend::writeKeys(
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
<< " . num keys = " << std::to_string(keys.size());
<< " . num keys = " << std::to_string(keys.size())
<< " . concurrentLimit = "
<< std::to_string(indexerMaxRequestsOutstanding);
std::atomic_uint32_t numRemaining = keys.size();
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs;
cbs.reserve(keys.size());
uint32_t concurrentLimit = maxRequestsOutstanding;
uint32_t concurrentLimit = indexerMaxRequestsOutstanding;
uint32_t numSubmitted = 0;
for (auto& key : keys)
{
@@ -761,7 +763,7 @@ CassandraBackend::writeBooks(
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<WriteBookCallbackData>> cbs;
uint32_t concurrentLimit = maxRequestsOutstanding / 2;
uint32_t concurrentLimit = indexerMaxRequestsOutstanding;
std::atomic_uint32_t numOutstanding = 0;
size_t count = 0;
auto start = std::chrono::system_clock::now();
@@ -1429,7 +1431,7 @@ CassandraBackend::open(bool readOnly)
query = {};
query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?";
<< " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?";
if (!selectKeys_.prepareStatement(query, session_.get()))
continue;
@@ -1613,6 +1615,11 @@ CassandraBackend::open(bool readOnly)
{
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
}
if (config_.contains("indexer_max_requests_outstanding"))
{
indexerMaxRequestsOutstanding =
config_["indexer_max_requests_outstanding"].as_int64();
}
/*
if (config_.contains("run_indexer"))
{

View File

@@ -634,6 +634,9 @@ private:
// maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded
uint32_t maxRequestsOutstanding = 10000;
// we keep this small because the indexer runs in the background, and we
// don't want the database to be swamped when the indexer is running
uint32_t indexerMaxRequestsOutstanding = 10;
mutable std::atomic_uint32_t numRequestsOutstanding_ = 0;
// mutex and condition_variable to limit the number of concurrent in flight
@@ -798,6 +801,14 @@ public:
{
// wait for all other writes to finish
sync();
auto rng = fetchLedgerRangeNoThrow();
if (rng && rng->maxSequence >= ledgerSequence_)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Ledger " << std::to_string(ledgerSequence_)
<< " already written. Returning";
return false;
}
// write range
if (isFirstLedger_)
{
@@ -811,7 +822,16 @@ public:
statement.bindInt(ledgerSequence_);
statement.bindBoolean(true);
statement.bindInt(ledgerSequence_ - 1);
return executeSyncUpdate(statement);
if (!executeSyncUpdate(statement))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Update failed for ledger "
<< std::to_string(ledgerSequence_) << ". Returning";
return false;
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " Committed ledger "
<< std::to_string(ledgerSequence_);
return true;
}
void
writeLedger(
@@ -1495,6 +1515,7 @@ public:
bool
executeSyncUpdate(CassandraStatement const& statement) const
{
bool timedOut = false;
CassFuture* fut;
CassError rc;
do
@@ -1503,8 +1524,9 @@ public:
rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
timedOut = true;
std::stringstream ss;
ss << "Cassandra sync write error";
ss << "Cassandra sync update error";
ss << ", retrying";
ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str();
@@ -1532,7 +1554,15 @@ public:
return false;
}
cass_result_free(res);
return success == cass_true;
if (success != cass_true && timedOut)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Update failed, but timedOut is true";
}
// if there was a timeout, the update may have succeeded in the
// background. We can't differentiate between an async success and
// another writer, so we just return true here
return success == cass_true || timedOut;
}
CassandraResult

View File

@@ -156,6 +156,20 @@ public:
cv_.notify_all();
return ret;
}
/// @return element popped from queue. Will block until queue is non-empty
std::optional<T>
tryPop()
{
std::unique_lock lck(m_);
if (queue_.empty())
return {};
T ret = std::move(queue_.front());
queue_.pop();
// if queue has a max size, unblock any possible pushers
if (maxSize_)
cv_.notify_all();
return ret;
}
};
/// Parititions the uint256 keyspace into numMarkers partitions, each of equal

View File

@@ -250,6 +250,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
<< ". Postgres insert error: " << res.msg();
if (res)
ss << ". Query status not PGRES_COPY_IN: " << res.status();
BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str());
}
@@ -259,6 +260,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
ss << "bulkInsert to " << table
<< ". PQputCopyData error: " << PQerrorMessage(conn_.get());
disconnect();
BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str());
}
@@ -268,6 +270,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
ss << "bulkInsert to " << table
<< ". PQputCopyEnd error: " << PQerrorMessage(conn_.get());
disconnect();
BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str());
}
@@ -282,7 +285,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
ss << "bulkInsert to " << table
<< ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status;
disconnect();
BOOST_LOG_TRIVIAL(debug) << __func__ << " " << records;
BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str());
}
}

View File

@@ -89,8 +89,8 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence)
{
// check that database is actually empty
auto ledger = flatMapBackend_->fetchLedgerBySequence(startingSequence);
if (ledger)
auto rng = flatMapBackend_->fetchLedgerRangeNoThrow();
if (rng)
{
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
<< "Database is not empty";
@@ -156,7 +156,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
{
try
{
auto range = flatMapBackend_->fetchLedgerRange();
auto range = flatMapBackend_->fetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence)
{
@@ -359,8 +359,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Starting etl pipeline";
writing_ = true;
auto parent = flatMapBackend_->fetchLedgerBySequence(startSequence - 1);
if (!parent)
auto rng = flatMapBackend_->fetchLedgerRangeNoThrow();
if (!rng || rng->maxSequence != startSequence - 1)
{
assert(false);
throw std::runtime_error("runETLPipeline: parent ledger is null");
@@ -385,14 +385,14 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
std::cout << std::to_string((sequence - startSequence) % numExtractors);
return queues[(sequence - startSequence) % numExtractors];
};
std::vector<std::thread> threads;
std::vector<std::thread> extractors;
for (size_t i = 0; i < numExtractors; ++i)
{
auto transformQueue = std::make_shared<QueueType>(maxQueueSize);
queues.push_back(transformQueue);
std::cout << "added to queues";
threads.emplace_back([this,
extractors.emplace_back([this,
&startSequence,
&writeConflict,
transformQueue,
@@ -503,7 +503,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
lastPublishedSequence = lgrInfo.seq;
}
writeConflict = !success;
auto range = flatMapBackend_->fetchLedgerRange();
auto range = flatMapBackend_->fetchLedgerRangeNoThrow();
if (onlineDeleteInterval_ && !deleting_ &&
range->maxSequence - range->minSequence >
*onlineDeleteInterval_)
@@ -520,10 +520,15 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
}
}};
// wait for all of the threads to stop
for (auto& t : threads)
t.join();
transformer.join();
for (size_t i = 0; i < numExtractors; ++i)
{
// pop from each queue that might be blocked on a push
getNext(i)->tryPop();
}
// wait for all of the extractors to stop
for (auto& t : extractors)
t.join();
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence
@@ -598,8 +603,8 @@ ReportingETL::monitor()
{
if (startSequence_)
{
throw std::runtime_error(
"start sequence specified but db is already populated");
BOOST_LOG_TRIVIAL(warning)
<< "start sequence specified but db is already populated";
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : "

View File

@@ -510,7 +510,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1):
print(json.dumps(x))
blobs.append(x)
keys.append(x["index"])
if limit != -1 and len(keys) > count:
if count != -1 and len(keys) > count:
print("stopping early")
print(len(keys))
print("done")
@@ -598,7 +598,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency,
req["cursor"] = cursor
await ws.send(json.dumps(req))
res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True))
#print(json.dumps(res,indent=4,sort_keys=True))
if "result" in res:
res = res["result"]
for x in res["offers"]: