Merge pull request #3 from natenichols/book_offers_v2

Book Offers RPC handler
This commit is contained in:
CJ Cobb
2021-05-14 15:41:37 -04:00
committed by GitHub
10 changed files with 429 additions and 388 deletions

View File

@@ -11,7 +11,7 @@ set(CMAKE_VERBOSE_MAKEFILE TRUE)
project(reporting)
cmake_minimum_required(VERSION 3.16)
set (CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread -Wno-narrowing")
set(Boost_USE_STATIC_LIBS ON)
set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME ON)
@@ -36,16 +36,16 @@ add_dependencies(reporting xrpl_core)
add_dependencies(reporting grpc_pbufs)
get_target_property(grpc_includes grpc_pbufs INCLUDE_DIRECTORIES)
#get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES)
get_target_property(proto_includes protobuf_src INCLUDE_DIRECTORIES)
# get_target_property(proto_includes protobuf_src INCLUDE_DIRECTORIES)
message("hi")
message("${grpc_includes}")
message("${proto_includes}")
ExternalProject_Get_Property(protobuf_src SOURCE_DIR)
# ExternalProject_Get_Property(protobuf_src SOURCE_DIR)
message("${SOURCE_DIR}")
INCLUDE_DIRECTORIES(${grpc_includes})
#INCLUDE_DIRECTORIES(${xrpl_core_includes})
INCLUDE_DIRECTORIES(${SOURCE_DIR}/src)
ExternalProject_Get_Property(grpc_src SOURCE_DIR)
# ExternalProject_Get_Property(grpc_src SOURCE_DIR)
INCLUDE_DIRECTORIES(${SOURCE_DIR}/include)
get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES)
message("${xrpl_core_includes}")

View File

