Order book offers by quality

This commit is contained in:
Nathan Nichols
2021-04-21 23:27:14 -05:00
parent d9a8ff5399
commit 0bcf3a4601
10 changed files with 232 additions and 239 deletions

View File

@@ -11,7 +11,7 @@ set(CMAKE_VERBOSE_MAKEFILE TRUE)
project(reporting)
cmake_minimum_required(VERSION 3.16)
set (CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread -Wno-narrowing")
set(Boost_USE_STATIC_LIBS ON)
set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME ON)
@@ -36,16 +36,16 @@ add_dependencies(reporting xrpl_core)
add_dependencies(reporting grpc_pbufs)
get_target_property(grpc_includes grpc_pbufs INCLUDE_DIRECTORIES)
#get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES)
get_target_property(proto_includes protobuf_src INCLUDE_DIRECTORIES)
# get_target_property(proto_includes protobuf_src INCLUDE_DIRECTORIES)
message("hi")
message("${grpc_includes}")
message("${proto_includes}")
ExternalProject_Get_Property(protobuf_src SOURCE_DIR)
# ExternalProject_Get_Property(protobuf_src SOURCE_DIR)
message("${SOURCE_DIR}")
INCLUDE_DIRECTORIES(${grpc_includes})
#INCLUDE_DIRECTORIES(${xrpl_core_includes})
INCLUDE_DIRECTORIES(${SOURCE_DIR}/src)
ExternalProject_Get_Property(grpc_src SOURCE_DIR)
# ExternalProject_Get_Property(grpc_src SOURCE_DIR)
INCLUDE_DIRECTORIES(${SOURCE_DIR}/include)
get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES)
message("${xrpl_core_includes}")

View File

