move state accounting to its own class and add to datagram

This commit is contained in:
Richard Holland
2024-11-30 18:52:58 +11:00
parent 18cd5086ce
commit ba27030bda
6 changed files with 217 additions and 189 deletions

View File

@@ -392,6 +392,7 @@ target_sources (rippled PRIVATE
src/ripple/app/misc/NegativeUNLVote.cpp
src/ripple/app/misc/NetworkOPs.cpp
src/ripple/app/misc/SHAMapStoreImp.cpp
src/ripple/app/misc/StateAccounting.cpp
src/ripple/app/misc/detail/impl/WorkSSL.cpp
src/ripple/app/misc/impl/AccountTxPaging.cpp
src/ripple/app/misc/impl/AmendmentTable.cpp

View File

@@ -19,6 +19,8 @@
#include <sys/sysinfo.h>
#include <thread>
#include <vector>
#include <ripple/app/misc/ValidatorList.h>
#include <ripple/app/misc/LoadFeeTrack.h>
namespace ripple {
@@ -57,33 +59,37 @@ struct LgrRange
};
// Core server metrics in the fixed header
struct ServerInfoHeader
{
struct ServerInfoHeader {
// Fixed header fields come first
uint32_t magic; // Magic number to identify packet type
uint32_t version; // Protocol version number
uint32_t network_id; // Network ID from config
uint16_t warning_flags; // Reduced to 16 bits, plenty for flags
uint16_t padding1; // Added to maintain alignment
uint64_t timestamp; // System time in microseconds
uint64_t uptime; // Server uptime in seconds
uint64_t io_latency_us; // IO latency in microseconds
uint64_t validation_quorum; // Validation quorum count
uint32_t server_state; // Operating mode as enum
uint32_t peer_count; // Number of connected peers
uint32_t node_size; // Size category (0=tiny through 4=huge)
uint32_t server_state; // Operating mode as enum
uint32_t padding2; // Added to maintain 8-byte alignment
uint32_t cpu_cores; // CPU core count
uint32_t ledger_range_count; // Number of range entries
uint16_t warning_flags; // Warning flags (reduced size)
uint16_t padding1; // Added to maintain alignment
// 64-bit metrics
uint64_t timestamp; // System time in microseconds
uint64_t uptime; // Server uptime in seconds
uint64_t io_latency_us; // IO latency in microseconds
uint64_t validation_quorum; // Validation quorum count
uint64_t fetch_pack_size; // Size of fetch pack cache
uint64_t proposer_count; // Number of proposers in last close
uint64_t converge_time_ms; // Last convergence time in ms
uint64_t load_factor; // Load factor (scaled by 1M for fixed point)
uint64_t load_base; // Load base value
uint64_t reserve_base; // Reserve base amount
uint64_t reserve_inc; // Reserve increment amount
uint64_t ledger_seq; // Latest ledger sequence
uint64_t load_factor; // Load factor (scaled by 1M)
uint64_t load_base; // Load base value
uint64_t reserve_base; // Reserve base amount
uint64_t reserve_inc; // Reserve increment amount
uint64_t ledger_seq; // Latest ledger sequence
// Fixed-size byte arrays
uint8_t ledger_hash[32]; // Latest ledger hash
uint8_t node_public_key[33]; // Node's public key (33 bytes)
uint8_t padding3[7]; // Padding to maintain 8-byte alignment
uint32_t ledger_range_count; // Number of range entries that follow
uint8_t node_public_key[33]; // Node's public key
uint8_t padding2[7]; // Padding to maintain 8-byte alignment
// System metrics
uint64_t process_memory_pages; // Process memory usage in pages
@@ -93,16 +99,19 @@ struct ServerInfoHeader
uint64_t system_disk_total; // Total disk space in bytes
uint64_t system_disk_free; // Free disk space in bytes
uint64_t system_disk_used; // Used disk space in bytes
double load_avg_1min; // 1 minute load average
double load_avg_5min; // 5 minute load average
double load_avg_15min; // 15 minute load average
uint64_t io_wait_time; // IO wait time in milliseconds
uint32_t cpu_cores;
uint32_t padding4;
double load_avg_1min; // 1 minute load average
double load_avg_5min; // 5 minute load average
double load_avg_15min; // 15 minute load average
// Network and disk rates
struct
{
// State transition metrics
uint32_t state_transitions[5]; // Count for each operating mode
uint32_t padding3; // Maintain alignment
uint64_t state_durations[5]; // Duration in each mode
uint64_t initial_sync_us; // Initial sync duration
// Network and disk rates remain unchanged
struct {
MetricRates network_in;
MetricRates network_out;
MetricRates disk_read;
@@ -530,6 +539,18 @@ private:
header->node_size = app_.config().NODE_SIZE;
// Get state accounting data
auto const [counters, mode, start, initialSync] =
app_.getOPs().getStateAccountingData();
// Pack state metrics into header
for (size_t i = 0; i < 5; ++i) {
header->state_transitions[i] = counters[i].transitions;
header->state_durations[i] = counters[i].dur.count();
}
header->initial_sync_us = initialSync;
// Pack warning flags
if (ops.isAmendmentBlocked())
header->warning_flags |= WARNING_AMENDMENT_BLOCKED;

View File

@@ -69,6 +69,7 @@
#include <ripple/rpc/impl/RPCHelpers.h>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/steady_timer.hpp>
#include <ripple/app/misc/StateAccounting.h>
#include <exception>
#include <mutex>
@@ -77,6 +78,7 @@
#include <tuple>
#include <unordered_map>
#include <utility>
#include <ripple/app/misc/StateAccounting.h>
namespace ripple {
@@ -116,81 +118,6 @@ class NetworkOPsImp final : public NetworkOPs
running,
};
static std::array<char const*, 5> const states_;
/**
* State accounting records two attributes for each possible server state:
* 1) Amount of time spent in each state (in microseconds). This value is
* updated upon each state transition.
* 2) Number of transitions to each state.
*
* This data can be polled through server_info and represented by
* monitoring systems similarly to how bandwidth, CPU, and other
* counter-based metrics are managed.
*
* State accounting is more accurate than periodic sampling of server
* state. With periodic sampling, it is very likely that state transitions
* are missed, and accuracy of time spent in each state is very rough.
*/
class StateAccounting
{
struct Counters
{
explicit Counters() = default;
std::uint64_t transitions = 0;
std::chrono::microseconds dur = std::chrono::microseconds(0);
};
OperatingMode mode_ = OperatingMode::DISCONNECTED;
std::array<Counters, 5> counters_;
mutable std::mutex mutex_;
std::chrono::steady_clock::time_point start_ =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point const processStart_ = start_;
std::uint64_t initialSyncUs_{0};
static std::array<Json::StaticString const, 5> const states_;
public:
explicit StateAccounting()
{
counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions = 1;
}
/**
* Record state transition. Update duration spent in previous
* state.
*
* @param om New state.
*/
void
mode(OperatingMode om);
/**
* Output state counters in JSON format.
*
* @obj Json object to which to add state accounting data.
*/
void
json(Json::Value& obj) const;
struct CounterData
{
decltype(counters_) counters;
decltype(mode_) mode;
decltype(start_) start;
decltype(initialSyncUs_) initialSyncUs;
};
CounterData
getCounterData() const
{
std::lock_guard lock(mutex_);
return {counters_, mode_, start_, initialSyncUs_};
}
};
//! Server fees published on `server` subscription
struct ServerFeeSummary
{
@@ -272,6 +199,9 @@ public:
std::string
strOperatingMode(bool const admin = false) const override;
StateAccounting::CounterData
getStateAccountingData() override;
//
// Transaction operations.
//
@@ -776,7 +706,7 @@ private:
DispatchState mDispatchState = DispatchState::none;
std::vector<TransactionStatus> mTransactions;
StateAccounting accounting_{};
StateAccounting accounting_;
std::set<uint256> pendingValidations_;
std::mutex validationsMutex_;
@@ -849,19 +779,6 @@ private:
//------------------------------------------------------------------------------
static std::array<char const*, 5> const stateNames{
{"disconnected", "connected", "syncing", "tracking", "full"}};
std::array<char const*, 5> const NetworkOPsImp::states_ = stateNames;
std::array<Json::StaticString const, 5> const
NetworkOPsImp::StateAccounting::states_ = {
{Json::StaticString(stateNames[0]),
Json::StaticString(stateNames[1]),
Json::StaticString(stateNames[2]),
Json::StaticString(stateNames[3]),
Json::StaticString(stateNames[4])}};
static auto const genesisAccountId = calcAccountID(
generateKeyPair(KeyType::secp256k1, generateSeed("masterpassphrase"))
.first);
@@ -1136,7 +1053,7 @@ NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
}
}
return states_[static_cast<std::size_t>(mode)];
return {StateAccounting::states_[static_cast<std::size_t>(mode)].c_str()};
}
void
@@ -4212,6 +4129,13 @@ NetworkOPsImp::stateAccounting(Json::Value& obj)
accounting_.json(obj);
}
StateAccounting::CounterData
NetworkOPsImp::getStateAccountingData()
{
return accounting_.getCounterData();
}
// <-- bool: true=erased, false=was not there
bool
NetworkOPsImp::unsubValidations(std::uint64_t uSeq)
@@ -4682,50 +4606,6 @@ NetworkOPsImp::collect_metrics()
counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
}
void
NetworkOPsImp::StateAccounting::mode(OperatingMode om)
{
auto now = std::chrono::steady_clock::now();
std::lock_guard lock(mutex_);
++counters_[static_cast<std::size_t>(om)].transitions;
if (om == OperatingMode::FULL &&
counters_[static_cast<std::size_t>(om)].transitions == 1)
{
initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
now - processStart_)
.count();
}
counters_[static_cast<std::size_t>(mode_)].dur +=
std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
mode_ = om;
start_ = now;
}
void
NetworkOPsImp::StateAccounting::json(Json::Value& obj) const
{
auto [counters, mode, start, initialSync] = getCounterData();
auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
obj[jss::state_accounting] = Json::objectValue;
for (std::size_t i = static_cast<std::size_t>(OperatingMode::DISCONNECTED);
i <= static_cast<std::size_t>(OperatingMode::FULL);
++i)
{
obj[jss::state_accounting][states_[i]] = Json::objectValue;
auto& state = obj[jss::state_accounting][states_[i]];
state[jss::transitions] = std::to_string(counters[i].transitions);
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}
obj[jss::server_state_duration_us] = std::to_string(current.count());
if (initialSync)
obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
}
//------------------------------------------------------------------------------
std::unique_ptr<NetworkOPs>