@@ -43,56 +43,11 @@ ledgerSequenceFromRequest(
return std::optional<std::uint32_t>{index.asInt()};
}
std::vector<ripple::uint256>
loadBookOfferIndexes(
ripple::Book const& book,
std::uint32_t seq,
std::uint32_t limit,
std::shared_ptr<PgPool> const& pool)
{
std::vector<ripple::uint256> hashes = {};
ripple::uint256 bookBase = getBookBase(book);
ripple::uint256 bookEnd = getQualityNext(bookBase);
pg_params dbParams;
char const*& command = dbParams.first;
std::vector<std::optional<std::string>>& values = dbParams.second;
command =
"SELECT offer_indexes FROM books "
"WHERE book_directory >= $1::bytea "
"AND book_directory < $2::bytea "
"AND ledger_index <= $3::bigint "
"LIMIT $4::bigint";
values.resize(4);
values[0] = "\\x" + ripple::strHex(bookBase);
values[1] = "\\x" + ripple::strHex(bookEnd);
values[2] = std::to_string(seq);
values[3] = std::to_string(limit);
auto indexes = PgQuery(pool)(dbParams);
if (!indexes || indexes.isNull())
return {};
for (auto i = 0; i < indexes.ntuples(); ++i)
{
auto unHexed = ripple::strUnHex(indexes.c_str(i) + 2);
if (unHexed)
hashes.push_back(ripple::uint256::fromVoid(unHexed->data()));
}
return hashes;
}
boost::json::object
doBookOffers(
boost::json::object const& request,
BackendInterface const& backend)
{
std::cout << "enter" << std::endl;
boost::json::object response;
auto ledgerSequence = ledgerSequenceFromRequest(request, backend);
@@ -282,24 +237,6 @@ doBookOffers(
return response;
}
boost::optional<ripple::AccountID> takerID;
if (request.contains("taker"))
{
if (!request.at("taker").is_string())
{
response["error"] = "taker should be string";
return response;
}
takerID = ripple::parseBase58<ripple::AccountID>(
request.at("taker").as_string().c_str());
if (!takerID)
{
response["error"] = "Invalid taker";
return response;
}
}
if (pay_currency == get_currency && pay_issuer == get_issuer)
{
response["error"] = "Bad market";
@@ -316,6 +253,24 @@ doBookOffers(
request.at("limit").kind() == boost::json::kind::int64)
limit = request.at("limit").as_int64();
std::optional<ripple::AccountID> takerID = {};
if (request.contains("taker"))
{
if (!request.at("taker").is_string())
{
response["error"] = "Taker account must be string";
return response;
}
takerID =
accountFromStringStrict(request.at("taker").as_string().c_str());
if (!takerID)
{
response["error"] = "Invalid taker account";
return response;
}
}
std::optional<ripple::uint256> cursor;
if (request.contains("cursor"))
{
@@ -328,32 +283,33 @@ doBookOffers(
backend.fetchBookOffers(bookBase, *ledgerSequence, limit, cursor);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(warning) << "Time loading books from Postgres: "
BOOST_LOG_TRIVIAL(warning) << "Time loading books: "
<< ((end - start).count() / 1000000000.0);
if(warning)
response["warning"] = *warning;
response["offers"] = boost::json::value(boost::json::array_kind);
boost::json::array& jsonOffers = response.at("offers").as_array();
start = std::chrono::system_clock::now();
std::transform(
std::move_iterator(offers.begin()),
std::move_iterator(offers.end()),
std::back_inserter(jsonOffers),
[](auto obj) {
try
{
ripple::SerialIter it{obj.blob.data(), obj.blob.size()};
ripple::SLE offer{it, obj.key};
return getJson(offer);
}
catch (std::exception const& e)
{
boost::json::object empty;
empty["missing_key"] = ripple::strHex(obj.key);
empty["data"] = ripple::strHex(obj.blob);
return empty;
}
});
for (auto const& obj : offers)
{
if (jsonOffers.size() == limit)
break;
try
{
ripple::SerialIter it{obj.blob.data(), obj.blob.size()};
ripple::SLE offer{it, obj.key};
ripple::uint256 bookDir = offer.getFieldH256(ripple::sfBookDirectory);
boost::json::object offerJson = getJson(offer);
offerJson["quality"] = ripple::amountFromQuality(getQuality(bookDir)).getText();
jsonOffers.push_back(offerJson);
}
catch (std::exception const& e) {}
}
end = std::chrono::system_clock::now();

View File

@@ -124,7 +124,6 @@ writeBookFlagLedger(
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>> const& books)
{
uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift));
ripple::uint256 zero = {};
@@ -133,33 +132,12 @@ writeBookFlagLedger(
<< " starting. ledgerSequence = " << std::to_string(ledgerSequence)
<< " nextFlag = " << std::to_string(nextFlag)
<< " books.size() = " << std::to_string(books.size());
while (true)
{
try
{
auto [objects, curCursor, warning] =
backend.fetchBookOffers(zero, nextFlag, 1);
if (!warning)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " flag ledger already written. sequence = "
<< std::to_string(ledgerSequence)
<< " next flag = " << std::to_string(nextFlag)
<< "returning";
return;
}
break;
}
catch (DatabaseTimeout& t)
{
;
}
}
auto start = std::chrono::system_clock::now();
backend.writeBooks(books, nextFlag, true);
backend.writeBooks({{zero, {zero}}}, nextFlag, true);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. ledgerSequence = " << std::to_string(ledgerSequence)
@@ -181,15 +159,20 @@ BackendIndexer::doBooksRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng)
return;
if (!sequence)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng)
return;
sequence = rng->maxSequence;
}
if(sequence < rng->minSequence)
sequence = rng->minSequence;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sequence = " << std::to_string(*sequence);
ripple::uint256 zero = {};
while (true)
{
@@ -254,15 +237,20 @@ BackendIndexer::doKeysRepair(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng)
return;
if (!sequence)
{
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng)
return;
sequence = rng->maxSequence;
}
if(sequence < rng->minSequence)
sequence = rng->minSequence;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sequence = " << std::to_string(*sequence);
std::optional<ripple::uint256> cursor;
while (true)
{
@@ -448,7 +436,7 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
<< " starting. sequence = " << std::to_string(ledgerSequence);
bool isFirst = false;
uint32_t keyIndex = getKeyIndexOfSeq(ledgerSequence);
uint32_t bookIndex = getKeyIndexOfSeq(ledgerSequence);
uint32_t bookIndex = getBookIndexOfSeq(ledgerSequence);
auto rng = backend.fetchLedgerRangeNoThrow();
if (!rng || rng->minSequence == ledgerSequence)
{
@@ -464,6 +452,7 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
backend.writeKeys({zero}, ledgerSequence);
writeBookFlagLedgerAsync(ledgerSequence, backend);
writeKeyFlagLedgerAsync(ledgerSequence, backend);
}
keys = {};
books = {};
@@ -471,5 +460,5 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
<< __func__
<< " finished. sequence = " << std::to_string(ledgerSequence);
} // namespace Backend
}
} // namespace Backend

