Add time measurement profiler (#458)

Rebase
This commit is contained in:
cyan317
2022-12-20 18:57:47 +00:00
committed by GitHub
parent 37c765a072
commit 2f65a26dc7
14 changed files with 379 additions and 226 deletions

View File

@@ -9,7 +9,7 @@ formatter="clang-format -i"
first=$(git diff $sources)
find $sources -type f \( -name '*.cpp' -o -name '*.h' -o -name '*.ipp' \) -print0 | xargs -0 $formatter
second=$(git diff $sources)
changes=$(diff <(echo "$first") <(echo "$second") | wc -l)
changes=$(diff <(echo "$first") <(echo "$second") | wc -l | sed -e 's/^[[:space:]]*//')
if [ "$changes" != "0" ]; then
cat <<\EOF

View File

@@ -114,6 +114,7 @@ if(BUILD_TESTS)
unittests/Backend.cpp
unittests/Logger.cpp
unittests/Config.cpp
unittests/ProfilerTest.cpp
unittests/DOSGuard.cpp)
include(CMake/deps/gtest.cmake)
endif()

View File

@@ -21,6 +21,7 @@
#include <backend/CassandraBackend.h>
#include <backend/DBHelpers.h>
#include <log/Logger.h>
#include <util/Profiler.h>
#include <functional>
#include <unordered_map>
@@ -524,8 +525,7 @@ CassandraBackend::fetchTransactions(
std::vector<TransactionAndMetadata> results{numHashes};
std::vector<std::shared_ptr<ReadCallbackData<result_type>>> cbs;
cbs.reserve(numHashes);
auto start = std::chrono::system_clock::now();
auto timeDiff = util::timed([&]() {
for (std::size_t i = 0; i < hashes.size(); ++i)
{
CassandraStatement statement{selectTransaction_};
@@ -548,8 +548,7 @@ CassandraBackend::fetchTransactions(
// suspend the coroutine until completion handler is called.
result.get();
numReadRequestsOutstanding_ -= hashes.size();
auto end = std::chrono::system_clock::now();
});
for (auto const& cb : cbs)
{
if (cb->errored)
@@ -557,10 +556,7 @@ CassandraBackend::fetchTransactions(
}
log_.debug() << "Fetched " << numHashes
<< " transactions from Cassandra in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
end - start)
.count()
<< " transactions from Cassandra in " << timeDiff
<< " milliseconds";
return results;
}

View File

@@ -31,6 +31,7 @@
#include <etl/ReportingETL.h>
#include <log/Logger.h>
#include <rpc/RPCHelpers.h>
#include <util/Profiler.h>
#include <thread>
@@ -807,18 +808,22 @@ ETLSourceImpl<Derived>::loadInitialLedger(
backend_->cache().setFull();
if (!cacheOnly)
{
auto start = std::chrono::system_clock::now();
auto seconds = util::timed<std::chrono::seconds>([&]() {
for (auto& key : edgeKeys)
{
log_.debug() << "Writing edge key = " << ripple::strHex(key);
log_.debug()
<< "Writing edge key = " << ripple::strHex(key);
auto succ = backend_->cache().getSuccessor(
*ripple::uint256::fromVoidChecked(key), sequence);
if (succ)
backend_->writeSuccessor(
std::move(key), sequence, uint256ToString(succ->key));
std::move(key),
sequence,
uint256ToString(succ->key));
}
ripple::uint256 prev = Backend::firstKey;
while (auto cur = backend_->cache().getSuccessor(prev, sequence))
while (auto cur =
backend_->cache().getSuccessor(prev, sequence))
{
assert(cur);
if (prev == Backend::firstKey)
@@ -854,7 +859,8 @@ ETLSourceImpl<Derived>::loadInitialLedger(
}
prev = std::move(cur->key);
if (numWrites % 100000 == 0 && numWrites != 0)
log_.info() << "Wrote " << numWrites << " book successors";
log_.info()
<< "Wrote " << numWrites << " book successors";
}
backend_->writeSuccessor(
@@ -863,10 +869,7 @@ ETLSourceImpl<Derived>::loadInitialLedger(
uint256ToString(Backend::lastKey));
++numWrites;
auto end = std::chrono::system_clock::now();
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(end - start)
.count();
});
log_.info()
<< "Looping through cache and submitting all writes took "
<< seconds

View File

@@ -24,6 +24,7 @@
#include <etl/ReportingETL.h>
#include <log/Logger.h>
#include <subscriptions/SubscriptionManager.h>
#include <util/Profiler.h>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
@@ -137,8 +138,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
log_.debug() << "Deserialized ledger header. " << detail::toString(lgrInfo);
auto start = std::chrono::system_clock::now();
auto timeDiff = util::timed<std::chrono::duration<double>>([&]() {
backend_->startWrites();
log_.debug() << "Started writes";
@@ -151,10 +151,11 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
insertTransactions(lgrInfo, *ledgerData);
log_.debug() << "Inserted txns";
// download the full account state map. This function downloads full ledger
// data and pushes the downloaded data into the writeQueue. asyncWriter
// consumes from the queue and inserts the data into the Ledger object.
// Once the below call returns, all data has been pushed into the queue
// download the full account state map. This function downloads full
// ledger data and pushes the downloaded data into the writeQueue.
// asyncWriter consumes from the queue and inserts the data into the
// Ledger object. Once the below call returns, all data has been pushed
// into the queue
loadBalancer_->loadInitialLedger(startingSequence);
log_.debug() << "Loaded initial ledger";
@@ -164,13 +165,12 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
backend_->writeAccountTransactions(
std::move(insertTxResult.accountTxData));
backend_->writeNFTs(std::move(insertTxResult.nfTokensData));
backend_->writeNFTTransactions(std::move(insertTxResult.nfTokenTxData));
backend_->writeNFTTransactions(
std::move(insertTxResult.nfTokenTxData));
}
backend_->finishWrites(startingSequence);
auto end = std::chrono::system_clock::now();
log_.debug() << "Time to download and store ledger = "
<< ((end - start).count()) / 1000000000.0;
});
log_.debug() << "Time to download and store ledger = " << timeDiff;
return lgrInfo;
}
@@ -530,10 +530,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
backend_->writeNFTTransactions(std::move(insertTxResult.nfTokenTxData));
log_.debug() << "wrote account_tx";
auto start = std::chrono::system_clock::now();
bool success = backend_->finishWrites(lgrInfo.seq);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
auto [success, duration] = util::timed<std::chrono::duration<double>>(
[&]() { return backend_->finishWrites(lgrInfo.seq); });
log_.debug() << "Finished writes. took " << std::to_string(duration);
log_.debug() << "Finished ledger update. " << detail::toString(lgrInfo);
@@ -620,12 +618,10 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
currentSequence) &&
!writeConflict && !isStopping())
{
auto start = std::chrono::system_clock::now();
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchResponse{fetchLedgerDataAndDiff(currentSequence)};
auto end = std::chrono::system_clock::now();
auto time = ((end - start).count()) / 1000000000.0;
auto [fetchResponse, time] =
util::timed<std::chrono::duration<double>>([&]() {
return fetchLedgerDataAndDiff(currentSequence);
});
totalTime += time;
// if the fetch is unsuccessful, stop. fetchLedger only

View File

@@ -21,6 +21,7 @@
#include <backend/BackendInterface.h>
#include <log/Logger.h>
#include <rpc/RPCHelpers.h>
#include <util/Profiler.h>
#include <boost/algorithm/string.hpp>
#include <boost/format.hpp>
@@ -798,14 +799,10 @@ traverseOwnedNodes(
.count()
<< " milliseconds";
start = std::chrono::system_clock::now();
auto objects = backend.fetchLedgerObjects(keys, sequence, yield);
end = std::chrono::system_clock::now();
auto [objects, timeDiff] = util::timed(
[&]() { return backend.fetchLedgerObjects(keys, sequence, yield); });
gLog.debug() << "Time loading owned entries: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
end - start)
.count()
gLog.debug() << "Time loading owned entries: " << timeDiff
<< " milliseconds";
for (auto i = 0; i < objects.size(); ++i)
@@ -1652,8 +1649,7 @@ traverseTransactions(
boost::json::array txns;
auto [blobs, retCursor] = transactionFetcher(
context.backend, limit, forward, cursor, context.yield);
auto serializationStart = std::chrono::system_clock::now();
auto timeDiff = util::timed([&, &retCursor = retCursor, &blobs = blobs]() {
if (retCursor)
{
boost::json::object cursorJson;
@@ -1672,7 +1668,8 @@ traverseTransactions(
}
else if (txnPlusMeta.ledgerSequence > maxIndex && !forward)
{
gLog.debug() << "Skipping over transactions from incomplete ledger";
gLog.debug()
<< "Skipping over transactions from incomplete ledger";
continue;
}
@@ -1701,11 +1698,9 @@ traverseTransactions(
response[JS(ledger_index_min)] = minIndex;
response[JS(ledger_index_max)] = maxIndex;
response[JS(transactions)] = txns;
});
gLog.info() << "serialization took " << timeDiff
gLog.info() << "serialization took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - serializationStart)
.count()
<< " milliseconds";
return response;

View File

@@ -19,6 +19,7 @@
#include <log/Logger.h>
#include <rpc/RPCHelpers.h>
#include <util/Profiler.h>
using namespace clio;
@@ -37,8 +38,7 @@ doAccountTx(Context const& context)
return status;
constexpr std::string_view outerFuncName = __func__;
auto const maybeResponse =
traverseTransactions(
auto const maybeResponse = traverseTransactions(
context,
[&accountID, &outerFuncName](
std::shared_ptr<Backend::BackendInterface const> const& backend,
@@ -46,14 +46,11 @@ doAccountTx(Context const& context)
bool const forward,
std::optional<Backend::TransactionsCursor> const& cursorIn,
boost::asio::yield_context& yield) {
auto const start = std::chrono::system_clock::now();
auto const txnsAndCursor = backend->fetchAccountTransactions(
auto [txnsAndCursor, timeDiff] = util::timed([&]() {
return backend->fetchAccountTransactions(
accountID, limit, forward, cursorIn, yield);
gLog.info()
<< outerFuncName << " db fetch took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count()
});
gLog.info() << outerFuncName << " db fetch took " << timeDiff
<< " milliseconds - num blobs = "
<< txnsAndCursor.txns.size();
return txnsAndCursor;

View File

@@ -19,6 +19,7 @@
#include <log/Logger.h>
#include <rpc/RPCHelpers.h>
#include <util/Profiler.h>
using namespace clio;
@@ -38,8 +39,7 @@ doNFTHistory(Context const& context)
auto const tokenID = std::get<ripple::uint256>(maybeTokenID);
constexpr std::string_view outerFuncName = __func__;
auto const maybeResponse =
traverseTransactions(
auto const maybeResponse = traverseTransactions(
context,
[&tokenID, &outerFuncName](
std::shared_ptr<Backend::BackendInterface const> const& backend,
@@ -48,14 +48,12 @@ doNFTHistory(Context const& context)
std::optional<Backend::TransactionsCursor> const& cursorIn,
boost::asio::yield_context& yield)
-> Backend::TransactionsAndCursor {
auto const start = std::chrono::system_clock::now();
auto const txnsAndCursor = backend->fetchNFTTransactions(
auto const [txnsAndCursor, timeDiff] =
util::timed([&, &tokenID = tokenID]() {
return backend->fetchNFTTransactions(
tokenID, limit, forward, cursorIn, yield);
gLog.info()
<< outerFuncName << " db fetch took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count()
});
gLog.info() << outerFuncName << " db fetch took " << timeDiff
<< " milliseconds - num blobs = "
<< txnsAndCursor.txns.size();
return txnsAndCursor;

59
src/util/Profiler.h Normal file
View File

@@ -0,0 +1,59 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <chrono>
#include <type_traits>
#include <utility>
namespace util {
/**
* @brief Profiler function to measure the time consuming
* @param func function object, can be a lamdba or function wrapper
* @return return a pair if function wrapper has return value: result of
* function wrapper and the elapsed time(ms) during executing the given
* function only return the elapsed time if function wrapper does not have
* return value
*/
template <typename U = std::chrono::milliseconds, typename F>
[[nodiscard]] auto
timed(F&& func)
{
auto start = std::chrono::system_clock::now();
if constexpr (std::is_same_v<decltype(func()), void>)
{
func();
return std::chrono::duration_cast<U>(
std::chrono::system_clock::now() - start)
.count();
}
else
{
auto ret = func();
auto elapsed = std::chrono::duration_cast<U>(
std::chrono::system_clock::now() - start)
.count();
return std::make_pair(ret, elapsed);
}
}
} // namespace util

View File

@@ -108,7 +108,6 @@ public:
}
{
auto it = ipConnCount_.find(ip);
assert(it != ipConnCount_.end());
if (it != ipConnCount_.end())
{
connsOk = it->second <= maxConnCount_;

View File

@@ -42,6 +42,7 @@
#include <rpc/Counters.h>
#include <rpc/RPC.h>
#include <rpc/WorkQueue.h>
#include <util/Profiler.h>
#include <util/Taggable.h>
#include <vector>
#include <webserver/DOSGuard.h>
@@ -437,11 +438,10 @@ handle_request(
RPC::makeError(RPC::RippledError::rpcBAD_SYNTAX))));
boost::json::object response;
auto start = std::chrono::system_clock::now();
auto v = RPC::buildResponse(*context);
auto end = std::chrono::system_clock::now();
auto us =
std::chrono::duration_cast<std::chrono::microseconds>(end - start);
auto [v, timeDiff] =
util::timed([&]() { return RPC::buildResponse(*context); });
auto us = std::chrono::duration<int, std::milli>(timeDiff);
RPC::logDuration(*context, us);
if (auto status = std::get_if<RPC::Status>(&v))

View File

@@ -28,6 +28,7 @@
#include <rpc/WorkQueue.h>
#include <subscriptions/Message.h>
#include <subscriptions/SubscriptionManager.h>
#include <util/Profiler.h>
#include <util/Taggable.h>
#include <webserver/DOSGuard.h>
@@ -343,11 +344,10 @@ public:
response = getDefaultWsResponse(id);
auto start = std::chrono::system_clock::now();
auto v = RPC::buildResponse(*context);
auto end = std::chrono::system_clock::now();
auto us = std::chrono::duration_cast<std::chrono::microseconds>(
end - start);
auto [v, timeDiff] =
util::timed([&]() { return RPC::buildResponse(*context); });
auto us = std::chrono::duration<int, std::milli>(timeDiff);
logDuration(*context, us);
if (auto status = std::get_if<RPC::Status>(&v))

108
unittests/ProfilerTest.cpp Normal file
View File

@@ -0,0 +1,108 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <gtest/gtest.h>
#include <thread>
#include <util/Profiler.h>
using namespace util;
TEST(TimedTest, HasReturnValue)
{
auto [ret, time] = timed([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return 8;
});
ASSERT_EQ(ret, 8);
ASSERT_NE(time, 0);
}
TEST(TimedTest, ReturnVoid)
{
auto time = timed(
[]() { std::this_thread::sleep_for(std::chrono::milliseconds(5)); });
ASSERT_NE(time, 0);
}
struct FunctorTest
{
void
operator()() const
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
};
TEST(TimedTest, Functor)
{
auto time = timed(FunctorTest());
ASSERT_NE(time, 0);
}
TEST(TimedTest, MovedLambda)
{
auto f = []() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return 8;
};
auto [ret, time] = timed(std::move(f));
ASSERT_EQ(ret, 8);
ASSERT_NE(time, 0);
}
TEST(TimedTest, ChangeToNs)
{
auto f = []() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return 8;
};
auto [ret, time] = timed<std::chrono::nanoseconds>(std::move(f));
ASSERT_EQ(ret, 8);
ASSERT_GE(time, 5 * 1000000);
}
TEST(TimedTest, NestedLambda)
{
double timeNested;
auto f = [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
timeNested = timed([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
});
return 8;
};
auto [ret, time] = timed<std::chrono::nanoseconds>(std::move(f));
ASSERT_EQ(ret, 8);
ASSERT_GE(timeNested, 5);
ASSERT_GE(time, 10 * 1000000);
}
TEST(TimedTest, FloatSec)
{
auto f = []() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return 8;
};
auto [ret, time] = timed<std::chrono::duration<double>>(std::move(f));
ASSERT_EQ(ret, 8);
ASSERT_GE(time, 0);
}

View File

@@ -127,6 +127,7 @@ struct AsyncAsioContextTest : public NoLoggerFixture
{
work.reset();
ctx.stop();
runner.join();
}
protected:
@@ -134,7 +135,7 @@ protected:
private:
std::optional<boost::asio::io_service::work> work;
std::jthread runner{[this] { ctx.run(); }};
std::thread runner{[this] { ctx.run(); }};
};
/**