impliment book offers for async etl

This commit is contained in:
Nathan Nichols
2021-05-06 23:07:52 -05:00
12 changed files with 534 additions and 347 deletions

View File

@@ -280,16 +280,16 @@ doBookOffers(
} }
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
std::cout << "getting Book Offers" << std::endl; auto [offers, retCursor, warning] =
auto [offers, retCursor] =
backend.fetchBookOffers(bookBase, *ledgerSequence, limit, cursor); backend.fetchBookOffers(bookBase, *ledgerSequence, limit, cursor);
std::cout << "got Book Offers" << std::endl;
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(warning) << "Time loading books from Postgres: " BOOST_LOG_TRIVIAL(warning) << "Time loading books: "
<< ((end - start).count() / 1000000000.0); << ((end - start).count() / 1000000000.0);
if(warning)
response["warning"] = *warning;
response["offers"] = boost::json::value(boost::json::array_kind); response["offers"] = boost::json::value(boost::json::array_kind);
boost::json::array& jsonOffers = response.at("offers").as_array(); boost::json::array& jsonOffers = response.at("offers").as_array();
@@ -324,6 +324,11 @@ doBookOffers(
<< ((end - start).count() / 1000000000.0); << ((end - start).count() / 1000000000.0);
if (retCursor) if (retCursor)
response["cursor"] = ripple::strHex(*retCursor); response["cursor"] = ripple::strHex(*retCursor);
if (warning)
response["warning"] =
"Periodic database update in progress. Data for this book as of "
"this ledger "
"may be incomplete. Data should be complete within one minute";
return response; return response;
} }

View File

@@ -100,6 +100,16 @@ doLedgerData(
response["num_results"] = results.size(); response["num_results"] = results.size();
response["db_time"] = time; response["db_time"] = time;
response["time_per_result"] = time / (results.size() ? results.size() : 1); response["time_per_result"] = time / (results.size() ? results.size() : 1);
if (page.warning)
{
response["warning"] =
"Periodic database update in progress. Data for this ledger may be "
"incomplete. Data should be complete "
"within a few minutes. Other RPC calls are not affected, "
"regardless of ledger. This "
"warning is only present on the first "
"page of the ledger";
}
return response; return response;
} }

View File