View File

@@ -497,72 +497,125 @@ CassandraBackend::fetchLedgerObjects(
BookOffersPage
CassandraBackend::fetchBookOffers(
ripple::uint256 const& book,
uint32_t sequence,
uint32_t ledgerSequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
auto index = getBookIndexOfSeq(sequence);
if (!index)
return {};
BOOST_LOG_TRIVIAL(info) << __func__ << " index = " << std::to_string(*index)
<< " book = " << ripple::strHex(book);
BookOffersPage page;
ripple::uint256 zero = {};
{
CassandraStatement statement{selectBook_};
statement.bindBytes(zero);
statement.bindInt(*index);
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
if (!result)
page.warning = "Data may be incomplete";
else
{
auto key = result.getUInt256();
if (!key.isZero())
page.warning = "Data may be incomplete";
}
}
CassandraStatement statement{selectBook_};
statement.bindBytes(book);
statement.bindInt(*index);
if (cursor)
statement.bindBytes(*cursor);
else
{
statement.bindBytes(zero);
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
auto rng = fetchLedgerRange();
auto limitTuningFactor = 50;
if(!rng)
return {{},{}};
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
if (!result)
return page;
do
auto readBooks =
[this, &book, &limit, &limitTuningFactor]
(std::uint32_t sequence)
-> std::pair<bool, std::vector<std::pair<std::uint64_t, ripple::uint256>>>
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
if (keys.size() && keys.size() == limit)
{
page.cursor = keys.back();
keys.pop_back();
}
CassandraStatement completeQuery{completeBook_};
completeQuery.bindInt(sequence);
CassandraResult completeResult = executeSyncRead(completeQuery);
bool complete = completeResult.hasResult();
CassandraStatement statement{selectBook_};
std::vector<std::pair<std::uint64_t, ripple::uint256>> keys = {};
statement.bindBytes(book.data(), 24);
statement.bindInt(sequence);
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(sequence)
<< " book = " << ripple::strHex(std::string((char*)book.data(), 24));
ripple::uint256 zero = beast::zero;
statement.bindBytes(zero.data(), 8);
statement.bindBytes(zero);
statement.bindUInt(limit * limitTuningFactor);
auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info) << "Book directory fetch took "
<< std::to_string(duration) << " seconds.";
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
if (!result)
{
return {false, {{}, {}}};
}
do
{
auto [quality, index] = result.getBytesTuple();
std::uint64_t q = 0;
memcpy(&q, quality.data(), 8);
keys.push_back({q, ripple::uint256::fromVoid(index.data())});
} while (result.nextRow());
return {complete, keys};
};
auto upper = indexer_.getBookIndexOfSeq(ledgerSequence);
auto [complete, quality_keys] = readBooks(upper);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size();
if (keys.size())
<< __func__ << " - populated keys. num keys = " << quality_keys.size();
std::optional<std::string> warning = {};
if (!complete)
{
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size() != 0)
page.offers.push_back({keys[i], objs[i]});
}
warning = "Data may be incomplete";
BOOST_LOG_TRIVIAL(info) << "May be incomplete. Fetching other page";
auto bookShift = indexer_.getBookShift();
std::uint32_t lower = upper - (1 << bookShift);
auto originalKeys = std::move(quality_keys);
auto [lowerComplete, otherKeys] = readBooks(lower);
assert(lowerComplete);
std::vector<std::pair<std::uint64_t, ripple::uint256>> merged_keys;
merged_keys.reserve(originalKeys.size() + otherKeys.size());
std::merge(originalKeys.begin(), originalKeys.end(),
otherKeys.begin(), otherKeys.end(),
std::back_inserter(merged_keys),
[](auto pair1, auto pair2)
{
return pair1.first < pair2.first;
});
}
return page;
std::vector<ripple::uint256> merged(quality_keys.size());
std::transform(quality_keys.begin(), quality_keys.end(),
std::back_inserter(merged),
[](auto pair) { return pair.second; });
auto uniqEnd = std::unique(merged.begin(), merged.end());
std::vector<ripple::uint256> keys{merged.begin(), uniqEnd};
std::cout << keys.size() << std::endl;
auto start = std::chrono::system_clock::now();
std::vector<Blob> objs = fetchLedgerObjects(keys, ledgerSequence);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info) << "Book object fetch took "
<< std::to_string(duration) << " seconds.";
std::vector<LedgerObject> results;
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size() != 0)
results.push_back({keys[i], objs[i]});
}
return {results, {}, warning};
}
struct WriteBookCallbackData
{
@@ -599,8 +652,9 @@ void
writeBook(WriteBookCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
statement.bindBytes(cb.book);
statement.bindBytes(cb.book.data(), 24);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.book.data()+24, 8);
statement.bindBytes(cb.offerKey);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true);
@@ -1055,7 +1109,6 @@ CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
void
CassandraBackend::open(bool readOnly)
{
std::cout << config_ << std::endl;
auto getString = [this](std::string const& field) -> std::string {
if (config_.contains(field))
{
@@ -1275,24 +1328,24 @@ CassandraBackend::open(bool readOnly)
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "objects"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE INDEX ON " << tablePrefix << "objects(sequence)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
"transaction "
@@ -1300,61 +1353,50 @@ CassandraBackend::open(bool readOnly)
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "transactions"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE INDEX ON " << tablePrefix
<< "transactions(ledger_sequence)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix
<< "transactions WHERE ledger_sequence = 1"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys"
<< " ( sequence bigint, key blob, PRIMARY KEY "
"(sequence, key))";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "keys"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
<< " ( book blob, sequence bigint, key blob, deleted_at "
"bigint, PRIMARY KEY "
"(book, key)) WITH CLUSTERING ORDER BY (key ASC)";
<< " ( book blob, sequence bigint, quality_key tuple<blob, blob>, PRIMARY KEY "
"((book, sequence), quality_key)) WITH CLUSTERING ORDER BY (quality_key "
"ASC)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "books"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2"
<< " ( book blob, sequence bigint, key blob, PRIMARY KEY "
"((book, sequence), key)) WITH CLUSTERING ORDER BY (key "
"ASC)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "SELECT * FROM " << tablePrefix << "books2"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
" hash blob, "
@@ -1364,43 +1406,43 @@ CassandraBackend::open(bool readOnly)
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "account_tx"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers"
<< " ( sequence bigint PRIMARY KEY, header blob )";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledgers"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes"
<< " (hash blob PRIMARY KEY, sequence bigint)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledger_hashes"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_range"
<< " (is_latest boolean PRIMARY KEY, sequence bigint)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledger_range"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
@@ -1420,7 +1462,7 @@ CassandraBackend::open(bool readOnly)
if (!insertObject_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "INSERT INTO " << tablePrefix << "transactions"
<< " (hash, ledger_sequence, transaction, metadata) VALUES "
"(?, ?, "
@@ -1428,35 +1470,26 @@ CassandraBackend::open(bool readOnly)
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "INSERT INTO " << tablePrefix << "keys"
<< " (sequence, key) VALUES (?, ?)";
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)";
if (!insertBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tablePrefix << "books2"
<< " (book, sequence, key) VALUES (?, ?, ?)";
<< " (book, sequence, quality_key) VALUES (?, ?, (?, ?))";
if (!insertBook2_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, deleted_at) VALUES (?, ?, ?)";
if (!deleteBook_.prepareStatement(query, session_.get()))
continue;
query.str("");
query = {};
query.str("");
query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?";
if (!selectKeys_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT object, sequence FROM " << tablePrefix << "objects"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
"LIMIT 1";
@@ -1464,28 +1497,28 @@ CassandraBackend::open(bool readOnly)
if (!selectObject_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT transaction, metadata, ledger_sequence FROM "
<< tablePrefix << "transactions"
<< " WHERE hash = ?";
if (!selectTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT transaction, metadata, ledger_sequence FROM "
<< tablePrefix << "transactions"
<< " WHERE ledger_sequence = ?";
if (!selectAllTransactionsInLedger_.prepareStatement(
query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT hash FROM " << tablePrefix << "transactions"
<< " WHERE ledger_sequence = ?";
if (!selectAllTransactionHashesInLedger_.prepareStatement(
query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ?"
@@ -1493,7 +1526,7 @@ CassandraBackend::open(bool readOnly)
if (!selectLedgerPageKeys_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT object,key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING";
@@ -1502,7 +1535,7 @@ CassandraBackend::open(bool readOnly)
continue;
/*
query = {};
query.str("");
query << "SELECT filterempty(key,object) FROM " << tablePrefix <<
"objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
@@ -1511,80 +1544,83 @@ CassandraBackend::open(bool readOnly)
if (!upperBound2_.prepareStatement(query, session_.get()))
continue;
*/
query = {};
query.str("");
query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects "
<< " WHERE key = ? LIMIT 1";
if (!getToken_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tablePrefix << "books "
<< " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND"
" key > ? "
" ORDER BY key ASC LIMIT ? ALLOW FILTERING";
if (!getBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tablePrefix << "books2 "
<< " WHERE book = ? AND sequence = ? AND "
" key >= ? "
" ORDER BY key ASC LIMIT ?";
query.str("");
query << "SELECT quality_key FROM " << tablePrefix << "books "
<< " WHERE book = ? AND sequence = ?"
<< " AND quality_key >= (?, ?)"
" ORDER BY quality_key ASC "
" LIMIT ?";
if (!selectBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "books "
<< "WHERE book = "
<< "0x000000000000000000000000000000000000000000000000"
<< " AND sequence = ?";
if (!completeBook_.prepareStatement(query, session_.get()))
continue;
query.str("");
query << " INSERT INTO " << tablePrefix << "account_tx"
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " INSERT INTO " << tablePrefix << "ledgers "
<< " (sequence, header) VALUES(?,?)";
if (!insertLedgerHeader_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " INSERT INTO " << tablePrefix << "ledger_hashes"
<< " (hash, sequence) VALUES(?,?)";
if (!insertLedgerHash_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence in "
"(?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " select header from " << tablePrefix
<< "ledgers where sequence = ?";
if (!selectLedgerBySeq_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " select sequence from " << tablePrefix
<< "ledger_range where is_latest = true";
if (!selectLatestLedger_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " SELECT sequence FROM " << tablePrefix
<< "ledger_range WHERE "
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " SELECT key,object FROM " << tablePrefix
<< "objects WHERE sequence = ?";
if (!selectLedgerDiff_.prepareStatement(query, session_.get()))
@@ -1603,30 +1639,27 @@ CassandraBackend::open(bool readOnly)
query << "TRUNCATE TABLE " << tablePrefix << "ledger_range";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "ledgers";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "ledger_hashes";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "objects";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "transactions";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "account_tx";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "TRUNCATE TABLE " << tablePrefix << "books";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
}
break;
}

View File

@@ -483,6 +483,34 @@ public:
return {first, second};
}
std::pair<Blob, Blob>
getBytesTuple()
{
cass_byte_t const* buf;
std::size_t bufSize;
if (!row_)
throw std::runtime_error(
"CassandraResult::getBytesTuple - no result");
CassValue const* tuple = cass_row_get_column(row_, curGetIndex_);
CassIterator* tupleIter = cass_iterator_from_tuple(tuple);
if (!cass_iterator_next(tupleIter))
throw std::runtime_error(
"CassandraResult::getBytesTuple - failed to iterate tuple");
CassValue const* value = cass_iterator_get_value(tupleIter);
cass_value_get_bytes(value, &buf, &bufSize);
Blob first{buf, buf + bufSize};
if (!cass_iterator_next(tupleIter))
throw std::runtime_error(
"CassandraResult::getBytesTuple - failed to iterate tuple");
value = cass_iterator_get_value(tupleIter);
cass_value_get_bytes(value, &buf, &bufSize);
Blob second{buf, buf + bufSize};
++curGetIndex_;
return {first, second};
}
~CassandraResult()
{
if (result_ != nullptr)
@@ -612,6 +640,7 @@ private:
CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement getBook_;
CassandraPreparedStatement selectBook_;
CassandraPreparedStatement completeBook_;
CassandraPreparedStatement insertBook_;
CassandraPreparedStatement insertBook2_;
CassandraPreparedStatement deleteBook_;
@@ -1001,51 +1030,6 @@ public:
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const override;
std::pair<std::vector<LedgerObject>, std::optional<ripple::uint256>>
fetchBookOffers2(
ripple::uint256 const& book,
uint32_t sequence,
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
CassandraStatement statement{getBook_};
statement.bindBytes(book);
statement.bindInt(sequence);
statement.bindInt(sequence);
if (cursor)
statement.bindBytes(*cursor);
else
{
ripple::uint256 zero = {};
statement.bindBytes(zero);
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
if (!result)
return {{}, {}};
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size();
if (keys.size())
{
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i)
{
results.push_back({keys[i], objs[i]});
}
return {results, results[results.size() - 1].key};
}
return {{}, {}};
}
bool
canFetchBatch()
{

View File

@@ -73,10 +73,6 @@ getBook(T const& offer)
ripple::SerialIter it{offer.data(), offer.size()};
ripple::SLE sle{it, {}};
ripple::uint256 book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book.data()[book.size() - 1 - i] = 0x00;
}
return book;
}

View File

@@ -444,7 +444,7 @@ public:
BOOST_LOG_TRIVIAL(trace) << "Writing objects";
for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects()))
{
std::optional<ripple::uint256> book;
std::optional<ripple::uint256> book = {};
short offer_bytes = (obj.data()[1] << 8) | obj.data()[2];
if (offer_bytes == 0x006f)
@@ -452,10 +452,6 @@ public:
ripple::SerialIter it{obj.data().data(), obj.data().size()};
ripple::SLE sle{it, {}};
book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book->data()[book->size() - 1 - i] = 0x00;
}
}
backend.writeLedgerObject(
std::move(*obj.mutable_key()),

View File

@@ -47,11 +47,12 @@
#include <thread>
#include <utility>
#include <vector>
#include <signal.h>
static void
noticeReceiver(void* arg, PGresult const* res)
{
BOOST_LOG_TRIVIAL(info) << "server message: " << PQresultErrorMessage(res);
BOOST_LOG_TRIVIAL(debug) << "server message: " << PQresultErrorMessage(res);
}
//-----------------------------------------------------------------------------

View File

@@ -80,7 +80,7 @@ PostgresBackend::doWriteLedgerObject(
<< numRowsInObjectsBuffer_;
writeConnection_.bulkInsert("objects", objectsBuffer_.str());
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
objectsBuffer_ = {};
objectsBuffer_.str("");
}
}
@@ -378,70 +378,159 @@ PostgresBackend::fetchBookOffers(
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
auto index = getBookIndexOfSeq(ledgerSequence);
if (!index)
return {};
PgQuery pgQuery(pgPool_);
ripple::uint256 zero = {};
std::optional<std::string> warning;
auto rng = fetchLedgerRange();
auto limitTuningFactor = 50;
if(!rng)
return {{},{}};
ripple::uint256 bookBase =
ripple::keylet::quality({ripple::ltDIR_NODE, book}, 0).key;
ripple::uint256 bookEnd = ripple::getQualityNext(bookBase);
using bookKeyPair = std::pair<ripple::uint256, ripple::uint256>;
auto getBooks =
[this, &bookBase, &bookEnd, &limit, &limitTuningFactor]
(std::uint32_t sequence)
-> std::pair<bool, std::vector<bookKeyPair>>
{
BOOST_LOG_TRIVIAL(info) << __func__ << ": Fetching books between "
<< "0x" << ripple::strHex(bookBase) << " and "
<< "0x" << ripple::strHex(bookEnd) << "at ledger "
<< std::to_string(sequence);
auto start = std::chrono::system_clock::now();
std::stringstream sql;
sql << "SELECT offer_key FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(zero)
<< "\' AND ledger_seq = " << std::to_string(*index);
sql << "SELECT COUNT(*) FROM books WHERE "
<< "book = \'\\x" << ripple::strHex(ripple::uint256(beast::zero))
<< "\' AND ledger_seq = " << std::to_string(sequence);
bool complete;
PgQuery pgQuery(this->pgPool_);
auto res = pgQuery(sql.str().data());
sql << " ORDER BY offer_key ASC"
<< " LIMIT " << std::to_string(limit);
if (size_t numRows = checkResult(res, 1))
{
auto key = res.asUInt256(0, 0);
if (!key.isZero())
warning = "Data may be incomplete";
}
else
warning = "Data may be incomplete";
}
complete = res.asInt(0, 0) != 0;
else
return {false, {}};
std::stringstream sql;
sql << "SELECT offer_key FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(book)
<< "\' AND ledger_seq = " << std::to_string(*index);
if (cursor)
sql << " AND offer_key > \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY offer_key ASC"
<< " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << sql.str();
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1))
sql.str("");
sql << "SELECT book, offer_key FROM books "
<< "WHERE ledger_seq = " << std::to_string(sequence)
<< " AND book >= "
<< "\'\\x" << ripple::strHex(bookBase) << "\' "
<< "AND book < "
<< "\'\\x" << ripple::strHex(bookEnd) << "\' "
<< "ORDER BY book ASC "
<< "LIMIT " << std::to_string(limit * limitTuningFactor);
BOOST_LOG_TRIVIAL(debug) << sql.str();
res = pgQuery(sql.str().data());
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info) << "Postgres book key fetch took "
<< std::to_string(duration)
<< " seconds";
if (size_t numRows = checkResult(res, 2))
{
std::vector<bookKeyPair> results(numRows);
for (size_t i = 0; i < numRows; ++i)
{
auto book = res.asUInt256(i, 0);
auto key = res.asUInt256(i, 1);
results.push_back({std::move(book), std::move(key)});
}
return {complete, results};
}
return {complete, {}};
};
auto fetchObjects =
[this]
(std::vector<bookKeyPair> const& pairs,
std::uint32_t sequence,
std::uint32_t limit,
std::optional<std::string> warning)
-> BookOffersPage
{
std::vector<ripple::uint256> keys;
for (size_t i = 0; i < numRows; ++i)
{
keys.push_back(res.asUInt256(i, 0));
}
std::vector<Blob> blobs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<ripple::uint256> allKeys(pairs.size());
for (auto const& pair : pairs)
allKeys.push_back(pair.second);
auto uniqEnd = std::unique(allKeys.begin(), allKeys.end());
std::vector<ripple::uint256> keys{allKeys.begin(), uniqEnd};
std::vector<LedgerObject> results;
std::transform(
blobs.begin(),
blobs.end(),
keys.begin(),
std::back_inserter(results),
[](auto& blob, auto& key) {
return LedgerObject{std::move(key), std::move(blob)};
});
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << results.size();
if (results.size() == limit)
auto start = std::chrono::system_clock::now();
auto ledgerEntries = fetchLedgerObjects(keys, sequence);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info) << "Postgres book objects fetch took "
<< std::to_string(duration)
<< " seconds. "
<< "Fetched "
<< std::to_string(ledgerEntries.size())
<< " ledger entries";
std::vector<LedgerObject> objects;
for (auto i = 0; i < ledgerEntries.size(); ++i)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << ripple::strHex(results[0].key) << " : "
<< ripple::strHex(results[results.size() - 1].key);
return {results, results[results.size() - 1].key, warning};
if(ledgerEntries[i].size() != 0)
objects.push_back(LedgerObject{keys[i], ledgerEntries[i]});
}
else
return {results, {}, warning};
return {objects, {}, warning};
};
std::uint32_t bookShift = indexer_.getBookShift();
auto upper = indexer_.getBookIndexOfSeq(ledgerSequence);
auto [upperComplete, upperResults] = getBooks(upper);
BOOST_LOG_TRIVIAL(info) << __func__ << ": Upper results found "
<< upperResults.size() << " books.";
if (upperComplete)
{
BOOST_LOG_TRIVIAL(info) << "Upper book page is complete";
return fetchObjects(upperResults, ledgerSequence, limit, {});
}
return {{}, {}, warning};
BOOST_LOG_TRIVIAL(info) << "Upper book page is not complete "
<< "fetching again";
auto lower = upper - (1 << bookShift);
if (lower < rng->minSequence)
lower = rng->minSequence;
auto [lowerComplete, lowerResults] = getBooks(lower);
BOOST_LOG_TRIVIAL(info) << __func__ << ": Lower results found "
<< lowerResults.size() << " books.";
assert(lowerComplete);
std::vector<bookKeyPair> pairs;
pairs.reserve(upperResults.size() + lowerResults.size());
std::merge(upperResults.begin(), upperResults.end(),
lowerResults.begin(), lowerResults.end(),
std::back_inserter(pairs),
[](bookKeyPair pair1, bookKeyPair pair2) -> bool
{
return pair1.first < pair2.first;
});
std::optional<std::string> warning = "book data may be incomplete";
return fetchObjects(pairs, ledgerSequence, limit, warning);
}
std::vector<TransactionAndMetadata>
@@ -563,7 +652,7 @@ PostgresBackend::fetchLedgerObjects(
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1))
{
results[i] = res.asUnHexedBlob(0, 0);
results[i] = res.asUnHexedBlob();
}
if (--numRemaining == 0)
{
@@ -732,7 +821,7 @@ PostgresBackend::writeKeys(
numRows++;
// If the buffer gets too large, the insert fails. Not sure why. So we
// insert after 1 million records
if (numRows == 1000000)
if (numRows == 100000)
{
pgQuery.bulkInsert("keys", keysBuffer.str());
std::stringstream temp;
@@ -756,6 +845,7 @@ PostgresBackend::writeBooks(
bool isAsync) const
{
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_);
pgQuery("BEGIN");
std::stringstream booksBuffer;

View File

@@ -282,10 +282,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{
bookDir =
ripple::uint256::fromVoid(obj.book_of_deleted_offer().data());
for (size_t i = 0; i < 8; ++i)
{
bookDir->data()[bookDir->size() - 1 - i] = 0x00;
}
}
assert(not(isCreated and isDeleted));
@@ -331,7 +327,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
std::optional<uint32_t>
ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
{
if (startSequence > finishSequence_)
if (finishSequence_ && startSequence > *finishSequence_)
return {};
/*
* Behold, mortals! This function spawns three separate threads, which talk
@@ -408,7 +404,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
// ETL mechanism should stop. The other stopping condition is if
// the entire server is shutting down. This can be detected in a
// variety of ways. See the comment at the top of the function
while (currentSequence <= finishSequence_ &&
while ((!finishSequence_ || currentSequence <= *finishSequence_) &&
networkValidatedLedgers_.waitUntilValidatedByNetwork(
currentSequence) &&
!writeConflict && !isStopping())
@@ -446,7 +442,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
transformQueue->push(std::move(fetchResponse));
currentSequence += numExtractors;
if (currentSequence > finishSequence_)
if (finishSequence_ && currentSequence > *finishSequence_)
break;
}
// empty optional tells the transformer to shut down