new book offers algorithm, seems to work

This commit is contained in:
CJ Cobb
2021-03-31 19:22:53 +00:00
parent d27b53e4f7
commit db37c05b7b
3 changed files with 255 additions and 79 deletions

View File

@@ -369,9 +369,11 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
objects.push_back({result.getUInt256(), result.getBytes()}); objects.push_back({result.getUInt256(), result.getBytes()});
} while (result.nextRow()); } while (result.nextRow());
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << __func__ << " Fetched diff. Fetch time = " BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Fetched diff. Fetch time = "
<< std::to_string((mid - start).count() / 1000000000.0) << std::to_string((mid - start).count() / 1000000000.0)
<< " . total time = " << std::to_string((end-start).count() / 1000000000.0); << " . total time = "
<< std::to_string((end - start).count() / 1000000000.0);
return objects; return objects;
} }
LedgerPage LedgerPage
@@ -531,6 +533,131 @@ CassandraBackend::fetchLedgerObjects(
<< "Fetched " << numKeys << " records from Cassandra"; << "Fetched " << numKeys << " records from Cassandra";
return results; return results;
} }
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
CassandraBackend::fetchBookOffers(
ripple::uint256 const& book,
uint32_t sequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
CassandraStatement statement{selectBook_};
statement.bindBytes(book);
uint32_t upper = (sequence >> 8) << 8;
if (upper != sequence)
upper += (1 << 8);
statement.bindInt(upper);
if (cursor)
statement.bindBytes(*cursor);
else
{
ripple::uint256 zero = {};
statement.bindBytes(zero);
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
if (!result)
return {{}, {}};
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size();
if (keys.size())
{
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size() != 0)
results.push_back({keys[i], objs[i]});
}
return {results, results[results.size() - 1].key};
}
return {{}, {}};
}
struct WriteBookCallbackData
{
CassandraBackend const& backend;
ripple::uint256 book;
ripple::uint256 offerKey;
uint32_t ledgerSequence;
std::condition_variable& cv;
std::atomic_uint32_t& numRemaining;
std::mutex& mtx;
uint32_t currentRetries = 0;
WriteBookCallbackData(
CassandraBackend const& backend,
ripple::uint256 const& book,
ripple::uint256 const& offerKey,
uint32_t ledgerSequence,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numRemaining)
: backend(backend)
, book(book)
, offerKey(offerKey)
, ledgerSequence(ledgerSequence)
, cv(cv)
, mtx(mtx)
, numRemaining(numRemaining)
{
}
};
void
writeBookCallback(CassFuture* fut, void* cbData);
void
writeBook2(WriteBookCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
statement.bindBytes(cb.book);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.offerKey);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true);
}
void
writeBookCallback(CassFuture* fut, void* cbData)
{
WriteBookCallbackData& requestParams =
*static_cast<WriteBookCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert key error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
writeBook2(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numRemaining;
requestParams.cv.notify_one();
}
}
}
struct WriteKeyCallbackData struct WriteKeyCallbackData
{ {
@@ -631,14 +758,15 @@ CassandraBackend::writeKeys(
std::unique_lock<std::mutex> lck(mtx); std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() { cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() {
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " "
<< std::to_string(numSubmitted) << " " << std::to_string(numRemaining) << std::to_string(numRemaining) << " "
<< " " << std::to_string(keys.size()) << " " << std::to_string(keys.size()) << " "
<< std::to_string(concurrentLimit); << std::to_string(concurrentLimit);
// keys.size() - i is number submitted. keys.size() - // keys.size() - i is number submitted. keys.size() -
// numRemaining is number completed Difference is num // numRemaining is number completed Difference is num
// outstanding // outstanding
return (numSubmitted - (keys.size() - numRemaining)) < concurrentLimit; return (numSubmitted - (keys.size() - numRemaining)) <
concurrentLimit;
}); });
if (numSubmitted % 100000 == 0) if (numSubmitted % 100000 == 0)
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
@@ -656,35 +784,61 @@ bool
CassandraBackend::writeBooks( CassandraBackend::writeBooks(
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>& std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>&
books, books,
uint32_t ledgerSequence) const uint32_t ledgerSequence,
uint32_t numOffers) const
{ {
std::unordered_map<ripple::uint256, uint32_t> sizes; BOOST_LOG_TRIVIAL(info)
size_t numOffers = 0; << __func__ << " Ledger = " << std::to_string(ledgerSequence)
<< " . num books = " << std::to_string(books.size());
std::atomic_uint32_t numRemaining = numOffers;
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<WriteBookCallbackData>> cbs;
uint32_t concurrentLimit = maxRequestsOutstanding / 2;
uint32_t numSubmitted = 0;
auto start = std::chrono::system_clock::now();
for (auto& book : books) for (auto& book : books)
{ {
for (auto& offer : book.second) for (auto& offer : book.second)
{ {
if (sizes.count(book.first) > 0) cbs.push_back(std::make_shared<WriteBookCallbackData>(
sizes[book.first]++; *this,
else book.first,
sizes[book.first] = 1; offer,
++numOffers; ledgerSequence,
} cv,
} mtx,
size_t maxSize = 0; numRemaining));
for (auto& book : sizes) writeBook2(*cbs.back());
{ ++numSubmitted;
if(book.second > maxSize) BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
maxSize = book.second; std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
<< __func__ << " Book = " << ripple::strHex(book.first) cv.wait(
<< " . num offers = " << book.second; lck,
} [&numRemaining, numSubmitted, concurrentLimit, numOffers]() {
BOOST_LOG_TRIVIAL(trace)
<< std::to_string(numSubmitted) << " "
<< std::to_string(numRemaining) << " "
<< std::to_string(numOffers) << " "
<< std::to_string(concurrentLimit);
return (numSubmitted - (numOffers - numRemaining)) <
concurrentLimit;
});
if (numSubmitted % 1000 == 0)
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) << __func__ << " Submitted " << std::to_string(numSubmitted)
<< " . total offers = " << std::to_string(numOffers) << " write requests. Completed "
<< " . total books = " << std::to_string(books.size()) << (numOffers - numRemaining);
<< " . max book size = " << std::to_string(maxSize); }
}
BOOST_LOG_TRIVIAL(info) << __func__
<< "Submitted all book writes. Waiting for them to "
"finish. num submitted = "
<< std::to_string(numSubmitted);
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
BOOST_LOG_TRIVIAL(info) << __func__ << "Finished writing books";
return true; return true;
} }
@@ -749,7 +903,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
<< " num books = " << books.size() << " num offers = " << numOffers << " num books = " << books.size() << " num offers = " << numOffers
<< " . Took " << (mid - start).count() / 1000000000.0; << " . Took " << (mid - start).count() / 1000000000.0;
writeKeys(keys, ledgerSequence); writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence); writeBooks(books, ledgerSequence, numOffers);
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote all keys from ledger " << __func__ << "Wrote all keys from ledger "
@@ -761,9 +915,11 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
uint32_t prevBooksLedgerSequence = ledgerSequence; uint32_t prevBooksLedgerSequence = ledgerSequence;
uint32_t nextLedgerSequence = uint32_t nextLedgerSequence =
((prevLedgerSequence >> indexerShift_) << indexerShift_); ((prevLedgerSequence >> indexerShift_) << indexerShift_);
BOOST_LOG_TRIVIAL(info) << __func__ << " next base = " << std::to_string(nextLedgerSequence); BOOST_LOG_TRIVIAL(info)
<< __func__ << " next base = " << std::to_string(nextLedgerSequence);
nextLedgerSequence += (1 << indexerShift_); nextLedgerSequence += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(info) << __func__ << " next = " << std::to_string(nextLedgerSequence); BOOST_LOG_TRIVIAL(info)
<< __func__ << " next = " << std::to_string(nextLedgerSequence);
if (nextLedgerSequence == prevLedgerSequence) if (nextLedgerSequence == prevLedgerSequence)
{ {
nextLedgerSequence += (1 << indexerShift_); nextLedgerSequence += (1 << indexerShift_);
@@ -833,11 +989,10 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
} }
} }
// books are written every 256 ledgers // books are written every 256 ledgers
if(i % 256 == 0) if (i % 256 == 0)
{ {
// Iterate through books from previous flag ledger, copying over
// Iterate through books from previous flag ledger, copying over any // any that still exist
// that still exist
for (auto& book : books) for (auto& book : books)
{ {
std::vector<ripple::uint256> offerKeys; std::vector<ripple::uint256> offerKeys;
@@ -846,12 +1001,13 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
offerKeys.push_back(offerKey); offerKeys.push_back(offerKey);
} }
auto offers = fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); auto offers =
fetchLedgerObjects(offerKeys, prevBooksLedgerSequence);
for (size_t i = 0; i < offerKeys.size(); ++i) for (size_t i = 0; i < offerKeys.size(); ++i)
{ {
auto& offer = offers[i]; auto& offer = offers[i];
// if the offer was deleted prior to prevLedgerSequence, don't // if the offer was deleted prior to prevLedgerSequence,
// copy // don't copy
if (offer.size() != 0) if (offer.size() != 0)
{ {
auto book = getBook(offer); auto book = getBook(offer);
@@ -860,15 +1016,16 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
} }
else else
{ {
BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping deleted offer"; BOOST_LOG_TRIVIAL(debug)
<< __func__ << " skipping deleted offer";
} }
} }
} }
writeBooks(nextBooks, i); writeBooks(nextBooks, i, nextOffers);
prevBooksLedgerSequence = i; prevBooksLedgerSequence = i;
books = std::move(nextBooks); books = std::move(nextBooks);
nextBooks = {}; nextBooks = {};
nextOffers = 0;
} }
} }
end = std::chrono::system_clock::now(); end = std::chrono::system_clock::now();
@@ -878,8 +1035,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
<< " shift width = " << std::to_string(indexerShift_) << " shift width = " << std::to_string(indexerShift_)
<< ". num keys = " << keys.size() << " . Took " << ". num keys = " << keys.size() << " . Took "
<< (end - start).count() / 1000000000.0 << (end - start).count() / 1000000000.0
<< " prev ledger = " << " prev ledger = " << std::to_string(prevLedgerSequence);
<< std::to_string(prevLedgerSequence);
writeKeys(keys, nextLedgerSequence); writeKeys(keys, nextLedgerSequence);
prevLedgerSequence = nextLedgerSequence; prevLedgerSequence = nextLedgerSequence;
nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_);
@@ -1364,6 +1520,13 @@ CassandraBackend::open()
" ORDER BY key ASC LIMIT ? ALLOW FILTERING"; " ORDER BY key ASC LIMIT ? ALLOW FILTERING";
if (!getBook_.prepareStatement(query, session_.get())) if (!getBook_.prepareStatement(query, session_.get()))
continue; continue;
query = {};
query << "SELECT key FROM " << tablePrefix << "books2 "
<< " WHERE book = ? AND sequence = ? AND "
" key > ? "
" ORDER BY key ASC LIMIT ?";
if (!selectBook_.prepareStatement(query, session_.get()))
continue;
query = {}; query = {};
query << " INSERT INTO " << tablePrefix << "account_tx" query << " INSERT INTO " << tablePrefix << "account_tx"

