From 3db9db9354f40d5343966cb679f9aba7b8e955a0 Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Tue, 1 Mar 2022 17:13:28 -0600 Subject: [PATCH] Fix memory errors (#108) * frees getUint64Tuple tuple iterator * explicitly create strand in synchronous() --- src/backend/BackendInterface.cpp | 5 +++-- src/backend/BackendInterface.h | 7 +++---- src/backend/CassandraBackend.h | 12 ++++++++++++ src/backend/Pg.cpp | 2 +- src/etl/ReportingETL.cpp | 18 ++++++++++-------- src/subscriptions/SubscriptionManager.cpp | 2 +- 6 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 99ac1574..716fb98e 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -252,8 +252,9 @@ BackendInterface::fetchLedgerPage( std::vector keys; while (keys.size() < limit) { - ripple::uint256 const& curCursor = - keys.size() ? keys.back() : cursor ? *cursor : firstKey; + ripple::uint256 const& curCursor = keys.size() ? keys.back() + : cursor ? *cursor + : firstKey; auto succ = fetchSuccessorKey(curCursor, ledgerSequence, yield); if (!succ) diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index df5d3810..a750f66d 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -37,18 +37,17 @@ retryOnTimeout(F func, size_t waitMs = 500) } } -// Please note, this function only works w/ non-void return type. Writes are -// synchronous anyways, so template void synchronous(F&& f) { boost::asio::io_context ctx; + boost::asio::io_context::strand strand(ctx); std::optional work; work.emplace(ctx); - boost::asio::spawn(ctx, [&f, &work](boost::asio::yield_context yield) { + boost::asio::spawn(strand, [&f, &work](boost::asio::yield_context yield) { f(yield); work.reset(); @@ -218,7 +217,7 @@ public: hardFetchLedgerRange() const { std::optional range = {}; - synchronous([&](boost::asio::yield_context yield) { + synchronous([&](boost::asio::yield_context& yield) { range = hardFetchLedgerRange(yield); }); diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index ba3843a7..cbbd6bae 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -423,20 +423,32 @@ public: if (!row_) throw std::runtime_error( "CassandraResult::getInt64Tuple - no result"); + CassValue const* tuple = cass_row_get_column(row_, curGetIndex_); CassIterator* tupleIter = cass_iterator_from_tuple(tuple); + if (!cass_iterator_next(tupleIter)) + { + cass_iterator_free(tupleIter); throw std::runtime_error( "CassandraResult::getInt64Tuple - failed to iterate tuple"); + } + CassValue const* value = cass_iterator_get_value(tupleIter); std::int64_t first; cass_value_get_int64(value, &first); if (!cass_iterator_next(tupleIter)) + { + cass_iterator_free(tupleIter); throw std::runtime_error( "CassandraResult::getInt64Tuple - failed to iterate tuple"); + } + value = cass_iterator_get_value(tupleIter); std::int64_t second; cass_value_get_int64(value, &second); + cass_iterator_free(tupleIter); + ++curGetIndex_; return {first, second}; } diff --git a/src/backend/Pg.cpp b/src/backend/Pg.cpp index 7d6d2ff9..187999de 100644 --- a/src/backend/Pg.cpp +++ b/src/backend/Pg.cpp @@ -886,7 +886,7 @@ make_PgPool(boost::asio::io_context& ioc, boost::json::object const& config) auto ret = std::make_shared(ioc, configCopy); ret->setup(); - Backend::synchronous([&](boost::asio::yield_context yield) { + Backend::synchronous([&](boost::asio::yield_context& yield) { PgQuery pgQuery(ret); std::string query = "CREATE DATABASE " + std::string{config.at("database").as_string().c_str()}; diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index f1539086..7f82e2db 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -141,7 +141,7 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) std::vector diff; auto fetchDiffSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context yield) { + Backend::synchronous([&](boost::asio::yield_context& yield) { diff = backend_->fetchLedgerDiff(lgrInfo.seq, yield); }); }; @@ -156,13 +156,13 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) std::vector transactions = {}; auto fetchFeesSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context yield) { + Backend::synchronous([&](boost::asio::yield_context& yield) { fees = backend_->fetchFees(lgrInfo.seq, yield); }); }; auto fetchTxSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context yield) { + Backend::synchronous([&](boost::asio::yield_context& yield) { transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield); }); @@ -224,7 +224,7 @@ ReportingETL::publishLedger( { std::optional lgr = {}; auto fetchLedgerSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context yield) { + Backend::synchronous([&](boost::asio::yield_context& yield) { lgr = backend_->fetchLedgerBySequence(ledgerSequence, yield); }); @@ -733,9 +733,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) ioContext_.post([this, &minSequence]() { BOOST_LOG_TRIVIAL(info) << "Running online delete"; - Backend::synchronous([&](boost::asio::yield_context yield) { - backend_->doOnlineDelete(*onlineDeleteInterval_, yield); - }); + Backend::synchronous( + [&](boost::asio::yield_context& yield) { + backend_->doOnlineDelete( + *onlineDeleteInterval_, yield); + }); BOOST_LOG_TRIVIAL(info) << "Finished online delete"; auto rng = backend_->fetchLedgerRange(); @@ -781,7 +783,7 @@ void ReportingETL::monitor() { std::optional latestSequence = {}; - Backend::synchronous([&](boost::asio::yield_context yield) { + Backend::synchronous([&](boost::asio::yield_context& yield) { latestSequence = backend_->fetchLatestLedgerSequence(yield); }); diff --git a/src/subscriptions/SubscriptionManager.cpp b/src/subscriptions/SubscriptionManager.cpp index d42f8d68..e55e1714 100644 --- a/src/subscriptions/SubscriptionManager.cpp +++ b/src/subscriptions/SubscriptionManager.cpp @@ -237,7 +237,7 @@ SubscriptionManager::pubTransaction( { ripple::STAmount ownerFunds; auto fetchFundsSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context yield) { + Backend::synchronous([&](boost::asio::yield_context& yield) { ownerFunds = RPC::accountFunds( *backend_, lgrInfo.seq, amount, account, yield); });