iterate through diffs. don't write anything

This commit is contained in:
CJ Cobb
2021-03-30 11:13:51 -04:00
parent 168283f0aa
commit d1f47b490a
4 changed files with 274 additions and 57 deletions

View File

@@ -1,4 +1,18 @@
#include <functional>
#include <reporting/CassandraBackend.h>
#include <reporting/DBHelpers.h>
#include <unordered_map>
namespace std {
template <>
struct hash<ripple::uint256>
{
std::size_t
operator()(const ripple::uint256& k) const noexcept
{
return boost::hash_range(k.begin(), k.end());
}
};
} // namespace std
namespace Backend {
template <class T, class F>
void
@@ -585,55 +599,17 @@ writeKeyCallback(CassFuture* fut, void* cbData)
}
}
}
bool
CassandraBackend::writeKeys(uint32_t ledgerSequence) const
CassandraBackend::writeKeys(
std::unordered_set<ripple::uint256>& keys,
uint32_t ledgerSequence) const
{
CassandraStatement statement{selectKeys_};
statement.bindInt(ledgerSequence);
ripple::uint256 zero;
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence)
<< " already indexed. Returning";
return false;
}
auto start = std::chrono::system_clock::now();
constexpr uint32_t limit = 2048;
std::vector<ripple::uint256> keys;
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
auto [objects, curCursor] =
fetchLedgerPage(cursor, ledgerSequence, limit);
cursor = curCursor;
for (auto& obj : objects)
{
keys.push_back(std::move(obj.key));
if (keys.size() % 100000 == 0)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Fetched "
<< std::to_string(keys.size()) << "keys";
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (mid - start).count() / 1000000000.0;
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
<< " . num keys = " << std::to_string(keys.size());
return true;
/*
std::atomic_uint32_t numRemaining = keys.size();
std::condition_variable cv;
std::mutex mtx;
@@ -667,12 +643,219 @@ CassandraBackend::writeKeys(uint32_t ledgerSequence) const
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
*/
}
bool
CassandraBackend::writeBooks(
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>&
books,
uint32_t ledgerSequence) const
{
std::unordered_map<ripple::uint256, uint32_t> sizes;
size_t numOffers = 0;
for (auto& book : books)
{
for (auto& offer : book.second)
{
if (sizes.count(offer))
sizes[book.first]++;
else
sizes[book.first] = 1;
++numOffers;
}
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger sequence = " << std::to_string(ledgerSequence)
<< " . total offers = " << std::to_string(numOffers);
for (auto& book : sizes)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Book = " << ripple::strHex(book.first)
<< " . num offers = " << book.second;
}
return true;
}
bool
CassandraBackend::runIndexer(uint32_t ledgerSequence) const
{
CassandraStatement statement{selectKeys_};
statement.bindInt(ledgerSequence);
ripple::uint256 zero;
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence)
<< " already indexed. Returning";
return false;
}
auto start = std::chrono::system_clock::now();
constexpr uint32_t limit = 2048;
std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::optional<ripple::uint256> cursor;
size_t numOffers = 0;
while (true)
{
try
{
auto [objects, curCursor] =
fetchLedgerPage2(cursor, ledgerSequence, limit);
cursor = curCursor;
for (auto& obj : objects)
{
if (isOffer(obj.blob))
{
auto bookDir = getBook(obj.blob);
books[bookDir].insert(obj.key);
++numOffers;
}
keys.insert(std::move(obj.key));
if (keys.size() % 100000 == 0)
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Fetched "
<< std::to_string(keys.size()) << "keys";
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " num books = " << books.size() << " num offers = " << numOffers
<< " . Took " << (mid - start).count() / 1000000000.0;
writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (end - mid).count() / 1000000000.0
<< ". Entire operation took " << (end - start).count() / 1000000000.0;
uint32_t prevLedgerSequence = ledgerSequence;
uint32_t nextLedgerSequence =
((prevLedgerSequence >> indexerShift_) << indexerShift_) +
(1 << indexerShift_);
if (nextLedgerSequence = prevLedgerSequence)
{
nextLedgerSequence += (1 << indexerShift_);
}
while (true)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Processing diffs. nextLedger = "
<< std::to_string(nextLedgerSequence);
auto rng = fetchLedgerRange();
if (rng->maxSequence < nextLedgerSequence)
break;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
nextBooks;
size_t nextOffers = 0;
start = std::chrono::system_clock::now();
for (size_t i = ledgerSequence + 1; i < nextLedgerSequence; ++i)
{
// Get the diff and update keys
auto objs = fetchLedgerDiff(i);
std::unordered_set<ripple::uint256> deleted;
for (auto const& obj : objs)
{
// remove deleted keys
if (obj.blob.size() == 0)
{
keys.erase(obj.key);
deleted.insert(obj.key);
}
else
{
// insert other keys. keys is a set, so this is a noop if
// obj.key is already in keys
keys.insert(obj.key);
// if the object is an offer, add to nextBooks
if (isOffer(obj.blob))
{
auto book = getBook(obj.blob);
if (nextBooks[book].insert(obj.key).second)
++nextOffers;
}
}
}
BOOST_LOG_TRIVIAL(info) << __func__;
// For any deleted keys, check if they are offer objects
std::vector<ripple::uint256> deletedKeys{
deleted.begin(), deleted.end()};
auto deletedObjs = fetchLedgerObjects(deletedKeys, i - 1);
for (size_t j = 0; j < deletedObjs.size(); ++j)
{
auto& obj = deletedObjs[j];
auto& key = deletedKeys[j];
if (!obj.size())
{
BOOST_LOG_TRIVIAL(error)
<< __func__
<< " Deleted object is deleted in prior ledger. "
<< ripple::strHex(key) << " " << std::to_string(i - 1);
throw std::runtime_error("Empty object");
}
// For any deleted keys, check if they are offer objects
// Add key to nextBooks if is offer
if (isOffer(obj))
{
auto book = getBook(obj);
if (nextBooks[book].insert(key).second)
++nextOffers;
}
}
}
end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all from diffs "
<< std::to_string(nextLedgerSequence)
<< " shift width = " << std::to_string(indexerShift_)
<< ". num keys = " << keys.size() << " . Took "
<< (end - start).count() / 1000000000.0;
// Iterate through books from previous flag ledger, copying over any
// that still exist
for (auto& book : books)
{
std::vector<ripple::uint256> offerKeys;
for (auto& offerKey : book.second)
{
offerKeys.push_back(offerKey);
}
auto offers = fetchLedgerObjects(offerKeys, prevLedgerSequence);
for (size_t i = 0; i < offerKeys.size(); ++i)
{
auto& offer = offers[i];
// if the offer was deleted prior to prevLedgerSequence, don't
// copy
if (offer.size() != 0)
{
auto book = getBook(offerKeys[i]);
if (nextBooks[book].insert(offerKeys[i]).second)
++nextOffers;
}
}
}
writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence);
prevLedgerSequence = nextLedgerSequence;
nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_);
books = nextBooks;
}
return true;
}
@@ -972,6 +1155,17 @@ CassandraBackend::open()
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2"
<< " ( book blob, sequence bigint, key blob, PRIMARY KEY "
"((book, sequence), key)) WITH CLUSTERING ORDER BY (key ASC)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "SELECT * FROM " << tablePrefix << "books2"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
" hash blob, "
@@ -1057,6 +1251,11 @@ CassandraBackend::open()
if (!insertBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tablePrefix << "books2"
<< " (book, sequence, key) VALUES (?, ?, ?)";
if (!insertBook2_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, deleted_at) VALUES (?, ?, ?)";
if (!deleteBook_.prepareStatement(query, session_.get()))
@@ -1242,18 +1441,22 @@ CassandraBackend::open()
if (config_.contains("run_indexer"))
{
if (config_["run_indexer"].as_bool())
{
if (config_.contains("indexer_shift"))
{
indexerShift_ = config_["indexer_shift"].as_int64();
}
indexer_ = std::thread{[this]() {
auto seq = fetchLatestLedgerSequence();
if (seq)
auto rng = fetchLedgerRange();
if (rng)
{
auto base = (*seq >> indexerShift_) << indexerShift_;
BOOST_LOG_TRIVIAL(info)
<< "Running indexer. Ledger = " << std::to_string(base)
<< " latest = " << std::to_string(*seq);
writeKeys(base);
BOOST_LOG_TRIVIAL(info) << "Running indexer. Ledger = "
<< std::to_string(rng->minSequence);
runIndexer(rng->minSequence);
BOOST_LOG_TRIVIAL(info) << "Ran indexer";
}
}};
}
}
work_.emplace(ioContext_);

View File

@@ -605,6 +605,7 @@ private:
CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement getBook_;
CassandraPreparedStatement insertBook_;
CassandraPreparedStatement insertBook2_;
CassandraPreparedStatement deleteBook_;
CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_;
@@ -623,7 +624,7 @@ private:
std::thread ioThread_;
std::thread indexer_;
static constexpr uint32_t indexerShift_ = 8;
uint32_t indexerShift_ = 8;
// maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded
@@ -942,7 +943,18 @@ public:
fetchLedgerDiff(uint32_t ledgerSequence) const;
bool
writeKeys(uint32_t ledgerSequence) const;
runIndexer(uint32_t ledgerSequence) const;
bool
writeKeys(
std::unordered_set<ripple::uint256>& keys,
uint32_t ledgerSequence) const;
bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>>& books,
uint32_t ledgerSequence) const;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers(

View File

@@ -46,8 +46,9 @@ struct AccountTransactionsData
}
};
template <class T>
inline bool
isOffer(std::string const& object)
isOffer(T const& object)
{
short offer_bytes = (object[1] << 8) | object[2];
return offer_bytes == 0x006f;

View File

@@ -433,6 +433,7 @@ async def ledger(ip, port, ledger, binary, transactions, expand):
await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":bool(binary), "transactions":bool(transactions),"expand":bool(expand)}))
res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True))
print(bool(binary))
return res
except websockets.exceptions.connectionclosederror as e:
@@ -478,7 +479,7 @@ parser.add_argument('--minLedger',default=-1)
parser.add_argument('--maxLedger',default=-1)
parser.add_argument('--filename',default=None)
parser.add_argument('--index')
parser.add_argument('--cursor',"0000000000000000000000000000000000000000000000000000000000000000")
parser.add_argument('--cursor',default='0000000000000000000000000000000000000000000000000000000000000000')