From 0bcf3a460167f9c693bdf64c564aa032dd5a1086 Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Wed, 21 Apr 2021 23:27:14 -0500 Subject: [PATCH] Order book offers by quality --- CMakeLists.txt | 8 +- handlers/BookOffers.cpp | 50 +----- reporting/BackendIndexer.cpp | 25 +++ reporting/BackendInterface.h | 5 +- reporting/CassandraBackend.cpp | 296 ++++++++++++++++----------------- reporting/CassandraBackend.h | 67 +++++--- reporting/DBHelpers.h | 4 - reporting/ETLSource.cpp | 6 +- reporting/PostgresBackend.cpp | 6 +- reporting/ReportingETL.cpp | 4 - 10 files changed, 232 insertions(+), 239 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index debf83c0..28b74856 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,7 +11,7 @@ set(CMAKE_VERBOSE_MAKEFILE TRUE) project(reporting) cmake_minimum_required(VERSION 3.16) set (CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread -Wno-narrowing") set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_MULTITHREADED ON) set(Boost_USE_STATIC_RUNTIME ON) @@ -36,16 +36,16 @@ add_dependencies(reporting xrpl_core) add_dependencies(reporting grpc_pbufs) get_target_property(grpc_includes grpc_pbufs INCLUDE_DIRECTORIES) #get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES) -get_target_property(proto_includes protobuf_src INCLUDE_DIRECTORIES) +# get_target_property(proto_includes protobuf_src INCLUDE_DIRECTORIES) message("hi") message("${grpc_includes}") message("${proto_includes}") -ExternalProject_Get_Property(protobuf_src SOURCE_DIR) +# ExternalProject_Get_Property(protobuf_src SOURCE_DIR) message("${SOURCE_DIR}") INCLUDE_DIRECTORIES(${grpc_includes}) #INCLUDE_DIRECTORIES(${xrpl_core_includes}) INCLUDE_DIRECTORIES(${SOURCE_DIR}/src) -ExternalProject_Get_Property(grpc_src SOURCE_DIR) +# ExternalProject_Get_Property(grpc_src SOURCE_DIR) INCLUDE_DIRECTORIES(${SOURCE_DIR}/include) get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES) message("${xrpl_core_includes}") diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 3b93ce96..94078956 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -43,50 +43,6 @@ ledgerSequenceFromRequest( return std::optional{index.asInt()}; } -std::vector -loadBookOfferIndexes( - ripple::Book const& book, - std::uint32_t seq, - std::uint32_t limit, - std::shared_ptr const& pool) -{ - std::vector hashes = {}; - - ripple::uint256 bookBase = getBookBase(book); - ripple::uint256 bookEnd = getQualityNext(bookBase); - - pg_params dbParams; - - char const*& command = dbParams.first; - std::vector>& values = dbParams.second; - - command = - "SELECT offer_indexes FROM books " - "WHERE book_directory >= $1::bytea " - "AND book_directory < $2::bytea " - "AND ledger_index <= $3::bigint " - "LIMIT $4::bigint"; - - values.resize(4); - values[0] = "\\x" + ripple::strHex(bookBase); - values[1] = "\\x" + ripple::strHex(bookEnd); - values[2] = std::to_string(seq); - values[3] = std::to_string(limit); - - auto indexes = PgQuery(pool)(dbParams); - if (!indexes || indexes.isNull()) - return {}; - - for (auto i = 0; i < indexes.ntuples(); ++i) - { - auto unHexed = ripple::strUnHex(indexes.c_str(i) + 2); - if (unHexed) - hashes.push_back(ripple::uint256::fromVoid(unHexed->data())); - } - - return hashes; -} - boost::json::object doBookOffers( boost::json::object const& request, @@ -330,7 +286,11 @@ doBookOffers( { ripple::SerialIter it{obj.blob.data(), obj.blob.size()}; ripple::SLE offer{it, obj.key}; - return getJson(offer); + ripple::uint256 bookDir = offer.getFieldH256(ripple::sfBookDirectory); + + boost::json::object offerJson = getJson(offer); + offerJson["quality"] = ripple::amountFromQuality(getQuality(bookDir)).getText(); + return offerJson; } catch (std::exception const& e) { diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index 9a1eb92d..f66c4067 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -5,6 +5,12 @@ BackendIndexer::BackendIndexer(boost::json::object const& config) : keyShift_(config.at("keyshift").as_int64()) , bookShift_(config.at("bookshift").as_int64()) { + BOOST_LOG_TRIVIAL(info) << "Indexer - starting with keyShift_ = " + << std::to_string(keyShift_); + + BOOST_LOG_TRIVIAL(info) << "Indexer - starting with keyShift_ = " + << std::to_string(bookShift_); + work_.emplace(ioc_); ioThread_ = std::thread{[this]() { ioc_.run(); }}; }; @@ -42,6 +48,25 @@ BackendIndexer::deleteBookOffer( booksToDeletedOffers[book].insert(offerKey); } +std::vector +BackendIndexer::getCurrentOffers(ripple::uint256 const& book) +{ + std::vector offers; + offers.reserve(booksToOffers[book].size() + booksToOffers[book].size()); + + for (auto const& offer : booksToOffers[book]) + { + offers.push_back(offer); + } + + for(auto const& offer : booksToDeletedOffers[book]) + { + offers.push_back(offer); + } + + return offers; +} + void BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) { diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index b924cf75..987ed4be 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -78,6 +78,9 @@ public: void deleteKey(ripple::uint256 const& key); + std::vector + getCurrentOffers(ripple::uint256 const& book); + void addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); void @@ -91,7 +94,7 @@ public: class BackendInterface { -private: +protected: mutable BackendIndexer indexer_; public: diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 7d1f7cab..60dfe4f6 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -63,16 +63,16 @@ flatMapWriteCallback(CassFuture* fut, void* cbData) processAsyncWriteResponse(requestParams, fut, func); } -void -flatMapWriteBookCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeBook(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} +// void +// flatMapWriteBookCallback(CassFuture* fut, void* cbData) +// { +// CassandraBackend::WriteCallbackData& requestParams = +// *static_cast(cbData); +// auto func = [](auto& params, bool retry) { +// params.backend->writeBook(params, retry); +// }; +// processAsyncWriteResponse(requestParams, fut, func); +// } /* void @@ -641,54 +641,74 @@ CassandraBackend::fetchBookOffers( std::optional const& cursor) const { CassandraStatement statement{selectBook_}; - statement.bindBytes(book); - uint32_t upper = sequence; + auto rng = fetchLedgerRange(); - if (rng && sequence != rng->minSequence) + + if(!rng) + return {{},{}}; + + std::vector keys; + uint32_t upper = sequence; + auto lastPage = rng->maxSequence - (rng->maxSequence % 256); + + if (lastPage < sequence) { + keys = indexer_.getCurrentOffers(book); + } + else if (sequence != rng->minSequence) { upper = (sequence >> 8) << 8; if (upper != sequence) upper += (1 << 8); - } - BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper) - << " book = " << ripple::strHex(book); - statement.bindInt(upper); - if (cursor) - statement.bindBytes(*cursor); - else - { - ripple::uint256 zero = {}; - statement.bindBytes(zero); - } - statement.bindUInt(limit); - CassandraResult result = executeSyncRead(statement); + + statement.bindBytes(book.data(), 24); + statement.bindInt(upper); - BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; - std::vector keys; - if (!result) - return {{}, {}}; - do - { - keys.push_back(result.getUInt256()); - } while (result.nextRow()); + BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper) + << " book = " << ripple::strHex(std::string((char*)book.data(), 24)); + + // ripple::uint256 zero = {}; + // statement.bindBytes(zero.data(), 8); + // if (cursor) + // statement.bindBytes(*cursor); + // else + // { + // statement.bindBytes(zero); + // } + + // statement.bindUInt(limit); + CassandraResult result = executeSyncRead(statement); + + BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; + if (!result) + { + return {{}, {}}; + } + + do + { + auto index = result.getBytesTuple().second; + keys.push_back(ripple::uint256::fromVoid(index.data())); + } while (result.nextRow()); + } BOOST_LOG_TRIVIAL(debug) << __func__ << " - populated keys. num keys = " << keys.size(); - if (keys.size()) - { - std::vector results; - std::vector objs = fetchLedgerObjects(keys, sequence); - for (size_t i = 0; i < objs.size(); ++i) - { - if (objs[i].size() != 0) - results.push_back({keys[i], objs[i]}); - } - if (keys.size()) - return {results, keys[keys.size() - 1]}; + + if (!keys.size()) return {{}, {}}; + + std::vector results; + std::vector objs = fetchLedgerObjects(keys, sequence); + for (size_t i = 0; i < objs.size(); ++i) + { + if (results.size() == limit) + return {results, keys[i]}; + + if (objs[i].size() != 0) + results.push_back({keys[i], objs[i]}); } - return {{}, {}}; + return {results, {}}; } struct WriteBookCallbackData { @@ -725,8 +745,9 @@ void writeBook2(WriteBookCallbackData& cb) { CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; - statement.bindBytes(cb.book); + statement.bindBytes(cb.book.data(), 24); statement.bindInt(cb.ledgerSequence); + statement.bindBytes(cb.book.data()+24, 8); statement.bindBytes(cb.offerKey); // Passing isRetry as true bypasses incrementing numOutstanding cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true); @@ -1387,24 +1408,24 @@ CassandraBackend::open() if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "objects" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE INDEX ON " << tablePrefix << "objects(sequence)"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" << " ( hash blob PRIMARY KEY, ledger_sequence bigint, " "transaction " @@ -1412,61 +1433,50 @@ CassandraBackend::open() if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "transactions" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE INDEX ON " << tablePrefix << "transactions(ledger_sequence)"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "transactions WHERE ledger_sequence = 1" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" << " ( sequence bigint, key blob, PRIMARY KEY " "(sequence, key))"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "keys" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; - query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books" - << " ( book blob, sequence bigint, key blob, deleted_at " - "bigint, PRIMARY KEY " - "(book, key)) WITH CLUSTERING ORDER BY (key ASC)"; - if (!executeSimpleStatement(query.str())) - continue; - query = {}; - query << "SELECT * FROM " << tablePrefix << "books" - << " LIMIT 1"; - if (!executeSimpleStatement(query.str())) - continue; - query = {}; + + query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2" - << " ( book blob, sequence bigint, key blob, PRIMARY KEY " - "((book, sequence), key)) WITH CLUSTERING ORDER BY (key " + << " ( book blob, sequence bigint, quality_key tuple, PRIMARY KEY " + "((book, sequence), quality_key)) WITH CLUSTERING ORDER BY (quality_key " "ASC)"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "books2" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" << " ( account blob, seq_idx tuple, " " hash blob, " @@ -1476,43 +1486,43 @@ CassandraBackend::open() if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "account_tx" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers" << " ( sequence bigint PRIMARY KEY, header blob )"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "ledgers" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes" << " (hash blob PRIMARY KEY, sequence bigint)"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "ledger_hashes" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_range" << " (is_latest boolean PRIMARY KEY, sequence bigint)"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "SELECT * FROM " << tablePrefix << "ledger_range" << " LIMIT 1"; if (!executeSimpleStatement(query.str())) @@ -1532,7 +1542,7 @@ CassandraBackend::open() if (!insertObject_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << "INSERT INTO " << tablePrefix << "transactions" << " (hash, ledger_sequence, transaction, metadata) VALUES " "(?, ?, " @@ -1540,35 +1550,26 @@ CassandraBackend::open() if (!insertTransaction_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << "INSERT INTO " << tablePrefix << "keys" << " (sequence, key) VALUES (?, ?)"; if (!insertKey_.prepareStatement(query, session_.get())) continue; - query = {}; - query << "INSERT INTO " << tablePrefix << "books" - << " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)"; - if (!insertBook_.prepareStatement(query, session_.get())) - continue; - query = {}; + query.str(""); query << "INSERT INTO " << tablePrefix << "books2" - << " (book, sequence, key) VALUES (?, ?, ?)"; + << " (book, sequence, quality_key) VALUES (?, ?, (?, ?))"; if (!insertBook2_.prepareStatement(query, session_.get())) continue; - query = {}; - query << "INSERT INTO " << tablePrefix << "books" - << " (book, key, deleted_at) VALUES (?, ?, ?)"; - if (!deleteBook_.prepareStatement(query, session_.get())) - continue; + query.str(""); - query = {}; + query.str(""); query << "SELECT key FROM " << tablePrefix << "keys" << " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?"; if (!selectKeys_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << "SELECT object, sequence FROM " << tablePrefix << "objects" << " WHERE key = ? AND sequence <= ? ORDER BY sequence DESC " "LIMIT 1"; @@ -1576,28 +1577,28 @@ CassandraBackend::open() if (!selectObject_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << "SELECT transaction, metadata, ledger_sequence FROM " << tablePrefix << "transactions" << " WHERE hash = ?"; if (!selectTransaction_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << "SELECT transaction, metadata, ledger_sequence FROM " << tablePrefix << "transactions" << " WHERE ledger_sequence = ?"; if (!selectAllTransactionsInLedger_.prepareStatement( query, session_.get())) continue; - query = {}; + query.str(""); query << "SELECT hash FROM " << tablePrefix << "transactions" << " WHERE ledger_sequence = ?"; if (!selectAllTransactionHashesInLedger_.prepareStatement( query, session_.get())) continue; - query = {}; + query.str(""); query << "SELECT key FROM " << tablePrefix << "objects " << " WHERE TOKEN(key) >= ? and sequence <= ? " << " PER PARTITION LIMIT 1 LIMIT ?" @@ -1605,7 +1606,7 @@ CassandraBackend::open() if (!selectLedgerPageKeys_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << "SELECT object,key FROM " << tablePrefix << "objects " << " WHERE TOKEN(key) >= ? and sequence <= ? " << " PER PARTITION LIMIT 1 LIMIT ? ALLOW FILTERING"; @@ -1614,7 +1615,7 @@ CassandraBackend::open() continue; /* - query = {}; + query.str(""); query << "SELECT filterempty(key,object) FROM " << tablePrefix << "objects " << " WHERE TOKEN(key) >= ? and sequence <= ?" @@ -1623,80 +1624,72 @@ CassandraBackend::open() if (!upperBound2_.prepareStatement(query, session_.get())) continue; */ - query = {}; + query.str(""); query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects " << " WHERE key = ? LIMIT 1"; if (!getToken_.prepareStatement(query, session_.get())) continue; - query = {}; - query << "SELECT key FROM " << tablePrefix << "books " - << " WHERE book = ? AND sequence <= ? AND deleted_at > ? AND" - " key > ? " - " ORDER BY key ASC LIMIT ? ALLOW FILTERING"; - if (!getBook_.prepareStatement(query, session_.get())) - continue; - query = {}; - query << "SELECT key FROM " << tablePrefix << "books2 " - << " WHERE book = ? AND sequence = ? AND " - " key > ? " - " ORDER BY key ASC LIMIT ?"; + query.str(""); + query << "SELECT quality_key FROM " << tablePrefix << "books2 " + << " WHERE book = ? AND sequence = ?" + " ORDER BY quality_key ASC"; if (!selectBook_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " INSERT INTO " << tablePrefix << "account_tx" << " (account, seq_idx, hash) " << " VALUES (?,?,?)"; if (!insertAccountTx_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" << " WHERE account = ? " << " AND seq_idx < ? LIMIT ?"; if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " INSERT INTO " << tablePrefix << "ledgers " << " (sequence, header) VALUES(?,?)"; if (!insertLedgerHeader_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " INSERT INTO " << tablePrefix << "ledger_hashes" << " (hash, sequence) VALUES(?,?)"; if (!insertLedgerHash_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " update " << tablePrefix << "ledger_range" << " set sequence = ? where is_latest = ? if sequence in " "(?,null)"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " select header from " << tablePrefix << "ledgers where sequence = ?"; if (!selectLedgerBySeq_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " select sequence from " << tablePrefix << "ledger_range where is_latest = true"; if (!selectLatestLedger_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " SELECT sequence FROM " << tablePrefix << "ledger_range WHERE " << " is_latest IN (true, false)"; if (!selectLedgerRange_.prepareStatement(query, session_.get())) continue; - query = {}; + query.str(""); query << " SELECT key,object FROM " << tablePrefix << "objects WHERE sequence = ?"; if (!selectLedgerDiff_.prepareStatement(query, session_.get())) @@ -1714,30 +1707,27 @@ CassandraBackend::open() query << "TRUNCATE TABLE " << tablePrefix << "ledger_range"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "TRUNCATE TABLE " << tablePrefix << "ledgers"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "TRUNCATE TABLE " << tablePrefix << "ledger_hashes"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "TRUNCATE TABLE " << tablePrefix << "objects"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "TRUNCATE TABLE " << tablePrefix << "transactions"; if (!executeSimpleStatement(query.str())) continue; - query = {}; + query.str(""); query << "TRUNCATE TABLE " << tablePrefix << "account_tx"; if (!executeSimpleStatement(query.str())) continue; - query = {}; - query << "TRUNCATE TABLE " << tablePrefix << "books"; - if (!executeSimpleStatement(query.str())) - continue; + query.str(""); } break; } @@ -1746,26 +1736,26 @@ CassandraBackend::open() { maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); } - if (config_.contains("run_indexer")) - { - if (config_["run_indexer"].as_bool()) - { - if (config_.contains("indexer_shift")) - { - indexerShift_ = config_["indexer_shift"].as_int64(); - } - indexer_ = std::thread{[this]() { - auto seq = getNextToIndex(); - if (seq) - { - BOOST_LOG_TRIVIAL(info) - << "Running indexer. Ledger = " << std::to_string(*seq); - runIndexer(*seq); - BOOST_LOG_TRIVIAL(info) << "Ran indexer"; - } - }}; - } - } + // if (config_.contains("run_indexer")) + // { + // if (config_["run_indexer"].as_bool()) + // { + // if (config_.contains("indexer_shift")) + // { + // indexerShift_ = config_["indexer_shift"].as_int64(); + // } + // indexer_ = std::thread{[this]() { + // auto seq = getNextToIndex(); + // if (seq) + // { + // BOOST_LOG_TRIVIAL(info) + // << "Running indexer. Ledger = " << std::to_string(*seq); + // runIndexer(*seq); + // BOOST_LOG_TRIVIAL(info) << "Ran indexer"; + // } + // }}; + // } + // } work_.emplace(ioContext_); ioThread_ = std::thread{[this]() { ioContext_.run(); }}; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index d3651866..0c3498f4 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -483,6 +483,34 @@ public: return {first, second}; } + std::pair + getBytesTuple() + { + cass_byte_t const* buf; + std::size_t bufSize; + + if (!row_) + throw std::runtime_error( + "CassandraResult::getBytesTuple - no result"); + CassValue const* tuple = cass_row_get_column(row_, curGetIndex_); + CassIterator* tupleIter = cass_iterator_from_tuple(tuple); + if (!cass_iterator_next(tupleIter)) + throw std::runtime_error( + "CassandraResult::getBytesTuple - failed to iterate tuple"); + CassValue const* value = cass_iterator_get_value(tupleIter); + cass_value_get_bytes(value, &buf, &bufSize); + Blob first{buf, buf + bufSize}; + + if (!cass_iterator_next(tupleIter)) + throw std::runtime_error( + "CassandraResult::getBytesTuple - failed to iterate tuple"); + value = cass_iterator_get_value(tupleIter); + cass_value_get_bytes(value, &buf, &bufSize); + Blob second{buf, buf + bufSize}; + ++curGetIndex_; + return {first, second}; + } + ~CassandraResult() { if (result_ != nullptr) @@ -631,7 +659,7 @@ private: std::optional work_; std::thread ioThread_; - std::thread indexer_; + // std::thread indexer_; uint32_t indexerShift_ = 16; // maximum number of concurrent in flight requests. New requests will wait @@ -693,8 +721,8 @@ public: std::lock_guard lock(mutex_); work_.reset(); ioThread_.join(); - if (indexer_.joinable()) - indexer_.join(); + // if (indexer_.joinable()) + // indexer_.join(); } open_ = false; } @@ -975,7 +1003,7 @@ public: bool writeKeys( std::unordered_set const& keys, - uint32_t ledgerSequence) const; + uint32_t ledgerSequence) const override; bool writeBooks( std::unordered_map< @@ -1255,20 +1283,21 @@ public: } */ - void - writeBook(WriteCallbackData& data, bool isRetry) const - { - assert(data.isCreated or data.isDeleted); - assert(data.book); - CassandraStatement statement{ - (data.isCreated ? insertBook_ : deleteBook_)}; - statement.bindBytes(*data.book); - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - if (data.isCreated) - statement.bindInt(INT64_MAX); - executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry); - } + // void + // writeBook(WriteCallbackData& data, bool isRetry) const + // { + // assert(data.isCreated or data.isDeleted); + // assert(data.book); + // CassandraStatement statement{ + // (data.isCreated ? insertBook_ : deleteBook_)}; + // statement.bindBytes(*data.book); + // statement.bindBytes(data.key); + // statement.bindInt(data.sequence); + // if (data.isCreated) + // statement.bindInt(INT64_MAX); + // executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry); + // } + void doWriteLedgerObject( std::string&& key, @@ -1290,8 +1319,6 @@ public: std::move(book)); write(*data, false); - if (hasBook) - writeBook(*data, false); } void diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index c4781926..caf6d79a 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -73,10 +73,6 @@ getBook(T const& offer) ripple::SerialIter it{offer.data(), offer.size()}; ripple::SLE sle{it, {}}; ripple::uint256 book = sle.getFieldH256(ripple::sfBookDirectory); - for (size_t i = 0; i < 8; ++i) - { - book.data()[book.size() - 1 - i] = 0x00; - } return book; } diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index c98d2705..b61c1281 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -444,7 +444,7 @@ public: BOOST_LOG_TRIVIAL(trace) << "Writing objects"; for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects())) { - std::optional book; + std::optional book = {}; short offer_bytes = (obj.data()[1] << 8) | obj.data()[2]; if (offer_bytes == 0x006f) @@ -452,10 +452,6 @@ public: ripple::SerialIter it{obj.data().data(), obj.data().size()}; ripple::SLE sle{it, {}}; book = sle.getFieldH256(ripple::sfBookDirectory); - for (size_t i = 0; i < 8; ++i) - { - book->data()[book->size() - 1 - i] = 0x00; - } } backend.writeLedgerObject( std::move(*obj.mutable_key()), diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 74c1d802..4085cc44 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -71,7 +71,7 @@ PostgresBackend::doWriteLedgerObject( if (numRowsInObjectsBuffer_ % 1000000 == 0) { writeConnection_.bulkInsert("objects", objectsBuffer_.str()); - objectsBuffer_ = {}; + objectsBuffer_.str(""); } if (book) @@ -603,7 +603,7 @@ PostgresBackend::writeKeys( if (numRows == 1000000) { pgQuery.bulkInsert("keys", keysBuffer.str()); - keysBuffer = {}; + keysBuffer.str(""); numRows = 0; } } @@ -635,7 +635,7 @@ PostgresBackend::writeBooks( if (numRows == 1000000) { pgQuery.bulkInsert("books", booksBuffer.str()); - booksBuffer = {}; + booksBuffer.str(""); numRows = 0; } } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index fff70d7b..85847491 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -282,10 +282,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) { bookDir = ripple::uint256::fromVoid(obj.book_of_deleted_offer().data()); - for (size_t i = 0; i < 8; ++i) - { - bookDir->data()[bookDir->size() - 1 - i] = 0x00; - } } assert(not(isCreated and isDeleted));