diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 6721ef6c..716ee39a 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -44,10 +44,7 @@ BackendInterface::writeLedgerObject( assert(key.size() == sizeof(rippled::uint256)); ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); indexer_.addKey(std::move(key256)); - doWriteLedgerObject( - std::move(key), - seq, - std::move(blob)); + doWriteLedgerObject(std::move(key), seq, std::move(blob)); } std::optional BackendInterface::hardFetchLedgerRangeNoThrow() const @@ -196,6 +193,7 @@ BackendInterface::fetchLedgerPage( { assert(limit != 0); bool incomplete = !isLedgerIndexed(ledgerSequence); + BOOST_LOG_TRIVIAL(debug) << __func__ << " incomplete = " << incomplete; // really low limits almost always miss uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); LedgerPage page; @@ -208,10 +206,14 @@ BackendInterface::fetchLedgerPage( auto partial = doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit); auto end = std::chrono::system_clock::now(); + std::string pageCursorStr = + page.cursor ? ripple::strHex(*page.cursor) : ""; + std::string partialCursorStr = + partial.cursor ? ripple::strHex(*partial.cursor) : ""; BOOST_LOG_TRIVIAL(debug) << __func__ << " " << std::to_string(ledgerSequence) << " " - << std::to_string(adjustedLimit) << " " - << ripple::strHex(*page.cursor) << " - time = " + << std::to_string(adjustedLimit) << " " << pageCursorStr << " - " + << partialCursorStr << " - time = " << std::to_string( std::chrono::duration_cast( end - start) @@ -262,11 +264,6 @@ BackendInterface::fetchLedgerPage( }); page.warning = "Data may be incomplete"; } - if (page.objects.size() >= limit) - { - page.objects.resize(limit); - page.cursor = page.objects.back().key; - } return page; } diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index af365a9b..312e91b4 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -158,9 +158,9 @@ CassandraBackend::doWriteLedgerObject( auto& [key, sequence, blob] = params.data; CassandraStatement statement{insertObject_}; - statement.bindBytes(key); - statement.bindInt(sequence); - statement.bindBytes(blob); + statement.bindNextBytes(key); + statement.bindNextInt(sequence); + statement.bindNextBytes(blob); return statement; }); } @@ -176,8 +176,8 @@ CassandraBackend::writeLedger( [this](auto& params) { auto& [sequence, header] = params.data; CassandraStatement statement{insertLedgerHeader_}; - statement.bindInt(sequence); - statement.bindBytes(header); + statement.bindNextInt(sequence); + statement.bindNextBytes(header); return statement; }); makeAndExecuteAsyncWrite( @@ -186,8 +186,8 @@ CassandraBackend::writeLedger( [this](auto& params) { auto& [hash, sequence] = params.data; CassandraStatement statement{insertLedgerHash_}; - statement.bindBytes(hash); - statement.bindInt(sequence); + statement.bindNextBytes(hash); + statement.bindNextInt(sequence); return statement; }); ledgerSequence_ = ledgerInfo.seq; @@ -211,12 +211,12 @@ CassandraBackend::writeAccountTransactions( [this](auto& params) { CassandraStatement statement(insertAccountTx_); auto& [account, lgrSeq, txnIdx, hash] = params.data; - statement.bindBytes(account); + statement.bindNextBytes(account); uint32_t index = lgrSeq >> 20 << 20; - statement.bindUInt(index); + statement.bindNextUInt(index); - statement.bindIntTuple(lgrSeq, txnIdx); - statement.bindBytes(hash); + statement.bindNextIntTuple(lgrSeq, txnIdx); + statement.bindNextBytes(hash); return statement; }); } @@ -235,8 +235,8 @@ CassandraBackend::writeTransaction( makeAndExecuteAsyncWrite( this, std::move(std::make_pair(seq, hash)), [this](auto& params) { CassandraStatement statement{insertLedgerTransaction_}; - statement.bindInt(params.data.first); - statement.bindBytes(params.data.second); + statement.bindNextInt(params.data.first); + statement.bindNextBytes(params.data.second); return statement; }); makeAndExecuteAsyncWrite( @@ -246,10 +246,10 @@ CassandraBackend::writeTransaction( [this](auto& params) { CassandraStatement statement{insertTransaction_}; auto& [hash, sequence, transaction, metadata] = params.data; - statement.bindBytes(hash); - statement.bindInt(sequence); - statement.bindBytes(transaction); - statement.bindBytes(metadata); + statement.bindNextBytes(hash); + statement.bindNextInt(sequence); + statement.bindNextBytes(transaction); + statement.bindNextBytes(metadata); return statement; }); } @@ -290,7 +290,7 @@ struct ReadCallbackData std::atomic_int& numOutstanding; std::mutex& mtx; std::condition_variable& cv; - bool errored = false; + std::atomic_bool errored = false; ReadCallbackData( std::atomic_int& numOutstanding, std::mutex& m, @@ -321,7 +321,7 @@ struct ReadCallbackData void processAsyncRead(CassFuture* fut, void* cbData) { - ReadCallbackData cb = *static_cast(cbData); + ReadCallbackData& cb = *static_cast(cbData); cb.finish(fut); } std::vector @@ -339,7 +339,7 @@ CassandraBackend::fetchTransactions( for (std::size_t i = 0; i < hashes.size(); ++i) { CassandraStatement statement{selectTransaction_}; - statement.bindBytes(hashes[i]); + statement.bindNextBytes(hashes[i]); cbs.push_back(std::make_shared( numOutstanding, mtx, cv, [i, &results](auto& result) { if (result.hasResult()) @@ -373,7 +373,7 @@ CassandraBackend::fetchAllTransactionHashesInLedger( uint32_t ledgerSequence) const { CassandraStatement statement{selectAllTransactionHashesInLedger_}; - statement.bindInt(ledgerSequence); + statement.bindNextInt(ledgerSequence); auto start = std::chrono::system_clock::now(); CassandraResult result = executeSyncRead(statement); auto end = std::chrono::system_clock::now(); @@ -424,11 +424,11 @@ CassandraBackend::fetchAccountTransactions( else return CassandraStatement{selectAccountTx_}; }(); - statement.bindBytes(account); + statement.bindNextBytes(account); if (cursor) { - statement.bindUInt(cursor->ledgerSequence >> 20 << 20); - statement.bindIntTuple( + statement.bindNextUInt(cursor->ledgerSequence >> 20 << 20); + statement.bindNextIntTuple( cursor->ledgerSequence, cursor->transactionIndex); BOOST_LOG_TRIVIAL(debug) << " account = " << ripple::strHex(account) @@ -439,16 +439,16 @@ CassandraBackend::fetchAccountTransactions( else { int seq = forward ? rng->minSequence : rng->maxSequence; - statement.bindUInt(seq >> 20 << 20); + statement.bindNextUInt(seq >> 20 << 20); int placeHolder = forward ? 0 : INT32_MAX; - statement.bindIntTuple(placeHolder, placeHolder); + statement.bindNextIntTuple(placeHolder, placeHolder); BOOST_LOG_TRIVIAL(debug) << " account = " << ripple::strHex(account) << " idx = " << seq << " tuple = " << placeHolder; } uint32_t adjustedLimit = limit - hashes.size(); - statement.bindUInt(adjustedLimit); + statement.bindNextUInt(adjustedLimit); CassandraResult result = executeSyncRead(statement); if (!result.hasResult()) { @@ -487,8 +487,8 @@ CassandraBackend::fetchAccountTransactions( cursor->transactionIndex = forward ? 0 : INT32_MAX; BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back"; CassandraStatement statement{selectObject_}; - statement.bindBytes(keylet.key); - statement.bindInt(seq); + statement.bindNextBytes(keylet.key); + statement.bindNextInt(seq); CassandraResult result = executeSyncRead(statement); if (!result) { @@ -529,15 +529,15 @@ CassandraBackend::doFetchLedgerPage( BOOST_LOG_TRIVIAL(debug) << __func__ << " - Cursor = " << ripple::strHex(*cursor); CassandraStatement statement{selectKeys_}; - statement.bindInt(index->keyIndex); + statement.bindNextInt(index->keyIndex); if (!cursor) { ripple::uint256 zero; cursor = zero; } - statement.bindBytes(cursor->data(), 1); - statement.bindBytes(*cursor); - statement.bindUInt(limit + 1); + statement.bindNextBytes(cursor->data(), 1); + statement.bindNextBytes(*cursor); + statement.bindNextUInt(limit + 1); CassandraResult result = executeSyncRead(statement); if (!!result) { @@ -613,8 +613,8 @@ CassandraBackend::fetchLedgerObjects( results[i] = result.getBytes(); })); CassandraStatement statement{selectObject_}; - statement.bindBytes(keys[i]); - statement.bindInt(sequence); + statement.bindNextBytes(keys[i]); + statement.bindNextInt(sequence); executeAsyncRead(statement, processAsyncRead, *cbs[i]); } assert(results.size() == cbs.size()); @@ -641,9 +641,9 @@ CassandraBackend::writeKeys( auto bind = [this](auto& params) { auto& [lgrSeq, key] = params.data; CassandraStatement statement{insertKey_}; - statement.bindInt(lgrSeq); - statement.bindBytes(key.data(), 1); - statement.bindBytes(key); + statement.bindNextInt(lgrSeq); + statement.bindNextBytes(key.data(), 1); + statement.bindNextBytes(key); return statement; }; std::atomic_int numOutstanding = 0; @@ -706,9 +706,9 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const auto bind = [this](auto& params) { auto& [key, seq, obj] = params.data; CassandraStatement statement{insertObject_}; - statement.bindBytes(key); - statement.bindInt(seq); - statement.bindBytes(obj); + statement.bindNextBytes(key); + statement.bindNextInt(seq); + statement.bindNextBytes(obj); return statement; }; std::condition_variable cv; @@ -770,7 +770,7 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const std::unique_lock lck(mtx); cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); CassandraStatement statement{deleteLedgerRange_}; - statement.bindInt(minLedger); + statement.bindNextInt(minLedger); executeSyncWrite(statement); // update ledger_range return true; diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 19e1e660..429a3170 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -124,11 +124,11 @@ public: } void - bindBoolean(bool val) + bindNextBoolean(bool val) { if (!statement_) throw std::runtime_error( - "CassandraStatement::bindBoolean - statement_ is null"); + "CassandraStatement::bindNextBoolean - statement_ is null"); CassError rc = cass_statement_bind_bool( statement_, 1, static_cast(val)); if (rc != CASS_OK) @@ -143,45 +143,45 @@ public: } void - bindBytes(const char* data, uint32_t size) + bindNextBytes(const char* data, uint32_t size) { - bindBytes((unsigned char*)data, size); + bindNextBytes((unsigned char*)data, size); } void - bindBytes(ripple::uint256 const& data) + bindNextBytes(ripple::uint256 const& data) { - bindBytes(data.data(), data.size()); + bindNextBytes(data.data(), data.size()); } void - bindBytes(std::vector const& data) + bindNextBytes(std::vector const& data) { - bindBytes(data.data(), data.size()); + bindNextBytes(data.data(), data.size()); } void - bindBytes(ripple::AccountID const& data) + bindNextBytes(ripple::AccountID const& data) { - bindBytes(data.data(), data.size()); + bindNextBytes(data.data(), data.size()); } void - bindBytes(std::string const& data) + bindNextBytes(std::string const& data) { - bindBytes(data.data(), data.size()); + bindNextBytes(data.data(), data.size()); } void - bindBytes(void const* key, uint32_t size) + bindNextBytes(void const* key, uint32_t size) { - bindBytes(static_cast(key), size); + bindNextBytes(static_cast(key), size); } void - bindBytes(const unsigned char* data, uint32_t size) + bindNextBytes(const unsigned char* data, uint32_t size) { if (!statement_) throw std::runtime_error( - "CassandraStatement::bindBytes - statement_ is null"); + "CassandraStatement::bindNextBytes - statement_ is null"); CassError rc = cass_statement_bind_bytes( statement_, curBindingIndex_, @@ -199,11 +199,11 @@ public: } void - bindUInt(uint32_t value) + bindNextUInt(uint32_t value) { if (!statement_) throw std::runtime_error( - "CassandraStatement::bindUInt - statement_ is null"); + "CassandraStatement::bindNextUInt - statement_ is null"); BOOST_LOG_TRIVIAL(trace) << std::to_string(curBindingIndex_) << " " << std::to_string(value); CassError rc = @@ -220,17 +220,17 @@ public: } void - bindInt(uint32_t value) + bindNextInt(uint32_t value) { - bindInt((int64_t)value); + bindNextInt((int64_t)value); } void - bindInt(int64_t value) + bindNextInt(int64_t value) { if (!statement_) throw std::runtime_error( - "CassandraStatement::bindInt - statement_ is null"); + "CassandraStatement::bindNextInt - statement_ is null"); CassError rc = cass_statement_bind_int64(statement_, curBindingIndex_, value); if (rc != CASS_OK) @@ -245,7 +245,7 @@ public: } void - bindIntTuple(uint32_t first, uint32_t second) + bindNextIntTuple(uint32_t first, uint32_t second) { CassTuple* tuple = cass_tuple_new(2); CassError rc = cass_tuple_set_int64(tuple, 0, first); @@ -667,15 +667,15 @@ public: if (isFirstLedger_) { CassandraStatement statement{updateLedgerRange_}; - statement.bindInt(ledgerSequence_); - statement.bindBoolean(false); - statement.bindInt(ledgerSequence_); + statement.bindNextInt(ledgerSequence_); + statement.bindNextBoolean(false); + statement.bindNextInt(ledgerSequence_); executeSyncWrite(statement); } CassandraStatement statement{updateLedgerRange_}; - statement.bindInt(ledgerSequence_); - statement.bindBoolean(true); - statement.bindInt(ledgerSequence_ - 1); + statement.bindNextInt(ledgerSequence_); + statement.bindNextBoolean(true); + statement.bindNextInt(ledgerSequence_ - 1); if (!executeSyncUpdate(statement)) { BOOST_LOG_TRIVIAL(warning) @@ -713,7 +713,7 @@ public: { BOOST_LOG_TRIVIAL(trace) << __func__; CassandraStatement statement{selectLedgerBySeq_}; - statement.bindInt(sequence); + statement.bindNextInt(sequence); CassandraResult result = executeSyncRead(statement); if (!result) @@ -730,7 +730,7 @@ public: { CassandraStatement statement{selectLedgerByHash_}; - statement.bindBytes(hash); + statement.bindNextBytes(hash); CassandraResult result = executeSyncRead(statement); if (!result.hasResult()) @@ -761,8 +761,8 @@ public: { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; CassandraStatement statement{selectObject_}; - statement.bindBytes(key); - statement.bindInt(sequence); + statement.bindNextBytes(key); + statement.bindNextInt(sequence); CassandraResult result = executeSyncRead(statement); if (!result) { @@ -780,7 +780,7 @@ public: { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; CassandraStatement statement{getToken_}; - statement.bindBytes(key, 32); + statement.bindNextBytes(key, 32); CassandraResult result = executeSyncRead(statement); if (!result) { @@ -799,7 +799,7 @@ public: { BOOST_LOG_TRIVIAL(trace) << __func__; CassandraStatement statement{selectTransaction_}; - statement.bindBytes(hash); + statement.bindNextBytes(hash); CassandraResult result = executeSyncRead(statement); if (!result) { @@ -830,10 +830,8 @@ public: uint32_t sequence) const override; void - doWriteLedgerObject( - std::string&& key, - uint32_t seq, - std::string&& blob) const override; + doWriteLedgerObject(std::string&& key, uint32_t seq, std::string&& blob) + const override; void writeAccountTransactions( diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 833f9e67..9ab98b84 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -145,12 +145,14 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) void ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - Publishing ledger " << std::to_string(lgrInfo.seq); backend_->updateRange(lgrInfo.seq); auto ledgerRange = backend_->fetchLedgerRange(); std::optional fees; std::vector transactions; - for (;;) + while (true) { try { @@ -180,6 +182,8 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) subscriptions_->pubTransaction(txAndMeta, lgrInfo.seq); setLastPublish(); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - Published ledger " << std::to_string(lgrInfo.seq); } bool @@ -301,11 +305,10 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) } BOOST_LOG_TRIVIAL(debug) << __func__ << " : " - << "wrote objects. num objects = " - << std::to_string(rawData.ledger_objects().objects_size()); + << "Inserted/modified/deleted all objects. Number of objects = " + << rawData.ledger_objects().objects_size(); std::vector accountTxData{ insertTransactions(lgrInfo, rawData)}; - BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Inserted all transactions. Number of transactions = " @@ -313,28 +316,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) backend_->writeAccountTransactions(std::move(accountTxData)); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "wrote account_tx"; - accumTxns_ += rawData.transactions_list().transactions_size(); - bool success = true; - if (accumTxns_ >= txnThreshold_) - { - auto start = std::chrono::system_clock::now(); - success = backend_->finishWrites(lgrInfo.seq); - auto end = std::chrono::system_clock::now(); + auto start = std::chrono::system_clock::now(); + bool success = backend_->finishWrites(lgrInfo.seq); + auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Accumulated " << std::to_string(accumTxns_) - << " transactions. Wrote in " << std::to_string(duration) - << " transactions per second = " - << std::to_string(accumTxns_ / duration); - accumTxns_ = 0; - } - else - BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping commit"; + auto duration = ((end - start).count()) / 1000000000.0; BOOST_LOG_TRIVIAL(debug) - << __func__ << " : " - << "Inserted/modified/deleted all objects. Number of objects = " - << rawData.ledger_objects().objects_size(); + << __func__ << " finished writes. took " << std::to_string(duration); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -477,29 +465,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) beast::setCurrentThreadName("rippled: ReportingETL transform"); uint32_t currentSequence = startSequence; - int counter = 0; - std::atomic_int per = 100; - auto startTimer = [this, &per]() { - auto innerFunc = [this, &per](auto& f) -> void { - std::shared_ptr timer = - std::make_shared( - ioContext_, - std::chrono::steady_clock::now() + - std::chrono::minutes(5)); - timer->async_wait( - [timer, f, &per](const boost::system::error_code& error) { - ++per; - BOOST_LOG_TRIVIAL(info) - << "Incremented per to " << std::to_string(per); - if (per > 100) - per = 100; - f(f); - }); - }; - innerFunc(innerFunc); - }; - // startTimer(); - auto begin = std::chrono::system_clock::now(); while (!writeConflict) @@ -527,7 +492,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) if (success) BOOST_LOG_TRIVIAL(info) << "Load phase of etl : " - << "Successfully published ledger! Ledger info: " + << "Successfully wrote ledger! Ledger info: " << detail::toString(lgrInfo) << ". txn count = " << numTxns << ". object count = " << numObjects << ". load time = " << duration @@ -558,19 +523,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) deleting_ = false; }); } - /* - if (++counter >= per) - { - std::chrono::milliseconds sleep = - std::chrono::duration_cast( - std::chrono::seconds(4) - (end - begin)); - BOOST_LOG_TRIVIAL(info) << "Sleeping for " << sleep.count() - << " . per = " << std::to_string(per); - std::this_thread::sleep_for(sleep); - counter = 0; - begin = std::chrono::system_clock::now(); - } - */ } }}; diff --git a/src/rpc/RPCHelpers.cpp b/src/rpc/RPCHelpers.cpp index 82830da1..bec0b03b 100644 --- a/src/rpc/RPCHelpers.cpp +++ b/src/rpc/RPCHelpers.cpp @@ -84,21 +84,44 @@ std::pair< std::shared_ptr> deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs) { - std::pair< - std::shared_ptr, - std::shared_ptr> - result; + try { - ripple::SerialIter s{ - blobs.transaction.data(), blobs.transaction.size()}; - result.first = std::make_shared(s); + std::pair< + std::shared_ptr, + std::shared_ptr> + result; + { + ripple::SerialIter s{ + blobs.transaction.data(), blobs.transaction.size()}; + result.first = std::make_shared(s); + } + { + ripple::SerialIter s{blobs.metadata.data(), blobs.metadata.size()}; + result.second = + std::make_shared(s, ripple::sfMetadata); + } + return result; } + catch (std::exception const& e) { - ripple::SerialIter s{blobs.metadata.data(), blobs.metadata.size()}; - result.second = - std::make_shared(s, ripple::sfMetadata); + std::stringstream txn; + std::stringstream meta; + std::copy( + blobs.transaction.begin(), + blobs.transaction.end(), + std::ostream_iterator(txn)); + std::copy( + blobs.metadata.begin(), + blobs.metadata.end(), + std::ostream_iterator(meta)); + BOOST_LOG_TRIVIAL(error) + << __func__ + << " Failed to deserialize transaction. txn = " << txn.str() + << " - meta = " << meta.str() + << " txn length = " << std::to_string(blobs.transaction.size()) + << " meta length = " << std::to_string(blobs.metadata.size()); + throw e; } - return result; } std::pair< diff --git a/src/rpc/handlers/LedgerData.cpp b/src/rpc/handlers/LedgerData.cpp index 2663e2bf..21b5aa6b 100644 --- a/src/rpc/handlers/LedgerData.cpp +++ b/src/rpc/handlers/LedgerData.cpp @@ -21,8 +21,8 @@ #include #include -#include #include +#include // Get state nodes from a ledger // Inputs: // limit: integer, maximum number of entries @@ -37,8 +37,7 @@ // // -namespace RPC -{ +namespace RPC { Result doLedgerData(Context const& context) @@ -47,34 +46,34 @@ doLedgerData(Context const& context) boost::json::object response = {}; bool binary = false; - if(request.contains("binary")) + if (request.contains("binary")) { - if(!request.at("binary").is_bool()) + if (!request.at("binary").is_bool()) return Status{Error::rpcINVALID_PARAMS, "binaryFlagNotBool"}; - + binary = request.at("binary").as_bool(); } std::size_t limit = binary ? 2048 : 256; - if(request.contains("limit")) + if (request.contains("limit")) { - if(!request.at("limit").is_int64()) + if (!request.at("limit").is_int64()) return Status{Error::rpcINVALID_PARAMS, "limitNotInteger"}; - + limit = value_to(request.at("limit")); } std::optional cursor; - if(request.contains("cursor")) + if (request.contains("marker")) { - if(!request.at("cursor").is_string()) - return Status{Error::rpcINVALID_PARAMS, "cursorNotString"}; - - BOOST_LOG_TRIVIAL(debug) << __func__ << " : parsing cursor"; + if (!request.at("marker").is_string()) + return Status{Error::rpcINVALID_PARAMS, "markerNotString"}; + + BOOST_LOG_TRIVIAL(debug) << __func__ << " : parsing marker"; cursor = ripple::uint256{}; - if(!cursor->parseHex(request.at("cursor").as_string().c_str())) - return Status{Error::rpcINVALID_PARAMS, "cursorMalformed"}; + if (!cursor->parseHex(request.at("marker").as_string().c_str())) + return Status{Error::rpcINVALID_PARAMS, "markerMalformed"}; } auto v = ledgerInfoFromRequest(context); @@ -94,7 +93,7 @@ doLedgerData(Context const& context) .count(); boost::json::object header; - if(!cursor) + if (!cursor) { if (binary) { @@ -106,8 +105,10 @@ doLedgerData(Context const& context) header["account_hash"] = ripple::strHex(lgrInfo.accountHash); header["close_flags"] = lgrInfo.closeFlags; header["close_time"] = lgrInfo.closeTime.time_since_epoch().count(); - header["close_time_human"] = ripple::to_string(lgrInfo.closeTime);; - header["close_time_resolution"] = lgrInfo.closeTimeResolution.count(); + header["close_time_human"] = ripple::to_string(lgrInfo.closeTime); + ; + header["close_time_resolution"] = + lgrInfo.closeTimeResolution.count(); header["closed"] = true; header["hash"] = ripple::strHex(lgrInfo.hash); header["ledger_hash"] = ripple::strHex(lgrInfo.hash); @@ -123,7 +124,7 @@ doLedgerData(Context const& context) response["ledger"] = header; } } - + response["ledger_hash"] = ripple::strHex(lgrInfo.hash); response["ledger_index"] = lgrInfo.seq; @@ -131,8 +132,12 @@ doLedgerData(Context const& context) std::vector& results = page.objects; std::optional const& returnedCursor = page.cursor; - if(returnedCursor) + if (returnedCursor) + { response["marker"] = ripple::strHex(*returnedCursor); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " cursor = " << ripple::strHex(*returnedCursor); + } BOOST_LOG_TRIVIAL(debug) << __func__ << " number of results = " << results.size(); @@ -152,7 +157,6 @@ doLedgerData(Context const& context) } response["state"] = objects; - if (cursor && page.warning) { response["warning"] = @@ -167,4 +171,4 @@ doLedgerData(Context const& context) return response; } -} // namespace RPC \ No newline at end of file +} // namespace RPC diff --git a/test.py b/test.py index 8dd32d31..81e55e28 100755 --- a/test.py +++ b/test.py @@ -810,7 +810,7 @@ async def perf(ip, port): parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info"]) +parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info", "gaps"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') @@ -864,6 +864,15 @@ def run(args): elif args.action == "perf": asyncio.get_event_loop().run_until_complete( perf(args.ip,args.port)) + elif args.action == "gaps": + missing = [] + for x in range(rng[0],rng[1]): + res = asyncio.get_event_loop().run_until_complete( + ledger(args.ip, args.port, x, True, False, False)) + if "error" in res: + print("missing " + str(x)) + missing.append(x) + print(missing) elif args.action == "account_info": res1 = asyncio.get_event_loop().run_until_complete( account_info(args.ip, args.port, args.account, args.ledger, args.binary))