View File

@@ -604,6 +604,7 @@ private:
CassandraPreparedStatement insertKey_; CassandraPreparedStatement insertKey_;
CassandraPreparedStatement selectKeys_; CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement getBook_; CassandraPreparedStatement getBook_;
CassandraPreparedStatement selectBook_;
CassandraPreparedStatement insertBook_; CassandraPreparedStatement insertBook_;
CassandraPreparedStatement insertBook2_; CassandraPreparedStatement insertBook2_;
CassandraPreparedStatement deleteBook_; CassandraPreparedStatement deleteBook_;
@@ -694,6 +695,11 @@ public:
{ {
return insertKey_; return insertKey_;
} }
CassandraPreparedStatement const&
getInsertBookPreparedStatement() const
{
return insertBook2_;
}
std::pair< std::pair<
std::vector<TransactionAndMetadata>, std::vector<TransactionAndMetadata>,
@@ -954,14 +960,21 @@ public:
std::unordered_map< std::unordered_map<
ripple::uint256, ripple::uint256,
std::unordered_set<ripple::uint256>>& books, std::unordered_set<ripple::uint256>>& books,
uint32_t ledgerSequence) const; uint32_t ledgerSequence,
uint32_t numOffers) const;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>> std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers( fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t sequence, uint32_t sequence,
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const override std::optional<ripple::uint256> const& cursor) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers2(
ripple::uint256 const& book,
uint32_t sequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{ {
CassandraStatement statement{getBook_}; CassandraStatement statement{getBook_};
statement.bindBytes(book); statement.bindBytes(book);

View File

@@ -547,7 +547,7 @@ def run(args):
print(compareAccountTx(res,res2)) print(compareAccountTx(res,res2))
elif args.action == "ledger_data": elif args.action == "ledger_data":
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary)) ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary, args.cursor))
if args.verify: if args.verify:
writeLedgerData(res,args.filename) writeLedgerData(res,args.filename)
elif args.action == "ledger_data_full": elif args.action == "ledger_data_full":