@@ -17,13 +17,23 @@ BackendIndexer::~BackendIndexer()
void void
BackendIndexer::addKey(ripple::uint256 const& key) BackendIndexer::addKey(ripple::uint256 const& key)
{ {
std::unique_lock lck(mtx);
keys.insert(key); keys.insert(key);
keysCumulative.insert(key); keysCumulative.insert(key);
} }
void void
BackendIndexer::addKeyAsync(ripple::uint256 const& key)
{
std::unique_lock lck(mtx);
keysCumulative.insert(key);
}
void
BackendIndexer::deleteKey(ripple::uint256 const& key) BackendIndexer::deleteKey(ripple::uint256 const& key)
{ {
std::unique_lock lck(mtx);
keysCumulative.erase(key); keysCumulative.erase(key);
if (populatingCacheAsync)
deletedKeys.insert(key);
} }
void void
@@ -31,15 +41,87 @@ BackendIndexer::addBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
ripple::uint256 const& offerKey) ripple::uint256 const& offerKey)
{ {
std::unique_lock lck(mtx);
books[book].insert(offerKey); books[book].insert(offerKey);
booksCumulative[book].insert(offerKey); booksCumulative[book].insert(offerKey);
} }
void void
BackendIndexer::addBookOfferAsync(
ripple::uint256 const& book,
ripple::uint256 const& offerKey)
{
std::unique_lock lck(mtx);
booksCumulative[book].insert(offerKey);
}
void
BackendIndexer::deleteBookOffer( BackendIndexer::deleteBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
ripple::uint256 const& offerKey) ripple::uint256 const& offerKey)
{ {
std::unique_lock lck(mtx);
booksCumulative[book].erase(offerKey); booksCumulative[book].erase(offerKey);
if (populatingCacheAsync)
deletedBooks[book].insert(offerKey);
}
void
writeFlagLedger(
uint32_t ledgerSequence,
uint32_t shift,
BackendInterface const& backend,
std::unordered_set<ripple::uint256> const& keys,
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books)
{
uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift));
ripple::uint256 zero = {};
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " starting. ledgerSequence = " << std::to_string(ledgerSequence)
<< " nextFlag = " << std::to_string(nextFlag)
<< " keys.size() = " << std::to_string(keys.size())
<< " books.size() = " << std::to_string(books.size());
while (true)
{
try
{
auto [objects, curCursor, warning] =
backend.fetchLedgerPage({}, nextFlag, 1);
if (!(warning || objects.size() == 0))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " flag ledger already written. sequence = "
<< std::to_string(ledgerSequence)
<< " next flag = " << std::to_string(nextFlag)
<< "returning";
return;
}
break;
}
catch (DatabaseTimeout& t)
{
;
}
}
auto start = std::chrono::system_clock::now();
backend.writeBooks(books, nextFlag);
backend.writeBooks({{zero, {zero}}}, nextFlag);
BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote books. writing keys ...";
backend.writeKeys(keys, nextFlag);
backend.writeKeys({zero}, nextFlag);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. ledgerSequence = " << std::to_string(ledgerSequence)
<< " nextFlag = " << std::to_string(nextFlag)
<< " keys.size() = " << std::to_string(keys.size())
<< " books.size() = " << std::to_string(books.size()) << " time = "
<< std::chrono::duration_cast<std::chrono::seconds>(end - start)
.count();
} }
void void
@@ -48,34 +130,47 @@ BackendIndexer::clearCaches()
keysCumulative = {}; keysCumulative = {};
booksCumulative = {}; booksCumulative = {};
} }
void void
BackendIndexer::populateCaches(BackendInterface const& backend) BackendIndexer::populateCaches(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{ {
if (keysCumulative.size() > 0) if (!sequence)
{ {
BOOST_LOG_TRIVIAL(info) auto rng = backend.fetchLedgerRangeNoThrow();
<< __func__ << " caches already populated. returning"; if (!rng)
return; return;
sequence = rng->maxSequence;
} }
auto tip = backend.fetchLatestLedgerSequence(); BOOST_LOG_TRIVIAL(info)
if (!tip) << __func__ << " sequence = " << std::to_string(*sequence);
return;
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
while (true) while (true)
{ {
try try
{ {
auto [objects, curCursor] = auto [objects, curCursor, warning] =
backend.fetchLedgerPage(cursor, *tip, 2048); backend.fetchLedgerPage(cursor, *sequence, 2048);
if (warning)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " performing index repair";
uint32_t lower = (*sequence - 1) >> shift_ << shift_;
populateCaches(backend, lower);
writeFlagLedger(
lower, shift_, backend, keysCumulative, booksCumulative);
clearCaches();
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor; cursor = curCursor;
for (auto& obj : objects) for (auto& obj : objects)
{ {
keysCumulative.insert(obj.key); addKeyAsync(obj.key);
if (isOffer(obj.blob)) if (isOffer(obj.blob))
{ {
auto book = getBook(obj.blob); auto book = getBook(obj.blob);
booksCumulative[book].insert(obj.key); addBookOfferAsync(book, obj.key);
} }
} }
if (!cursor) if (!cursor)
@@ -88,51 +183,113 @@ BackendIndexer::populateCaches(BackendInterface const& backend)
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(std::chrono::seconds(2));
} }
} }
// Do reconcilation. Remove anything from keys or books that shouldn't
// be there
{
std::unique_lock lck(mtx);
populatingCacheAsync = false;
}
auto tip = backend.fetchLatestLedgerSequence();
for (auto& key : deletedKeys)
{
deleteKey(key);
}
for (auto& book : deletedBooks)
{
for (auto& offer : book.second)
{
deleteBookOffer(book.first, offer);
}
}
{
std::unique_lock lck(mtx);
deletedKeys = {};
deletedBooks = {};
cv_.notify_one();
}
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. keys.size() = " << std::to_string(keysCumulative.size());
}
void
BackendIndexer::populateCachesAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
if (keysCumulative.size() > 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " caches already populated. returning";
return;
}
{
std::unique_lock lck(mtx);
populatingCacheAsync = true;
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " seq = " << (sequence ? std::to_string(*sequence) : "");
boost::asio::post(ioc_, [this, sequence, &backend]() {
populateCaches(backend, sequence);
});
} }
void void
BackendIndexer::writeNext( BackendIndexer::waitForCaches()
{
std::unique_lock lck(mtx);
cv_.wait(lck, [this]() {
return !populatingCacheAsync && deletedKeys.size() == 0;
});
}
void
BackendIndexer::writeFlagLedgerAsync(
uint32_t ledgerSequence, uint32_t ledgerSequence,
BackendInterface const& backend) BackendInterface const& backend)
{ {
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << __func__
<< " starting. sequence = " << std::to_string(ledgerSequence); << " starting. sequence = " << std::to_string(ledgerSequence);
bool isFlag = (ledgerSequence % (1 << shift_)) == 0;
if (!backend.fetchLedgerRange())
{
isFlag = true;
}
if (isFlag) waitForCaches();
{ auto booksCopy = booksCumulative;
uint32_t nextSeq = auto keysCopy = keysCumulative;
((ledgerSequence >> shift_ << shift_) + (1 << shift_)); boost::asio::post(ioc_, [=, this, &backend]() {
BOOST_LOG_TRIVIAL(info) writeFlagLedger(ledgerSequence, shift_, backend, keysCopy, booksCopy);
<< __func__ << " actually doing the write. keysCumulative.size() = " });
<< std::to_string(keysCumulative.size()); BOOST_LOG_TRIVIAL(info)
backend.writeKeys(keysCumulative, nextSeq); << __func__
BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; << " finished. sequence = " << std::to_string(ledgerSequence);
backend.writeBooks(booksCumulative, nextSeq);
BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books";
}
} }
void void
BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
{ {
bool isFlag = ledgerSequence % (1 << shift_) == 0; BOOST_LOG_TRIVIAL(info)
if (!backend.fetchLedgerRange()) << __func__
<< " starting. sequence = " << std::to_string(ledgerSequence);
bool isFirst = false;
uint32_t index = getIndexOfSeq(ledgerSequence);
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng || rng->minSequence == ledgerSequence)
{ {
isFlag = true; isFirst = true;
index = ledgerSequence;
}
backend.writeKeys(keys, index);
backend.writeBooks(books, index);
if (isFirst)
{
ripple::uint256 zero = {};
backend.writeBooks({{zero, {zero}}}, ledgerSequence);
backend.writeKeys({zero}, ledgerSequence);
writeFlagLedgerAsync(ledgerSequence, backend);
} }
uint32_t nextSeq = ((ledgerSequence >> shift_ << shift_) + (1 << shift_));
uint32_t curSeq = isFlag ? ledgerSequence : nextSeq;
backend.writeKeys(keys, curSeq);
keys = {}; keys = {};
backend.writeBooks(books, curSeq);
books = {}; books = {};
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. sequence = " << std::to_string(ledgerSequence);
} // namespace Backend } // namespace Backend
} // namespace Backend } // namespace Backend

View File

