This commit is contained in:
CJ Cobb
2021-07-09 23:45:01 +00:00
parent 8684dba694
commit 607ea0a76e
2 changed files with 65 additions and 928 deletions

View File

@@ -55,134 +55,31 @@ processAsyncWrite(CassFuture* fut, void* cbData)
T& requestParams = *static_cast<T*>(cbData);
processAsyncWriteResponse(requestParams, fut, requestParams.retry);
}
/*
// Process the result of an asynchronous write. Retry on error
// @param fut cassandra future associated with the write
// @param cbData struct that holds the request parameters
template <class T>
void
flatMapWriteCallback(CassFuture* fut, void* cbData)
processAsyncRead(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->write(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
*/
/*
void
retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry)
{
auto const& backend = *requestParams.backend;
if (requestParams.isDeleted)
backend.writeDeletedKey(requestParams, true);
else
backend.writeKey(requestParams, true);
}
void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
processAsyncWriteResponse(requestParams, fut, retryWriteKey);
}
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
CassandraBackend const& backend = *requestParams.backend;
auto rc = cass_future_error_code(fut);
T& requestParams = *static_cast<T*>(cbData);
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
BOOST_LOG_TRIVIAL(info) << __func__;
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.ioContext_, std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &backend](
const boost::system::error_code& error) {
backend.writeKey(requestParams, true);
});
requestParams.result = {};
}
else
{
auto finish = [&backend]() {
--(backend.numRequestsOutstanding_);
CassandraResult result =
std::move(CassandraResult(cass_future_get_result(fut)));
requestParams.populate(result);
backend.throttleCv_.notify_all();
if (backend.numRequestsOutstanding_ == 0)
backend.syncCv_.notify_all();
};
CassandraResult result{cass_future_get_result(fut)};
if (!result)
{
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc
<< ", " << cass_error_desc(rc);
finish();
return;
}
requestParams.createdSequence = result.getUInt32();
backend.writeDeletedKey(requestParams, false);
std::lock_guard lck(requestParams.mtx);
size_t batchSize = requestParams.batchSize;
if (++(requestParams_.numFinished) == batchSize)
requestParams_.cv.notify_all();
}
}
*/
/*
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteTransactionCallbackData& requestParams =
*static_cast<CassandraBackend::WriteTransactionCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeTransaction(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteAccountTxCallbackData& requestParams =
*static_cast<CassandraBackend::WriteAccountTxCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeAccountTx(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHeaderCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHeaderCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeLedgerHeader(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteLedgerHashCallbackData& requestParams =
*static_cast<CassandraBackend::WriteLedgerHashCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeLedgerHash(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
*/
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
@@ -224,6 +121,26 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData)
}
}
/*
template <class T, class Result>
struct ReadCallbackData
{
using Finisher = std::function<Result(CassandraResult)>;
T data;
CassandraBackend const* backend;
Finisher finish;
ReadCallbackData(CassandraBackend const* backend, T&& d, Finisher f)
: backend(b), data(d), finish(f)
{
}
void
finish(CassandraResult& res)
{
finish(res)
}
};
*/
template <class T, class B>
struct CallbackData
{
@@ -292,11 +209,10 @@ struct BulkWriteCallbackData : public CallbackData<T, B>
void
finish() override
{
{
std::lock_guard lck(mtx);
--numRemaining;
cv.notify_one();
}
// TODO: it would be nice to avoid this lock.
std::lock_guard lck(mtx);
--numRemaining;
cv.notify_one();
}
~BulkWriteCallbackData()
{
@@ -531,120 +447,6 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
return hashes;
}
struct ReadDiffCallbackData
{
CassandraBackend const& backend;
uint32_t sequence;
std::vector<LedgerObject>& result;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadDiffCallbackData(
CassandraBackend const& backend,
uint32_t sequence,
std::vector<LedgerObject>& result,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, sequence(sequence)
, result(result)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
};
void
flatMapReadDiffCallback(CassFuture* fut, void* cbData);
void
readDiff(ReadDiffCallbackData& data)
{
CassandraStatement statement{
data.backend.getSelectLedgerDiffPreparedStatement()};
statement.bindInt(data.sequence);
data.backend.executeAsyncRead(statement, flatMapReadDiffCallback, data);
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadDiffCallback(CassFuture* fut, void* cbData)
{
ReadDiffCallbackData& requestParams =
*static_cast<ReadDiffCallbackData*>(cbData);
auto func = [](auto& params) { readDiff(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func, true};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
do
{
requestParams.result.push_back(
{result.getUInt256(), result.getBytes()});
} while (result.nextRow());
}
}
std::map<uint32_t, std::vector<LedgerObject>>
CassandraBackend::fetchLedgerDiffs(std::vector<uint32_t> const& sequences) const
{
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::map<uint32_t, std::vector<LedgerObject>> results;
std::vector<std::shared_ptr<ReadDiffCallbackData>> cbs;
cbs.reserve(sequences.size());
for (std::size_t i = 0; i < sequences.size(); ++i)
{
cbs.push_back(std::make_shared<ReadDiffCallbackData>(
*this,
sequences[i],
results[sequences[i]],
cv,
numFinished,
sequences.size()));
readDiff(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &sequences]() {
return numFinished == sequences.size();
});
return results;
}
std::vector<LedgerObject>
CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
{
CassandraStatement statement{selectLedgerDiff_};
statement.bindInt(ledgerSequence);
auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement);
auto mid = std::chrono::system_clock::now();
if (!result)
return {};
std::vector<LedgerObject> objects;
do
{
objects.push_back({result.getUInt256(), result.getBytes()});
} while (result.nextRow());
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Fetched diff. Fetch time = "
<< std::to_string((mid - start).count() / 1000000000.0)
<< " . total time = "
<< std::to_string((end - start).count() / 1000000000.0);
return objects;
}
LedgerPage
CassandraBackend::doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
@@ -746,241 +548,6 @@ CassandraBackend::fetchLedgerObjects(
<< "Fetched " << numKeys << " records from Cassandra";
return results;
}
struct WriteBookCallbackData
{
CassandraBackend const& backend;
ripple::uint256 book;
ripple::uint256 offerKey;
uint32_t ledgerSequence;
std::condition_variable& cv;
std::atomic_uint32_t& numOutstanding;
std::mutex& mtx;
uint32_t currentRetries = 0;
WriteBookCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& book,
ripple::uint256 const& offerKey,
uint32_t ledgerSequence,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numOutstanding)
: backend(backend)
, book(book)
, offerKey(offerKey)
, ledgerSequence(ledgerSequence)
, cv(cv)
, mtx(mtx)
, numOutstanding(numOutstanding)
{
}
};
void
writeBookCallback(CassFuture* fut, void* cbData);
void
writeBook(WriteBookCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
statement.bindBytes(cb.book.data(), 24);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.book.data() + 24, 8);
statement.bindBytes(cb.offerKey);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true);
}
void
writeBookCallback(CassFuture* fut, void* cbData)
{
WriteBookCallbackData& requestParams =
*static_cast<WriteBookCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert book error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
writeBook(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numOutstanding;
requestParams.cv.notify_one();
}
}
}
struct WriteKeyCallbackData
{
CassandraBackend const& backend;
ripple::uint256 key;
uint32_t ledgerSequence;
std::condition_variable& cv;
std::atomic_uint32_t& numRemaining;
std::mutex& mtx;
uint32_t currentRetries = 0;
WriteKeyCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& key,
uint32_t ledgerSequence,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numRemaining)
: backend(backend)
, key(key)
, ledgerSequence(ledgerSequence)
, cv(cv)
, mtx(mtx)
, numRemaining(numRemaining)
{
}
};
struct OnlineDeleteCallbackData
{
CassandraBackend const& backend;
ripple::uint256 key;
uint32_t ledgerSequence;
std::vector<unsigned char> object;
std::condition_variable& cv;
std::atomic_uint32_t& numOutstanding;
std::mutex& mtx;
uint32_t currentRetries = 0;
OnlineDeleteCallbackData(
CassandraBackend const& backend,
ripple::uint256&& key,
uint32_t ledgerSequence,
std::vector<unsigned char>&& object,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numOutstanding)
: backend(backend)
, key(std::move(key))
, ledgerSequence(ledgerSequence)
, object(std::move(object))
, cv(cv)
, mtx(mtx)
, numOutstanding(numOutstanding)
{
}
};
void
onlineDeleteCallback(CassFuture* fut, void* cbData);
void
onlineDelete(OnlineDeleteCallbackData& cb)
{
{
CassandraStatement statement{
cb.backend.getInsertObjectPreparedStatement()};
statement.bindBytes(cb.key);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.object);
cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true);
}
}
void
onlineDeleteCallback(CassFuture* fut, void* cbData)
{
OnlineDeleteCallbackData& requestParams =
*static_cast<OnlineDeleteCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert book error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
onlineDelete(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numOutstanding;
requestParams.cv.notify_one();
}
}
}
void
writeKeyCallback(CassFuture* fut, void* cbData);
void
writeKey(WriteKeyCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertKeyPreparedStatement()};
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.key);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeKeyCallback, cb, true);
}
void
writeKeyCallback(CassFuture* fut, void* cbData)
{
WriteKeyCallbackData& requestParams =
*static_cast<WriteKeyCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert key error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
writeKey(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a key";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numRemaining;
requestParams.cv.notify_one();
}
}
}
bool
CassandraBackend::writeKeys(
@@ -995,7 +562,7 @@ CassandraBackend::writeKeys(
statement.bindBytes(key);
return statement;
};
std::atomic_int numRemaining = keys.size();
std::atomic_int numOutstanding = keys.size();
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<BulkWriteCallbackData<
@@ -1017,261 +584,28 @@ CassandraBackend::writeKeys(
this,
std::make_pair(index.keyIndex, std::move(key)),
bind,
numRemaining,
numOutstanding,
mtx,
cv));
++numOutstanding;
++numSubmitted;
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() {
BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " "
<< std::to_string(numRemaining) << " "
<< std::to_string(keys.size()) << " "
<< std::to_string(concurrentLimit);
cv.wait(lck, [&numOutstanding, concurrentLimit, &keys]() {
// keys.size() - i is number submitted. keys.size() -
// numRemaining is number completed Difference is num
// outstanding
return (numSubmitted - (keys.size() - numRemaining)) <
concurrentLimit;
return numOutstanding < concurrentLimit;
});
if (numSubmitted % 100000 == 0)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Submitted " << std::to_string(numSubmitted)
<< " write requests. Completed "
<< (keys.size() - numRemaining);
<< __func__ << " Submitted " << std::to_string(numSubmitted);
}
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
return true;
}
bool
CassandraBackend::isIndexed(uint32_t ledgerSequence) const
{
return false;
/*
auto rng = fetchLedgerRange();
if (!rng)
return false;
if (ledgerSequence != rng->minSequence &&
ledgerSequence != (ledgerSequence >> indexerShift_ << indexerShift_))
ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) +
(1 << indexerShift_);
CassandraStatement statement{selectKeys_};
statement.bindInt(ledgerSequence);
ripple::uint256 zero;
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
return !!result;
*/
}
std::optional<uint32_t>
CassandraBackend::getNextToIndex() const
{
return {};
/*
auto rng = fetchLedgerRange();
if (!rng)
return {};
uint32_t cur = rng->minSequence;
while (isIndexed(cur))
{
cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_);
}
return cur;
*/
}
bool
CassandraBackend::runIndexer(uint32_t ledgerSequence) const
{
return false;
/*
auto start = std::chrono::system_clock::now();
constexpr uint32_t limit = 2048;
std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, ripple::uint256> offers;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::optional<ripple::uint256> cursor;
size_t numOffers = 0;
uint32_t base = ledgerSequence;
auto rng = fetchLedgerRange();
if (base != rng->minSequence)
{
base = (base >> indexerShift_) << indexerShift_;
base -= (1 << indexerShift_);
if (base < rng->minSequence)
base = rng->minSequence;
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " base = " << std::to_string(base)
<< " next to index = " << std::to_string(ledgerSequence);
while (true)
{
try
{
auto [objects, curCursor] = fetchLedgerPage(cursor, base, limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
for (auto& obj : objects)
{
if (isOffer(obj.blob))
{
auto bookDir = getBook(obj.blob);
books[bookDir].insert(obj.key);
offers[obj.key] = bookDir;
++numOffers;
}
keys.insert(std::move(obj.key));
if (keys.size() % 100000 == 0)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Fetched "
<< std::to_string(keys.size()) << "keys";
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all keys from ledger " << std::to_string(base)
<< " . num keys = " << keys.size() << " num books = " << books.size()
<< " num offers = " << numOffers << " . Took "
<< (mid - start).count() / 1000000000.0;
if (base == ledgerSequence)
{
BOOST_LOG_TRIVIAL(info) << __func__ << "Writing keys";
writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence, numOffers);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (end - mid).count() / 1000000000.0
<< ". Entire operation took "
<< (end - start).count() / 1000000000.0;
}
else
{
writeBooks(books, base, numOffers);
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote books. Skipping writing keys";
}
uint32_t prevLedgerSequence = base;
uint32_t nextLedgerSequence =
((prevLedgerSequence >> indexerShift_) << indexerShift_);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " next base = " << std::to_string(nextLedgerSequence);
nextLedgerSequence += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " next = " << std::to_string(nextLedgerSequence);
while (true)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Processing diffs. nextLedger = "
<< std::to_string(nextLedgerSequence);
auto rng = fetchLedgerRange();
if (rng->maxSequence < nextLedgerSequence)
break;
start = std::chrono::system_clock::now();
for (size_t i = prevLedgerSequence; i <= nextLedgerSequence; i += 256)
{
auto start2 = std::chrono::system_clock::now();
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>>
booksDeleted;
size_t numOffersDeleted = 0;
// Get the diff and update keys
std::vector<LedgerObject> objs;
std::vector<uint32_t> sequences(256, 0);
std::iota(sequences.begin(), sequences.end(), i + 1);
auto diffs = fetchLedgerDiffs(sequences);
for (auto const& diff : diffs)
{
for (auto const& obj : diff.second)
{
// remove deleted keys
if (obj.blob.size() == 0)
{
keys.erase(obj.key);
if (offers.count(obj.key) > 0)
{
auto book = offers[obj.key];
if (booksDeleted[book].insert(obj.key).second)
++numOffersDeleted;
offers.erase(obj.key);
}
}
else
{
// insert other keys. keys is a set, so this is a
// noop if obj.key is already in keys
keys.insert(obj.key);
// if the object is an offer, add to books
if (isOffer(obj.blob))
{
auto book = getBook(obj.blob);
if (books[book].insert(obj.key).second)
++numOffers;
offers[obj.key] = book;
}
}
}
}
if (sequences.back() % 256 != 0)
{
BOOST_LOG_TRIVIAL(error)
<< __func__
<< " back : " << std::to_string(sequences.back())
<< " front : " << std::to_string(sequences.front())
<< " size : " << std::to_string(sequences.size());
throw std::runtime_error(
"Last sequence is not divisible by 256");
}
for (auto& book : booksDeleted)
{
for (auto& offerKey : book.second)
{
if (books[book.first].erase(offerKey))
--numOffers;
}
}
writeBooks(books, sequences.back(), numOffers);
writeBooks(booksDeleted, sequences.back(), numOffersDeleted);
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info) << __func__ << " Fetched 256 diffs. Took "
<< (mid - start2).count() / 1000000000.0;
}
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all from diffs "
<< std::to_string(nextLedgerSequence)
<< " shift width = " << std::to_string(indexerShift_)
<< ". num keys = " << keys.size() << " . Took "
<< (end - start).count() / 1000000000.0
<< " prev ledger = " << std::to_string(prevLedgerSequence);
writeKeys(keys, nextLedgerSequence);
prevLedgerSequence = nextLedgerSequence;
nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_);
}
return true;
*/
}
bool
CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
{
@@ -1285,11 +619,22 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
if (minLedger <= rng->minSequence)
return false;
auto bind = [this](auto& params) {
auto& [key, seq, obj] = params.data;
CassandraStatement statement{insertObject_};
statement.bindBytes(key);
statement.bindInt(seq);
statement.bindBytes(obj);
return statement;
};
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<OnlineDeleteCallbackData>> cbs;
std::vector<std::shared_ptr<BulkWriteCallbackData<
std::tuple<ripple::uint256, uint32_t, Blob>,
typename std::remove_reference<decltype(bind)>::type>>>
cbs;
uint32_t concurrentLimit = 10;
std::atomic_uint32_t numOutstanding = 0;
std::atomic_int numOutstanding = 0;
// iterate through latest ledger, updating TTL
std::optional<ripple::uint256> cursor;
@@ -1311,16 +656,15 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
for (auto& obj : objects)
{
++numOutstanding;
cbs.push_back(std::make_shared<OnlineDeleteCallbackData>(
*this,
std::move(obj.key),
minLedger,
std::move(obj.blob),
cv,
cbs.push_back(makeAndExecuteBulkAsyncWrite(
this,
std::make_tuple(
std::move(obj.key), minLedger, std::move(obj.blob)),
bind,
numOutstanding,
mtx,
numOutstanding));
cv));
onlineDelete(*cbs.back());
std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numOutstanding, concurrentLimit]() {

View File

@@ -36,26 +36,12 @@
namespace Backend {
void
flatMapWriteCallback(CassFuture* fut, void* cbData);
void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
void
flatMapWriteBookCallback(CassFuture* fut, void* cbData);
void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData);
void
flatMapReadCallback(CassFuture* fut, void* cbData);
void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
class CassandraPreparedStatement
{
@@ -571,7 +557,7 @@ public:
~CassandraAsyncResult()
{
if (result_.isOk() or timedOut_)
if (result_.isOk() || timedOut_)
{
BOOST_LOG_TRIVIAL(trace) << "finished a request";
size_t batchSize = requestParams_.batchSize;
@@ -644,11 +630,6 @@ private:
CassandraPreparedStatement insertKey_;
CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement getBook_;
CassandraPreparedStatement selectBook_;
CassandraPreparedStatement completeBook_;
CassandraPreparedStatement insertBook_;
CassandraPreparedStatement insertBook2_;
CassandraPreparedStatement deleteBook_;
CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_;
CassandraPreparedStatement insertLedgerHeader_;
@@ -659,7 +640,6 @@ private:
CassandraPreparedStatement selectLedgerBySeq_;
CassandraPreparedStatement selectLatestLedger_;
CassandraPreparedStatement selectLedgerRange_;
CassandraPreparedStatement selectLedgerDiff_;
// io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_;
@@ -732,27 +712,11 @@ public:
open_ = false;
}
CassandraPreparedStatement const&
getInsertKeyPreparedStatement() const
{
return insertKey_;
}
CassandraPreparedStatement const&
getInsertBookPreparedStatement() const
{
return insertBook2_;
}
CassandraPreparedStatement const&
getInsertObjectPreparedStatement() const
{
return insertObject_;
}
CassandraPreparedStatement const&
getSelectLedgerDiffPreparedStatement() const
{
return selectLedgerDiff_;
}
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
@@ -803,39 +767,6 @@ public:
return {{}, {}};
}
struct WriteLedgerHeaderCallbackData
{
CassandraBackend const* backend;
uint32_t sequence;
std::string header;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteLedgerHeaderCallbackData(
CassandraBackend const* f,
uint32_t sequence,
std::string&& header)
: backend(f), sequence(sequence), header(std::move(header))
{
}
};
struct WriteLedgerHashCallbackData
{
CassandraBackend const* backend;
ripple::uint256 hash;
uint32_t sequence;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteLedgerHashCallbackData(
CassandraBackend const* f,
ripple::uint256 hash,
uint32_t sequence)
: backend(f), hash(hash), sequence(sequence)
{
}
};
bool
doFinishWrites() const override
{
@@ -870,25 +801,6 @@ public:
ripple::LedgerInfo const& ledgerInfo,
std::string&& header,
bool isFirst = false) const override;
void
writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const
{
CassandraStatement statement{insertLedgerHash_};
statement.bindBytes(cb.hash);
statement.bindInt(cb.sequence);
executeAsyncWrite(
statement, flatMapWriteLedgerHashCallback, cb, isRetry);
}
void
writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const
{
CassandraStatement statement{insertLedgerHeader_};
statement.bindInt(cb.sequence);
statement.bindBytes(cb.header);
executeAsyncWrite(
statement, flatMapWriteLedgerHeaderCallback, cb, isRetry);
}
std::optional<uint32_t>
fetchLatestLedgerSequence() const override
@@ -1104,79 +1016,6 @@ public:
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override;
struct WriteCallbackData
{
CassandraBackend const* backend;
std::string key;
uint32_t sequence;
uint32_t createdSequence = 0;
std::string blob;
bool isCreated;
bool isDeleted;
std::optional<ripple::uint256> book;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteCallbackData(
CassandraBackend const* f,
std::string&& key,
uint32_t sequence,
std::string&& blob,
bool isCreated,
bool isDeleted,
std::optional<ripple::uint256>&& inBook)
: backend(f)
, key(std::move(key))
, sequence(sequence)
, blob(std::move(blob))
, isCreated(isCreated)
, isDeleted(isDeleted)
, book(std::move(inBook))
{
}
};
struct WriteAccountTxCallbackData
{
CassandraBackend const* backend;
ripple::AccountID account;
uint32_t ledgerSequence;
uint32_t transactionIndex;
ripple::uint256 txHash;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteAccountTxCallbackData(
CassandraBackend const* f,
ripple::AccountID&& account,
uint32_t lgrSeq,
uint32_t txIdx,
ripple::uint256&& hash)
: backend(f)
, account(std::move(account))
, ledgerSequence(lgrSeq)
, transactionIndex(txIdx)
, txHash(std::move(hash))
{
}
};
/*
void
write(WriteCallbackData& data, bool isRetry) const
{
{
CassandraStatement statement{insertObject_};
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
statement.bindBytes(data.blob);
executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry);
}
}*/
void
doWriteLedgerObject(
std::string&& key,
@@ -1190,35 +1029,6 @@ public:
writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const override;
struct WriteTransactionCallbackData
{
CassandraBackend const* backend;
// The shared pointer to the node object must exist until it's
// confirmed persisted. Otherwise, it can become deleted
// prematurely if other copies are removed from caches.
std::string hash;
uint32_t sequence;
std::string transaction;
std::string metadata;
uint32_t currentRetries = 0;
std::atomic<int> refs = 1;
WriteTransactionCallbackData(
CassandraBackend const* f,
std::string&& hash,
uint32_t sequence,
std::string&& transaction,
std::string&& metadata)
: backend(f)
, hash(std::move(hash))
, sequence(sequence)
, transaction(std::move(transaction))
, metadata(std::move(metadata))
{
}
};
void
writeTransaction(
std::string&& hash,
@@ -1247,27 +1057,10 @@ public:
return ioContext_;
}
friend void
flatMapWriteCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteKeyCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteTransactionCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteBookCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData);
friend void
flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadCallback(CassFuture* fut, void* cbData);
friend void
flatMapReadObjectCallback(CassFuture* fut, void* cbData);
friend void
flatMapGetCreatedCallback(CassFuture* fut, void* cbData);
inline void
incremementOutstandingRequestCount() const