tests working for postges and cassandra. removed books index. need to

implement book_offers
This commit is contained in:
CJ Cobb
2021-05-28 21:21:24 -04:00
parent 572b072271
commit 562f96a30f
8 changed files with 915 additions and 901 deletions

View File

@@ -5,101 +5,20 @@ BackendIndexer::BackendIndexer(boost::json::object const& config)
{ {
if (config.contains("indexer_key_shift")) if (config.contains("indexer_key_shift"))
keyShift_ = config.at("indexer_key_shift").as_int64(); keyShift_ = config.at("indexer_key_shift").as_int64();
if (config.contains("indexer_book_shift"))
bookShift_ = config.at("indexer_book_shift").as_int64();
work_.emplace(ioc_); work_.emplace(ioc_);
ioThread_ = std::thread{[this]() { ioc_.run(); }}; ioThread_ = std::thread{[this]() { ioc_.run(); }};
updateThread_ = std::thread{[this]() { ioc_.run(); }};
}; };
BackendIndexer::~BackendIndexer() BackendIndexer::~BackendIndexer()
{ {
std::unique_lock lck(mutex_); std::unique_lock lck(mutex_);
work_.reset(); work_.reset();
ioThread_.join(); ioThread_.join();
updateThread_.join();
}
void
BackendIndexer::writeLedgerObject(
ripple::uint256&& key,
std::optional<ripple::uint256>&& book,
bool isCreated,
bool isDeleted)
{
++updatesOutstanding_;
boost::asio::post(
ioc_,
[this,
key = std::move(key),
isCreated,
isDeleted,
book = std::move(book)]() {
if (isCreated)
addKey(key);
if (isDeleted)
deleteKey(key);
if (book)
{
if (isCreated)
addBookOffer(*book, key);
if (isDeleted)
deleteBookOffer(*book, key);
}
--updatesOutstanding_;
{
std::unique_lock lck(mtx);
updateCv_.notify_one();
}
});
} }
void void
BackendIndexer::addKey(ripple::uint256 const& key) BackendIndexer::addKey(ripple::uint256&& key)
{ {
std::unique_lock lck(mtx); keys.insert(std::move(key));
keys.insert(key);
keysCumulative.insert(key);
}
void
BackendIndexer::addKeyAsync(ripple::uint256 const& key)
{
std::unique_lock lck(mtx);
keysCumulative.insert(key);
}
void
BackendIndexer::deleteKey(ripple::uint256 const& key)
{
std::unique_lock lck(mtx);
keysCumulative.erase(key);
if (populatingCacheAsync)
deletedKeys.insert(key);
}
void
BackendIndexer::addBookOffer(
ripple::uint256 const& book,
ripple::uint256 const& offerKey)
{
std::unique_lock lck(mtx);
books[book].insert(offerKey);
booksCumulative[book].insert(offerKey);
}
void
BackendIndexer::addBookOfferAsync(
ripple::uint256 const& book,
ripple::uint256 const& offerKey)
{
std::unique_lock lck(mtx);
booksCumulative[book].insert(offerKey);
}
void
BackendIndexer::deleteBookOffer(
ripple::uint256 const& book,
ripple::uint256 const& offerKey)
{
std::unique_lock lck(mtx);
booksCumulative[book].erase(offerKey);
if (populatingCacheAsync)
deletedBooks[book].insert(offerKey);
} }
void void
@@ -152,123 +71,6 @@ writeKeyFlagLedger(
.count(); .count();
} }
void void
writeBookFlagLedger(
uint32_t ledgerSequence,
uint32_t shift,
BackendInterface const& backend,
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books)
{
uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift));
ripple::uint256 zero = {};
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. ledgerSequence = " << std::to_string(ledgerSequence)
<< " nextFlag = " << std::to_string(nextFlag)
<< " books.size() = " << std::to_string(books.size());
auto start = std::chrono::system_clock::now();
backend.writeBooks(books, BookIndex{nextFlag}, true);
backend.writeBooks({{zero, {zero}}}, BookIndex{nextFlag}, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. ledgerSequence = " << std::to_string(ledgerSequence)
<< " nextFlag = " << std::to_string(nextFlag)
<< " books.size() = " << std::to_string(books.size()) << " time = "
<< std::chrono::duration_cast<std::chrono::seconds>(end - start)
.count();
}
void
BackendIndexer::clearCaches()
{
keysCumulative = {};
booksCumulative = {};
}
void
BackendIndexer::doBooksRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng)
return;
if (!sequence)
sequence = rng->maxSequence;
if (sequence < rng->minSequence)
sequence = rng->minSequence;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sequence = " << std::to_string(*sequence);
ripple::uint256 zero = {};
while (true)
{
try
{
auto [objects, cursor, warning] =
backend.fetchBookOffers(zero, *sequence, 1);
if (!warning)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " flag ledger already written. sequence = "
<< std::to_string(*sequence) << "returning";
return;
}
else
{
uint32_t lower = (*sequence - 1) >> bookShift_ << bookShift_;
doBooksRepair(backend, lower);
}
break;
}
catch (DatabaseTimeout& t)
{
;
}
}
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
auto [objects, curCursor, warning] =
backend.fetchLedgerPage(cursor, *sequence, 2048);
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
for (auto& obj : objects)
{
if (isOffer(obj.blob))
{
auto book = getBook(obj.blob);
booksRepair[book].insert(obj.key);
}
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
writeBookFlagLedger(*sequence, bookShift_, backend, booksRepair);
booksRepair = {};
BOOST_LOG_TRIVIAL(info)
<< __func__ << " finished. sequence = " << std::to_string(*sequence);
}
void
BackendIndexer::doKeysRepair( BackendIndexer::doKeysRepair(
BackendInterface const& backend, BackendInterface const& backend,
std::optional<uint32_t> sequence) std::optional<uint32_t> sequence)
@@ -293,34 +95,23 @@ BackendIndexer::doKeysRepair(
try try
{ {
auto [objects, curCursor, warning] = auto [objects, curCursor, warning] =
backend.fetchLedgerPage(cursor, *sequence, 2048); backend.fetchLedgerPage({}, *sequence, 1);
// no cursor means this is the first page // no cursor means this is the first page
if (!cursor) // if there is no warning, we don't need to do a repair
// warning only shows up on the first page
if (!warning)
{ {
// if there is no warning, we don't need to do a repair BOOST_LOG_TRIVIAL(debug)
// warning only shows up on the first page << __func__ << " flag ledger already written. returning";
if (!warning) return;
{
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " flag ledger already written. returning";
return;
}
else
{
uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_;
doKeysRepair(backend, lower);
}
} }
else
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
for (auto& obj : objects)
{ {
keysRepair.insert(obj.key); uint32_t lower = (*sequence - 1) >> keyShift_ << keyShift_;
doKeysRepair(backend, lower);
writeKeyFlagLedgerAsync(lower, backend);
return;
} }
if (!cursor)
break;
} }
catch (DatabaseTimeout const& e) catch (DatabaseTimeout const& e)
{ {
@@ -329,104 +120,10 @@ BackendIndexer::doKeysRepair(
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(std::chrono::seconds(2));
} }
} }
writeKeyFlagLedger(*sequence, keyShift_, backend, keysRepair);
keysRepair = {};
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " finished. sequence = " << std::to_string(*sequence); << __func__ << " finished. sequence = " << std::to_string(*sequence);
} }
void
BackendIndexer::populateCaches(BackendInterface const& backend)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng)
return;
uint32_t sequence = rng->maxSequence;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sequence = " << std::to_string(sequence);
doBooksRepair(backend, sequence);
doKeysRepair(backend, sequence);
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
auto [objects, curCursor, warning] =
backend.fetchLedgerPage(cursor, sequence, 2048);
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
for (auto& obj : objects)
{
addKeyAsync(obj.key);
if (isOffer(obj.blob))
{
auto book = getBook(obj.blob);
addBookOfferAsync(book, obj.key);
}
}
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
// Do reconcilation. Remove anything from keys or books that shouldn't
// be there
{
std::unique_lock lck(mtx);
populatingCacheAsync = false;
}
for (auto& key : deletedKeys)
{
deleteKey(key);
}
for (auto& book : deletedBooks)
{
for (auto& offer : book.second)
{
deleteBookOffer(book.first, offer);
}
}
{
std::unique_lock lck(mtx);
deletedKeys = {};
deletedBooks = {};
cacheCv_.notify_one();
}
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. keys.size() = " << std::to_string(keysCumulative.size());
}
void
BackendIndexer::populateCachesAsync(BackendInterface const& backend)
{
if (keysCumulative.size() > 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " caches already populated. returning";
return;
}
{
std::unique_lock lck(mtx);
populatingCacheAsync = true;
}
BOOST_LOG_TRIVIAL(info) << __func__;
boost::asio::post(ioc_, [this, &backend]() { populateCaches(backend); });
}
void
BackendIndexer::waitForCaches()
{
std::unique_lock lck(mtx);
cacheCv_.wait(lck, [this]() {
return !populatingCacheAsync && deletedKeys.size() == 0;
});
}
void void
BackendIndexer::writeKeyFlagLedgerAsync( BackendIndexer::writeKeyFlagLedgerAsync(
uint32_t ledgerSequence, uint32_t ledgerSequence,
@@ -436,28 +133,82 @@ BackendIndexer::writeKeyFlagLedgerAsync(
<< __func__ << __func__
<< " starting. sequence = " << std::to_string(ledgerSequence); << " starting. sequence = " << std::to_string(ledgerSequence);
waitForCaches(); boost::asio::post(ioc_, [this, ledgerSequence, &backend]() {
auto keysCopy = keysCumulative; std::unordered_set<ripple::uint256> keys;
boost::asio::post(ioc_, [=, this, &backend]() { auto nextFlag = getKeyIndexOfSeq(ledgerSequence + 1);
writeKeyFlagLedger(ledgerSequence, keyShift_, backend, keysCopy); BOOST_LOG_TRIVIAL(info)
}); << "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex)
BOOST_LOG_TRIVIAL(info) << " starting";
<< __func__ ripple::uint256 zero = {};
<< " finished. sequence = " << std::to_string(ledgerSequence); std::optional<ripple::uint256> cursor;
} size_t numKeys = 0;
void auto begin = std::chrono::system_clock::now();
BackendIndexer::writeBookFlagLedgerAsync( while (true)
uint32_t ledgerSequence, {
BackendInterface const& backend) try
{ {
BOOST_LOG_TRIVIAL(info) auto start = std::chrono::system_clock::now();
<< __func__ auto [objects, curCursor, warning] =
<< " starting. sequence = " << std::to_string(ledgerSequence); backend.fetchLedgerPage(cursor, ledgerSequence, 2048);
auto mid = std::chrono::system_clock::now();
// no cursor means this is the first page
if (!cursor)
{
// if there is no warning, we don't need to do a repair
// warning only shows up on the first page
if (warning)
{
BOOST_LOG_TRIVIAL(error)
<< "writeKeyFlagLedger - "
<< " prev flag ledger not written "
<< std::to_string(nextFlag.keyIndex) << " : "
<< std::to_string(ledgerSequence);
assert(false);
throw std::runtime_error("Missing prev flag");
}
}
waitForCaches(); cursor = curCursor;
auto booksCopy = booksCumulative; for (auto& obj : objects)
boost::asio::post(ioc_, [=, this, &backend]() { {
writeBookFlagLedger(ledgerSequence, bookShift_, backend, booksCopy); keys.insert(obj.key);
}
backend.writeKeys(keys, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< "writeKeyFlagLedger - "
<< std::to_string(nextFlag.keyIndex) << " fetched a page "
<< " cursor = "
<< (cursor.has_value() ? ripple::strHex(*cursor)
: std::string{})
<< " num keys = " << std::to_string(numKeys)
<< " fetch time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
mid - start)
.count()
<< " write time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
end - mid)
.count();
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
backend.writeKeys({zero}, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - " << std::to_string(nextFlag.keyIndex)
<< " finished. "
<< " num keys = " << std::to_string(numKeys) << " total time = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
end - begin)
.count();
}); });
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << __func__
@@ -472,7 +223,6 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
<< " starting. sequence = " << std::to_string(ledgerSequence); << " starting. sequence = " << std::to_string(ledgerSequence);
bool isFirst = false; bool isFirst = false;
auto keyIndex = getKeyIndexOfSeq(ledgerSequence); auto keyIndex = getKeyIndexOfSeq(ledgerSequence);
auto bookIndex = getBookIndexOfSeq(ledgerSequence);
if (isFirst_) if (isFirst_)
{ {
auto rng = backend.fetchLedgerRangeNoThrow(); auto rng = backend.fetchLedgerRangeNoThrow();
@@ -481,26 +231,18 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
else else
{ {
keyIndex = KeyIndex{ledgerSequence}; keyIndex = KeyIndex{ledgerSequence};
bookIndex = BookIndex{ledgerSequence};
} }
} }
{
std::unique_lock lck(mtx);
updateCv_.wait(lck, [this]() { return updatesOutstanding_ == 0; });
}
backend.writeKeys(keys, keyIndex); backend.writeKeys(keys, keyIndex);
backend.writeBooks(books, bookIndex);
if (isFirst_) if (isFirst_)
{ {
// write completion record // write completion record
ripple::uint256 zero = {}; ripple::uint256 zero = {};
backend.writeBooks({{zero, {zero}}}, bookIndex);
backend.writeKeys({zero}, keyIndex); backend.writeKeys({zero}, keyIndex);
} }
isFirst_ = false; isFirst_ = false;
keys = {}; keys = {};
books = {};
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << __func__
<< " finished. sequence = " << std::to_string(ledgerSequence); << " finished. sequence = " << std::to_string(ledgerSequence);

View File

@@ -83,70 +83,18 @@ class BackendIndexer
std::mutex mutex_; std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_; std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_; std::thread ioThread_;
std::thread updateThread_;
std::atomic_uint32_t updatesOutstanding_ = 0;
std::condition_variable updateCv_;
uint32_t keyShift_ = 20; uint32_t keyShift_ = 20;
uint32_t bookShift_ = 10;
std::unordered_set<ripple::uint256> keys; std::unordered_set<ripple::uint256> keys;
std::unordered_set<ripple::uint256> keysCumulative;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksCumulative;
bool populatingCacheAsync = false;
// These are only used when the cache is being populated asynchronously
std::unordered_set<ripple::uint256> deletedKeys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
deletedBooks;
std::unordered_set<ripple::uint256> keysRepair;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksRepair;
std::mutex mtx;
std::condition_variable cacheCv_;
mutable bool isFirst_ = true; mutable bool isFirst_ = true;
void
addKeyAsync(ripple::uint256 const& key);
void
addBookOfferAsync(
ripple::uint256 const& book,
ripple::uint256 const& offerKey);
public: public:
BackendIndexer(boost::json::object const& config); BackendIndexer(boost::json::object const& config);
~BackendIndexer(); ~BackendIndexer();
void void
populateCachesAsync(BackendInterface const& backend); addKey(ripple::uint256&& key);
void
populateCaches(BackendInterface const& backend);
void
clearCaches();
// Blocking, possibly for minutes
void
waitForCaches();
void
writeLedgerObject(
ripple::uint256&& key,
std::optional<ripple::uint256>&& book,
bool isCreated,
bool isDeleted);
void
addKey(ripple::uint256 const& key);
void
deleteKey(ripple::uint256 const& key);
void
addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey);
void
deleteBookOffer(
ripple::uint256 const& book,
ripple::uint256 const& offerKey);
void void
finish(uint32_t ledgerSequence, BackendInterface const& backend); finish(uint32_t ledgerSequence, BackendInterface const& backend);
@@ -155,22 +103,9 @@ public:
uint32_t ledgerSequence, uint32_t ledgerSequence,
BackendInterface const& backend); BackendInterface const& backend);
void void
writeBookFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend);
void
doKeysRepair( doKeysRepair(
BackendInterface const& backend, BackendInterface const& backend,
std::optional<uint32_t> sequence); std::optional<uint32_t> sequence);
void
doBooksRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence);
uint32_t
getBookShift()
{
return bookShift_;
}
uint32_t uint32_t
getKeyShift() getKeyShift()
{ {
@@ -191,24 +126,6 @@ public:
{ {
return (ledgerSequence % (1 << keyShift_)) == 0; return (ledgerSequence % (1 << keyShift_)) == 0;
} }
BookIndex
getBookIndexOfSeq(uint32_t seq) const
{
if (isBookFlagLedger(seq))
return BookIndex{seq};
auto incr = (1 << bookShift_);
BookIndex index{(seq >> bookShift_ << bookShift_) + incr};
assert(isBookFlagLedger(index.bookIndex));
assert(
bookShift_ == keyShift_ || !isKeyFlagLedger(index.bookIndex) ||
!isKeyFlagLedger(index.bookIndex + incr));
return index;
}
bool
isBookFlagLedger(uint32_t ledgerSequence) const
{
return (ledgerSequence % (1 << bookShift_)) == 0;
}
}; };
class BackendInterface class BackendInterface
@@ -241,18 +158,6 @@ public:
return KeyIndex{seq}; return KeyIndex{seq};
return indexer_.getKeyIndexOfSeq(seq); return indexer_.getKeyIndexOfSeq(seq);
} }
std::optional<BookIndex>
getBookIndexOfSeq(uint32_t seq) const
{
if (indexer_.isBookFlagLedger(seq))
return BookIndex{seq};
auto rng = fetchLedgerRange();
if (!rng)
return {};
if (rng->minSequence == seq)
return BookIndex{seq};
return indexer_.getBookIndexOfSeq(seq);
}
bool bool
finishWrites(uint32_t ledgerSequence) const finishWrites(uint32_t ledgerSequence) const
@@ -266,9 +171,8 @@ public:
auto rng = fetchLedgerRangeNoThrow(); auto rng = fetchLedgerRangeNoThrow();
if (rng && rng->minSequence != ledgerSequence) if (rng && rng->minSequence != ledgerSequence)
isFirst_ = false; isFirst_ = false;
indexer_.doKeysRepair(*this, ledgerSequence);
} }
if (indexer_.isBookFlagLedger(ledgerSequence) || isFirst_)
indexer_.writeBookFlagLedgerAsync(ledgerSequence, *this);
if (indexer_.isKeyFlagLedger(ledgerSequence) || isFirst_) if (indexer_.isKeyFlagLedger(ledgerSequence) || isFirst_)
indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this); indexer_.writeKeyFlagLedgerAsync(ledgerSequence, *this);
isFirst_ = false; isFirst_ = false;
@@ -315,8 +219,76 @@ public:
virtual std::vector<ripple::uint256> virtual std::vector<ripple::uint256>
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const = 0; fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const = 0;
virtual LedgerPage LedgerPage
fetchLedgerPage( fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
assert(limit != 0);
bool incomplete = false;
{
auto check = doFetchLedgerPage({}, ledgerSequence, 1);
incomplete = check.warning.has_value();
}
uint32_t adjustedLimit = limit;
LedgerPage page;
page.cursor = cursor;
do
{
adjustedLimit = adjustedLimit > 2048 ? 2048 : adjustedLimit * 2;
auto partial =
doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit);
page.objects.insert(
page.objects.end(),
partial.objects.begin(),
partial.objects.end());
page.cursor = partial.cursor;
} while (page.objects.size() < limit && page.cursor);
if (incomplete)
{
std::cout << "checking lower" << std::endl;
uint32_t lowerSequence = ledgerSequence >> indexer_.getKeyShift()
<< indexer_.getKeyShift();
auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit);
std::vector<ripple::uint256> keys;
std::transform(
std::move_iterator(lowerPage.objects.begin()),
std::move_iterator(lowerPage.objects.end()),
std::back_inserter(keys),
[](auto&& elt) { return std::move(elt.key); });
auto objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < keys.size(); ++i)
{
auto& obj = objs[i];
auto& key = keys[i];
if (obj.size())
page.objects.push_back({std::move(key), std::move(obj)});
}
std::sort(
page.objects.begin(), page.objects.end(), [](auto a, auto b) {
return a.key < b.key;
});
page.warning = "Data may be incomplete";
}
if (page.objects.size() >= limit)
{
page.objects.resize(limit);
page.cursor = page.objects.back().key;
}
return page;
}
std::optional<LedgerObject>
fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence)
{
auto page = fetchLedgerPage({++key}, ledgerSequence, 1);
if (page.objects.size())
return page.objects[0];
return {};
}
virtual LedgerPage
doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const = 0; std::uint32_t limit) const = 0;
@@ -363,8 +335,7 @@ public:
std::optional<ripple::uint256>&& book) const std::optional<ripple::uint256>&& book) const
{ {
ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); ripple::uint256 key256 = ripple::uint256::fromVoid(key.data());
indexer_.writeLedgerObject( indexer_.addKey(std::move(key256));
std::move(key256), std::move(book), isCreated, isDeleted);
doWriteLedgerObject( doWriteLedgerObject(
std::move(key), std::move(key),
seq, seq,
@@ -418,13 +389,6 @@ public:
std::unordered_set<ripple::uint256> const& keys, std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index, KeyIndex const& index,
bool isAsync = false) const = 0; bool isAsync = false) const = 0;
virtual bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
BookIndex const& index,
bool isAsync = false) const = 0;
virtual ~BackendInterface() virtual ~BackendInterface()
{ {

View File

@@ -394,7 +394,7 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
return objects; return objects;
} }
LedgerPage LedgerPage
CassandraBackend::fetchLedgerPage( CassandraBackend::doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const std::uint32_t limit) const
@@ -433,7 +433,7 @@ CassandraBackend::fetchLedgerPage(
if (keys.size() && keys.size() == limit) if (keys.size() && keys.size() == limit)
{ {
page.cursor = keys.back(); page.cursor = keys.back();
keys.pop_back(); ++(*page.cursor);
} }
auto objects = fetchLedgerObjects(keys, ledgerSequence); auto objects = fetchLedgerObjects(keys, ledgerSequence);
if (objects.size() != keys.size()) if (objects.size() != keys.size())
@@ -501,124 +501,7 @@ CassandraBackend::fetchBookOffers(
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const std::optional<ripple::uint256> const& cursor) const
{ {
auto rng = fetchLedgerRange(); return {};
auto limitTuningFactor = 50;
if (!rng)
return {{}, {}};
auto readBooks =
[this, &book, &limit, &limitTuningFactor](std::uint32_t sequence)
-> std::pair<
bool,
std::vector<std::pair<std::uint64_t, ripple::uint256>>> {
CassandraStatement completeQuery{completeBook_};
completeQuery.bindInt(sequence);
CassandraResult completeResult = executeSyncRead(completeQuery);
bool complete = completeResult.hasResult();
CassandraStatement statement{selectBook_};
std::vector<std::pair<std::uint64_t, ripple::uint256>> keys = {};
statement.bindBytes(book.data(), 24);
statement.bindInt(sequence);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " upper = " << std::to_string(sequence) << " book = "
<< ripple::strHex(std::string((char*)book.data(), 24));
ripple::uint256 zero = beast::zero;
statement.bindBytes(zero.data(), 8);
statement.bindBytes(zero);
statement.bindUInt(limit * limitTuningFactor);
auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info) << "Book directory fetch took "
<< std::to_string(duration) << " seconds.";
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
if (!result)
{
return {false, {{}, {}}};
}
do
{
auto [quality, index] = result.getBytesTuple();
std::uint64_t q = 0;
memcpy(&q, quality.data(), 8);
keys.push_back({q, ripple::uint256::fromVoid(index.data())});
} while (result.nextRow());
return {complete, keys};
};
auto upper = getBookIndexOfSeq(ledgerSequence);
auto [complete, quality_keys] = readBooks(upper->bookIndex);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << quality_keys.size();
std::optional<std::string> warning = {};
if (!complete)
{
warning = "Data may be incomplete";
BOOST_LOG_TRIVIAL(info) << "May be incomplete. Fetching other page";
auto bookShift = indexer_.getBookShift();
std::uint32_t lower = upper->bookIndex - (1 << bookShift);
auto originalKeys = std::move(quality_keys);
auto [lowerComplete, otherKeys] = readBooks(lower);
assert(lowerComplete);
std::vector<std::pair<std::uint64_t, ripple::uint256>> merged_keys;
merged_keys.reserve(originalKeys.size() + otherKeys.size());
std::merge(
originalKeys.begin(),
originalKeys.end(),
otherKeys.begin(),
otherKeys.end(),
std::back_inserter(merged_keys),
[](auto pair1, auto pair2) { return pair1.first < pair2.first; });
}
std::vector<ripple::uint256> merged(quality_keys.size());
std::transform(
quality_keys.begin(),
quality_keys.end(),
std::back_inserter(merged),
[](auto pair) { return pair.second; });
auto uniqEnd = std::unique(merged.begin(), merged.end());
std::vector<ripple::uint256> keys{merged.begin(), uniqEnd};
std::cout << keys.size() << std::endl;
auto start = std::chrono::system_clock::now();
std::vector<Blob> objs = fetchLedgerObjects(keys, ledgerSequence);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< "Book object fetch took " << std::to_string(duration) << " seconds.";
std::vector<LedgerObject> results;
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size() != 0)
results.push_back({keys[i], objs[i]});
}
return {results, {}, warning};
} // namespace Backend } // namespace Backend
struct WriteBookCallbackData struct WriteBookCallbackData
{ {
@@ -907,57 +790,6 @@ CassandraBackend::writeKeys(
return true; return true;
} }
bool
CassandraBackend::writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
BookIndex const& index,
bool isAsync) const
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger = " << std::to_string(index.bookIndex)
<< " . num books = " << std::to_string(books.size());
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<WriteBookCallbackData>> cbs;
uint32_t concurrentLimit =
isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding;
std::atomic_uint32_t numOutstanding = 0;
size_t count = 0;
auto start = std::chrono::system_clock::now();
for (auto& book : books)
{
for (auto& offer : book.second)
{
++numOutstanding;
++count;
cbs.push_back(std::make_shared<WriteBookCallbackData>(
*this,
book.first,
offer,
index.bookIndex,
cv,
mtx,
numOutstanding));
writeBook(*cbs.back());
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numOutstanding, concurrentLimit]() {
return numOutstanding < concurrentLimit;
});
}
}
BOOST_LOG_TRIVIAL(info) << __func__
<< "Submitted all book writes. Waiting for them to "
"finish. num submitted = "
<< std::to_string(count);
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
BOOST_LOG_TRIVIAL(info) << __func__ << "Finished writing books";
return true;
}
bool bool
CassandraBackend::isIndexed(uint32_t ledgerSequence) const CassandraBackend::isIndexed(uint32_t ledgerSequence) const
{ {
@@ -1445,18 +1277,17 @@ CassandraBackend::open(bool readOnly)
cass_cluster_set_connect_timeout(cluster, 10000); cass_cluster_set_connect_timeout(cluster, 10000);
int ttl = getInt("ttl") ? *getInt("ttl") * 2 : 0; int ttl = getInt("ttl") ? *getInt("ttl") * 2 : 0;
int keysTtl, int keysTtl = (ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0);
keysIncr = ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0; int incr = keysTtl;
while (keysTtl < ttl) while (keysTtl < ttl)
{ {
keysTtl += keysIncr; keysTtl += incr;
}
int booksTtl,
booksIncr = ttl != 0 ? pow(2, indexer_.getBookShift()) * 4 * 2 : 0;
while (booksTtl < ttl)
{
booksTtl += booksIncr;
} }
int booksTtl = 0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " setting ttl to " << std::to_string(ttl)
<< " , books ttl to " << std::to_string(booksTtl) << " , keys ttl to "
<< std::to_string(keysTtl);
auto executeSimpleStatement = [this](std::string const& query) { auto executeSimpleStatement = [this](std::string const& query) {
CassStatement* statement = makeStatement(query.c_str(), 0); CassStatement* statement = makeStatement(query.c_str(), 0);
@@ -1529,7 +1360,7 @@ CassandraBackend::open(bool readOnly)
<< " ( key blob, sequence bigint, object blob, PRIMARY " << " ( key blob, sequence bigint, object blob, PRIMARY "
"KEY(key, " "KEY(key, "
"sequence)) WITH CLUSTERING ORDER BY (sequence DESC) AND" "sequence)) WITH CLUSTERING ORDER BY (sequence DESC) AND"
<< " default_time_to_live = " << ttl; << " default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1544,7 +1375,7 @@ CassandraBackend::open(bool readOnly)
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, " << " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
"transaction " "transaction "
"blob, metadata blob)" "blob, metadata blob)"
<< " WITH default_time_to_live = " << ttl; << " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1571,7 +1402,7 @@ CassandraBackend::open(bool readOnly)
<< " ( sequence bigint, key blob, PRIMARY KEY " << " ( sequence bigint, key blob, PRIMARY KEY "
"(sequence, key))" "(sequence, key))"
" WITH default_time_to_live = " " WITH default_time_to_live = "
<< keysTtl; << std::to_string(keysTtl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1582,28 +1413,13 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
<< " ( book blob, sequence bigint, quality_key tuple<blob, "
"blob>, PRIMARY KEY "
"((book, sequence), quality_key)) WITH CLUSTERING ORDER BY "
"(quality_key "
"ASC) AND default_time_to_live = "
<< booksTtl;
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "books"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, " << " ( account blob, seq_idx tuple<bigint, bigint>, "
" hash blob, " " hash blob, "
"PRIMARY KEY " "PRIMARY KEY "
"(account, seq_idx)) WITH " "(account, seq_idx)) WITH "
"CLUSTERING ORDER BY (seq_idx desc)" "CLUSTERING ORDER BY (seq_idx desc)"
<< " AND default_time_to_live = " << ttl; << " AND default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1617,7 +1433,7 @@ CassandraBackend::open(bool readOnly)
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers"
<< " ( sequence bigint PRIMARY KEY, header blob )" << " ( sequence bigint PRIMARY KEY, header blob )"
<< " WITH default_time_to_live = " << ttl; << " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1630,7 +1446,7 @@ CassandraBackend::open(bool readOnly)
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes"
<< " (hash blob PRIMARY KEY, sequence bigint)" << " (hash blob PRIMARY KEY, sequence bigint)"
<< " WITH default_time_to_live = " << ttl; << " WITH default_time_to_live = " << std::to_string(ttl);
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1680,13 +1496,6 @@ CassandraBackend::open(bool readOnly)
if (!insertKey_.prepareStatement(query, session_.get())) if (!insertKey_.prepareStatement(query, session_.get()))
continue; continue;
query.str("");
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, sequence, quality_key) VALUES (?, ?, (?, ?))";
if (!insertBook2_.prepareStatement(query, session_.get()))
continue;
query.str("");
query.str(""); query.str("");
query << "SELECT key FROM " << tablePrefix << "keys" query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?"; << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?";
@@ -1755,23 +1564,6 @@ CassandraBackend::open(bool readOnly)
if (!getToken_.prepareStatement(query, session_.get())) if (!getToken_.prepareStatement(query, session_.get()))
continue; continue;
query.str("");
query << "SELECT quality_key FROM " << tablePrefix << "books "
<< " WHERE book = ? AND sequence = ?"
<< " AND quality_key >= (?, ?)"
" ORDER BY quality_key ASC "
" LIMIT ?";
if (!selectBook_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << "SELECT * FROM " << tablePrefix << "books "
<< "WHERE book = "
<< "0x000000000000000000000000000000000000000000000000"
<< " AND sequence = ?";
if (!completeBook_.prepareStatement(query, session_.get()))
continue;
query.str(""); query.str("");
query << " INSERT INTO " << tablePrefix << "account_tx" query << " INSERT INTO " << tablePrefix << "account_tx"
<< " (account, seq_idx, hash) " << " (account, seq_idx, hash) "

View File

@@ -957,7 +957,7 @@ public:
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result) if (!result)
{ {
BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; BOOST_LOG_TRIVIAL(debug) << __func__ << " - no rows";
return {}; return {};
} }
return result.getBytes(); return result.getBytes();
@@ -997,7 +997,7 @@ public:
return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; return {{result.getBytes(), result.getBytes(), result.getUInt32()}};
} }
LedgerPage LedgerPage
fetchLedgerPage( doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const override; std::uint32_t limit) const override;
@@ -1019,13 +1019,6 @@ public:
std::unordered_set<ripple::uint256> const& keys, std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index, KeyIndex const& index,
bool isAsync = false) const override; bool isAsync = false) const override;
bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
BookIndex const& index,
bool isAsync = false) const override;
BookOffersPage BookOffersPage
fetchBookOffers( fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,

View File

@@ -324,7 +324,7 @@ PostgresBackend::fetchAllTransactionHashesInLedger(
} }
LedgerPage LedgerPage
PostgresBackend::fetchLedgerPage( PostgresBackend::doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const std::uint32_t limit) const
@@ -338,7 +338,7 @@ PostgresBackend::fetchLedgerPage(
sql << "SELECT key FROM keys WHERE ledger_seq = " sql << "SELECT key FROM keys WHERE ledger_seq = "
<< std::to_string(index->keyIndex); << std::to_string(index->keyIndex);
if (cursor) if (cursor)
sql << " AND key > \'\\x" << ripple::strHex(*cursor) << "\'"; sql << " AND key >= \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key ASC LIMIT " << std::to_string(limit); sql << " ORDER BY key ASC LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
@@ -352,7 +352,10 @@ PostgresBackend::fetchLedgerPage(
keys.push_back({res.asUInt256(i, 0)}); keys.push_back({res.asUInt256(i, 0)});
} }
if (numRows == limit) if (numRows == limit)
{
returnCursor = keys.back(); returnCursor = keys.back();
++(*returnCursor);
}
auto objs = fetchLedgerObjects(keys, ledgerSequence); auto objs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results; std::vector<LedgerObject> results;
@@ -379,155 +382,7 @@ PostgresBackend::fetchBookOffers(
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const std::optional<ripple::uint256> const& cursor) const
{ {
auto rng = fetchLedgerRange(); return {};
auto limitTuningFactor = 50;
if (!rng)
return {{}, {}};
ripple::uint256 bookBase =
ripple::keylet::quality({ripple::ltDIR_NODE, book}, 0).key;
ripple::uint256 bookEnd = ripple::getQualityNext(bookBase);
using bookKeyPair = std::pair<ripple::uint256, ripple::uint256>;
auto getBooks = [this, &bookBase, &bookEnd, &limit, &limitTuningFactor](
std::uint32_t sequence)
-> std::pair<bool, std::vector<bookKeyPair>> {
BOOST_LOG_TRIVIAL(info) << __func__ << ": Fetching books between "
<< "0x" << ripple::strHex(bookBase) << " and "
<< "0x" << ripple::strHex(bookEnd)
<< "at ledger " << std::to_string(sequence);
auto start = std::chrono::system_clock::now();
std::stringstream sql;
sql << "SELECT COUNT(*) FROM books WHERE "
<< "book = \'\\x" << ripple::strHex(ripple::uint256(beast::zero))
<< "\' AND ledger_seq = " << std::to_string(sequence);
bool complete;
PgQuery pgQuery(this->pgPool_);
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1))
complete = res.asInt(0, 0) != 0;
else
return {false, {}};
sql.str("");
sql << "SELECT book, offer_key FROM books "
<< "WHERE ledger_seq = " << std::to_string(sequence)
<< " AND book >= "
<< "\'\\x" << ripple::strHex(bookBase) << "\' "
<< "AND book < "
<< "\'\\x" << ripple::strHex(bookEnd) << "\' "
<< "ORDER BY book ASC "
<< "LIMIT " << std::to_string(limit * limitTuningFactor);
BOOST_LOG_TRIVIAL(debug) << sql.str();
res = pgQuery(sql.str().data());
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info) << "Postgres book key fetch took "
<< std::to_string(duration) << " seconds";
if (size_t numRows = checkResult(res, 2))
{
std::vector<bookKeyPair> results(numRows);
for (size_t i = 0; i < numRows; ++i)
{
auto book = res.asUInt256(i, 0);
auto key = res.asUInt256(i, 1);
results.push_back({std::move(book), std::move(key)});
}
return {complete, results};
}
return {complete, {}};
};
auto fetchObjects =
[this](
std::vector<bookKeyPair> const& pairs,
std::uint32_t sequence,
std::uint32_t limit,
std::optional<std::string> warning) -> BookOffersPage {
std::vector<ripple::uint256> allKeys(pairs.size());
for (auto const& pair : pairs)
allKeys.push_back(pair.second);
auto uniqEnd = std::unique(allKeys.begin(), allKeys.end());
std::vector<ripple::uint256> keys{allKeys.begin(), uniqEnd};
auto start = std::chrono::system_clock::now();
auto ledgerEntries = fetchLedgerObjects(keys, sequence);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< "Postgres book objects fetch took " << std::to_string(duration)
<< " seconds. "
<< "Fetched " << std::to_string(ledgerEntries.size())
<< " ledger entries";
std::vector<LedgerObject> objects;
for (auto i = 0; i < ledgerEntries.size(); ++i)
{
if (ledgerEntries[i].size() != 0)
objects.push_back(LedgerObject{keys[i], ledgerEntries[i]});
}
return {objects, {}, warning};
};
std::uint32_t bookShift = indexer_.getBookShift();
auto upper = getBookIndexOfSeq(ledgerSequence);
auto [upperComplete, upperResults] = getBooks(upper->bookIndex);
BOOST_LOG_TRIVIAL(info) << __func__ << ": Upper results found "
<< upperResults.size() << " books.";
if (upperComplete)
{
BOOST_LOG_TRIVIAL(info) << "Upper book page is complete";
return fetchObjects(upperResults, ledgerSequence, limit, {});
}
BOOST_LOG_TRIVIAL(info) << "Upper book page is not complete "
<< "fetching again";
auto lower = upper->bookIndex - (1 << bookShift);
if (lower < rng->minSequence)
lower = rng->minSequence;
auto [lowerComplete, lowerResults] = getBooks(lower);
BOOST_LOG_TRIVIAL(info) << __func__ << ": Lower results found "
<< lowerResults.size() << " books.";
assert(lowerComplete);
std::vector<bookKeyPair> pairs;
pairs.reserve(upperResults.size() + lowerResults.size());
std::merge(
upperResults.begin(),
upperResults.end(),
lowerResults.begin(),
lowerResults.end(),
std::back_inserter(pairs),
[](bookKeyPair pair1, bookKeyPair pair2) -> bool {
return pair1.first < pair2.first;
});
std::optional<std::string> warning = "book data may be incomplete";
return fetchObjects(pairs, ledgerSequence, limit, warning);
} }
std::vector<TransactionAndMetadata> std::vector<TransactionAndMetadata>
@@ -841,8 +696,6 @@ PostgresBackend::writeKeys(
std::stringstream temp; std::stringstream temp;
buffer.swap(temp); buffer.swap(temp);
numRows = 0; numRows = 0;
if (isAsync)
std::this_thread::sleep_for(std::chrono::seconds(1));
} }
} }
if (isAsync) if (isAsync)
@@ -854,54 +707,6 @@ PostgresBackend::writeKeys(
return true; return true;
} }
bool bool
PostgresBackend::writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
BookIndex const& index,
bool isAsync) const
{
if (abortWrite_)
return false;
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_);
PgQuery& conn = isAsync ? pgQuery : writeConnection_;
std::stringstream asyncBuffer;
std::stringstream& buffer = isAsync ? asyncBuffer : booksBuffer_;
if (isAsync)
conn("BEGIN");
size_t numRows = 0;
for (auto& book : books)
{
for (auto& offer : book.second)
{
buffer << std::to_string(index.bookIndex) << '\t' << "\\\\x"
<< ripple::strHex(book.first) << '\t' << "\\\\x"
<< ripple::strHex(offer) << '\n';
numRows++;
// If the buffer gets too large, the insert fails. Not sure why.
// When writing in the background, we insert after every 10 rows
if ((isAsync && numRows == 1000) || numRows == 100000)
{
conn.bulkInsert("books", buffer.str());
std::stringstream temp;
buffer.swap(temp);
numRows = 0;
if (isAsync)
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
if (isAsync)
{
if (numRows > 0)
conn.bulkInsert("books", buffer.str());
conn("COMMIT");
}
return true;
}
bool
PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
{ {
auto rng = fetchLedgerRangeNoThrow(); auto rng = fetchLedgerRangeNoThrow();

View File

@@ -16,7 +16,7 @@ private:
std::shared_ptr<PgPool> pgPool_; std::shared_ptr<PgPool> pgPool_;
mutable PgQuery writeConnection_; mutable PgQuery writeConnection_;
mutable bool abortWrite_ = false; mutable bool abortWrite_ = false;
mutable boost::asio::thread_pool pool_{200}; mutable boost::asio::thread_pool pool_{16};
uint32_t writeInterval_ = 1000000; uint32_t writeInterval_ = 1000000;
public: public:
@@ -46,7 +46,7 @@ public:
fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override; fetchAllTransactionHashesInLedger(uint32_t ledgerSequence) const override;
LedgerPage LedgerPage
fetchLedgerPage( doFetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const override; std::uint32_t limit) const override;
@@ -120,13 +120,6 @@ public:
std::unordered_set<ripple::uint256> const& keys, std::unordered_set<ripple::uint256> const& keys,
KeyIndex const& index, KeyIndex const& index,
bool isAsync = false) const override; bool isAsync = false) const override;
bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
BookIndex const& index,
bool isAsync = false) const override;
}; };
} // namespace Backend } // namespace Backend
#endif #endif