@@ -26,6 +26,13 @@ struct LedgerPage
{ {
std::vector<LedgerObject> objects; std::vector<LedgerObject> objects;
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
};
struct BookOffersPage
{
std::vector<LedgerObject> offers;
std::optional<ripple::uint256> cursor;
std::optional<std::string> warning;
}; };
struct TransactionAndMetadata struct TransactionAndMetadata
{ {
@@ -63,28 +70,51 @@ class BackendIndexer
std::thread ioThread_; std::thread ioThread_;
uint32_t shift_ = 16; uint32_t shift_ = 16;
std::unordered_set<ripple::uint256> keys; std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::unordered_set<ripple::uint256> keysCumulative; std::unordered_set<ripple::uint256> keysCumulative;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>> std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksCumulative; booksCumulative;
bool populatingCacheAsync = false;
// These are only used when the cache is being populated asynchronously
std::unordered_set<ripple::uint256> deletedKeys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
deletedBooks;
std::mutex mtx;
std::condition_variable cv_;
void
addKeyAsync(ripple::uint256 const& key);
void
addBookOfferAsync(
ripple::uint256 const& book,
ripple::uint256 const& offerKey);
public: public:
BackendIndexer(boost::json::object const& config); BackendIndexer(boost::json::object const& config);
~BackendIndexer(); ~BackendIndexer();
void void
populateCaches(BackendInterface const& backend); populateCachesAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence = {});
void
populateCaches(
BackendInterface const& backend,
std::optional<uint32_t> sequence = {});
void void
clearCaches(); clearCaches();
// Blocking, possibly for minutes
void
waitForCaches();
void void
addKey(ripple::uint256 const& key); addKey(ripple::uint256 const& key);
void void
deleteKey(ripple::uint256 const& key); deleteKey(ripple::uint256 const& key);
void void
addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey);
void void
deleteBookOffer( deleteBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
@@ -93,12 +123,27 @@ public:
void void
finish(uint32_t ledgerSequence, BackendInterface const& backend); finish(uint32_t ledgerSequence, BackendInterface const& backend);
void void
writeNext(uint32_t ledgerSequence, BackendInterface const& backend); writeFlagLedgerAsync(
uint32_t ledgerSequence,
BackendInterface const& backend);
uint32_t uint32_t
getShift() getShift()
{ {
return shift_; return shift_;
} }
uint32_t
getIndexOfSeq(uint32_t seq) const
{
if (isFlagLedger(seq))
return seq;
auto incr = (1 << shift_);
return (seq >> shift_ << shift_) + incr;
}
bool
isFlagLedger(uint32_t ledgerSequence) const
{
return (ledgerSequence % (1 << shift_)) == 0;
}
}; };
class BackendInterface class BackendInterface
@@ -121,15 +166,27 @@ public:
std::optional<uint32_t> std::optional<uint32_t>
getIndexOfSeq(uint32_t seq) const getIndexOfSeq(uint32_t seq) const
{ {
if (!fetchLedgerRange()) if (indexer_.isFlagLedger(seq))
return seq;
auto rng = fetchLedgerRange();
if (!rng)
return {}; return {};
if (fetchLedgerRange()->minSequence == seq) if (rng->minSequence == seq)
return seq; return seq;
uint32_t shift = indexer_.getShift(); return indexer_.getIndexOfSeq(seq);
uint32_t incr = (1 << shift); }
if ((seq % incr) == 0)
return seq; bool
return (seq >> shift << shift) + incr; finishWrites(uint32_t ledgerSequence) const
{
indexer_.finish(ledgerSequence, *this);
auto commitRes = doFinishWrites();
if (commitRes)
{
if (indexer_.isFlagLedger(ledgerSequence))
indexer_.writeFlagLedgerAsync(ledgerSequence, *this);
}
return commitRes;
} }
virtual std::optional<uint32_t> virtual std::optional<uint32_t>
@@ -141,6 +198,22 @@ public:
virtual std::optional<LedgerRange> virtual std::optional<LedgerRange>
fetchLedgerRange() const = 0; fetchLedgerRange() const = 0;
std::optional<LedgerRange>
fetchLedgerRangeNoThrow() const
{
while (true)
{
try
{
return fetchLedgerRange();
}
catch (DatabaseTimeout& t)
{
;
}
}
}
virtual std::optional<Blob> virtual std::optional<Blob>
fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0; fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0;
@@ -160,7 +233,8 @@ public:
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const = 0; std::uint32_t limit) const = 0;
virtual std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>> // TODO add warning for incomplete data
virtual BookOffersPage
fetchBookOffers( fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t ledgerSequence, uint32_t ledgerSequence,
@@ -243,7 +317,8 @@ public:
// other database methods // other database methods
// Open the database. Set up all of the necessary objects and // Open the database. Set up all of the necessary objects and
// datastructures. After this call completes, the database is ready for use. // datastructures. After this call completes, the database is ready for
// use.
virtual void virtual void
open(bool readOnly) = 0; open(bool readOnly) = 0;
@@ -254,13 +329,6 @@ public:
virtual void virtual void
startWrites() const = 0; startWrites() const = 0;
bool
finishWrites(uint32_t ledgerSequence) const
{
indexer_.finish(ledgerSequence, *this);
indexer_.writeNext(ledgerSequence, *this);
return doFinishWrites();
}
virtual bool virtual bool
doFinishWrites() const = 0; doFinishWrites() const = 0;

View File

@@ -24,24 +24,27 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func)
auto rc = cass_future_error_code(fut); auto rc = cass_future_error_code(fut);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second) // exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds( auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra ETL insert error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries; ++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer = std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>( std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(), backend.getIOContext(),
std::chrono::steady_clock::now() + wait); std::chrono::steady_clock::now() + wait);
timer->async_wait([timer, &requestParams, &func]( timer->async_wait([timer, &requestParams, func](
const boost::system::error_code& error) { const boost::system::error_code& error) {
func(requestParams, true); func(requestParams, true);
}); });
} }
else else
{ {
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " Succesfully inserted a record";
backend.finishAsyncWrite(); backend.finishAsyncWrite();
int remaining = --requestParams.refs; int remaining = --requestParams.refs;
if (remaining == 0) if (remaining == 0)
@@ -63,16 +66,6 @@ flatMapWriteCallback(CassFuture* fut, void* cbData)
processAsyncWriteResponse(requestParams, fut, func); processAsyncWriteResponse(requestParams, fut, func);
} }
// void
// flatMapWriteBookCallback(CassFuture* fut, void* cbData)
// {
// CassandraBackend::WriteCallbackData& requestParams =
// *static_cast<CassandraBackend::WriteCallbackData*>(cbData);
// auto func = [](auto& params, bool retry) {
// params.backend->writeBook(params, retry);
// };
// processAsyncWriteResponse(requestParams, fut, func);
// }
/* /*
void void
@@ -286,73 +279,6 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
return hashes; return hashes;
} }
LedgerPage
CassandraBackend::fetchLedgerPage2(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
BOOST_LOG_TRIVIAL(trace) << __func__;
std::optional<ripple::uint256> currentCursor = cursor;
std::vector<LedgerObject> objects;
uint32_t curLimit = limit;
while (objects.size() < limit)
{
CassandraStatement statement{selectLedgerPage_};
int64_t intCursor = INT64_MIN;
if (currentCursor)
{
auto token = getToken(currentCursor->data());
if (token)
intCursor = *token;
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cursor = " << std::to_string(intCursor)
<< " , sequence = " << std::to_string(ledgerSequence)
<< ", - limit = " << std::to_string(limit);
statement.bindInt(intCursor);
statement.bindInt(ledgerSequence);
statement.bindUInt(curLimit);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
size_t prevSize = objects.size();
do
{
std::vector<unsigned char> object = result.getBytes();
if (object.size())
{
objects.push_back({result.getUInt256(), std::move(object)});
}
} while (result.nextRow());
size_t prevBatchSize = objects.size() - prevSize;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - added to objects. size = " << objects.size();
if (result.numRows() < curLimit)
{
currentCursor = {};
break;
}
if (objects.size() < limit)
{
curLimit = 2048;
}
assert(objects.size());
currentCursor = objects[objects.size() - 1].key;
}
}
if (objects.size())
return {objects, currentCursor};
return {{}, {}};
}
struct ReadDiffCallbackData struct ReadDiffCallbackData
{ {
CassandraBackend const& backend; CassandraBackend const& backend;
@@ -477,12 +403,12 @@ CassandraBackend::fetchLedgerPage(
if (!index) if (!index)
return {}; return {};
LedgerPage page; LedgerPage page;
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " ledgerSequence = " << std::to_string(ledgerSequence) << __func__ << " ledgerSequence = " << std::to_string(ledgerSequence)
<< " index = " << std::to_string(*index); << " index = " << std::to_string(*index);
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
CassandraStatement statement{selectKeys_}; CassandraStatement statement{selectKeys_};
statement.bindInt(*index); statement.bindInt(*index);
if (cursor) if (cursor)
@@ -494,10 +420,9 @@ CassandraBackend::fetchLedgerPage(
} }
statement.bindUInt(limit); statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys";
if (!!result) if (!!result)
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(trace)
<< __func__ << " - got keys - size = " << result.numRows(); << __func__ << " - got keys - size = " << result.numRows();
std::vector<ripple::uint256> keys; std::vector<ripple::uint256> keys;
@@ -505,17 +430,14 @@ CassandraBackend::fetchLedgerPage(
{ {
keys.push_back(result.getUInt256()); keys.push_back(result.getUInt256());
} while (result.nextRow()); } while (result.nextRow());
BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Read keys";
auto objects = fetchLedgerObjects(keys, ledgerSequence); auto objects = fetchLedgerObjects(keys, ledgerSequence);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Using base ledger. Got objects";
if (objects.size() != keys.size()) if (objects.size() != keys.size())
throw std::runtime_error("Mismatch in size of objects and keys"); throw std::runtime_error("Mismatch in size of objects and keys");
if (keys.size() == limit) if (keys.size() == limit)
page.cursor = keys[keys.size() - 1]; page.cursor = keys[keys.size() - 1];
if (cursor) if (cursor)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(trace)
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor); << __func__ << " Cursor = " << ripple::strHex(*page.cursor);
for (size_t i = 0; i < objects.size(); ++i) for (size_t i = 0; i < objects.size(); ++i)
@@ -527,6 +449,8 @@ CassandraBackend::fetchLedgerPage(
page.objects.push_back({std::move(key), std::move(obj)}); page.objects.push_back({std::move(key), std::move(obj)});
} }
} }
if (keys.size() && !cursor && !keys[0].isZero())
page.warning = "Data may be incomplete";
return page; return page;
} }
return {{}, {}}; return {{}, {}};
@@ -565,39 +489,39 @@ CassandraBackend::fetchLedgerObjects(
<< "Fetched " << numKeys << " records from Cassandra"; << "Fetched " << numKeys << " records from Cassandra";
return results; return results;
} }
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>> BookOffersPage
CassandraBackend::fetchBookOffers( CassandraBackend::fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t sequence, uint32_t sequence,
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const std::optional<ripple::uint256> const& cursor) const
{ {
CassandraStatement statement{selectBook_};
auto rng = fetchLedgerRange(); auto rng = fetchLedgerRange();
if(!rng) if(!rng)
return {{},{}}; return {{},{}};
std::vector<ripple::uint256> keys;
uint32_t upper = sequence; uint32_t upper = sequence;
auto lastPage = rng->maxSequence - (rng->maxSequence % 256); auto lastPage = rng->maxSequence - (rng->maxSequence % 256);
if (sequence != rng->minSequence) auto readBooks = [this](
ripple::uint256 const& book,
std::uint32_t seq,
std::optional<ripple::uint256> const& cursor)
-> std::vector<std::pair<std::uint64_t, ripple::uint256>>
{ {
upper = (sequence >> 8) << 8; CassandraStatement statement{this->selectBook_};
if (upper != sequence) std::vector<std::pair<std::uint64_t, ripple::uint256>> keys = {};
upper += (1 << 8);
statement.bindBytes(book.data(), 24); statement.bindBytes(book.data(), 24);
statement.bindInt(upper); statement.bindInt(seq);
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper) BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(seq)
<< " book = " << ripple::strHex(std::string((char*)book.data(), 24)); << " book = " << ripple::strHex(std::string((char*)book.data(), 24));
if (cursor) if (cursor)
{ {
auto object = fetchLedgerObject(*cursor, sequence); auto object = this->fetchLedgerObject(*cursor, seq);
if(!object) if(!object)
return {{}, {}}; return {{}, {}};
@@ -617,7 +541,7 @@ CassandraBackend::fetchBookOffers(
} }
// statement.bindUInt(limit); // statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement); CassandraResult result = this->executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
if (!result) if (!result)
@@ -628,32 +552,72 @@ CassandraBackend::fetchBookOffers(
do do
{ {
auto index = result.getBytesTuple().second; auto [quality, index] = result.getBytesTuple();
keys.push_back(ripple::uint256::fromVoid(index.data())); std::uint64_t q = 0;
memcpy(&q, quality.data(), 8);
keys.push_back({q, ripple::uint256::fromVoid(index.data())});
} while (result.nextRow()); } while (result.nextRow());
return keys;
};
std::vector<std::pair<std::uint64_t, ripple::uint256>> quality_keys;
if (sequence != rng->minSequence)
{
upper = (sequence >> 8) << 8;
if (upper != sequence)
upper += (1 << 8);
quality_keys = readBooks(book, upper, cursor);
} }
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size(); << __func__ << " - populated keys. num keys = " << quality_keys.size();
std::cout << keys.size() << std::endl; std::cout << "KEYS SIZE: " << quality_keys.size() << std::endl;
if (!keys.size()) if (!quality_keys.size())
return {{}, {}}; return {{}, {}};
std::optional<std::string> warning = {};
if (quality_keys[0].second.isZero())
{
warning = "Data may be incomplete";
std::uint32_t lower = (sequence >> 8) << 8;
auto originalKeys = std::move(quality_keys);
auto otherKeys = readBooks(book, lower, cursor);
quality_keys.reserve(originalKeys.size() + otherKeys.size());
std::merge(originalKeys.begin(), originalKeys.end(),
otherKeys.begin(), otherKeys.end(),
std::back_inserter(quality_keys),
[](auto pair1, auto pair2)
{
return pair1.first < pair2.first;
});
}
std::vector<ripple::uint256> keys(quality_keys.size());
std::transform(quality_keys.begin(), quality_keys.end(),
std::back_inserter(keys),
[](auto pair) { return pair.second; });
std::vector<LedgerObject> results; std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, sequence); std::vector<Blob> objs = fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i) for (size_t i = 0; i < objs.size(); ++i)
{ {
if (objs[i].size() != 0) if (objs[i].size() != 0)
{ {
if (results.size() == limit) if (results.size() == limit)
return {results, keys[i]}; return {results, keys[i], warning};
results.push_back({keys[i], objs[i]}); results.push_back({keys[i], objs[i]});
} }
} }
return {results, {}}; return {results, {}, warning};
} }
struct WriteBookCallbackData struct WriteBookCallbackData
{ {
@@ -662,7 +626,7 @@ struct WriteBookCallbackData
ripple::uint256 offerKey; ripple::uint256 offerKey;
uint32_t ledgerSequence; uint32_t ledgerSequence;
std::condition_variable& cv; std::condition_variable& cv;
std::atomic_uint32_t& numRemaining; std::atomic_uint32_t& numOutstanding;
std::mutex& mtx; std::mutex& mtx;
uint32_t currentRetries = 0; uint32_t currentRetries = 0;
WriteBookCallbackData( WriteBookCallbackData(
@@ -672,14 +636,14 @@ struct WriteBookCallbackData
uint32_t ledgerSequence, uint32_t ledgerSequence,
std::condition_variable& cv, std::condition_variable& cv,
std::mutex& mtx, std::mutex& mtx,
std::atomic_uint32_t& numRemaining) std::atomic_uint32_t& numOutstanding)
: backend(backend) : backend(backend)
, book(book) , book(book)
, offerKey(offerKey) , offerKey(offerKey)
, ledgerSequence(ledgerSequence) , ledgerSequence(ledgerSequence)
, cv(cv) , cv(cv)
, mtx(mtx) , mtx(mtx)
, numRemaining(numRemaining) , numOutstanding(numOutstanding)
{ {
} }
@@ -687,7 +651,7 @@ struct WriteBookCallbackData
void void
writeBookCallback(CassFuture* fut, void* cbData); writeBookCallback(CassFuture* fut, void* cbData);
void void
writeBook2(WriteBookCallbackData& cb) writeBook(WriteBookCallbackData& cb)
{ {
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
statement.bindBytes(cb.book.data(), 24); statement.bindBytes(cb.book.data(), 24);
@@ -707,12 +671,13 @@ writeBookCallback(CassFuture* fut, void* cbData)
auto rc = cass_future_error_code(fut); auto rc = cass_future_error_code(fut);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert key error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second) // exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds( auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert book error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries; ++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer = std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>( std::make_shared<boost::asio::steady_timer>(
@@ -720,15 +685,15 @@ writeBookCallback(CassFuture* fut, void* cbData)
std::chrono::steady_clock::now() + wait); std::chrono::steady_clock::now() + wait);
timer->async_wait( timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) { [timer, &requestParams](const boost::system::error_code& error) {
writeBook2(requestParams); writeBook(requestParams);
}); });
} }
else else
{ {
BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request"; BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
{ {
std::lock_guard lck(requestParams.mtx); std::lock_guard lck(requestParams.mtx);
--requestParams.numRemaining; --requestParams.numOutstanding;
requestParams.cv.notify_one(); requestParams.cv.notify_one();
} }
} }
@@ -781,12 +746,13 @@ writeKeyCallback(CassFuture* fut, void* cbData)
auto rc = cass_future_error_code(fut); auto rc = cass_future_error_code(fut);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert key error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying ";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds( auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert key error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
// exponential backoff with a max wait of 2^10 ms (about 1 second)
++requestParams.currentRetries; ++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer = std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>( std::make_shared<boost::asio::steady_timer>(
@@ -799,7 +765,7 @@ writeKeyCallback(CassFuture* fut, void* cbData)
} }
else else
{ {
BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request"; BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a key";
{ {
std::lock_guard lck(requestParams.mtx); std::lock_guard lck(requestParams.mtx);
--requestParams.numRemaining; --requestParams.numRemaining;
@@ -815,13 +781,15 @@ CassandraBackend::writeKeys(
{ {
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger = " << std::to_string(ledgerSequence) << __func__ << " Ledger = " << std::to_string(ledgerSequence)
<< " . num keys = " << std::to_string(keys.size()); << " . num keys = " << std::to_string(keys.size())
<< " . concurrentLimit = "
<< std::to_string(indexerMaxRequestsOutstanding);
std::atomic_uint32_t numRemaining = keys.size(); std::atomic_uint32_t numRemaining = keys.size();
std::condition_variable cv; std::condition_variable cv;
std::mutex mtx; std::mutex mtx;
std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs; std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs;
cbs.reserve(keys.size()); cbs.reserve(keys.size());
uint32_t concurrentLimit = maxRequestsOutstanding; uint32_t concurrentLimit = indexerMaxRequestsOutstanding;
uint32_t numSubmitted = 0; uint32_t numSubmitted = 0;
for (auto& key : keys) for (auto& key : keys)
{ {
@@ -868,7 +836,7 @@ CassandraBackend::writeBooks(
std::condition_variable cv; std::condition_variable cv;
std::mutex mtx; std::mutex mtx;
std::vector<std::shared_ptr<WriteBookCallbackData>> cbs; std::vector<std::shared_ptr<WriteBookCallbackData>> cbs;
uint32_t concurrentLimit = maxRequestsOutstanding / 2; uint32_t concurrentLimit = indexerMaxRequestsOutstanding;
std::atomic_uint32_t numOutstanding = 0; std::atomic_uint32_t numOutstanding = 0;
size_t count = 0; size_t count = 0;
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
@@ -886,7 +854,7 @@ CassandraBackend::writeBooks(
cv, cv,
mtx, mtx,
numOutstanding)); numOutstanding));
writeBook2(*cbs.back()); writeBook(*cbs.back());
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
std::unique_lock<std::mutex> lck(mtx); std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
@@ -1416,14 +1384,14 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
<< " ( book blob, sequence bigint, quality_key tuple<blob, blob>, PRIMARY KEY " << " ( book blob, sequence bigint, quality_key tuple<blob, blob>, PRIMARY KEY "
"((book, sequence), quality_key)) WITH CLUSTERING ORDER BY (quality_key " "((book, sequence), quality_key)) WITH CLUSTERING ORDER BY (quality_key "
"ASC)"; "ASC)";
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
query.str(""); query.str("");
query << "SELECT * FROM " << tablePrefix << "books2" query << "SELECT * FROM " << tablePrefix << "books"
<< " LIMIT 1"; << " LIMIT 1";
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1508,7 +1476,7 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "INSERT INTO " << tablePrefix << "books2" query << "INSERT INTO " << tablePrefix << "books"
<< " (book, sequence, quality_key) VALUES (?, ?, (?, ?))"; << " (book, sequence, quality_key) VALUES (?, ?, (?, ?))";
if (!insertBook2_.prepareStatement(query, session_.get())) if (!insertBook2_.prepareStatement(query, session_.get()))
continue; continue;
@@ -1516,7 +1484,7 @@ CassandraBackend::open(bool readOnly)
query.str(""); query.str("");
query << "SELECT key FROM " << tablePrefix << "keys" query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?"; << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?";
if (!selectKeys_.prepareStatement(query, session_.get())) if (!selectKeys_.prepareStatement(query, session_.get()))
continue; continue;
@@ -1583,7 +1551,7 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "SELECT quality_key FROM " << tablePrefix << "books2 " query << "SELECT quality_key FROM " << tablePrefix << "books "
<< " WHERE book = ? AND sequence = ?" << " WHERE book = ? AND sequence = ?"
<< " AND quality_key >= (?, ?)" << " AND quality_key >= (?, ?)"
" ORDER BY quality_key ASC"; " ORDER BY quality_key ASC";
@@ -1690,6 +1658,11 @@ CassandraBackend::open(bool readOnly)
{ {
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
} }
if (config_.contains("indexer_max_requests_outstanding"))
{
indexerMaxRequestsOutstanding =
config_["indexer_max_requests_outstanding"].as_int64();
}
/* /*
if (config_.contains("run_indexer")) if (config_.contains("run_indexer"))
{ {

View File

@@ -662,6 +662,9 @@ private:
// maximum number of concurrent in flight requests. New requests will wait // maximum number of concurrent in flight requests. New requests will wait
// for earlier requests to finish if this limit is exceeded // for earlier requests to finish if this limit is exceeded
uint32_t maxRequestsOutstanding = 10000; uint32_t maxRequestsOutstanding = 10000;
// we keep this small because the indexer runs in the background, and we
// don't want the database to be swamped when the indexer is running
uint32_t indexerMaxRequestsOutstanding = 10;
mutable std::atomic_uint32_t numRequestsOutstanding_ = 0; mutable std::atomic_uint32_t numRequestsOutstanding_ = 0;
// mutex and condition_variable to limit the number of concurrent in flight // mutex and condition_variable to limit the number of concurrent in flight
@@ -826,6 +829,14 @@ public:
{ {
// wait for all other writes to finish // wait for all other writes to finish
sync(); sync();
auto rng = fetchLedgerRangeNoThrow();
if (rng && rng->maxSequence >= ledgerSequence_)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Ledger " << std::to_string(ledgerSequence_)
<< " already written. Returning";
return false;
}
// write range // write range
if (isFirstLedger_) if (isFirstLedger_)
{ {
@@ -839,7 +850,16 @@ public:
statement.bindInt(ledgerSequence_); statement.bindInt(ledgerSequence_);
statement.bindBoolean(true); statement.bindBoolean(true);
statement.bindInt(ledgerSequence_ - 1); statement.bindInt(ledgerSequence_ - 1);
return executeSyncUpdate(statement); if (!executeSyncUpdate(statement))
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Update failed for ledger "
<< std::to_string(ledgerSequence_) << ". Returning";
return false;
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " Committed ledger "
<< std::to_string(ledgerSequence_);
return true;
} }
void void
writeLedger( writeLedger(
@@ -973,11 +993,6 @@ public:
return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; return {{result.getBytes(), result.getBytes(), result.getUInt32()}};
} }
LedgerPage LedgerPage
fetchLedgerPage2(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const;
LedgerPage
fetchLedgerPage( fetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
@@ -1005,58 +1020,13 @@ public:
ripple::uint256, ripple::uint256,
std::unordered_set<ripple::uint256>> const& books, std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const override; uint32_t ledgerSequence) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>> BookOffersPage
fetchBookOffers( fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t sequence, uint32_t sequence,
std::uint32_t limit, std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const override; std::optional<ripple::uint256> const& cursor) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers2(
ripple::uint256 const& book,
uint32_t sequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
CassandraStatement statement{getBook_};
statement.bindBytes(book);
statement.bindInt(sequence);
statement.bindInt(sequence);
if (cursor)
statement.bindBytes(*cursor);
else
{
ripple::uint256 zero = {};
statement.bindBytes(zero);
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
if (!result)
return {{}, {}};
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size();
if (keys.size())
{
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i)
{
results.push_back({keys[i], objs[i]});
}
return {results, results[results.size() - 1].key};
}
return {{}, {}};
}
bool bool
canFetchBatch() canFetchBatch()
{ {
@@ -1213,18 +1183,29 @@ public:
{ {
} }
}; };
struct WriteAccountTxCallbackData struct WriteAccountTxCallbackData
{ {
CassandraBackend const* backend; CassandraBackend const* backend;
AccountTransactionsData data; ripple::AccountID account;
uint32_t ledgerSequence;
uint32_t transactionIndex;
ripple::uint256 txHash;
uint32_t currentRetries = 0; uint32_t currentRetries = 0;
std::atomic<int> refs; std::atomic<int> refs = 1;
WriteAccountTxCallbackData( WriteAccountTxCallbackData(
CassandraBackend const* f, CassandraBackend const* f,
AccountTransactionsData&& in) ripple::AccountID&& account,
: backend(f), data(std::move(in)), refs(data.accounts.size()) uint32_t lgrSeq,
uint32_t txIdx,
ripple::uint256&& hash)
: backend(f)
, account(std::move(account))
, ledgerSequence(lgrSeq)
, transactionIndex(txIdx)
, txHash(std::move(hash))
{ {
} }
}; };
@@ -1242,55 +1223,6 @@ public:
} }
} }
/*
void
writeDeletedKey(WriteCallbackData& data, bool isRetry) const
{
CassandraStatement statement{insertKey_};
statement.bindBytes(data.key);
statement.bindInt(data.createdSequence);
statement.bindInt(data.sequence);
executeAsyncWrite(statement, flatMapWriteKeyCallback, data, isRetry);
}
void
writeKey(WriteCallbackData& data, bool isRetry) const
{
if (data.isCreated)
{
CassandraStatement statement{insertKey_};
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
statement.bindInt(INT64_MAX);
executeAsyncWrite(
statement, flatMapWriteKeyCallback, data, isRetry);
}
else if (data.isDeleted)
{
CassandraStatement statement{getCreated_};
executeAsyncWrite(
statement, flatMapGetCreatedCallback, data, isRetry);
}
}
*/
// void
// writeBook(WriteCallbackData& data, bool isRetry) const
// {
// assert(data.isCreated or data.isDeleted);
// assert(data.book);
// CassandraStatement statement{
// (data.isCreated ? insertBook_ : deleteBook_)};
// statement.bindBytes(*data.book);
// statement.bindBytes(data.key);
// statement.bindInt(data.sequence);
// if (data.isCreated)
// statement.bindInt(INT64_MAX);
// executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry);
// }
void void
doWriteLedgerObject( doWriteLedgerObject(
std::string&& key, std::string&& key,
@@ -1320,26 +1252,30 @@ public:
{ {
for (auto& record : data) for (auto& record : data)
{ {
WriteAccountTxCallbackData* cbData = for (auto& account : record.accounts)
new WriteAccountTxCallbackData(this, std::move(record)); {
writeAccountTx(*cbData, false); WriteAccountTxCallbackData* cbData =
new WriteAccountTxCallbackData(
this,
std::move(account),
record.ledgerSequence,
record.transactionIndex,
std::move(record.txHash));
writeAccountTx(*cbData, false);
}
} }
} }
void void
writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const
{ {
for (auto const& account : data.data.accounts) CassandraStatement statement(insertAccountTx_);
{ statement.bindBytes(data.account);
CassandraStatement statement(insertAccountTx_); statement.bindIntTuple(data.ledgerSequence, data.transactionIndex);
statement.bindBytes(account); statement.bindBytes(data.txHash);
statement.bindIntTuple(
data.data.ledgerSequence, data.data.transactionIndex);
statement.bindBytes(data.data.txHash);
executeAsyncWrite( executeAsyncWrite(
statement, flatMapWriteAccountTxCallback, data, isRetry); statement, flatMapWriteAccountTxCallback, data, isRetry);
}
} }
struct WriteTransactionCallbackData struct WriteTransactionCallbackData
@@ -1562,6 +1498,7 @@ public:
bool bool
executeSyncUpdate(CassandraStatement const& statement) const executeSyncUpdate(CassandraStatement const& statement) const
{ {
bool timedOut = false;
CassFuture* fut; CassFuture* fut;
CassError rc; CassError rc;
do do
@@ -1570,8 +1507,9 @@ public:
rc = cass_future_error_code(fut); rc = cass_future_error_code(fut);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
timedOut = true;
std::stringstream ss; std::stringstream ss;
ss << "Cassandra sync write error"; ss << "Cassandra sync update error";
ss << ", retrying"; ss << ", retrying";
ss << ": " << cass_error_desc(rc); ss << ": " << cass_error_desc(rc);
BOOST_LOG_TRIVIAL(warning) << ss.str(); BOOST_LOG_TRIVIAL(warning) << ss.str();
@@ -1599,7 +1537,15 @@ public:
return false; return false;
} }
cass_result_free(res); cass_result_free(res);
return success == cass_true; if (success != cass_true && timedOut)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Update failed, but timedOut is true";
}
// if there was a timeout, the update may have succeeded in the
// background. We can't differentiate between an async success and
// another writer, so we just return true here
return success == cass_true || timedOut;
} }
CassandraResult CassandraResult

