Fix memory errors (#108)

* frees getUint64Tuple tuple iterator

* explicitly create strand in synchronous()
This commit is contained in:
Nathan Nichols
2022-03-01 17:13:28 -06:00
committed by GitHub
parent 4839e0d58b
commit 3db9db9354
6 changed files with 30 additions and 16 deletions

View File

@@ -252,8 +252,9 @@ BackendInterface::fetchLedgerPage(
std::vector<ripple::uint256> keys; std::vector<ripple::uint256> keys;
while (keys.size() < limit) while (keys.size() < limit)
{ {
ripple::uint256 const& curCursor = ripple::uint256 const& curCursor = keys.size() ? keys.back()
keys.size() ? keys.back() : cursor ? *cursor : firstKey; : cursor ? *cursor
: firstKey;
auto succ = fetchSuccessorKey(curCursor, ledgerSequence, yield); auto succ = fetchSuccessorKey(curCursor, ledgerSequence, yield);
if (!succ) if (!succ)

View File

@@ -37,18 +37,17 @@ retryOnTimeout(F func, size_t waitMs = 500)
} }
} }
// Please note, this function only works w/ non-void return type. Writes are
// synchronous anyways, so
template <class F> template <class F>
void void
synchronous(F&& f) synchronous(F&& f)
{ {
boost::asio::io_context ctx; boost::asio::io_context ctx;
boost::asio::io_context::strand strand(ctx);
std::optional<boost::asio::io_context::work> work; std::optional<boost::asio::io_context::work> work;
work.emplace(ctx); work.emplace(ctx);
boost::asio::spawn(ctx, [&f, &work](boost::asio::yield_context yield) { boost::asio::spawn(strand, [&f, &work](boost::asio::yield_context yield) {
f(yield); f(yield);
work.reset(); work.reset();
@@ -218,7 +217,7 @@ public:
hardFetchLedgerRange() const hardFetchLedgerRange() const
{ {
std::optional<LedgerRange> range = {}; std::optional<LedgerRange> range = {};
synchronous([&](boost::asio::yield_context yield) { synchronous([&](boost::asio::yield_context& yield) {
range = hardFetchLedgerRange(yield); range = hardFetchLedgerRange(yield);
}); });

View File

@@ -423,20 +423,32 @@ public:
if (!row_) if (!row_)
throw std::runtime_error( throw std::runtime_error(
"CassandraResult::getInt64Tuple - no result"); "CassandraResult::getInt64Tuple - no result");
CassValue const* tuple = cass_row_get_column(row_, curGetIndex_); CassValue const* tuple = cass_row_get_column(row_, curGetIndex_);
CassIterator* tupleIter = cass_iterator_from_tuple(tuple); CassIterator* tupleIter = cass_iterator_from_tuple(tuple);
if (!cass_iterator_next(tupleIter)) if (!cass_iterator_next(tupleIter))
{
cass_iterator_free(tupleIter);
throw std::runtime_error( throw std::runtime_error(
"CassandraResult::getInt64Tuple - failed to iterate tuple"); "CassandraResult::getInt64Tuple - failed to iterate tuple");
}
CassValue const* value = cass_iterator_get_value(tupleIter); CassValue const* value = cass_iterator_get_value(tupleIter);
std::int64_t first; std::int64_t first;
cass_value_get_int64(value, &first); cass_value_get_int64(value, &first);
if (!cass_iterator_next(tupleIter)) if (!cass_iterator_next(tupleIter))
{
cass_iterator_free(tupleIter);
throw std::runtime_error( throw std::runtime_error(
"CassandraResult::getInt64Tuple - failed to iterate tuple"); "CassandraResult::getInt64Tuple - failed to iterate tuple");
}
value = cass_iterator_get_value(tupleIter); value = cass_iterator_get_value(tupleIter);
std::int64_t second; std::int64_t second;
cass_value_get_int64(value, &second); cass_value_get_int64(value, &second);
cass_iterator_free(tupleIter);
++curGetIndex_; ++curGetIndex_;
return {first, second}; return {first, second};
} }

View File

@@ -886,7 +886,7 @@ make_PgPool(boost::asio::io_context& ioc, boost::json::object const& config)
auto ret = std::make_shared<PgPool>(ioc, configCopy); auto ret = std::make_shared<PgPool>(ioc, configCopy);
ret->setup(); ret->setup();
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous([&](boost::asio::yield_context& yield) {
PgQuery pgQuery(ret); PgQuery pgQuery(ret);
std::string query = "CREATE DATABASE " + std::string query = "CREATE DATABASE " +
std::string{config.at("database").as_string().c_str()}; std::string{config.at("database").as_string().c_str()};

View File

@@ -141,7 +141,7 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
std::vector<Backend::LedgerObject> diff; std::vector<Backend::LedgerObject> diff;
auto fetchDiffSynchronous = [&]() { auto fetchDiffSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous([&](boost::asio::yield_context& yield) {
diff = backend_->fetchLedgerDiff(lgrInfo.seq, yield); diff = backend_->fetchLedgerDiff(lgrInfo.seq, yield);
}); });
}; };
@@ -156,13 +156,13 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
std::vector<Backend::TransactionAndMetadata> transactions = {}; std::vector<Backend::TransactionAndMetadata> transactions = {};
auto fetchFeesSynchronous = [&]() { auto fetchFeesSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous([&](boost::asio::yield_context& yield) {
fees = backend_->fetchFees(lgrInfo.seq, yield); fees = backend_->fetchFees(lgrInfo.seq, yield);
}); });
}; };
auto fetchTxSynchronous = [&]() { auto fetchTxSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous([&](boost::asio::yield_context& yield) {
transactions = transactions =
backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield); backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
}); });
@@ -224,7 +224,7 @@ ReportingETL::publishLedger(
{ {
std::optional<ripple::LedgerInfo> lgr = {}; std::optional<ripple::LedgerInfo> lgr = {};
auto fetchLedgerSynchronous = [&]() { auto fetchLedgerSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous([&](boost::asio::yield_context& yield) {
lgr = lgr =
backend_->fetchLedgerBySequence(ledgerSequence, yield); backend_->fetchLedgerBySequence(ledgerSequence, yield);
}); });
@@ -733,8 +733,10 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
ioContext_.post([this, &minSequence]() { ioContext_.post([this, &minSequence]() {
BOOST_LOG_TRIVIAL(info) << "Running online delete"; BOOST_LOG_TRIVIAL(info) << "Running online delete";
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous(
backend_->doOnlineDelete(*onlineDeleteInterval_, yield); [&](boost::asio::yield_context& yield) {
backend_->doOnlineDelete(
*onlineDeleteInterval_, yield);
}); });
BOOST_LOG_TRIVIAL(info) << "Finished online delete"; BOOST_LOG_TRIVIAL(info) << "Finished online delete";
@@ -781,7 +783,7 @@ void
ReportingETL::monitor() ReportingETL::monitor()
{ {
std::optional<uint32_t> latestSequence = {}; std::optional<uint32_t> latestSequence = {};
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous([&](boost::asio::yield_context& yield) {
latestSequence = backend_->fetchLatestLedgerSequence(yield); latestSequence = backend_->fetchLatestLedgerSequence(yield);
}); });

View File

@@ -237,7 +237,7 @@ SubscriptionManager::pubTransaction(
{ {
ripple::STAmount ownerFunds; ripple::STAmount ownerFunds;
auto fetchFundsSynchronous = [&]() { auto fetchFundsSynchronous = [&]() {
Backend::synchronous([&](boost::asio::yield_context yield) { Backend::synchronous([&](boost::asio::yield_context& yield) {
ownerFunds = RPC::accountFunds( ownerFunds = RPC::accountFunds(
*backend_, lgrInfo.seq, amount, account, yield); *backend_, lgrInfo.seq, amount, account, yield);
}); });