various fixes. make shift info optional in config. Properly check book flag ledger is complete

This commit is contained in:
CJ Cobb
2021-05-07 20:49:42 +00:00
parent 144ad5eb19
commit f32af6bb2d
6 changed files with 99 additions and 48 deletions

View File

@@ -2,9 +2,11 @@
namespace Backend {
BackendIndexer::BackendIndexer(boost::json::object const& config)
: keyShift_(config.at("indexer_key_shift").as_int64())
, bookShift_(config.at("indexer_book_shift").as_int64())
{
if (config.contains("indexer_key_shift"))
keyShift_ = config.at("indexer_key_shift").as_int64();
if (config.contains("indexer_book_shift"))
bookShift_ = config.at("indexer_book_shift").as_int64();
work_.emplace(ioc_);
ioThread_ = std::thread{[this]() { ioc_.run(); }};
};
@@ -103,8 +105,8 @@ writeKeyFlagLedger(
}
auto start = std::chrono::system_clock::now();
backend.writeKeys(keys, nextFlag);
backend.writeKeys({zero}, nextFlag);
backend.writeKeys(keys, nextFlag, true);
backend.writeKeys({zero}, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__
@@ -154,8 +156,8 @@ writeBookFlagLedger(
}
}
auto start = std::chrono::system_clock::now();
backend.writeBooks(books, nextFlag);
backend.writeBooks({{zero, {zero}}}, nextFlag);
backend.writeBooks(books, nextFlag, true);
backend.writeBooks({{zero, {zero}}}, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)

View File

@@ -381,13 +381,15 @@ public:
virtual bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const = 0;
uint32_t ledgerSequence,
bool isAsync = false) const = 0;
virtual bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const = 0;
uint32_t ledgerSequence,
bool isAsync = false) const = 0;
virtual ~BackendInterface()
{

View File

@@ -418,7 +418,7 @@ CassandraBackend::fetchLedgerPage(
ripple::uint256 zero;
statement.bindBytes(zero);
}
statement.bindUInt(limit);
statement.bindUInt(limit + 1);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
@@ -430,11 +430,14 @@ CassandraBackend::fetchLedgerPage(
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
if (keys.size() && keys.size() == limit)
{
page.cursor = keys.back();
keys.pop_back();
}
auto objects = fetchLedgerObjects(keys, ledgerSequence);
if (objects.size() != keys.size())
throw std::runtime_error("Mismatch in size of objects and keys");
if (keys.size() == limit)
page.cursor = keys[keys.size() - 1];
if (cursor)
BOOST_LOG_TRIVIAL(trace)
@@ -449,11 +452,13 @@ CassandraBackend::fetchLedgerPage(
page.objects.push_back({std::move(key), std::move(obj)});
}
}
if (!keys.size() || (!cursor && !keys[0].isZero()))
if (!cursor && (!keys.size() || !keys[0].isZero()))
page.warning = "Data may be incomplete";
return page;
}
return {{}, {}, "Data may be incomplete"};
if (!cursor)
return {{}, {}, "Data may be incomplete"};
return {};
}
std::vector<Blob>
CassandraBackend::fetchLedgerObjects(
@@ -496,19 +501,36 @@ CassandraBackend::fetchBookOffers(
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
CassandraStatement statement{selectBook_};
statement.bindBytes(book);
auto index = getBookIndexOfSeq(sequence);
if (!index)
return {};
BOOST_LOG_TRIVIAL(info) << __func__ << " index = " << std::to_string(*index)
<< " book = " << ripple::strHex(book);
BookOffersPage page;
ripple::uint256 zero = {};
{
CassandraStatement statement{selectBook_};
statement.bindBytes(zero);
statement.bindInt(*index);
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
if (!result)
page.warning = "Data may be incomplete";
else
{
auto key = result.getUInt256();
if (!key.isZero())
page.warning = "Data may be incomplete";
}
}
CassandraStatement statement{selectBook_};
statement.bindBytes(book);
statement.bindInt(*index);
if (cursor)
statement.bindBytes(*cursor);
else
{
ripple::uint256 zero = {};
statement.bindBytes(zero);
}
statement.bindUInt(limit);
@@ -517,11 +539,16 @@ CassandraBackend::fetchBookOffers(
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
if (!result)
return {{}, {}, "Data may be incomplete"};
return page;
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
if (keys.size() && keys.size() == limit)
{
page.cursor = keys.back();
keys.pop_back();
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size();
@@ -532,20 +559,10 @@ CassandraBackend::fetchBookOffers(
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size() != 0)
results.push_back({keys[i], objs[i]});
page.offers.push_back({keys[i], objs[i]});
}
std::optional<std::string> warning;
if (!cursor && !keys[0].isZero())
warning = "Data may be incomplete";
if (keys.size() == limit)
return {results, keys[keys.size() - 1], warning};
else
return {results, {}, warning};
}
else if (!cursor)
return {{}, {}, "Data may be incomplete"};
return {};
return page;
}
struct WriteBookCallbackData
{
@@ -704,7 +721,8 @@ writeKeyCallback(CassFuture* fut, void* cbData)
bool
CassandraBackend::writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const
uint32_t ledgerSequence,
bool isAsync) const
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
@@ -716,7 +734,8 @@ CassandraBackend::writeKeys(
std::mutex mtx;
std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs;
cbs.reserve(keys.size());
uint32_t concurrentLimit = indexerMaxRequestsOutstanding;
uint32_t concurrentLimit =
isAsync ? indexerMaxRequestsOutstanding : keys.size();
uint32_t numSubmitted = 0;
for (auto& key : keys)
{
@@ -755,7 +774,8 @@ CassandraBackend::writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const
uint32_t ledgerSequence,
bool isAsync) const
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
@@ -763,7 +783,8 @@ CassandraBackend::writeBooks(
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<WriteBookCallbackData>> cbs;
uint32_t concurrentLimit = indexerMaxRequestsOutstanding;
uint32_t concurrentLimit =
isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding;
std::atomic_uint32_t numOutstanding = 0;
size_t count = 0;
auto start = std::chrono::system_clock::now();
@@ -1507,7 +1528,7 @@ CassandraBackend::open(bool readOnly)
query = {};
query << "SELECT key FROM " << tablePrefix << "books2 "
<< " WHERE book = ? AND sequence = ? AND "
" key > ? "
" key >= ? "
" ORDER BY key ASC LIMIT ?";
if (!selectBook_.prepareStatement(query, session_.get()))
continue;

View File

@@ -985,13 +985,15 @@ public:
bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const;
uint32_t ledgerSequence,
bool isAsync = false) const;
bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const override;
uint32_t ledgerSequence,
bool isAsync = false) const override;
BookOffersPage
fetchBookOffers(
ripple::uint256 const& book,

View File

@@ -337,7 +337,7 @@ PostgresBackend::fetchLedgerPage(
std::stringstream sql;
sql << "SELECT key FROM keys WHERE ledger_seq = " << std::to_string(*index);
if (cursor)
sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " AND key > \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key ASC LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str();
auto res = pgQuery(sql.str().data());
@@ -378,14 +378,37 @@ PostgresBackend::fetchBookOffers(
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
auto index = getBookIndexOfSeq(ledgerSequence);
if (!index)
return {};
PgQuery pgQuery(pgPool_);
ripple::uint256 zero = {};
std::optional<std::string> warning;
{
std::stringstream sql;
sql << "SELECT offer_key FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(zero)
<< "\' AND ledger_seq = " << std::to_string(*index);
auto res = pgQuery(sql.str().data());
sql << " ORDER BY offer_key ASC"
<< " LIMIT " << std::to_string(limit);
if (size_t numRows = checkResult(res, 1))
{
auto key = res.asUInt256(0, 0);
if (!key.isZero())
warning = "Data may be incomplete";
}
else
warning = "Data may be incomplete";
}
std::stringstream sql;
sql << "SELECT offer_key FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(book)
<< "\' AND ledger_seq = " << std::to_string(ledgerSequence);
<< "\' AND ledger_seq = " << std::to_string(*index);
if (cursor)
sql << " AND offer_key < \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY offer_key DESC, ledger_seq DESC"
sql << " AND offer_key > \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY offer_key ASC"
<< " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << sql.str();
auto res = pgQuery(sql.str().data());
@@ -396,9 +419,6 @@ PostgresBackend::fetchBookOffers(
{
keys.push_back(res.asUInt256(i, 0));
}
std::optional<std::string> warning;
if (keys[0].isZero())
warning = "Data may be incomplete";
std::vector<Blob> blobs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results;
@@ -421,7 +441,7 @@ PostgresBackend::fetchBookOffers(
else
return {results, {}, warning};
}
return {{}, {}};
return {{}, {}, warning};
}
std::vector<TransactionAndMetadata>
@@ -697,7 +717,8 @@ PostgresBackend::doFinishWrites() const
bool
PostgresBackend::writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const
uint32_t ledgerSequence,
bool isAsync) const
{
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_);
@@ -731,7 +752,8 @@ PostgresBackend::writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const
uint32_t ledgerSequence,
bool isAsync) const
{
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_);

View File

@@ -117,13 +117,15 @@ public:
bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const override;
uint32_t ledgerSequence,
bool isAsync = false) const override;
bool
writeBooks(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const override;
uint32_t ledgerSequence,
bool isAsync = false) const override;
};
} // namespace Backend
#endif