View File

@@ -156,6 +156,20 @@ public:
cv_.notify_all(); cv_.notify_all();
return ret; return ret;
} }
/// @return element popped from queue. Will block until queue is non-empty
std::optional<T>
tryPop()
{
std::unique_lock lck(m_);
if (queue_.empty())
return {};
T ret = std::move(queue_.front());
queue_.pop();
// if queue has a max size, unblock any possible pushers
if (maxSize_)
cv_.notify_all();
return ret;
}
}; };
/// Parititions the uint256 keyspace into numMarkers partitions, each of equal /// Parititions the uint256 keyspace into numMarkers partitions, each of equal

View File

@@ -250,6 +250,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
<< ". Postgres insert error: " << res.msg(); << ". Postgres insert error: " << res.msg();
if (res) if (res)
ss << ". Query status not PGRES_COPY_IN: " << res.status(); ss << ". Query status not PGRES_COPY_IN: " << res.status();
BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
@@ -259,6 +260,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
ss << "bulkInsert to " << table ss << "bulkInsert to " << table
<< ". PQputCopyData error: " << PQerrorMessage(conn_.get()); << ". PQputCopyData error: " << PQerrorMessage(conn_.get());
disconnect(); disconnect();
BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
@@ -268,6 +270,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
ss << "bulkInsert to " << table ss << "bulkInsert to " << table
<< ". PQputCopyEnd error: " << PQerrorMessage(conn_.get()); << ". PQputCopyEnd error: " << PQerrorMessage(conn_.get());
disconnect(); disconnect();
BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
@@ -282,7 +285,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
ss << "bulkInsert to " << table ss << "bulkInsert to " << table
<< ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status; << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status;
disconnect(); disconnect();
BOOST_LOG_TRIVIAL(debug) << __func__ << " " << records; BOOST_LOG_TRIVIAL(error) << __func__ << " " << records;
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
} }

