From 5f265c5850efe124756bb171e45706049927c380 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Mon, 7 Jun 2021 20:19:47 -0400 Subject: [PATCH] checkpoint --- CMakeLists.txt | 5 +-- handlers/RPCHelpers.cpp | 74 +++++++++++++--------------------- handlers/RPCHelpers.h | 10 ++--- reporting/BackendFactory.h | 17 +++----- reporting/CassandraBackend.cpp | 40 ------------------ reporting/ETLSource.h | 49 ---------------------- reporting/ReportingETL.cpp | 53 +++++------------------- reporting/ReportingETL.h | 13 +----- 8 files changed, 51 insertions(+), 210 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f35768db..2811b00e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,7 +87,6 @@ target_sources(reporting PRIVATE handlers/LedgerRange.cpp handlers/Ledger.cpp handlers/LedgerEntry.cpp -<<<<<<< HEAD handlers/AccountChannels.cpp handlers/AccountLines.cpp handlers/AccountCurrencies.cpp @@ -95,10 +94,8 @@ target_sources(reporting PRIVATE handlers/AccountObjects.cpp handlers/ChannelAuthorize.cpp handlers/ChannelVerify.cpp - handlers/Subscribe.cpp) -======= + handlers/Subscribe.cpp handlers/ServerInfo.cpp) ->>>>>>> dev message(${Boost_LIBRARIES}) diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index 107697af..2e1a5648 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -9,10 +9,10 @@ accountFromStringStrict(std::string const& account) boost::optional publicKey = {}; if (blob && ripple::publicKeyType(ripple::makeSlice(*blob))) { - publicKey = ripple::PublicKey( - ripple::Slice{blob->data(), blob->size()}); + publicKey = + ripple::PublicKey(ripple::Slice{blob->data(), blob->size()}); } - else + else { publicKey = ripple::parseBase58( ripple::TokenType::AccountPublic, account); @@ -51,19 +51,17 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs) return result; } - std::pair< std::shared_ptr, std::shared_ptr> -deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq) +deserializeTxPlusMeta( + Backend::TransactionAndMetadata const& blobs, + std::uint32_t seq) { auto [tx, meta] = deserializeTxPlusMeta(blobs); - std::shared_ptr m = - std::make_shared( - tx->getTransactionID(), - seq, - *meta); + std::shared_ptr m = + std::make_shared(tx->getTransactionID(), seq, *meta); return {tx, m}; } @@ -82,8 +80,7 @@ toJson(ripple::STBase const& obj) } boost::json::object -<<<<<<< HEAD -getJson(ripple::TxMeta const& meta) +toJson(ripple::TxMeta const& meta) { auto start = std::chrono::system_clock::now(); boost::json::value value = boost::json::parse( @@ -95,20 +92,8 @@ getJson(ripple::TxMeta const& meta) return value.as_object(); } -boost::json::value -getJson(Json::Value const& value) -{ - boost::json::value boostValue = - boost::json::parse(value.toStyledString()); - - return boostValue; -} - boost::json::object -getJson(ripple::SLE const& sle) -======= toJson(ripple::SLE const& sle) ->>>>>>> dev { auto start = std::chrono::system_clock::now(); boost::json::value value = boost::json::parse( @@ -154,7 +139,6 @@ ledgerSequenceFromRequest( return request.at("ledger_index").as_int64(); } } -<<<<<<< HEAD std::optional traverseOwnedNodes( @@ -173,9 +157,8 @@ traverseOwnedNodes( auto start = std::chrono::system_clock::now(); for (;;) { - auto ownedNode = - backend.fetchLedgerObject(currentIndex.key, sequence); - + auto ownedNode = backend.fetchLedgerObject(currentIndex.key, sequence); + if (!ownedNode) { throw std::runtime_error("Could not find owned node"); @@ -199,15 +182,14 @@ traverseOwnedNodes( auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Time loading owned directories: " - << ((end - start).count() / 1000000000.0); - + << ((end - start).count() / 1000000000.0); start = std::chrono::system_clock::now(); auto objects = backend.fetchLedgerObjects(keys, sequence); end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Time loading owned entries: " - << ((end - start).count() / 1000000000.0); + << ((end - start).count() / 1000000000.0); for (auto i = 0; i < objects.size(); ++i) { @@ -215,7 +197,7 @@ traverseOwnedNodes( ripple::SLE sle(it, keys[i]); if (!atOwnedNode(sle)) { - nextCursor = keys[i+1]; + nextCursor = keys[i + 1]; break; } } @@ -231,9 +213,9 @@ parseRippleLibSeed(boost::json::value const& value) // try to detect such keys to avoid user confusion. if (!value.is_string()) return {}; - - auto const result = - ripple::decodeBase58Token(value.as_string().c_str(), ripple::TokenType::None); + + auto const result = ripple::decodeBase58Token( + value.as_string().c_str(), ripple::TokenType::None); if (result.size() == 18 && static_cast(result[0]) == std::uint8_t(0xE1) && @@ -251,10 +233,7 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error) // All of the secret types we allow, but only one at a time. // The array should be constexpr, but that makes Visual Studio unhappy. static std::string const secretTypes[]{ - "passphrase", - "secret", - "seed", - "seed_hex"}; + "passphrase", "secret", "seed", "seed_hex"}; // Identify which secret type is in use. std::string secretType = ""; @@ -276,8 +255,9 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error) if (count > 1) { - error = "Exactly one of the following must be specified: " - " passphrase, secret, seed, or seed_hex"; + error = + "Exactly one of the following must be specified: " + " passphrase, secret, seed, or seed_hex"; return {}; } @@ -319,7 +299,8 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error) { // If the user passed in an Ed25519 seed but *explicitly* // requested another key type, return an error. - if (keyType.value_or(ripple::KeyType::ed25519) != ripple::KeyType::ed25519) + if (keyType.value_or(ripple::KeyType::ed25519) != + ripple::KeyType::ed25519) { error = "Specified seed is for an Ed25519 wallet."; return {}; @@ -374,8 +355,8 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error) return {}; } - if (keyType != ripple::KeyType::secp256k1 - && keyType != ripple::KeyType::ed25519) + if (keyType != ripple::KeyType::secp256k1 && + keyType != ripple::KeyType::ed25519) { error = "keypairForSignature: invalid key type"; return {}; @@ -404,9 +385,9 @@ getAccountsFromTransaction(boost::json::object const& transaction) } } } - + return accounts; -======= +} std::vector ledgerInfoToBlob(ripple::LedgerInfo const& info) { @@ -422,5 +403,4 @@ ledgerInfoToBlob(ripple::LedgerInfo const& info) s.add8(info.closeFlags); s.addBitString(info.hash); return s.peekData(); ->>>>>>> dev } diff --git a/handlers/RPCHelpers.h b/handlers/RPCHelpers.h index 93918c68..c5dba05e 100644 --- a/handlers/RPCHelpers.h +++ b/handlers/RPCHelpers.h @@ -18,7 +18,9 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs); std::pair< std::shared_ptr, std::shared_ptr> -deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq); +deserializeTxPlusMeta( + Backend::TransactionAndMetadata const& blobs, + std::uint32_t seq); boost::json::object toJson(ripple::STBase const& obj); @@ -40,7 +42,6 @@ ledgerSequenceFromRequest( boost::json::object const& request, BackendInterface const& backend); -<<<<<<< HEAD std::optional traverseOwnedNodes( BackendInterface const& backend, @@ -53,12 +54,11 @@ std::pair keypairFromRequst( boost::json::object const& request, boost::json::value& error); - + std::vector getAccountsFromTransaction(boost::json::object const& transaction); -======= + std::vector ledgerInfoToBlob(ripple::LedgerInfo const& info); ->>>>>>> dev #endif diff --git a/reporting/BackendFactory.h b/reporting/BackendFactory.h index c2717189..fb912284 100644 --- a/reporting/BackendFactory.h +++ b/reporting/BackendFactory.h @@ -10,13 +10,9 @@ namespace Backend { std::unique_ptr make_Backend(boost::json::object const& config) { -<<<<<<< HEAD BOOST_LOG_TRIVIAL(info) << __func__ << ": Constructing BackendInterface"; - boost::json::object const& dbConfig = config.at("database").as_object(); -======= boost::json::object dbConfig = config.at("database").as_object(); ->>>>>>> dev bool readOnly = false; if (config.contains("read_only")) @@ -28,14 +24,10 @@ make_Backend(boost::json::object const& config) if (boost::iequals(type, "cassandra")) { -<<<<<<< HEAD - backend = -======= if (config.contains("online_delete")) dbConfig.at(type).as_object()["ttl"] = config.at("online_delete").as_int64() * 4; - auto backend = ->>>>>>> dev + backend = std::make_unique(dbConfig.at(type).as_object()); } else if (boost::iequals(type, "postgres")) @@ -48,12 +40,13 @@ make_Backend(boost::json::object const& config) throw std::runtime_error("Invalid database type"); backend->open(readOnly); + backend->checkFlagLedgers(); - BOOST_LOG_TRIVIAL(info) << __func__ - << ": Constructed BackendInterface Successfully"; + BOOST_LOG_TRIVIAL(info) + << __func__ << ": Constructed BackendInterface Successfully"; return backend; } } // namespace Backend -#endif //RIPPLE_REPORTING_BACKEND_FACTORY +#endif // RIPPLE_REPORTING_BACKEND_FACTORY diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 8efe5174..11ccc867 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -1364,29 +1364,11 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); -<<<<<<< HEAD - query << "CREATE INDEX ON " << tablePrefix << "objects(sequence)"; - if (!executeSimpleStatement(query.str())) - continue; - - query.str(""); - query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1" - << " LIMIT 1"; - if (!executeSimpleStatement(query.str())) - continue; - - query.str(""); - query - << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" - << " ( hash blob PRIMARY KEY, ledger_sequence bigint, transaction " - "blob, metadata blob)"; -======= query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" << " ( hash blob PRIMARY KEY, ledger_sequence bigint, " "transaction " "blob, metadata blob)" << " WITH default_time_to_live = " << std::to_string(ttl); ->>>>>>> dev if (!executeSimpleStatement(query.str())) continue; @@ -1507,15 +1489,6 @@ CassandraBackend::open(bool readOnly) continue; query.str(""); -<<<<<<< HEAD - query << "INSERT INTO " << tablePrefix << "books" - << " (book, sequence, quality_key) VALUES (?, ?, (?, ?))"; - if (!insertBook2_.prepareStatement(query, session_.get())) - continue; - - query.str(""); -======= ->>>>>>> dev query << "SELECT key FROM " << tablePrefix << "keys" << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?"; if (!selectKeys_.prepareStatement(query, session_.get())) @@ -1639,19 +1612,6 @@ CassandraBackend::open(bool readOnly) setupPreparedStatements = true; } -<<<<<<< HEAD - if (config_.contains("max_requests_outstanding")) - { - maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); - } - if (config_.contains("indexer_max_requests_outstanding")) - { - indexerMaxRequestsOutstanding = - config_["indexer_max_requests_outstanding"].as_int64(); - } - -======= ->>>>>>> dev work_.emplace(ioContext_); ioThread_ = std::thread{[this]() { ioContext_.run(); }}; open_ = true; diff --git a/reporting/ETLSource.h b/reporting/ETLSource.h index 920293a9..17f2512f 100644 --- a/reporting/ETLSource.h +++ b/reporting/ETLSource.h @@ -253,12 +253,6 @@ public: ", grpc port : " + grpcPort_ + " }"; } -<<<<<<< HEAD - boost::json::value - toJson() const - { - return boost::json::string(toString()); -======= boost::json::object toJson() const { @@ -275,7 +269,6 @@ public: std::chrono::system_clock::now() - getLastMsgTime()) .count()); return res; ->>>>>>> dev } /// Download a ledger in full @@ -396,7 +389,6 @@ public: /// to clients). /// @param in ETLSource in question /// @return true if messages should be forwarded -<<<<<<< HEAD bool shouldPropagateTxnStream(ETLSource* in) const { @@ -418,35 +410,11 @@ public: } boost::json::value -======= - // bool - // shouldPropagateTxnStream(ETLSource* in) const - // { - // for (auto& src : sources_) - // { - // assert(src); - // // We pick the first ETLSource encountered that is connected - // if (src->isConnected()) - // { - // if (src.get() == in) - // return true; - // else - // return false; - // } - // } - // - // // If no sources connected, then this stream has not been - // forwarded. return true; - // } - - boost::json::array ->>>>>>> dev toJson() const { boost::json::array ret; for (auto& src : sources_) { -<<<<<<< HEAD ret.push_back(src->toJson()); } return ret; @@ -462,23 +430,6 @@ public: /// @return response received from p2p node boost::json::object forwardToP2p(boost::json::object const& request) const; -======= - ret.emplace_back(src->toJson()); - } - return ret; - } - // - // /// Randomly select a p2p node to forward a gRPC request to - // /// @return gRPC stub to forward requests to p2p node - // std::unique_ptr - // getP2pForwardingStub() const; - // - // /// Forward a JSON RPC request to a randomly selected p2p node - // /// @param context context of the request - // /// @return response received from p2p node - // Json::Value - // forwardToP2p(RPC::JsonContext& context) const; ->>>>>>> dev private: /// f is a function that takes an ETLSource as an argument and returns a diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 43af0fd5..5761ac03 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -148,7 +148,7 @@ ReportingETL::getFees(std::uint32_t seq) if (!bytes) { BOOST_LOG_TRIVIAL(error) << __func__ << " - could not find fees"; - return {}; + return {}; } ripple::SerialIter it(bytes->data(), bytes->size()); @@ -174,17 +174,17 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { auto ledgerRange = backend_->fetchLedgerRange(); auto fees = getFees(lgrInfo.seq); - auto transactions = - backend_->fetchAllTransactionsInLedger(lgrInfo.seq); + auto transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq); if (!fees || !ledgerRange) { - BOOST_LOG_TRIVIAL(error) << __func__ - << " - could not fetch from database"; + BOOST_LOG_TRIVIAL(error) + << __func__ << " - could not fetch from database"; return; } - std::string range = std::to_string(ledgerRange->minSequence) + "-" + std::to_string(ledgerRange->maxSequence); + std::string range = std::to_string(ledgerRange->minSequence) + "-" + + std::to_string(ledgerRange->maxSequence); subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size()); @@ -259,7 +259,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) // << __func__ << " : " // << "Published ledger. " << ledger->seq; // }); - + publishLedger(ledger); return true; @@ -309,14 +309,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) << "Deserialized ledger header. " << detail::toString(lgrInfo); backend_->startWrites(); -<<<<<<< HEAD - backend_->writeLedger( -======= BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "started writes"; - flatMapBackend_->writeLedger( ->>>>>>> dev - lgrInfo, std::move(*rawData.mutable_ledger_header())); + backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "wrote ledger header"; @@ -351,9 +346,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) isDeleted, std::move(bookDir)); } -<<<<<<< HEAD - backend_->writeAccountTransactions(std::move(accountTxData)); -======= BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "wrote objects. num objects = " @@ -365,10 +357,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) << __func__ << " : " << "Inserted all transactions. Number of transactions = " << rawData.transactions_list().transactions_size(); - flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); + backend_->writeAccountTransactions(std::move(accountTxData)); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "wrote account_tx"; ->>>>>>> dev accumTxns_ += rawData.transactions_list().transactions_size(); bool success = true; if (accumTxns_ >= txnThreshold_) @@ -404,7 +395,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) { if (finishSequence_ && startSequence > *finishSequence_) return {}; - + /* * Behold, mortals! This function spawns three separate threads, which talk * to each other via 2 different thread safe queues and 1 atomic variable. @@ -441,10 +432,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populating caches"; -<<<<<<< HEAD - backend_->getIndexer().populateCachesAsync(*backend_); -======= ->>>>>>> dev BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populated caches"; @@ -579,22 +566,13 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; -<<<<<<< HEAD - auto range = backend_->fetchLedgerRangeNoThrow(); -======= ->>>>>>> dev if (onlineDeleteInterval_ && !deleting_ && lgrInfo.seq - minSequence > *onlineDeleteInterval_) { deleting_ = true; ioContext_.post([this, &minSequence]() { BOOST_LOG_TRIVIAL(info) << "Running online delete"; -<<<<<<< HEAD - backend_->doOnlineDelete( - range->maxSequence - *onlineDeleteInterval_); -======= - flatMapBackend_->doOnlineDelete(*onlineDeleteInterval_); ->>>>>>> dev + backend_->doOnlineDelete(*onlineDeleteInterval_); BOOST_LOG_TRIVIAL(info) << "Finished online delete"; auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); minSequence = rng->minSequence; @@ -618,10 +596,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Extracted and wrote " << *lastPublishedSequence - startSequence << " in " << ((end - begin).count()) / 1000000000.0; writing_ = false; -<<<<<<< HEAD - backend_->getIndexer().clearCaches(); -======= ->>>>>>> dev BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Stopping etl pipeline"; @@ -830,10 +804,5 @@ ReportingETL::ReportingETL( extractorThreads_ = config.at("extractor_threads").as_int64(); if (config.contains("txn_threshold")) txnThreshold_ = config.at("txn_threshold").as_int64(); -<<<<<<< HEAD -======= - flatMapBackend_->open(readOnly_); - flatMapBackend_->checkFlagLedgers(); ->>>>>>> dev } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index aeaff870..6f5afc01 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -27,8 +27,8 @@ #include #include #include -#include #include +#include #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include @@ -271,8 +271,6 @@ private: return numMarkers_; } -<<<<<<< HEAD -======= boost::json::object getInfo() { @@ -292,7 +290,6 @@ private: } /// start all of the necessary components and begin ETL ->>>>>>> dev void run() { @@ -306,7 +303,6 @@ private: doWork(); public: - ReportingETL( boost::json::object const& config, boost::asio::io_context& ioc, @@ -325,12 +321,7 @@ public: std::shared_ptr ledgers) { auto etl = std::make_shared( - config, - ioc, - backend, - subscriptions, - balancer, - ledgers); + config, ioc, backend, subscriptions, balancer, ledgers); etl->run();