View File

@@ -375,7 +375,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches"; << "Populating caches";
flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_);
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches"; << "Populated caches";
@@ -541,7 +540,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence << "Extracted and wrote " << *lastPublishedSequence - startSequence
<< " in " << ((end - begin).count()) / 1000000000.0; << " in " << ((end - begin).count()) / 1000000000.0;
writing_ = false; writing_ = false;
flatMapBackend_->getIndexer().clearCaches();
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Stopping etl pipeline"; << "Stopping etl pipeline";

727
unittests/main.cpp Normal file
View File

@@ -0,0 +1,727 @@
#include <algorithm>
#include <gtest/gtest.h>
#include <handlers/RPCHelpers.h>
#include <reporting/DBHelpers.h>
#include <boost/log/core.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/trivial.hpp>
#include <reporting/BackendFactory.h>
#include <reporting/BackendInterface.h>
// Demonstrate some basic assertions.
TEST(BackendTest, Basic)
{
boost::log::core::get()->set_filter(
boost::log::trivial::severity >= boost::log::trivial::warning);
std::string keyspace =
"oceand_test_" +
std::to_string(
std::chrono::system_clock::now().time_since_epoch().count());
boost::json::object cassandraConfig{
{"database",
{{"type", "cassandra"},
{"cassandra",
{{"contact_points", "127.0.0.1"},
{"port", 9042},
{"keyspace", keyspace.c_str()},
{"replication_factor", 1},
{"table_prefix", ""},
{"max_requests_outstanding", 1000},
{"indexer_key_shift", 2},
{"threads", 8}}}}}};
boost::json::object postgresConfig{
{"database",
{{"type", "postgres"},
{"postgres",
{{"contact_point", "127.0.0.1"},
{"username", "postgres"},
{"database", keyspace.c_str()},
{"password", "postgres"},
{"indexer_key_shift", 2},
{"threads", 8}}}}}};
std::vector<boost::json::object> configs = {
cassandraConfig, postgresConfig};
for (auto& config : configs)
{
std::cout << keyspace << std::endl;
auto backend = Backend::makeBackend(config);
backend->open(false);
std::string rawHeader =
"03C3141A01633CD656F91B4EBB5EB89B791BD34DBC8A04BB6F407C5335BC54351E"
"DD73"
"3898497E809E04074D14D271E4832D7888754F9230800761563A292FA2315A6DB6"
"FE30"
"CC5909B285080FCD6773CC883F9FE0EE4D439340AC592AADB973ED3CF53E2232B3"
"3EF5"
"7CECAC2816E3122816E31A0A00F8377CD95DFA484CFAE282656A58CE5AA29652EF"
"FD80"
"AC59CD91416E4E13DBBE";
auto hexStringToBinaryString = [](auto const& hex) {
auto blob = ripple::strUnHex(hex);
std::string strBlob;
for (auto c : *blob)
{
strBlob += c;
}
return strBlob;
};
auto binaryStringToUint256 = [](auto const& bin) -> ripple::uint256 {
ripple::uint256 uint;
return uint.fromVoid((void const*)bin.data());
};
auto ledgerInfoToBinaryString = [](auto const& info) {
auto blob = ledgerInfoToBlob(info);
std::string strBlob;
for (auto c : blob)
{
strBlob += c;
}
return strBlob;
};
std::string rawHeaderBlob = hexStringToBinaryString(rawHeader);
ripple::LedgerInfo lgrInfo =
deserializeHeader(ripple::makeSlice(rawHeaderBlob));
backend->startWrites();
backend->writeLedger(lgrInfo, std::move(rawHeaderBlob), true);
ASSERT_TRUE(backend->finishWrites(lgrInfo.seq));
{
auto rng = backend->fetchLedgerRange();
EXPECT_TRUE(rng.has_value());
EXPECT_EQ(rng->minSequence, rng->maxSequence);
EXPECT_EQ(rng->maxSequence, lgrInfo.seq);
}
{
auto seq = backend->fetchLatestLedgerSequence();
EXPECT_TRUE(seq.has_value());
EXPECT_EQ(*seq, lgrInfo.seq);
}
{
auto retLgr = backend->fetchLedgerBySequence(lgrInfo.seq);
EXPECT_TRUE(retLgr.has_value());
EXPECT_EQ(retLgr->seq, lgrInfo.seq);
EXPECT_EQ(ledgerInfoToBlob(lgrInfo), ledgerInfoToBlob(*retLgr));
}
EXPECT_FALSE(
backend->fetchLedgerBySequence(lgrInfo.seq + 1).has_value());
auto lgrInfoOld = lgrInfo;
auto lgrInfoNext = lgrInfo;
lgrInfoNext.seq = lgrInfo.seq + 1;
lgrInfoNext.parentHash = lgrInfo.hash;
lgrInfoNext.hash++;
lgrInfoNext.accountHash = ~lgrInfo.accountHash;
{
std::string rawHeaderBlob = ledgerInfoToBinaryString(lgrInfoNext);
backend->startWrites();
backend->writeLedger(lgrInfoNext, std::move(rawHeaderBlob));
ASSERT_TRUE(backend->finishWrites(lgrInfoNext.seq));
}
{
auto rng = backend->fetchLedgerRange();
EXPECT_TRUE(rng.has_value());
EXPECT_EQ(rng->minSequence, lgrInfoOld.seq);
EXPECT_EQ(rng->maxSequence, lgrInfoNext.seq);
}
{
auto seq = backend->fetchLatestLedgerSequence();
EXPECT_EQ(seq, lgrInfoNext.seq);
}
{
auto retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq);
EXPECT_TRUE(retLgr.has_value());
EXPECT_EQ(retLgr->seq, lgrInfoNext.seq);
EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext));
EXPECT_NE(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoOld));
retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq - 1);
EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoOld));
EXPECT_NE(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext));
retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq - 2);
EXPECT_FALSE(backend->fetchLedgerBySequence(lgrInfoNext.seq - 2)
.has_value());
auto txns = backend->fetchAllTransactionsInLedger(lgrInfoNext.seq);
EXPECT_EQ(txns.size(), 0);
auto hashes =
backend->fetchAllTransactionHashesInLedger(lgrInfoNext.seq);
EXPECT_EQ(hashes.size(), 0);
}
// the below dummy data is not expected to be consistent. The metadata
// string does represent valid metadata. Don't assume though that the
// transaction or its hash correspond to the metadata, or anything like
// that. These tests are purely binary tests to make sure the same data
// that goes in, comes back out
std::string metaHex =
"201C0000001AF8E411006F560A3E08122A05AC91DEFA87052B0554E4A29B46"
"3A27642EBB060B6052196592EEE72200000000240480FDB52503CE1A863300"
"000000000000003400000000000000005529983CBAED30F547471452921C3C"
"6B9F9685F292F6291000EED0A44413AF18C250101AC09600F4B502C8F7F830"
"F80B616DCB6F3970CB79AB70975A05ED5B66860B9564400000001FE217CB65"
"D54B640B31521B05000000000000000000000000434E5900000000000360E3"
"E0751BD9A566CD03FA6CAFC78118B82BA081142252F328CF91263417762570"
"D67220CCB33B1370E1E1E3110064561AC09600F4B502C8F7F830F80B616DCB"
"6F3970CB79AB70975A05ED33DF783681E8365A05ED33DF783681581AC09600"
"F4B502C8F7F830F80B616DCB6F3970CB79AB70975A05ED33DF783681031100"
"0000000000000000000000434E59000000000004110360E3E0751BD9A566CD"
"03FA6CAFC78118B82BA0E1E1E4110064561AC09600F4B502C8F7F830F80B61"
"6DCB6F3970CB79AB70975A05ED5B66860B95E72200000000365A05ED5B6686"
"0B95581AC09600F4B502C8F7F830F80B616DCB6F3970CB79AB70975A05ED5B"
"66860B95011100000000000000000000000000000000000000000211000000"
"00000000000000000000000000000000000311000000000000000000000000"
"434E59000000000004110360E3E0751BD9A566CD03FA6CAFC78118B82BA0E1"
"E1E311006F5647B05E66DE9F3DF2689E8F4CE6126D3136B6C5E79587F9D24B"
"D71A952B0852BAE8240480FDB950101AC09600F4B502C8F7F830F80B616DCB"
"6F3970CB79AB70975A05ED33DF78368164400000033C83A95F65D59D9A6291"
"9C2D18000000000000000000000000434E5900000000000360E3E0751BD9A5"
"66CD03FA6CAFC78118B82BA081142252F328CF91263417762570D67220CCB3"
"3B1370E1E1E511006456AEA3074F10FE15DAC592F8A0405C61FB7D4C98F588"
"C2D55C84718FAFBBD2604AE722000000003100000000000000003200000000"
"0000000058AEA3074F10FE15DAC592F8A0405C61FB7D4C98F588C2D55C8471"
"8FAFBBD2604A82142252F328CF91263417762570D67220CCB33B1370E1E1E5"
"1100612503CE1A8755CE935137F8C6C8DEF26B5CD93BE18105CA83F65E1E90"
"CEC546F562D25957DC0856E0311EB450B6177F969B94DBDDA83E99B7A0576A"
"CD9079573876F16C0C004F06E6240480FDB9624000000005FF0E2BE1E72200"
"000000240480FDBA2D00000005624000000005FF0E1F81142252F328CF9126"
"3417762570D67220CCB33B1370E1E1F1031000";
std::string txnHex =
"1200072200000000240480FDB920190480FDB5201B03CE1A8964400000033C"
"83A95F65D59D9A62919C2D18000000000000000000000000434E5900000000"
"000360E3E0751BD9A566CD03FA6CAFC78118B82BA068400000000000000C73"
"21022D40673B44C82DEE1DDB8B9BB53DCCE4F97B27404DB850F068DD91D685"
"E337EA7446304402202EA6B702B48B39F2197112382838F92D4C02948E9911"
"FE6B2DEBCF9183A426BC022005DAC06CD4517E86C2548A80996019F3AC60A0"
"9EED153BF60C992930D68F09F981142252F328CF91263417762570D67220CC"
"B33B1370";
std::string hashHex =
"0A81FB3D6324C2DCF73131505C6E4DC67981D7FC39F5E9574CEC4B1F22D28BF7";
// this account is not related to the above transaction and metadata
std::string accountHex =
"1100612200000000240480FDBC2503CE1A872D0000000555516931B2AD018EFFBE"
"17C5"
"C9DCCF872F36837C2C6136ACF80F2A24079CF81FD0624000000005FF0E07811422"
"52F3"
"28CF91263417762570D67220CCB33B1370";
std::string accountIndexHex =
"E0311EB450B6177F969B94DBDDA83E99B7A0576ACD9079573876F16C0C004F06";
std::string metaBlob = hexStringToBinaryString(metaHex);
std::string txnBlob = hexStringToBinaryString(txnHex);
std::string hashBlob = hexStringToBinaryString(hashHex);
std::string accountBlob = hexStringToBinaryString(accountHex);
std::string accountIndexBlob = hexStringToBinaryString(accountIndexHex);
std::vector<ripple::AccountID> affectedAccounts;
{
backend->startWrites();
lgrInfoNext.seq = lgrInfoNext.seq + 1;
lgrInfoNext.txHash = ~lgrInfo.txHash;
lgrInfoNext.accountHash =
lgrInfoNext.accountHash ^ lgrInfoNext.txHash;
lgrInfoNext.parentHash = lgrInfoNext.hash;
lgrInfoNext.hash++;
ripple::uint256 hash256;
EXPECT_TRUE(hash256.parseHex(hashHex));
ripple::TxMeta txMeta{hash256, lgrInfoNext.seq, metaBlob};
auto journal = ripple::debugLog();
auto accountsSet = txMeta.getAffectedAccounts(journal);
for (auto& a : accountsSet)
{
affectedAccounts.push_back(a);
}
std::vector<AccountTransactionsData> accountTxData;
accountTxData.emplace_back(txMeta, hash256, journal);
backend->writeLedger(
lgrInfoNext, std::move(ledgerInfoToBinaryString(lgrInfoNext)));
backend->writeTransaction(
std::move(std::string{hashBlob}),
lgrInfoNext.seq,
std::move(std::string{txnBlob}),
std::move(std::string{metaBlob}));
backend->writeAccountTransactions(std::move(accountTxData));
backend->writeLedgerObject(
std::move(std::string{accountIndexBlob}),
lgrInfoNext.seq,
std::move(std::string{accountBlob}),
true,
false,
{});
ASSERT_TRUE(backend->finishWrites(lgrInfoNext.seq));
}
{
auto rng = backend->fetchLedgerRange();
EXPECT_TRUE(rng);
EXPECT_EQ(rng->minSequence, lgrInfoOld.seq);
EXPECT_EQ(rng->maxSequence, lgrInfoNext.seq);
auto retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq);
EXPECT_TRUE(retLgr);
EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext));
auto txns = backend->fetchAllTransactionsInLedger(lgrInfoNext.seq);
EXPECT_EQ(txns.size(), 1);
EXPECT_STREQ(
(const char*)txns[0].transaction.data(),
(const char*)txnBlob.data());
EXPECT_STREQ(
(const char*)txns[0].metadata.data(),
(const char*)metaBlob.data());
auto hashes =
backend->fetchAllTransactionHashesInLedger(lgrInfoNext.seq);
EXPECT_EQ(hashes.size(), 1);
EXPECT_EQ(ripple::strHex(hashes[0]), hashHex);
for (auto& a : affectedAccounts)
{
auto accountTxns = backend->fetchAccountTransactions(a, 100);
EXPECT_EQ(accountTxns.first.size(), 1);
EXPECT_EQ(accountTxns.first[0], txns[0]);
EXPECT_FALSE(accountTxns.second);
}
ripple::uint256 key256;
EXPECT_TRUE(key256.parseHex(accountIndexHex));
auto obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq);
EXPECT_TRUE(obj);
EXPECT_STREQ(
(const char*)obj->data(), (const char*)accountBlob.data());
obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq + 1);
EXPECT_TRUE(obj);
EXPECT_STREQ(
(const char*)obj->data(), (const char*)accountBlob.data());
obj = backend->fetchLedgerObject(key256, lgrInfoOld.seq - 1);
EXPECT_FALSE(obj);
}
// obtain a time-based seed:
unsigned seed =
std::chrono::system_clock::now().time_since_epoch().count();
std::string accountBlobOld = accountBlob;
{
backend->startWrites();
lgrInfoNext.seq = lgrInfoNext.seq + 1;
lgrInfoNext.parentHash = lgrInfoNext.hash;
lgrInfoNext.hash++;
lgrInfoNext.txHash = lgrInfoNext.txHash ^ lgrInfoNext.accountHash;
lgrInfoNext.accountHash =
~(lgrInfoNext.accountHash ^ lgrInfoNext.txHash);
backend->writeLedger(
lgrInfoNext, std::move(ledgerInfoToBinaryString(lgrInfoNext)));
std::shuffle(
accountBlob.begin(),
accountBlob.end(),
std::default_random_engine(seed));
backend->writeLedgerObject(
std::move(std::string{accountIndexBlob}),
lgrInfoNext.seq,
std::move(std::string{accountBlob}),
true,
false,
{});
ASSERT_TRUE(backend->finishWrites(lgrInfoNext.seq));
}
{
auto rng = backend->fetchLedgerRange();
EXPECT_TRUE(rng);
EXPECT_EQ(rng->minSequence, lgrInfoOld.seq);
EXPECT_EQ(rng->maxSequence, lgrInfoNext.seq);
auto retLgr = backend->fetchLedgerBySequence(lgrInfoNext.seq);
EXPECT_TRUE(retLgr);
EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfoNext));
auto txns = backend->fetchAllTransactionsInLedger(lgrInfoNext.seq);
EXPECT_EQ(txns.size(), 0);
ripple::uint256 key256;
EXPECT_TRUE(key256.parseHex(accountIndexHex));
auto obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq);
EXPECT_TRUE(obj);
EXPECT_STREQ(
(const char*)obj->data(), (const char*)accountBlob.data());
obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq + 1);
EXPECT_TRUE(obj);
EXPECT_STREQ(
(const char*)obj->data(), (const char*)accountBlob.data());
obj = backend->fetchLedgerObject(key256, lgrInfoNext.seq - 1);
EXPECT_TRUE(obj);
EXPECT_STREQ(
(const char*)obj->data(), (const char*)accountBlobOld.data());
obj = backend->fetchLedgerObject(key256, lgrInfoOld.seq - 1);
EXPECT_FALSE(obj);
}
auto generateObjects = [seed](
size_t numObjects, uint32_t ledgerSequence) {
std::vector<std::pair<std::string, std::string>> res{numObjects};
ripple::uint256 key;
key = ledgerSequence * 100000;
for (auto& blob : res)
{
++key;
std::string keyStr{(const char*)key.data(), key.size()};
blob.first = keyStr;
blob.second = std::to_string(ledgerSequence) + keyStr;
}
return res;
};
auto updateObjects = [](uint32_t ledgerSequence, auto objs) {
for (auto& [key, obj] : objs)
{
obj = std::to_string(ledgerSequence) + obj;
}
return objs;
};
auto generateTxns = [seed](size_t numTxns, uint32_t ledgerSequence) {
std::vector<std::tuple<std::string, std::string, std::string>> res{
numTxns};
ripple::uint256 base;
base = ledgerSequence * 100000;
for (auto& blob : res)
{
++base;
std::string hashStr{(const char*)base.data(), base.size()};
std::string txnStr =
"tx" + std::to_string(ledgerSequence) + hashStr;
std::string metaStr =
"meta" + std::to_string(ledgerSequence) + hashStr;
blob = std::make_tuple(hashStr, txnStr, metaStr);
}
return res;
};
auto generateAccounts = [](uint32_t ledgerSequence,
uint32_t numAccounts) {
std::vector<ripple::AccountID> accounts;
ripple::AccountID base;
base = ledgerSequence * 998765;
for (size_t i = 0; i < numAccounts; ++i)
{
++base;
accounts.push_back(base);
}
return accounts;
};
auto generateAccountTx = [&](uint32_t ledgerSequence, auto txns) {
std::vector<AccountTransactionsData> ret;
auto accounts = generateAccounts(ledgerSequence, 10);
std::srand(std::time(nullptr));
uint32_t idx = 0;
for (auto& [hash, txn, meta] : txns)
{
AccountTransactionsData data;
data.ledgerSequence = ledgerSequence;
data.transactionIndex = idx;
data.txHash = hash;
for (size_t i = 0; i < 3; ++i)
{
data.accounts.insert(
accounts[std::rand() % accounts.size()]);
}
++idx;
ret.push_back(data);
}
return ret;
};
auto generateNextLedger = [seed](auto lgrInfo) {
++lgrInfo.seq;
lgrInfo.parentHash = lgrInfo.hash;
std::srand(std::time(nullptr));
std::shuffle(
lgrInfo.txHash.begin(),
lgrInfo.txHash.end(),
std::default_random_engine(seed));
std::shuffle(
lgrInfo.accountHash.begin(),
lgrInfo.accountHash.end(),
std::default_random_engine(seed));
std::shuffle(
lgrInfo.hash.begin(),
lgrInfo.hash.end(),
std::default_random_engine(seed));
return lgrInfo;
};
auto writeLedger =
[&](auto lgrInfo, auto txns, auto objs, auto accountTx) {
std::cout << "writing ledger = " << std::to_string(lgrInfo.seq);
backend->startWrites();
backend->writeLedger(
lgrInfo, std::move(ledgerInfoToBinaryString(lgrInfo)));
for (auto [hash, txn, meta] : txns)
{
backend->writeTransaction(
std::move(hash),
lgrInfo.seq,
std::move(txn),
std::move(meta));
}
for (auto [key, obj] : objs)
{
std::optional<ripple::uint256> bookDir;
if (isOffer(obj.data()))
bookDir = getBook(obj);
backend->writeLedgerObject(
std::move(key),
lgrInfo.seq,
std::move(obj),
true,
false,
std::move(bookDir));
}
backend->writeAccountTransactions(std::move(accountTx));
ASSERT_TRUE(backend->finishWrites(lgrInfo.seq));
};
auto checkLedger = [&](auto lgrInfo,
auto txns,
auto objs,
auto accountTx) {
auto rng = backend->fetchLedgerRange();
auto seq = lgrInfo.seq;
EXPECT_TRUE(rng);
EXPECT_EQ(rng->minSequence, lgrInfoOld.seq);
EXPECT_GE(rng->maxSequence, seq);
auto retLgr = backend->fetchLedgerBySequence(seq);
EXPECT_TRUE(retLgr);
EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfo));
// retLgr = backend->fetchLedgerByHash(lgrInfo.hash);
// EXPECT_TRUE(retLgr);
// EXPECT_EQ(ledgerInfoToBlob(*retLgr), ledgerInfoToBlob(lgrInfo));
auto retTxns = backend->fetchAllTransactionsInLedger(seq);
for (auto [hash, txn, meta] : txns)
{
bool found = false;
for (auto [retTxn, retMeta, retSeq] : retTxns)
{
if (std::strncmp(
(const char*)retTxn.data(),
(const char*)txn.data(),
txn.size()) == 0 &&
std::strncmp(
(const char*)retMeta.data(),
(const char*)meta.data(),
meta.size()) == 0)
found = true;
}
ASSERT_TRUE(found);
}
for (auto [account, data] : accountTx)
{
std::vector<Backend::TransactionAndMetadata> retData;
std::optional<Backend::AccountTransactionsCursor> cursor;
do
{
uint32_t limit = 10;
auto res = backend->fetchAccountTransactions(
account, limit, cursor);
if (res.second)
EXPECT_EQ(res.first.size(), limit);
retData.insert(
retData.end(), res.first.begin(), res.first.end());
cursor = res.second;
} while (cursor);
EXPECT_EQ(retData.size(), data.size());
for (size_t i = 0; i < retData.size(); ++i)
{
auto [txn, meta, seq] = retData[i];
auto [hash, expTxn, expMeta] = data[i];
EXPECT_STREQ(
(const char*)txn.data(), (const char*)expTxn.data());
EXPECT_STREQ(
(const char*)meta.data(), (const char*)expMeta.data());
}
}
for (auto [key, obj] : objs)
{
auto retObj =
backend->fetchLedgerObject(binaryStringToUint256(key), seq);
if (obj.size())
{
ASSERT_TRUE(retObj.has_value());
EXPECT_STREQ(
(const char*)obj.data(), (const char*)retObj->data());
}
else
{
ASSERT_FALSE(retObj.has_value());
}
}
Backend::LedgerPage page;
std::vector<Backend::LedgerObject> retObjs;
size_t numLoops = 0;
do
{
uint32_t limit = 10;
page = backend->fetchLedgerPage(page.cursor, seq, limit);
if (page.cursor)
EXPECT_EQ(page.objects.size(), limit);
retObjs.insert(
retObjs.end(), page.objects.begin(), page.objects.end());
++numLoops;
ASSERT_FALSE(page.warning.has_value());
} while (page.cursor);
for (auto obj : objs)
{
bool found = false;
bool correct = false;
for (auto retObj : retObjs)
{
if (ripple::strHex(obj.first) == ripple::strHex(retObj.key))
{
found = true;
ASSERT_EQ(
ripple::strHex(obj.second),
ripple::strHex(retObj.blob));
}
}
ASSERT_EQ(found, obj.second.size() != 0);
}
};
std::map<uint32_t, std::vector<std::pair<std::string, std::string>>>
state;
std::map<
uint32_t,
std::vector<std::tuple<std::string, std::string, std::string>>>
allTxns;
std::unordered_map<std::string, std::pair<std::string, std::string>>
allTxnsMap;
std::
map<uint32_t, std::map<ripple::AccountID, std::vector<std::string>>>
allAccountTx;
std::map<uint32_t, ripple::LedgerInfo> lgrInfos;
for (size_t i = 0; i < 10; ++i)
{
lgrInfoNext = generateNextLedger(lgrInfoNext);
auto objs = generateObjects(25, lgrInfoNext.seq);
auto txns = generateTxns(10, lgrInfoNext.seq);
auto accountTx = generateAccountTx(lgrInfoNext.seq, txns);
for (auto rec : accountTx)
{
for (auto account : rec.accounts)
{
allAccountTx[lgrInfoNext.seq][account].push_back(
std::string{
(const char*)rec.txHash.data(), rec.txHash.size()});
}
}
EXPECT_EQ(objs.size(), 25);
EXPECT_NE(objs[0], objs[1]);
EXPECT_EQ(txns.size(), 10);
EXPECT_NE(txns[0], txns[1]);
writeLedger(lgrInfoNext, txns, objs, accountTx);
state[lgrInfoNext.seq] = objs;
allTxns[lgrInfoNext.seq] = txns;
lgrInfos[lgrInfoNext.seq] = lgrInfoNext;
for (auto& [hash, txn, meta] : txns)
{
allTxnsMap[hash] = std::make_pair(txn, meta);
}
}
std::vector<std::pair<std::string, std::string>> objs;
for (size_t i = 0; i < 10; ++i)
{
lgrInfoNext = generateNextLedger(lgrInfoNext);
if (!objs.size())
objs = generateObjects(25, lgrInfoNext.seq);
else
objs = updateObjects(lgrInfoNext.seq, objs);
auto txns = generateTxns(10, lgrInfoNext.seq);
auto accountTx = generateAccountTx(lgrInfoNext.seq, txns);
for (auto rec : accountTx)
{
for (auto account : rec.accounts)
{
allAccountTx[lgrInfoNext.seq][account].push_back(
std::string{
(const char*)rec.txHash.data(), rec.txHash.size()});
}
}
EXPECT_EQ(objs.size(), 25);
EXPECT_NE(objs[0], objs[1]);
EXPECT_EQ(txns.size(), 10);
EXPECT_NE(txns[0], txns[1]);
writeLedger(lgrInfoNext, txns, objs, accountTx);
state[lgrInfoNext.seq] = objs;
allTxns[lgrInfoNext.seq] = txns;
lgrInfos[lgrInfoNext.seq] = lgrInfoNext;
for (auto& [hash, txn, meta] : txns)
{
allTxnsMap[hash] = std::make_pair(txn, meta);
}
}
std::cout << "WROTE ALL OBJECTS" << std::endl;
auto flatten = [&](uint32_t max) {
std::vector<std::pair<std::string, std::string>> flat;
std::map<std::string, std::string> objs;
for (auto [seq, diff] : state)
{
for (auto [k, v] : diff)
{
if (seq > max)
{
if (objs.count(k) == 0)
objs[k] = "";
}
else
{
objs[k] = v;
}
}
}
for (auto [key, value] : objs)
{
flat.push_back(std::make_pair(key, value));
}
return flat;
};
auto flattenAccountTx = [&](uint32_t max) {
std::unordered_map<
ripple::AccountID,
std::vector<std::tuple<std::string, std::string, std::string>>>
accountTx;
for (auto [seq, map] : allAccountTx)
{
if (seq > max)
break;
for (auto& [account, hashes] : map)
{
for (auto& hash : hashes)
{
auto& [txn, meta] = allTxnsMap[hash];
accountTx[account].push_back(
std::make_tuple(hash, txn, meta));
}
}
}
for (auto& [account, data] : accountTx)
std::reverse(data.begin(), data.end());
return accountTx;
};
for (auto [seq, diff] : state)
{
std::cout << "flatteneing" << std::endl;
auto flat = flatten(seq);
std::cout << "flattened" << std::endl;
checkLedger(
lgrInfos[seq], allTxns[seq], flat, flattenAccountTx(seq));
std::cout << "checked" << std::endl;
}
}
}