View File

@@ -338,7 +338,7 @@ PostgresBackend::fetchLedgerPage(
sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index); sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index);
if (cursor) if (cursor)
sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'"; sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key DESC LIMIT " << std::to_string(limit); sql << " ORDER BY key ASC LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str(); BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched keys"; BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched keys";
@@ -362,12 +362,14 @@ PostgresBackend::fetchLedgerPage(
results.push_back({keys[i], objs[i]}); results.push_back({keys[i], objs[i]});
} }
} }
if (!cursor && !keys[0].isZero())
return {results, returnCursor, "Data may be incomplete"};
return {results, returnCursor}; return {results, returnCursor};
} }
return {}; return {};
} }
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>> BookOffersPage
PostgresBackend::fetchBookOffers( PostgresBackend::fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t ledgerSequence, uint32_t ledgerSequence,
@@ -392,6 +394,9 @@ PostgresBackend::fetchBookOffers(
{ {
keys.push_back(res.asUInt256(i, 0)); keys.push_back(res.asUInt256(i, 0));
} }
std::optional<std::string> warning;
if (keys[0].isZero())
warning = "Data may be incomplete";
std::vector<Blob> blobs = fetchLedgerObjects(keys, ledgerSequence); std::vector<Blob> blobs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results; std::vector<LedgerObject> results;
@@ -409,10 +414,10 @@ PostgresBackend::fetchBookOffers(
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << ripple::strHex(results[0].key) << " : " << __func__ << " : " << ripple::strHex(results[0].key) << " : "
<< ripple::strHex(results[results.size() - 1].key); << ripple::strHex(results[results.size() - 1].key);
return {results, results[results.size() - 1].key}; return {results, results[results.size() - 1].key, warning};
} }
else else
return {results, {}}; return {results, {}, warning};
} }
return {{}, {}}; return {{}, {}};
} }