View File

@@ -32,6 +32,7 @@
#include <deque>
#include <memory>
#include <tuple>
#include <ripple/app/misc/StateAccounting.h>
namespace ripple {
@@ -43,35 +44,6 @@ class LedgerMaster;
class Transaction;
class ValidatorKeys;
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
// should use this interface. The RPC code will primarily be a light wrapper
// over this code.
//
// Eventually, it will check the node's operating mode (synched, unsynched,
// etectera) and defer to the correct means of processing. The current
// code assumes this node is synched (and will continue to do so until
// there's a functional network.
//
/** Specifies the mode under which the server believes it's operating.
This has implications about how the server processes transactions and
how it responds to requests (e.g. account balance request).
@note Other code relies on the numerical values of these constants; do
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
SYNCING = 2, //!< fallen slightly behind
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
/** Provides server functionality for clients.
Clients include backend applications, local commands, and connected
@@ -236,6 +208,9 @@ public:
virtual Json::Value
getLedgerFetchInfo() = 0;
virtual StateAccounting::CounterData
getStateAccountingData() = 0;
/** Accepts the current transaction tree, return the new ledger's sequence
This API is only used via RPC with the server in STANDALONE mode and

View File

@@ -0,0 +1,49 @@
#include <ripple/app/misc/StateAccounting.h>
namespace ripple {
void
StateAccounting::mode(OperatingMode om)
{
auto now = std::chrono::steady_clock::now();
std::lock_guard lock(mutex_);
++counters_[static_cast<std::size_t>(om)].transitions;
if (om == OperatingMode::FULL &&
counters_[static_cast<std::size_t>(om)].transitions == 1)
{
initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
now - processStart_)
.count();
}
counters_[static_cast<std::size_t>(mode_)].dur +=
std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
mode_ = om;
start_ = now;
}
void
StateAccounting::json(Json::Value& obj)
{
auto [counters, mode, start, initialSync] = getCounterData();
auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
obj[jss::state_accounting] = Json::objectValue;
for (std::size_t i = static_cast<std::size_t>(OperatingMode::DISCONNECTED);
i <= static_cast<std::size_t>(OperatingMode::FULL);
++i)
{
obj[jss::state_accounting][states_[i]] = Json::objectValue;
auto& state = obj[jss::state_accounting][states_[i]];
state[jss::transitions] = std::to_string(counters[i].transitions);
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}
obj[jss::server_state_duration_us] = std::to_string(current.count());
if (initialSync)
obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
}
} // ripple

View File

@@ -0,0 +1,102 @@
#ifndef RIPPLE_APP_MAIN_STATEACCOUNTING_H_INCLUDED
#define RIPPLE_APP_MAIN_STATEACCOUNTING_H_INCLUDED
#include <ripple/json/json_value.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/protocol/jss.h>
#include <ripple/basics/chrono.h>
#include <array>
#include <mutex>
namespace ripple {
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
// should use this interface. The RPC code will primarily be a light wrapper
// over this code.
//
// Eventually, it will check the node's operating mode (synched, unsynched,
// etectera) and defer to the correct means of processing. The current
// code assumes this node is synched (and will continue to do so until
// there's a functional network.
//
/** Specifies the mode under which the server believes it's operating.
This has implications about how the server processes transactions and
how it responds to requests (e.g. account balance request).
@note Other code relies on the numerical values of these constants; do
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
SYNCING = 2, //!< fallen slightly behind
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
class StateAccounting
{
public:
constexpr static
std::array<Json::StaticString const, 5> const
states_ =
{
{Json::StaticString("disconnected"),
Json::StaticString("connected"),
Json::StaticString("syncing"),
Json::StaticString("tracking"),
Json::StaticString("full")}};
struct Counters
{
explicit Counters() = default;
std::uint64_t transitions = 0;
std::chrono::microseconds dur = std::chrono::microseconds(0);
};
private:
OperatingMode mode_;
std::array<Counters, 5> counters_;
mutable std::mutex mutex_;
std::chrono::steady_clock::time_point start_;
std::chrono::steady_clock::time_point const processStart_;
std::uint64_t initialSyncUs_;
public:
explicit StateAccounting()
{
counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions = 1;
}
//! Record state transition. Update duration spent in previous state.
void mode(OperatingMode om);
//! Output state counters in JSON format.
void json(Json::Value& obj);
using CounterData = std::tuple<
decltype(counters_),
decltype(mode_),
decltype(start_),
decltype(initialSyncUs_)>;
CounterData
getCounterData()
{
return {counters_, mode_, start_, initialSyncUs_};
}
};
} // ripple
#endif