@@ -43,50 +43,6 @@ ledgerSequenceFromRequest(
return std::optional<std::uint32_t>{index.asInt()};
}
std::vector<ripple::uint256>
loadBookOfferIndexes(
ripple::Book const& book,
std::uint32_t seq,
std::uint32_t limit,
std::shared_ptr<PgPool> const& pool)
{
std::vector<ripple::uint256> hashes = {};
ripple::uint256 bookBase = getBookBase(book);
ripple::uint256 bookEnd = getQualityNext(bookBase);
pg_params dbParams;
char const*& command = dbParams.first;
std::vector<std::optional<std::string>>& values = dbParams.second;
command =
"SELECT offer_indexes FROM books "
"WHERE book_directory >= $1::bytea "
"AND book_directory < $2::bytea "
"AND ledger_index <= $3::bigint "
"LIMIT $4::bigint";
values.resize(4);
values[0] = "\\x" + ripple::strHex(bookBase);
values[1] = "\\x" + ripple::strHex(bookEnd);
values[2] = std::to_string(seq);
values[3] = std::to_string(limit);
auto indexes = PgQuery(pool)(dbParams);
if (!indexes || indexes.isNull())
return {};
for (auto i = 0; i < indexes.ntuples(); ++i)
{
auto unHexed = ripple::strUnHex(indexes.c_str(i) + 2);
if (unHexed)
hashes.push_back(ripple::uint256::fromVoid(unHexed->data()));
}
return hashes;
}
boost::json::object
doBookOffers(
boost::json::object const& request,
@@ -330,7 +286,11 @@ doBookOffers(
{
ripple::SerialIter it{obj.blob.data(), obj.blob.size()};
ripple::SLE offer{it, obj.key};
return getJson(offer);
ripple::uint256 bookDir = offer.getFieldH256(ripple::sfBookDirectory);
boost::json::object offerJson = getJson(offer);
offerJson["quality"] = ripple::amountFromQuality(getQuality(bookDir)).getText();
return offerJson;
}
catch (std::exception const& e)
{

View File

@@ -5,6 +5,12 @@ BackendIndexer::BackendIndexer(boost::json::object const& config)
: keyShift_(config.at("keyshift").as_int64())
, bookShift_(config.at("bookshift").as_int64())
{
BOOST_LOG_TRIVIAL(info) << "Indexer - starting with keyShift_ = "
<< std::to_string(keyShift_);
BOOST_LOG_TRIVIAL(info) << "Indexer - starting with keyShift_ = "
<< std::to_string(bookShift_);
work_.emplace(ioc_);
ioThread_ = std::thread{[this]() { ioc_.run(); }};
};
@@ -42,6 +48,25 @@ BackendIndexer::deleteBookOffer(
booksToDeletedOffers[book].insert(offerKey);
}
std::vector<ripple::uint256>
BackendIndexer::getCurrentOffers(ripple::uint256 const& book)
{
std::vector<ripple::uint256> offers;
offers.reserve(booksToOffers[book].size() + booksToOffers[book].size());
for (auto const& offer : booksToOffers[book])
{
offers.push_back(offer);
}
for(auto const& offer : booksToDeletedOffers[book])
{
offers.push_back(offer);
}
return offers;
}
void
BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
{

View File

@@ -78,6 +78,9 @@ public:
void
deleteKey(ripple::uint256 const& key);
std::vector<ripple::uint256>
getCurrentOffers(ripple::uint256 const& book);
void
addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey);
void
@@ -91,7 +94,7 @@ public:
class BackendInterface
{
private:
protected:
mutable BackendIndexer indexer_;
public:

View File

@@ -63,16 +63,16 @@ flatMapWriteCallback(CassFuture* fut, void* cbData)
processAsyncWriteResponse(requestParams, fut, func);
}
void
flatMapWriteBookCallback(CassFuture* fut, void* cbData)
{
CassandraBackend::WriteCallbackData& requestParams =
*static_cast<CassandraBackend::WriteCallbackData*>(cbData);
auto func = [](auto& params, bool retry) {
params.backend->writeBook(params, retry);
};
processAsyncWriteResponse(requestParams, fut, func);
}
// void
// flatMapWriteBookCallback(CassFuture* fut, void* cbData)
// {
// CassandraBackend::WriteCallbackData& requestParams =
// *static_cast<CassandraBackend::WriteCallbackData*>(cbData);
// auto func = [](auto& params, bool retry) {
// params.backend->writeBook(params, retry);
// };
// processAsyncWriteResponse(requestParams, fut, func);
// }
/*
void
@@ -641,54 +641,74 @@ CassandraBackend::fetchBookOffers(
std::optional<ripple::uint256> const& cursor) const
{
CassandraStatement statement{selectBook_};
statement.bindBytes(book);
uint32_t upper = sequence;
auto rng = fetchLedgerRange();
if (rng && sequence != rng->minSequence)
if(!rng)
return {{},{}};
std::vector<ripple::uint256> keys;
uint32_t upper = sequence;
auto lastPage = rng->maxSequence - (rng->maxSequence % 256);
if (lastPage < sequence) {
keys = indexer_.getCurrentOffers(book);
}
else if (sequence != rng->minSequence)
{
upper = (sequence >> 8) << 8;
if (upper != sequence)
upper += (1 << 8);
}
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper)
<< " book = " << ripple::strHex(book);
statement.bindInt(upper);
if (cursor)
statement.bindBytes(*cursor);
else
{
ripple::uint256 zero = {};
statement.bindBytes(zero);
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
statement.bindBytes(book.data(), 24);
statement.bindInt(upper);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
std::vector<ripple::uint256> keys;
if (!result)
return {{}, {}};
do
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper)
<< " book = " << ripple::strHex(std::string((char*)book.data(), 24));
// ripple::uint256 zero = {};
// statement.bindBytes(zero.data(), 8);
// if (cursor)
// statement.bindBytes(*cursor);
// else
// {
// statement.bindBytes(zero);
// }
// statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys";
if (!result)
{
return {{}, {}};
}
do
{
auto index = result.getBytesTuple().second;
keys.push_back(ripple::uint256::fromVoid(index.data()));
} while (result.nextRow());
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - populated keys. num keys = " << keys.size();
if (keys.size())
{
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i)
{
if (objs[i].size() != 0)
results.push_back({keys[i], objs[i]});
}
if (keys.size())
return {results, keys[keys.size() - 1]};
if (!keys.size())
return {{}, {}};
std::vector<LedgerObject> results;
std::vector<Blob> objs = fetchLedgerObjects(keys, sequence);
for (size_t i = 0; i < objs.size(); ++i)
{
if (results.size() == limit)
return {results, keys[i]};
if (objs[i].size() != 0)
results.push_back({keys[i], objs[i]});
}
return {{}, {}};
return {results, {}};
}
struct WriteBookCallbackData
{
@@ -725,8 +745,9 @@ void
writeBook2(WriteBookCallbackData& cb)
{
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
statement.bindBytes(cb.book);
statement.bindBytes(cb.book.data(), 24);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.book.data()+24, 8);
statement.bindBytes(cb.offerKey);
// Passing isRetry as true bypasses incrementing numOutstanding
cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true);
@@ -1387,24 +1408,24 @@ CassandraBackend::open()
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "objects"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE INDEX ON " << tablePrefix << "objects(sequence)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
"transaction "
@@ -1412,61 +1433,50 @@ CassandraBackend::open()
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "transactions"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE INDEX ON " << tablePrefix
<< "transactions(ledger_sequence)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix
<< "transactions WHERE ledger_sequence = 1"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys"
<< " ( sequence bigint, key blob, PRIMARY KEY "
"(sequence, key))";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "keys"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
<< " ( book blob, sequence bigint, key blob, deleted_at "
"bigint, PRIMARY KEY "
"(book, key)) WITH CLUSTERING ORDER BY (key ASC)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "SELECT * FROM " << tablePrefix << "books"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2"
<< " ( book blob, sequence bigint, key blob, PRIMARY KEY "
"((book, sequence), key)) WITH CLUSTERING ORDER BY (key "
<< " ( book blob, sequence bigint, quality_key tuple<blob, blob>, PRIMARY KEY "
"((book, sequence), quality_key)) WITH CLUSTERING ORDER BY (quality_key "
"ASC)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "books2"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx"
<< " ( account blob, seq_idx tuple<bigint, bigint>, "
" hash blob, "
@@ -1476,43 +1486,43 @@ CassandraBackend::open()
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "account_tx"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers"
<< " ( sequence bigint PRIMARY KEY, header blob )";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledgers"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes"
<< " (hash blob PRIMARY KEY, sequence bigint)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledger_hashes"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_range"
<< " (is_latest boolean PRIMARY KEY, sequence bigint)";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "SELECT * FROM " << tablePrefix << "ledger_range"
<< " LIMIT 1";
if (!executeSimpleStatement(query.str()))
@@ -1532,7 +1542,7 @@ CassandraBackend::open()
if (!insertObject_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "INSERT INTO " << tablePrefix << "transactions"
<< " (hash, ledger_sequence, transaction, metadata) VALUES "
"(?, ?, "
@@ -1540,35 +1550,26 @@ CassandraBackend::open()
if (!insertTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "INSERT INTO " << tablePrefix << "keys"
<< " (sequence, key) VALUES (?, ?)";
if (!insertKey_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)";
if (!insertBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "INSERT INTO " << tablePrefix << "books2"
<< " (book, sequence, key) VALUES (?, ?, ?)";
<< " (book, sequence, quality_key) VALUES (?, ?, (?, ?))";
if (!insertBook2_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "INSERT INTO " << tablePrefix << "books"
<< " (book, key, deleted_at) VALUES (?, ?, ?)";
if (!deleteBook_.prepareStatement(query, session_.get()))
continue;
query.str("");
query = {};
query.str("");
query << "SELECT key FROM " << tablePrefix << "keys"
<< " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?";
if (!selectKeys_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT object, sequence FROM " << tablePrefix << "objects"
<< " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC "
"LIMIT 1";
@@ -1576,28 +1577,28 @@ CassandraBackend::open()
if (!selectObject_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT transaction, metadata, ledger_sequence FROM "
<< tablePrefix << "transactions"
<< " WHERE hash = ?";
if (!selectTransaction_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT transaction, metadata, ledger_sequence FROM "
<< tablePrefix << "transactions"
<< " WHERE ledger_sequence = ?";
if (!selectAllTransactionsInLedger_.prepareStatement(
query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT hash FROM " << tablePrefix << "transactions"
<< " WHERE ledger_sequence = ?";
if (!selectAllTransactionHashesInLedger_.prepareStatement(
query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ?"
@@ -1605,7 +1606,7 @@ CassandraBackend::open()
if (!selectLedgerPageKeys_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << "SELECT object,key FROM " << tablePrefix << "objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ? "
<< " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING";
@@ -1614,7 +1615,7 @@ CassandraBackend::open()
continue;
/*
query = {};
query.str("");
query << "SELECT filterempty(key,object) FROM " << tablePrefix <<
"objects "
<< " WHERE TOKEN(key) >= ? and sequence <= ?"
@@ -1623,80 +1624,72 @@ CassandraBackend::open()
if (!upperBound2_.prepareStatement(query, session_.get()))
continue;
*/
query = {};
query.str("");
query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects "
<< " WHERE key = ? LIMIT 1";
if (!getToken_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tablePrefix << "books "
<< " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND"
" key > ? "
" ORDER BY key ASC LIMIT ? ALLOW FILTERING";
if (!getBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query << "SELECT key FROM " << tablePrefix << "books2 "
<< " WHERE book = ? AND sequence = ? AND "
" key > ? "
" ORDER BY key ASC LIMIT ?";
query.str("");
query << "SELECT quality_key FROM " << tablePrefix << "books2 "
<< " WHERE book = ? AND sequence = ?"
" ORDER BY quality_key ASC";
if (!selectBook_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " INSERT INTO " << tablePrefix << "account_tx"
<< " (account, seq_idx, hash) "
<< " VALUES (?,?,?)";
if (!insertAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " INSERT INTO " << tablePrefix << "ledgers "
<< " (sequence, header) VALUES(?,?)";
if (!insertLedgerHeader_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " INSERT INTO " << tablePrefix << "ledger_hashes"
<< " (hash, sequence) VALUES(?,?)";
if (!insertLedgerHash_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = ? if sequence in "
"(?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " select header from " << tablePrefix
<< "ledgers where sequence = ?";
if (!selectLedgerBySeq_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " select sequence from " << tablePrefix
<< "ledger_range where is_latest = true";
if (!selectLatestLedger_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " SELECT sequence FROM " << tablePrefix
<< "ledger_range WHERE "
<< " is_latest IN (true, false)";
if (!selectLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " SELECT key,object FROM " << tablePrefix
<< "objects WHERE sequence = ?";
if (!selectLedgerDiff_.prepareStatement(query, session_.get()))
@@ -1714,30 +1707,27 @@ CassandraBackend::open()
query << "TRUNCATE TABLE " << tablePrefix << "ledger_range";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "ledgers";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "ledger_hashes";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "objects";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "transactions";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "TRUNCATE TABLE " << tablePrefix << "account_tx";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query << "TRUNCATE TABLE " << tablePrefix << "books";
if (!executeSimpleStatement(query.str()))
continue;
query.str("");
}
break;
}
@@ -1746,26 +1736,26 @@ CassandraBackend::open()
{
maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64();
}
if (config_.contains("run_indexer"))
{
if (config_["run_indexer"].as_bool())
{
if (config_.contains("indexer_shift"))
{
indexerShift_ = config_["indexer_shift"].as_int64();
}
indexer_ = std::thread{[this]() {
auto seq = getNextToIndex();
if (seq)
{
BOOST_LOG_TRIVIAL(info)
<< "Running indexer. Ledger = " << std::to_string(*seq);
runIndexer(*seq);
BOOST_LOG_TRIVIAL(info) << "Ran indexer";
}
}};
}
}
// if (config_.contains("run_indexer"))
// {
// if (config_["run_indexer"].as_bool())
// {
// if (config_.contains("indexer_shift"))
// {
// indexerShift_ = config_["indexer_shift"].as_int64();
// }
// indexer_ = std::thread{[this]() {
// auto seq = getNextToIndex();
// if (seq)
// {
// BOOST_LOG_TRIVIAL(info)
// << "Running indexer. Ledger = " << std::to_string(*seq);
// runIndexer(*seq);
// BOOST_LOG_TRIVIAL(info) << "Ran indexer";
// }
// }};
// }
// }
work_.emplace(ioContext_);
ioThread_ = std::thread{[this]() { ioContext_.run(); }};

View File

@@ -483,6 +483,34 @@ public:
return {first, second};
}
std::pair<Blob, Blob>
getBytesTuple()
{
cass_byte_t const* buf;
std::size_t bufSize;
if (!row_)
throw std::runtime_error(
"CassandraResult::getBytesTuple - no result");
CassValue const* tuple = cass_row_get_column(row_, curGetIndex_);
CassIterator* tupleIter = cass_iterator_from_tuple(tuple);
if (!cass_iterator_next(tupleIter))
throw std::runtime_error(
"CassandraResult::getBytesTuple - failed to iterate tuple");
CassValue const* value = cass_iterator_get_value(tupleIter);
cass_value_get_bytes(value, &buf, &bufSize);
Blob first{buf, buf + bufSize};
if (!cass_iterator_next(tupleIter))
throw std::runtime_error(
"CassandraResult::getBytesTuple - failed to iterate tuple");
value = cass_iterator_get_value(tupleIter);
cass_value_get_bytes(value, &buf, &bufSize);
Blob second{buf, buf + bufSize};
++curGetIndex_;
return {first, second};
}
~CassandraResult()
{
if (result_ != nullptr)
@@ -631,7 +659,7 @@ private:
std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_;
std::thread indexer_;
// std::thread indexer_;
uint32_t indexerShift_ = 16;
// maximum number of concurrent in flight requests. New requests will wait
@@ -693,8 +721,8 @@ public:
std::lock_guard<std::mutex> lock(mutex_);
work_.reset();
ioThread_.join();
if (indexer_.joinable())
indexer_.join();
// if (indexer_.joinable())
// indexer_.join();
}
open_ = false;
}
@@ -975,7 +1003,7 @@ public:
bool
writeKeys(
std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const;
uint32_t ledgerSequence) const override;
bool
writeBooks(
std::unordered_map<
@@ -1255,20 +1283,21 @@ public:
}
*/
void
writeBook(WriteCallbackData& data, bool isRetry) const
{
assert(data.isCreated or data.isDeleted);
assert(data.book);
CassandraStatement statement{
(data.isCreated ? insertBook_ : deleteBook_)};
statement.bindBytes(*data.book);
statement.bindBytes(data.key);
statement.bindInt(data.sequence);
if (data.isCreated)
statement.bindInt(INT64_MAX);
executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry);
}
// void
// writeBook(WriteCallbackData& data, bool isRetry) const
// {
// assert(data.isCreated or data.isDeleted);
// assert(data.book);
// CassandraStatement statement{
// (data.isCreated ? insertBook_ : deleteBook_)};
// statement.bindBytes(*data.book);
// statement.bindBytes(data.key);
// statement.bindInt(data.sequence);
// if (data.isCreated)
// statement.bindInt(INT64_MAX);
// executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry);
// }
void
doWriteLedgerObject(
std::string&& key,
@@ -1290,8 +1319,6 @@ public:
std::move(book));
write(*data, false);
if (hasBook)
writeBook(*data, false);
}
void

View File

@@ -73,10 +73,6 @@ getBook(T const& offer)
ripple::SerialIter it{offer.data(), offer.size()};
ripple::SLE sle{it, {}};
ripple::uint256 book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book.data()[book.size() - 1 - i] = 0x00;
}
return book;
}

View File

@@ -444,7 +444,7 @@ public:
BOOST_LOG_TRIVIAL(trace) << "Writing objects";
for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects()))
{
std::optional<ripple::uint256> book;
std::optional<ripple::uint256> book = {};
short offer_bytes = (obj.data()[1] << 8) | obj.data()[2];
if (offer_bytes == 0x006f)
@@ -452,10 +452,6 @@ public:
ripple::SerialIter it{obj.data().data(), obj.data().size()};
ripple::SLE sle{it, {}};
book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book->data()[book->size() - 1 - i] = 0x00;
}
}
backend.writeLedgerObject(
std::move(*obj.mutable_key()),

View File

@@ -71,7 +71,7 @@ PostgresBackend::doWriteLedgerObject(
if (numRowsInObjectsBuffer_ % 1000000 == 0)
{
writeConnection_.bulkInsert("objects", objectsBuffer_.str());
objectsBuffer_ = {};
objectsBuffer_.str("");
}
if (book)
@@ -603,7 +603,7 @@ PostgresBackend::writeKeys(
if (numRows == 1000000)
{
pgQuery.bulkInsert("keys", keysBuffer.str());
keysBuffer = {};
keysBuffer.str("");
numRows = 0;
}
}
@@ -635,7 +635,7 @@ PostgresBackend::writeBooks(
if (numRows == 1000000)
{
pgQuery.bulkInsert("books", booksBuffer.str());
booksBuffer = {};
booksBuffer.str("");
numRows = 0;
}
}

View File

@@ -282,10 +282,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{
bookDir =
ripple::uint256::fromVoid(obj.book_of_deleted_offer().data());
for (size_t i = 0; i < 8; ++i)
{
bookDir->data()[bookDir->size() - 1 - i] = 0x00;
}
}
assert(not(isCreated and isDeleted));