View File

@@ -50,7 +50,7 @@ public:
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t limit) const override; std::uint32_t limit) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>> BookOffersPage
fetchBookOffers( fetchBookOffers(
ripple::uint256 const& book, ripple::uint256 const& book,
uint32_t ledgerSequence, uint32_t ledgerSequence,

View File

@@ -89,8 +89,8 @@ std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence) ReportingETL::loadInitialLedger(uint32_t startingSequence)
{ {
// check that database is actually empty // check that database is actually empty
auto ledger = flatMapBackend_->fetchLedgerBySequence(startingSequence); auto rng = flatMapBackend_->fetchLedgerRangeNoThrow();
if (ledger) if (rng)
{ {
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
<< "Database is not empty"; << "Database is not empty";
@@ -156,7 +156,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
{ {
try try
{ {
auto range = flatMapBackend_->fetchLedgerRange(); auto range = flatMapBackend_->fetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence) if (!range || range->maxSequence < ledgerSequence)
{ {
@@ -327,7 +327,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
std::optional<uint32_t> std::optional<uint32_t>
ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
{ {
if (startSequence > finishSequence_) if (finishSequence_ && startSequence > *finishSequence_)
return {}; return {};
/* /*
* Behold, mortals! This function spawns three separate threads, which talk * Behold, mortals! This function spawns three separate threads, which talk
@@ -355,15 +355,16 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Starting etl pipeline"; << "Starting etl pipeline";
writing_ = true; writing_ = true;
auto parent = flatMapBackend_->fetchLedgerBySequence(startSequence - 1); auto rng = flatMapBackend_->fetchLedgerRangeNoThrow();
if (!parent) if (!rng || rng->maxSequence != startSequence - 1)
{ {
assert(false); assert(false);
throw std::runtime_error("runETLPipeline: parent ledger is null"); throw std::runtime_error("runETLPipeline: parent ledger is null");
} }
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches"; << "Populating caches";
flatMapBackend_->getIndexer().populateCaches(*flatMapBackend_);
flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_);
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches"; << "Populated caches";
@@ -380,19 +381,19 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
std::cout << std::to_string((sequence - startSequence) % numExtractors); std::cout << std::to_string((sequence - startSequence) % numExtractors);
return queues[(sequence - startSequence) % numExtractors]; return queues[(sequence - startSequence) % numExtractors];
}; };
std::vector<std::thread> threads; std::vector<std::thread> extractors;
for (size_t i = 0; i < numExtractors; ++i) for (size_t i = 0; i < numExtractors; ++i)
{ {
auto transformQueue = std::make_shared<QueueType>(maxQueueSize); auto transformQueue = std::make_shared<QueueType>(maxQueueSize);
queues.push_back(transformQueue); queues.push_back(transformQueue);
std::cout << "added to queues"; std::cout << "added to queues";
threads.emplace_back([this, extractors.emplace_back([this,
&startSequence, &startSequence,
&writeConflict, &writeConflict,
transformQueue, transformQueue,
i, i,
numExtractors]() { numExtractors]() {
beast::setCurrentThreadName("rippled: ReportingETL extract"); beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence + i; uint32_t currentSequence = startSequence + i;
@@ -403,7 +404,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
// ETL mechanism should stop. The other stopping condition is if // ETL mechanism should stop. The other stopping condition is if
// the entire server is shutting down. This can be detected in a // the entire server is shutting down. This can be detected in a
// variety of ways. See the comment at the top of the function // variety of ways. See the comment at the top of the function
while (currentSequence <= finishSequence_ && while ((!finishSequence_ || currentSequence <= *finishSequence_) &&
networkValidatedLedgers_.waitUntilValidatedByNetwork( networkValidatedLedgers_.waitUntilValidatedByNetwork(
currentSequence) && currentSequence) &&
!writeConflict && !isStopping()) !writeConflict && !isStopping())
@@ -441,7 +442,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
transformQueue->push(std::move(fetchResponse)); transformQueue->push(std::move(fetchResponse));
currentSequence += numExtractors; currentSequence += numExtractors;
if (currentSequence > finishSequence_) if (finishSequence_ && currentSequence > *finishSequence_)
break; break;
} }
// empty optional tells the transformer to shut down // empty optional tells the transformer to shut down
@@ -498,7 +499,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
lastPublishedSequence = lgrInfo.seq; lastPublishedSequence = lgrInfo.seq;
} }
writeConflict = !success; writeConflict = !success;
auto range = flatMapBackend_->fetchLedgerRange(); auto range = flatMapBackend_->fetchLedgerRangeNoThrow();
if (onlineDeleteInterval_ && !deleting_ && if (onlineDeleteInterval_ && !deleting_ &&
range->maxSequence - range->minSequence > range->maxSequence - range->minSequence >
*onlineDeleteInterval_) *onlineDeleteInterval_)
@@ -515,10 +516,15 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
} }
}}; }};
// wait for all of the threads to stop
for (auto& t : threads)
t.join();
transformer.join(); transformer.join();
for (size_t i = 0; i < numExtractors; ++i)
{
// pop from each queue that might be blocked on a push
getNext(i)->tryPop();
}
// wait for all of the extractors to stop
for (auto& t : extractors)
t.join();
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence << "Extracted and wrote " << *lastPublishedSequence - startSequence
@@ -593,8 +599,8 @@ ReportingETL::monitor()
{ {
if (startSequence_) if (startSequence_)
{ {
throw std::runtime_error( BOOST_LOG_TRIVIAL(warning)
"start sequence specified but db is already populated"); << "start sequence specified but db is already populated";
} }
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " : " << __func__ << " : "

View File

@@ -510,7 +510,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1):
print(json.dumps(x)) print(json.dumps(x))
blobs.append(x) blobs.append(x)
keys.append(x["index"]) keys.append(x["index"])
if limit != -1 and len(keys) > count: if count != -1 and len(keys) > count:
print("stopping early") print("stopping early")
print(len(keys)) print(len(keys))
print("done") print("done")
@@ -598,7 +598,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency,
req["cursor"] = cursor req["cursor"] = cursor
await ws.send(json.dumps(req)) await ws.send(json.dumps(req))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True)) #print(json.dumps(res,indent=4,sort_keys=True))
if "result" in res: if "result" in res:
res = res["result"] res = res["result"]
for x in res["offers"]: for x in res["offers"]: