diff --git a/CMakeLists.txt b/CMakeLists.txt index 46d19e33..635c21b2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,7 +122,7 @@ include(Postgres) #target_link_libraries (grpc_protobufs ${_REFLECTION} ${_PROTOBUF_LIBPROTOBUF} ${_GRPC_GRPCPP}) -target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/ReportingBackend.cpp reporting/Pg.cpp reporting/DBHelpers.cpp) +target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/ReportingBackend.cpp reporting/Pg.cpp reporting/DBHelpers.cpp reporting/ReportingETL.cpp handlers/AccountInfo.cpp) message(${Boost_LIBRARIES}) diff --git a/config.json b/config.json index b4c666f6..e833e96f 100644 --- a/config.json +++ b/config.json @@ -3,15 +3,14 @@ { "cassandra": { - "username":"xrplreporting", - "password":"", - "secure_connect_bundle":"/home/cj/secure-connect-xrplreporting.zip", - "keyspace":"xrplreporting", + "contact_points":"34.221.15.227", + "port":9042, + "keyspace":"xrpl_reporting", "table_name":"cj", - "max_requests_outstanding":1000 + "max_requests_outstanding":10000 }, "postgres": { - "connection":"" + "conninfo":"postgres://postgres:coco@127.0.0.1/reporting" } }, "etl_sources": diff --git a/handlers/AccountInfo.cpp b/handlers/AccountInfo.cpp new file mode 100644 index 00000000..0f317a1a --- /dev/null +++ b/handlers/AccountInfo.cpp @@ -0,0 +1,132 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2014 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include + +std::optional +accountFromStringStrict(std::string const& account) +{ + boost::optional result; + + auto const publicKey = ripple::parseBase58( + ripple::TokenType::AccountPublic, account); + + if (publicKey) + result = ripple::calcAccountID(*publicKey); + else + result = ripple::parseBase58(account); + + if (result) + return result.value(); + else + return {}; +} +// { +// account: , +// strict: // optional (default false) +// // if true only allow public keys and addresses. +// ledger_hash : +// ledger_index : +// signer_lists : // optional (default false) +// // if true return SignerList(s). +// queue : // optional (default false) +// // if true return information about transactions +// // in the current TxQ, only if the requested +// // ledger is open. Otherwise if true, returns an +// // error. +// } + +// TODO(tom): what is that "default"? +boost::json::object +doAccountInfo( + boost::json::object const& request, + CassandraFlatMapBackend const& backend) +{ + boost::json::object response; + std::string strIdent; + if (request.contains("account")) + strIdent = request.at("account").as_string().c_str(); + else if (request.contains("ident")) + strIdent = request.at("ident").as_string().c_str(); + else + { + response["error"] = "missing account field"; + return response; + } + size_t ledgerSequence = request.at("ledger_index").as_int64(); + + // bool bStrict = request.contains("strict") && + // params.at("strict").as_bool(); + + // Get info on account. + std::optional accountID = + accountFromStringStrict(strIdent); + + if (!accountID) + { + response["error"] = "couldnt decode account"; + return response; + } + auto key = ripple::keylet::account(accountID.value()); + + std::optional> dbResponse = + backend.fetch(key.key.data(), ledgerSequence); + if (!dbResponse) + { + response["error"] = "no response from db"; + } + auto sle = std::make_shared( + ripple::SerialIter{dbResponse->data(), dbResponse->size()}, key.key); + if (!key.check(*sle)) + { + response["error"] = "error fetching record from db"; + return response; + } + else + { + response["success"] = "fetched successfully!"; + response["object"] = sle->getFullText(); + return response; + } + + // Return SignerList(s) if that is requested. + /* + if (params.isMember(jss::signer_lists) && + params[jss::signer_lists].asBool()) + { + // We put the SignerList in an array because of an anticipated + // future when we support multiple signer lists on one account. + Json::Value jvSignerList = Json::arrayValue; + + // This code will need to be revisited if in the future we + // support multiple SignerLists on one account. + auto const sleSigners = ledger->read(keylet::signers(accountID)); + if (sleSigners) + jvSignerList.append(sleSigners->getJson(JsonOptions::none)); + + result[jss::account_data][jss::signer_lists] = + std::move(jvSignerList); + } + */ + + return response; +} diff --git a/reporting/DBHelpers.cpp b/reporting/DBHelpers.cpp index 18690a67..90147226 100644 --- a/reporting/DBHelpers.cpp +++ b/reporting/DBHelpers.cpp @@ -22,10 +22,7 @@ #include static bool -writeToLedgersDB( - ripple::LedgerInfo const& info, - PgQuery& pgQuery, - beast::Journal& j) +writeToLedgersDB(ripple::LedgerInfo const& info, PgQuery& pgQuery) { BOOST_LOG_TRIVIAL(debug) << __func__; auto cmd = boost::format( @@ -73,7 +70,7 @@ writeToPostgres( // Writing to the ledgers db fails if the ledger already exists in the // db. In this situation, the ETL process has detected there is another // writer, and falls back to only publishing - if (!writeToLedgersDB(info, pg, j)) + if (!writeToLedgersDB(info, pg)) { BOOST_LOG_TRIVIAL(warning) << __func__ << " : " diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index 2b5957f2..ae2ee1b8 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -21,7 +21,7 @@ #define RIPPLE_APP_REPORTING_DBHELPERS_H_INCLUDED #include -#include +#include #include #include diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index a5e5eca6..214affcc 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -31,8 +31,10 @@ ETLSource::ETLSource( boost::json::object const& config, CassandraFlatMapBackend& backend, - NetworkValidatedLedgers& networkValidatedLedgers) - : ws_(std::make_unique< + NetworkValidatedLedgers& networkValidatedLedgers, + boost::asio::io_context& ioContext) + : ioc_(ioContext) + , ws_(std::make_unique< boost::beast::websocket::stream>( boost::asio::make_strand(ioc_))) , resolver_(boost::asio::make_strand(ioc_)) @@ -289,8 +291,12 @@ ETLSource::handleMessage() connected_ = true; try { - boost::json::value raw = boost::json::parse( - static_cast(readBuffer_.data().data())); + std::string msg{ + static_cast(readBuffer_.data().data()), + readBuffer_.size()}; + // BOOST_LOG_TRIVIAL(debug) << __func__ << msg; + boost::json::value raw = boost::json::parse(msg); + // BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed"; boost::json::object response = raw.as_object(); uint32_t ledgerIndex = 0; @@ -299,7 +305,7 @@ ETLSource::handleMessage() boost::json::object result = response["result"].as_object(); if (result.contains("ledger_index")) { - ledgerIndex = result["ledger_index"].as_uint64(); + ledgerIndex = result["ledger_index"].as_int64(); } if (result.contains("validated_ledgers")) { @@ -336,7 +342,7 @@ ETLSource::handleMessage() << toString(); if (response.contains("ledger_index")) { - ledgerIndex = response["ledger_index"].as_uint64(); + ledgerIndex = response["ledger_index"].as_int64(); } if (response.contains("validated_ledgers")) { @@ -557,12 +563,13 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) ETLLoadBalancer::ETLLoadBalancer( boost::json::array const& config, CassandraFlatMapBackend& backend, - NetworkValidatedLedgers& nwvl) + NetworkValidatedLedgers& nwvl, + boost::asio::io_context& ioContext) { for (auto& entry : config) { - std::unique_ptr source = - std::make_unique(entry.as_object(), backend, nwvl); + std::unique_ptr source = std::make_unique( + entry.as_object(), backend, nwvl, ioContext); sources_.push_back(std::move(source)); BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - " << sources_.back()->toString(); @@ -844,7 +851,7 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence) std::this_thread::sleep_for(std::chrono::seconds(2)); } } - return false; + return true; } void diff --git a/reporting/ETLSource.h b/reporting/ETLSource.h index ba14a7e5..6befc0a8 100644 --- a/reporting/ETLSource.h +++ b/reporting/ETLSource.h @@ -46,7 +46,7 @@ class ETLSource std::string grpcPort_; - boost::asio::io_context ioc_; + boost::asio::io_context& ioc_; std::unique_ptr stub_; @@ -114,7 +114,8 @@ public: ETLSource( boost::json::object const& config, CassandraFlatMapBackend& backend, - NetworkValidatedLedgers& networkValidatedLedgers); + NetworkValidatedLedgers& networkValidatedLedgers, + boost::asio::io_context& ioContext); /// @param sequence ledger sequence to check for /// @return true if this source has the desired ledger @@ -285,7 +286,8 @@ public: ETLLoadBalancer( boost::json::array const& config, CassandraFlatMapBackend& backend, - NetworkValidatedLedgers& nwvl); + NetworkValidatedLedgers& nwvl, + boost::asio::io_context& ioContext); /// Load the initial ledger, writing data to the queue /// @param sequence sequence of ledger to download diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 1a4f04a4..862792b5 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -1422,11 +1422,7 @@ initSchema(std::shared_ptr const& pool) // @return LedgerInfo std::optional getLedger( - std::variant< - std::monostate, - ripple::uint256, - uint32_t, - std::pair> const& whichLedger, + std::variant const& whichLedger, std::shared_ptr& pgPool) { ripple::LedgerInfo lgrInfo; diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index a9eb8236..50e24fa6 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -193,7 +193,7 @@ public: } int port = - config_.contains("port") ? config_["port"].as_uint64() : 0; + config_.contains("port") ? config_["port"].as_int64() : 0; if (port) { rc = cass_cluster_set_port(cluster, port); @@ -630,7 +630,7 @@ public: // @param pno object in which to store the result // @return result status of query std::optional> - fetch(void const* key, uint32_t sequence) + fetch(void const* key, uint32_t sequence) const { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; CassStatement* statement = cass_prepared_bind(selectObject_); @@ -999,7 +999,7 @@ public: void store(std::string&& key, uint32_t seq, std::string&& blob) const { - BOOST_LOG_TRIVIAL(trace) << "Writing to cassandra"; + BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; WriteCallbackData* data = new WriteCallbackData(this, std::move(key), seq, std::move(blob)); @@ -1124,7 +1124,7 @@ public: std::string&& transaction, std::string&& metadata) { - BOOST_LOG_TRIVIAL(trace) << "Writing to cassandra"; + BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; WriteTransactionCallbackData* data = new WriteTransactionCallbackData( this, std::move(hash), diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index c9d00f92..d6f5055a 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -17,12 +17,10 @@ */ //============================================================================== -#include -#include +#include +#include #include -#include -#include #include #include #include @@ -32,12 +30,10 @@ #include #include -namespace ripple { - namespace detail { /// Convenience function for printing out basic ledger info std::string -toString(LedgerInfo const& info) +toString(ripple::LedgerInfo const& info) { std::stringstream ss; ss << "LedgerInfo { Sequence : " << info.seq @@ -46,6 +42,29 @@ toString(LedgerInfo const& info) << " ParentHash : " << strHex(info.parentHash) << " }"; return ss.str(); } +ripple::LedgerInfo +deserializeHeader(ripple::Slice data) +{ + ripple::SerialIter sit(data.data(), data.size()); + + ripple::LedgerInfo info; + + info.seq = sit.get32(); + info.drops = sit.get64(); + info.parentHash = sit.get256(); + info.txHash = sit.get256(); + info.accountHash = sit.get256(); + info.parentCloseTime = + ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}}; + info.closeTime = + ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}}; + info.closeTimeResolution = ripple::NetClock::duration{sit.get8()}; + info.closeFlags = sit.get8(); + + info.hash = sit.get256(); + + return info; +} } // namespace detail std::vector @@ -68,31 +87,32 @@ ReportingETL::insertTransactions( ripple::TxMeta txMeta{ sttx.getTransactionID(), ledger.seq, txn.metadata_blob()}; - auto metaSerializer = - std::make_shared(txMeta.getAsObject().getSerializer()); + auto metaSerializer = std::make_shared( + txMeta.getAsObject().getSerializer()); BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << "Inserting transaction = " << sttx.getTransactionID(); ripple::uint256 nodestoreHash = sttx.getTransactionID(); - accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal_); + auto journal = ripple::debugLog(); + accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal); std::string keyStr{(const char*)sttx.getTransactionID().data(), 32}; flatMapBackend_.storeTransaction( std::move(keyStr), - ledger->info().seq, + ledger.seq, std::move(*raw), std::move(*txn.mutable_metadata_blob())); } return accountTxData; } -std::shared_ptr +std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto ledgers = getLedger(startingSequence); - if (ledgers.size()) + auto ledger = getLedger(startingSequence, pgPool_); + if (ledger) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " << "Database is not empty"; @@ -108,8 +128,8 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) if (!ledgerData) return {}; - ripple::LedgerInfo lgrInfo = ripple::deserializeHeader( - ripple::makeSlice(ledgerData->ledger_header()), true); + ripple::LedgerInfo lgrInfo = detail::deserializeHeader( + ripple::makeSlice(ledgerData->ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -134,19 +154,16 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = " << ((end - start).count()) / 1000000000.0; - return ledger; + return lgrInfo; } -/* void -ReportingETL::publishLedger(std::shared_ptr& ledger) +ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { - app_.getOPs().pubLedger(ledger); + // app_.getOPs().pubLedger(ledger); setLastPublish(); } -*/ -/* bool ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { @@ -156,11 +173,11 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) size_t numAttempts = 0; while (!stopping_) { - auto ledger = app_.getLedgerMaster().getLedgerBySeq(ledgerSequence); + auto ledger = getLedger(ledgerSequence, pgPool_); if (!ledger) { - BOOST_LOG_TRIVIAL(warn) + BOOST_LOG_TRIVIAL(warning) << __func__ << " : " << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence; @@ -203,6 +220,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) continue; } + /* publishStrand_.post([this, ledger]() { app_.getOPs().pubLedger(ledger); setLastPublish(); @@ -210,11 +228,11 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) << __func__ << " : " << "Published ledger. " << detail::toString(ledger->info()); }); + */ return true; } return false; } -*/ std::optional ReportingETL::fetchLedgerData(uint32_t idx) @@ -250,15 +268,15 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Beginning ledger update"; - LedgerInfo lgrInfo = - deserializeHeader(makeSlice(rawData.ledger_header()), true); + ripple::LedgerInfo lgrInfo = + detail::deserializeHeader(ripple::makeSlice(rawData.ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Deserialized ledger header. " << detail::toString(lgrInfo); std::vector accountTxData{ - insertTransactions(rawData)}; + insertTransactions(lgrInfo, rawData)}; BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -269,7 +287,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) { flatMapBackend_.store( std::move(*obj.mutable_key()), - next->info().seq, + lgrInfo.seq, std::move(*obj.mutable_data())); } flatMapBackend_.sync(); @@ -280,8 +298,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) BOOST_LOG_TRIVIAL(debug) << __func__ << " : " - << "Finished ledger update. " << detail::toString(next->info()); - return {info, std::move(accountTxData)}; + << "Finished ledger update. " << detail::toString(lgrInfo); + return {lgrInfo, std::move(accountTxData)}; } // Database must be populated when this starts @@ -313,11 +331,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence) << "Starting etl pipeline"; writing_ = true; - auto parent = getLedger(startSequence - 1); + auto parent = getLedger(startSequence - 1, pgPool_); if (!parent) { assert(false); - Throw("runETLPipeline: parent ledger is null"); + throw std::runtime_error("runETLPipeline: parent ledger is null"); } std::atomic_bool writeConflict = false; @@ -375,105 +393,48 @@ ReportingETL::runETLPipeline(uint32_t startSequence) transformQueue.push({}); }}; - std::thread transformer{ - [this, &parent, &writeConflict, &loadQueue, &transformQueue]() { - beast::setCurrentThreadName("rippled: ReportingETL transform"); + std::thread transformer{[this, + &writeConflict, + &transformQueue, + &lastPublishedSequence]() { + beast::setCurrentThreadName("rippled: ReportingETL transform"); - while (!writeConflict) - { - std::optional - fetchResponse{transformQueue.pop()}; - // if fetchResponse is an empty optional, the extracter thread - // has stopped and the transformer should stop as well - if (!fetchResponse) - { - break; - } - if (isStopping()) - continue; - - auto start = std::chrono::system_clock::now(); - auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse); - auto end = std::chrono::system_clock::now(); - if (!writeToPostgres(lgrInfo, accountTxData, pgPool_)) - writeConflict = true; - - auto duration = ((end - start).count()) / 1000000000.0; - BOOST_LOG_TRIVIAL(debug) << "transform time = " << duration; - if (!writeConflict) - { - publishLedger(ledger); - lastPublishedSequence = lgrInfo.seq; - } - } - }}; - - std::thread loader{[this, - &lastPublishedSequence, - &loadQueue, - &writeConflict]() { - beast::setCurrentThreadName("rippled: ReportingETL load"); - size_t totalTransactions = 0; - double totalTime = 0; while (!writeConflict) { - std::optional, - std::vector>> - result{loadQueue.pop()}; - // if result is an empty optional, the transformer thread has - // stopped and the loader should stop as well - if (!result) + std::optional fetchResponse{ + transformQueue.pop()}; + // if fetchResponse is an empty optional, the extracter thread + // has stopped and the transformer should stop as well + if (!fetchResponse) + { break; + } if (isStopping()) continue; - auto& ledger = result->first; - auto& accountTxData = result->second; - auto start = std::chrono::system_clock::now(); - // write to the key-value store - // flushLedger(ledger); - - auto mid = std::chrono::system_clock::now(); - // write to RDBMS - // if there is a write conflict, some other process has already - // written this ledger and has taken over as the ETL writer -#ifdef RIPPLED_REPORTING - if (!writeToPostgres( - ledger->info(), accountTxData, app_.getPgPool(), journal_)) - writeConflict = true; -#endif - + auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse); auto end = std::chrono::system_clock::now(); + if (!writeToPostgres(lgrInfo, accountTxData, pgPool_)) + writeConflict = true; - if (!writeConflict) - { - publishLedger(ledger); - lastPublishedSequence = ledger->info().seq; - } - // print some performance numbers - auto kvTime = ((mid - start).count()) / 1000000000.0; - auto relationalTime = ((end - mid).count()) / 1000000000.0; - - size_t numTxns = accountTxData.size(); - totalTime += kvTime; - totalTransactions += numTxns; + auto duration = ((end - start).count()) / 1000000000.0; + auto numTxns = accountTxData.size(); BOOST_LOG_TRIVIAL(info) << "Load phase of etl : " << "Successfully published ledger! Ledger info: " - << detail::toString(ledger->info()) - << ". txn count = " << numTxns - << ". key-value write time = " << kvTime - << ". relational write time = " << relationalTime - << ". key-value tps = " << numTxns / kvTime - << ". relational tps = " << numTxns / relationalTime - << ". total key-value tps = " << totalTransactions / totalTime; + << detail::toString(lgrInfo) << ". txn count = " << numTxns + << ". load time = " << duration << ". load tps " + << numTxns / duration; + if (!writeConflict) + { + publishLedger(lgrInfo); + lastPublishedSequence = lgrInfo.seq; + } } }}; // wait for all of the threads to stop - loader.join(); extracter.join(); transformer.join(); auto end = std::chrono::system_clock::now(); @@ -501,8 +462,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence) void ReportingETL::monitor() { - auto ledgers = getLedger(std::monostate); - if (!ledgers.size()) + auto ledger = getLedger(std::monostate(), pgPool_); + if (!ledger) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Database is empty. Will download a ledger " @@ -545,7 +506,7 @@ ReportingETL::monitor() { if (startSequence_) { - Throw( + throw std::runtime_error( "start sequence specified but db is already populated"); } BOOST_LOG_TRIVIAL(info) @@ -563,7 +524,7 @@ ReportingETL::monitor() { // publishLedger(ledger); } - uint32_t nextSequence = ledger->info().seq + 1; + uint32_t nextSequence = ledger->seq + 1; BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -596,7 +557,7 @@ ReportingETL::monitor() bool success = publishLedger(nextSequence, timeoutSeconds); if (!success) { - BOOST_LOG_TRIVIAL(warn) + BOOST_LOG_TRIVIAL(warning) << __func__ << " : " << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL"; @@ -653,91 +614,17 @@ ReportingETL::ReportingETL( boost::asio::io_context& ioc) : publishStrand_(ioc) , ioContext_(ioc) - , loadBalancer_(*this) , flatMapBackend_( - (*config).at("database").as_object().at("cassandra").as_object()) + config.at("database").as_object().at("cassandra").as_object()) + , pgPool_(make_PgPool( + config.at("database").as_object().at("postgres").as_object())) + , loadBalancer_( + config.at("etl_sources").as_array(), + flatMapBackend_, + networkValidatedLedgers_, + ioc) { - // if present, get endpoint from config - if (app_.config().exists("reporting")) - { - flatMapBackend_.open(); - Section section = app_.config().section("reporting"); - - BOOST_LOG_TRIVIAL(debug) << "Parsing config info"; - - auto& vals = section.values(); - for (auto& v : vals) - { - BOOST_LOG_TRIVIAL(debug) << "val is " << v; - Section source = app_.config().section(v); - - std::pair ipPair = source.find("source_ip"); - if (!ipPair.second) - continue; - - std::pair wsPortPair = - source.find("source_ws_port"); - if (!wsPortPair.second) - continue; - - std::pair grpcPortPair = - source.find("source_grpc_port"); - if (!grpcPortPair.second) - { - // add source without grpc port - // used in read-only mode to detect when new ledgers have - // been validated. Used for publishing - if (app_.config().reportingReadOnly()) - loadBalancer_.add(ipPair.first, wsPortPair.first); - continue; - } - - loadBalancer_.add( - ipPair.first, wsPortPair.first, grpcPortPair.first); - } - - // this is true iff --reportingReadOnly was passed via command line - readOnly_ = app_.config().reportingReadOnly(); - - // if --reportingReadOnly was not passed via command line, check config - // file. Command line takes precedence - if (!readOnly_) - { - std::pair ro = section.find("read_only"); - if (ro.second) - { - readOnly_ = (ro.first == "true" || ro.first == "1"); - app_.config().setReportingReadOnly(readOnly_); - } - } - - // handle command line arguments - if (app_.config().START_UP == Config::StartUpType::FRESH && !readOnly_) - { - startSequence_ = std::stol(app_.config().START_LEDGER); - } - // if not passed via command line, check config for start sequence - if (!startSequence_) - { - std::pair start = section.find("start_sequence"); - if (start.second) - { - startSequence_ = std::stoi(start.first); - } - } - - std::pair flushInterval = - section.find("flush_interval"); - if (flushInterval.second) - flushInterval_ = std::stoi(flushInterval.first); - - std::pair numMarkers = section.find("num_markers"); - if (numMarkers.second) - numMarkers_ = std::stoi(numMarkers.first); - ReportingETL::instance_ = this; - } + flatMapBackend_.open(); + initSchema(pgPool_); } -ReportingETL* ReportingETL::instance_ = 0; - -} // namespace ripple diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index dd130ab1..11a4f2e0 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -20,12 +20,14 @@ #ifndef RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED #define RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED +#include #include #include #include #include #include #include +#include #include #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" @@ -36,7 +38,6 @@ #include #include -namespace ripple { struct AccountTransactionsData; @@ -59,9 +60,10 @@ class ReportingETL { private: CassandraFlatMapBackend flatMapBackend_; + std::shared_ptr pgPool_; std::thread worker_; - // boost::asio::io_context& ioContext_; + boost::asio::io_context& ioContext_; /// Strand to ensure that ledgers are published in order. /// If ETL is started far behind the network, ledgers will be written and @@ -75,7 +77,7 @@ private: /// includes reading all of the transactions from the database) is done from /// the application wide asio io_service, and a strand is used to ensure /// ledgers are published in order - // boost::asio::io_context::strand publishStrand_; + boost::asio::io_context::strand publishStrand_; /// Mechanism for communicating with ETL sources. ETLLoadBalancer wraps an /// arbitrary number of ETL sources and load balances ETL requests across @@ -151,7 +153,7 @@ private: /// @param sequence the sequence of the ledger to download /// @return The ledger downloaded, with a full transaction and account state /// map - std::shared_ptr + std::optional loadInitialLedger(uint32_t sequence); /// Run ETL. Extracts ledgers and writes them to the database, until a write @@ -218,12 +220,9 @@ private: /// @param parent the previous ledger /// @param rawData data extracted from an ETL source /// @return the newly built ledger and data to write to Postgres - std::pair, std::vector> - buildNextLedger( - std::shared_ptr& parent, - org::xrpl::rpc::v1::GetLedgerResponse& rawData); + std::pair> + buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData); - /* /// Attempt to read the specified ledger from the database, and then publish /// that ledger to the ledgers stream. /// @param ledgerSequence the sequence of the ledger to publish @@ -236,10 +235,12 @@ private: /// Publish the passed in ledger /// @param ledger the ledger to publish void - publishLedger(std::shared_ptr& ledger); -*/ + publishLedger(ripple::LedgerInfo const& lgrInfo); + public: - ReportingETL(boost::json::object& config); + ReportingETL( + boost::json::object const& config, + boost::asio::io_context& ioc); ~ReportingETL() { @@ -330,5 +331,4 @@ private: doWork(); }; -} // namespace ripple #endif diff --git a/rippled b/rippled index 759ad7f2..2978847d 160000 --- a/rippled +++ b/rippled @@ -1 +1 @@ -Subproject commit 759ad7f274ccf5b25fa401debbd306168647a68b +Subproject commit 2978847d8d96c6ed693c83d49b8ef2cce794c7ff diff --git a/test.py b/test.py new file mode 100755 index 00000000..e46da256 --- /dev/null +++ b/test.py @@ -0,0 +1,46 @@ +#!/usr/bin/python3 + +import websockets +import asyncio +import json +import io +import os +import subprocess +import argparse +import time +import threading + + + +async def ping(ip, port): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"account_info","ledger_index":60392449,"account":"rLC64xxNif3GiY9FQnbaM4kcE6VvDhwRod"})) + res = json.loads(await ws.recv()) + print(res) + except websockets.exceptions.ConnectionClosedError as e: + print(e) + + + + +parser = argparse.ArgumentParser(description='test script for xrpl-reporting') +parser.add_argument('action', choices=["ping"]) +parser.add_argument('--ip', default='127.0.0.1') +parser.add_argument('--port', default='8080') + + + +args = parser.parse_args() + +def run(args): + asyncio.set_event_loop(asyncio.new_event_loop()) + if args.action == "ping": + asyncio.get_event_loop().run_until_complete( + ping(args.ip, args.port)) + else: + print("incorrect arguments") + +run(args) + diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index 5f43ac55..19eb1888 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -18,21 +18,57 @@ #include #include #include + +#include +#include +#include #include #include #include #include #include #include -#include -#include +#include #include #include #include #include //------------------------------------------------------------------------------ +enum RPCCommand { tx, account_tx, ledger, account_info }; +std::unordered_map commandMap{ + {"tx", tx}, + {"account_tx", account_tx}, + {"ledger", ledger}, + {"account_info", account_info}}; +boost::json::object +doAccountInfo( + boost::json::object const& request, + CassandraFlatMapBackend const& backend); +boost::json::object +buildResponse( + boost::json::object const& request, + CassandraFlatMapBackend const& backend) +{ + std::string command = request.at("command").as_string().c_str(); + boost::json::object response; + switch (commandMap[command]) + { + case tx: + break; + case account_tx: + break; + case ledger: + break; + case account_info: + return doAccountInfo(request, backend); + break; + default: + BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command; + } + return response; +} // Report a failure void fail(boost::beast::error_code ec, char const* what) @@ -45,11 +81,14 @@ class session : public std::enable_shared_from_this { boost::beast::websocket::stream ws_; boost::beast::flat_buffer buffer_; + CassandraFlatMapBackend const& backend_; public: // Take ownership of the socket - explicit session(boost::asio::ip::tcp::socket&& socket) - : ws_(std::move(socket)) + explicit session( + boost::asio::ip::tcp::socket&& socket, + CassandraFlatMapBackend const& backend) + : ws_(std::move(socket)), backend_(backend) { } @@ -119,11 +158,19 @@ public: if (ec) fail(ec, "read"); + std::string msg{ + static_cast(buffer_.data().data()), buffer_.size()}; + // BOOST_LOG_TRIVIAL(debug) << __func__ << msg; + boost::json::value raw = boost::json::parse(msg); + // BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed"; + boost::json::object request = raw.as_object(); + auto response = buildResponse(request, backend_); + BOOST_LOG_TRIVIAL(debug) << __func__ << response; // Echo the message ws_.text(ws_.got_text()); ws_.async_write( - buffer_.data(), + boost::asio::buffer(boost::json::serialize(response)), boost::beast::bind_front_handler( &session::on_write, shared_from_this())); } @@ -151,12 +198,14 @@ class listener : public std::enable_shared_from_this { boost::asio::io_context& ioc_; boost::asio::ip::tcp::acceptor acceptor_; + CassandraFlatMapBackend const& backend_; public: listener( boost::asio::io_context& ioc, - boost::asio::ip::tcp::endpoint endpoint) - : ioc_(ioc), acceptor_(ioc) + boost::asio::ip::tcp::endpoint endpoint, + CassandraFlatMapBackend const& backend) + : ioc_(ioc), acceptor_(ioc), backend_(backend) { boost::beast::error_code ec; @@ -221,7 +270,7 @@ private: else { // Create the session and run it - std::make_shared(std::move(socket))->run(); + std::make_shared(std::move(socket), backend_)->run(); } // Accept another connection @@ -252,52 +301,82 @@ parse_config(const char* filename) return {}; } //------------------------------------------------------------------------------ +// +void +initLogLevel(int level) +{ + switch (level) + { + case 0: + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::trace); + break; + case 1: + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::debug); + break; + case 2: + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::info); + break; + case 3: + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::warning); + break; + case 4: + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::error); + break; + case 5: + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::fatal); + break; + default: + boost::log::core::get()->set_filter( + boost::log::trivial::severity >= boost::log::trivial::info); + } +} int main(int argc, char* argv[]) { // Check command line arguments. - if (argc != 5) + if (argc != 5 and argc != 6) { - std::cerr << "Usage: websocket-server-async
" - " \n" - << "Example:\n" - << " websocket-server-async 0.0.0.0 8080 1\n"; + std::cerr + << "Usage: websocket-server-async
" + " \n" + << "Example:\n" + << " websocket-server-async 0.0.0.0 8080 1 config.json 2\n"; return EXIT_FAILURE; } auto const address = boost::asio::ip::make_address(argv[1]); auto const port = static_cast(std::atoi(argv[2])); auto const threads = std::max(1, std::atoi(argv[3])); auto const config = parse_config(argv[4]); + if (argc > 5) + { + initLogLevel(std::atoi(argv[5])); + } + else + { + initLogLevel(2); + } if (!config) { std::cerr << "couldnt parse config. Exiting..." << std::endl; return EXIT_FAILURE; } - auto cassConfig = - (*config).at("database").as_object().at("cassandra").as_object(); - std::cout << cassConfig << std::endl; - - CassandraFlatMapBackend backend{cassConfig}; - backend.open(); - boost::json::array sources = (*config).at("etl_sources").as_array(); - if (!sources.size()) - { - std::cerr << "no etl sources listed in config. exiting..." << std::endl; - return EXIT_FAILURE; - } - NetworkValidatedLedgers nwvl; - ETLSource source{sources[0].as_object(), backend, nwvl}; - - source.start(); - // source.loadInitialLedger(60000000); // The io_context is required for all I/O boost::asio::io_context ioc{threads}; + ReportingETL etl{config.value(), ioc}; // Create and launch a listening port std::make_shared( - ioc, boost::asio::ip::tcp::endpoint{address, port}) + ioc, + boost::asio::ip::tcp::endpoint{address, port}, + etl.getFlatMapBackend()) ->run(); // Run the I/O service on the requested number of threads @@ -305,6 +384,9 @@ main(int argc, char* argv[]) v.reserve(threads - 1); for (auto i = threads - 1; i > 0; --i) v.emplace_back([&ioc] { ioc.run(); }); + std::cout << "created ETL" << std::endl; + etl.run(); + std::cout << "running ETL" << std::endl; ioc.run(); return EXIT_SUCCESS;