mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-05 09:46:53 +00:00
Compare commits
1 Commits
dangell7/p
...
dangell7/d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2add5b3fb8 |
@@ -953,21 +953,6 @@
|
||||
#
|
||||
# Optional keys for NuDB and RocksDB:
|
||||
#
|
||||
# cache_size Size of cache for database records. Default is 16384.
|
||||
# Setting this value to 0 will use the default value.
|
||||
#
|
||||
# cache_age Length of time in minutes to keep database records
|
||||
# cached. Default is 5 minutes. Setting this value to
|
||||
# 0 will use the default value.
|
||||
#
|
||||
# Note: if cache_size or cache_age is not specified,
|
||||
# default values will be used for the unspecified
|
||||
# parameter.
|
||||
#
|
||||
# Note: the cache will not be created if online_delete
|
||||
# is specified, because the rotating NodeStore does
|
||||
# not use this cache).
|
||||
#
|
||||
# fast_load Boolean. If set, load the last persisted ledger
|
||||
# from disk upon process start before syncing to
|
||||
# the network. This is likely to improve performance
|
||||
|
||||
@@ -131,10 +131,6 @@ public:
|
||||
std::uint32_t ledgerSeq,
|
||||
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback);
|
||||
|
||||
/** Remove expired entries from the positive and negative caches. */
|
||||
virtual void
|
||||
sweep() = 0;
|
||||
|
||||
/** Gather statistics pertaining to read and write activities.
|
||||
*
|
||||
* @param obj Json object reference into which to place counters.
|
||||
|
||||
@@ -22,32 +22,6 @@ public:
|
||||
beast::Journal j)
|
||||
: Database(scheduler, readThreads, config, j), backend_(std::move(backend))
|
||||
{
|
||||
std::optional<int> cacheSize, cacheAge;
|
||||
|
||||
if (config.exists("cache_size"))
|
||||
{
|
||||
cacheSize = get<int>(config, "cache_size");
|
||||
if (cacheSize.value() < 0)
|
||||
Throw<std::runtime_error>("Specified negative value for cache_size");
|
||||
}
|
||||
|
||||
if (config.exists("cache_age"))
|
||||
{
|
||||
cacheAge = get<int>(config, "cache_age");
|
||||
if (cacheAge.value() < 0)
|
||||
Throw<std::runtime_error>("Specified negative value for cache_age");
|
||||
}
|
||||
|
||||
if (cacheSize.has_value() || cacheAge.has_value())
|
||||
{
|
||||
cache_ = std::make_shared<TaggedCache<uint256, NodeObject>>(
|
||||
"DatabaseNodeImp",
|
||||
cacheSize.value_or(0),
|
||||
std::chrono::minutes(cacheAge.value_or(0)),
|
||||
stopwatch(),
|
||||
j);
|
||||
}
|
||||
|
||||
XRPL_ASSERT(
|
||||
backend_,
|
||||
"xrpl::NodeStore::DatabaseNodeImp::DatabaseNodeImp : non-null "
|
||||
@@ -99,13 +73,7 @@ public:
|
||||
std::uint32_t ledgerSeq,
|
||||
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback) override;
|
||||
|
||||
void
|
||||
sweep() override;
|
||||
|
||||
private:
|
||||
// Cache for database objects. This cache is not always initialized. Check
|
||||
// for null before using.
|
||||
std::shared_ptr<TaggedCache<uint256, NodeObject>> cache_;
|
||||
// Persistent key/value storage
|
||||
std::shared_ptr<Backend> backend_;
|
||||
|
||||
|
||||
@@ -55,9 +55,6 @@ public:
|
||||
void
|
||||
sync() override;
|
||||
|
||||
void
|
||||
sweep() override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<Backend> writableBackend_;
|
||||
std::shared_ptr<Backend> archiveBackend_;
|
||||
|
||||
@@ -10,7 +10,9 @@
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
@@ -72,6 +74,18 @@ class NetworkOPs : public InfoSub::Source
|
||||
public:
|
||||
using clock_type = beast::AbstractClock<std::chrono::steady_clock>;
|
||||
|
||||
// Snapshot of per-operating-mode accounting, exposed for the datagram monitor.
|
||||
struct AccountingCounter
|
||||
{
|
||||
std::uint64_t transitions{0};
|
||||
std::chrono::microseconds dur{std::chrono::microseconds(0)};
|
||||
};
|
||||
using StateAccountingData = std::tuple<
|
||||
std::array<AccountingCounter, 5>,
|
||||
OperatingMode,
|
||||
std::chrono::steady_clock::time_point,
|
||||
std::uint64_t>;
|
||||
|
||||
enum class FailHard : unsigned char { No, Yes };
|
||||
static FailHard
|
||||
doFailHard(bool noMeansDont)
|
||||
@@ -92,6 +106,8 @@ public:
|
||||
|
||||
[[nodiscard]] virtual OperatingMode
|
||||
getOperatingMode() const = 0;
|
||||
[[nodiscard]] virtual StateAccountingData
|
||||
getStateAccountingData() = 0;
|
||||
[[nodiscard]] virtual std::string
|
||||
strOperatingMode(OperatingMode const mode, bool const admin = false) const = 0;
|
||||
[[nodiscard]] virtual std::string
|
||||
|
||||
@@ -24,13 +24,6 @@ DatabaseNodeImp::store(NodeObjectType type, Blob&& data, uint256 const& hash, st
|
||||
|
||||
auto obj = NodeObject::createObject(type, std::move(data), hash);
|
||||
backend_->store(obj);
|
||||
if (cache_)
|
||||
{
|
||||
// After the store, replace a negative cache entry if there is one
|
||||
cache_->canonicalize(hash, obj, [](std::shared_ptr<NodeObject> const& n) {
|
||||
return n->getType() == NodeObjectType::Dummy;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -39,25 +32,9 @@ DatabaseNodeImp::asyncFetch(
|
||||
std::uint32_t ledgerSeq,
|
||||
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
|
||||
{
|
||||
if (cache_)
|
||||
{
|
||||
std::shared_ptr<NodeObject> const obj = cache_->fetch(hash);
|
||||
if (obj)
|
||||
{
|
||||
callback(obj->getType() == NodeObjectType::Dummy ? nullptr : obj);
|
||||
return;
|
||||
}
|
||||
}
|
||||
Database::asyncFetch(hash, ledgerSeq, std::move(callback));
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseNodeImp::sweep()
|
||||
{
|
||||
if (cache_)
|
||||
cache_->sweep();
|
||||
}
|
||||
|
||||
std::shared_ptr<NodeObject>
|
||||
DatabaseNodeImp::fetchNodeObject(
|
||||
uint256 const& hash,
|
||||
@@ -65,58 +42,32 @@ DatabaseNodeImp::fetchNodeObject(
|
||||
FetchReport& fetchReport,
|
||||
bool duplicate)
|
||||
{
|
||||
std::shared_ptr<NodeObject> nodeObject = cache_ ? cache_->fetch(hash) : nullptr;
|
||||
if (!nodeObject)
|
||||
std::shared_ptr<NodeObject> nodeObject = nullptr;
|
||||
Status status = Status::Ok;
|
||||
|
||||
try
|
||||
{
|
||||
JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record not "
|
||||
<< (cache_ ? "cached" : "found");
|
||||
|
||||
Status status = Status::Ok;
|
||||
try
|
||||
{
|
||||
status = backend_->fetch(hash, &nodeObject);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash
|
||||
<< ": Exception fetching from backend: " << e.what();
|
||||
rethrow();
|
||||
}
|
||||
|
||||
switch (status)
|
||||
{
|
||||
case Status::Ok:
|
||||
if (cache_)
|
||||
{
|
||||
if (nodeObject)
|
||||
{
|
||||
cache_->canonicalizeReplaceClient(hash, nodeObject);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto notFound = NodeObject::createObject(NodeObjectType::Dummy, {}, hash);
|
||||
cache_->canonicalizeReplaceClient(hash, notFound);
|
||||
if (notFound->getType() != NodeObjectType::Dummy)
|
||||
nodeObject = notFound;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Status::NotFound:
|
||||
break;
|
||||
case Status::DataCorrupt:
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": nodestore data is corrupted";
|
||||
break;
|
||||
default:
|
||||
JLOG(j_.warn()) << "fetchNodeObject " << hash << ": backend returns unknown result "
|
||||
<< static_cast<int>(status);
|
||||
break;
|
||||
}
|
||||
status = backend_->fetch(hash, &nodeObject);
|
||||
}
|
||||
else
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record found in cache";
|
||||
if (nodeObject->getType() == NodeObjectType::Dummy)
|
||||
nodeObject.reset();
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash
|
||||
<< ": Exception fetching from backend: " << e.what();
|
||||
rethrow();
|
||||
}
|
||||
|
||||
switch (status)
|
||||
{
|
||||
case Status::Ok:
|
||||
case Status::NotFound:
|
||||
break;
|
||||
case Status::DataCorrupt:
|
||||
JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": nodestore data is corrupted";
|
||||
break;
|
||||
default:
|
||||
JLOG(j_.warn()) << "fetchNodeObject " << hash << ": backend returns unknown result "
|
||||
<< static_cast<int>(status);
|
||||
break;
|
||||
}
|
||||
|
||||
if (nodeObject)
|
||||
|
||||
@@ -113,12 +113,6 @@ DatabaseRotatingImp::store(NodeObjectType type, Blob&& data, uint256 const& hash
|
||||
storeStats(1, nObj->getData().size());
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::sweep()
|
||||
{
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
std::shared_ptr<NodeObject>
|
||||
DatabaseRotatingImp::fetchNodeObject(
|
||||
uint256 const& hash,
|
||||
|
||||
@@ -520,25 +520,6 @@ public:
|
||||
/////////////////////////////////////////////////////////////
|
||||
// Create NodeStore with two backends to allow online deletion of data.
|
||||
// Normally, SHAMapStoreImp handles all these details.
|
||||
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
|
||||
|
||||
// Provide default values.
|
||||
if (!nscfg.exists("cache_size"))
|
||||
{
|
||||
nscfg.set(
|
||||
"cache_size",
|
||||
std::to_string(
|
||||
env.app().config().getValueFor(SizedItem::TreeCacheSize, std::nullopt)));
|
||||
}
|
||||
|
||||
if (!nscfg.exists("cache_age"))
|
||||
{
|
||||
nscfg.set(
|
||||
"cache_age",
|
||||
std::to_string(
|
||||
env.app().config().getValueFor(SizedItem::TreeCacheAge, std::nullopt)));
|
||||
}
|
||||
|
||||
NodeStoreScheduler scheduler(env.app().getJobQueue());
|
||||
|
||||
std::string const writableDb = "write";
|
||||
@@ -547,6 +528,7 @@ public:
|
||||
auto archiveBackend = makeBackendRotating(env, scheduler, archiveDb);
|
||||
|
||||
static constexpr int kReadThreads = 4;
|
||||
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
|
||||
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
|
||||
scheduler,
|
||||
kReadThreads,
|
||||
|
||||
@@ -784,7 +784,7 @@ RCLConsensus::Adaptor::buildLCL(
|
||||
|
||||
// Update fee computations based on accepted txs
|
||||
using namespace std::chrono_literals;
|
||||
app_.getTxQ().processClosedLedger(app_, *built, roundTime > 2500ms);
|
||||
app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s);
|
||||
|
||||
// And stash the ledger in the ledger master
|
||||
if (ledgerMaster_.storeLedger(built))
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include <xrpld/app/main/LoadManager.h>
|
||||
#include <xrpld/app/main/NodeIdentity.h>
|
||||
#include <xrpld/app/main/NodeStoreScheduler.h>
|
||||
#include <xrpld/app/misc/DatagramMonitor.h>
|
||||
#include <xrpld/app/misc/SHAMapStore.h>
|
||||
#include <xrpld/app/misc/TxQ.h>
|
||||
#include <xrpld/app/misc/ValidatorKeys.h>
|
||||
@@ -220,6 +221,7 @@ public:
|
||||
std::unique_ptr<JobQueue> jobQueue_;
|
||||
NodeStoreScheduler nodeStoreScheduler_;
|
||||
std::unique_ptr<SHAMapStore> shaMapStore_;
|
||||
std::unique_ptr<DatagramMonitor> datagramMonitor_;
|
||||
PendingSaves pendingSaves_;
|
||||
std::optional<OpenLedger> openLedger_;
|
||||
|
||||
@@ -998,10 +1000,6 @@ public:
|
||||
JLOG(journal_.debug()) << "MasterTransaction sweep. Size before: " << oldMasterTxSize
|
||||
<< "; size after: " << masterTxCache.size();
|
||||
}
|
||||
{
|
||||
// Sweep NodeStore database cache(s), if enabled.
|
||||
getNodeStore().sweep();
|
||||
}
|
||||
{
|
||||
std::size_t const oldLedgerMasterCacheSize = getLedgerMaster().getFetchPackCacheSize();
|
||||
|
||||
@@ -1507,6 +1505,14 @@ ApplicationImp::start(bool withTimers)
|
||||
|
||||
ledgerCleaner_->start();
|
||||
perfLog_->start();
|
||||
|
||||
// Datagram monitor: UDP node-stats exporter (XDGM). Off in standalone or
|
||||
// when [datagram_monitor] has no endpoints.
|
||||
if (!config_->standalone() && !config_->DATAGRAM_MONITOR.empty())
|
||||
{
|
||||
datagramMonitor_ = std::make_unique<DatagramMonitor>(*this);
|
||||
datagramMonitor_->start();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
919
src/xrpld/app/misc/DatagramMonitor.h
Normal file
919
src/xrpld/app/misc/DatagramMonitor.h
Normal file
@@ -0,0 +1,919 @@
|
||||
//
|
||||
#ifndef RIPPLE_APP_MAIN_DATAGRAMMONITOR_H_INCLUDED
|
||||
#define RIPPLE_APP_MAIN_DATAGRAMMONITOR_H_INCLUDED
|
||||
|
||||
#include <xrpld/app/ledger/AcceptedLedger.h>
|
||||
#include <xrpld/app/ledger/InboundLedgers.h>
|
||||
#include <xrpld/app/ledger/LedgerMaster.h>
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpl/server/LoadFeeTrack.h>
|
||||
#include <xrpl/server/NetworkOPs.h>
|
||||
#include <xrpld/app/misc/ValidatorList.h>
|
||||
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
|
||||
#include <xrpl/ledger/CachedSLEs.h>
|
||||
#include <xrpl/nodestore/Database.h>
|
||||
#include <xrpld/overlay/Overlay.h>
|
||||
#include <xrpl/basics/UptimeClock.h>
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/basics/mulDiv.h>
|
||||
#include <xrpl/protocol/BuildInfo.h>
|
||||
#include <xrpl/protocol/ErrorCodes.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#include <fstream>
|
||||
#include <netdb.h>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/socket.h>
|
||||
#if defined(__linux__)
|
||||
#include <sys/statvfs.h>
|
||||
#include <sys/sysinfo.h>
|
||||
#elif defined(__APPLE__)
|
||||
#include <ifaddrs.h>
|
||||
#include <mach/host_info.h>
|
||||
#include <mach/mach.h>
|
||||
#include <net/if.h>
|
||||
#include <net/if_dl.h>
|
||||
#include <sys/mount.h>
|
||||
#include <sys/sysctl.h>
|
||||
#include <sys/types.h>
|
||||
#endif
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
// Magic number for server info packets: 'XDGM' (le) Xahau DataGram Monitor
|
||||
constexpr uint32_t SERVER_INFO_MAGIC = 0x4D474458;
|
||||
constexpr uint32_t SERVER_INFO_VERSION = 1;
|
||||
|
||||
// Warning flag bits
|
||||
constexpr uint32_t WARNING_AMENDMENT_BLOCKED = 1 << 0;
|
||||
constexpr uint32_t WARNING_UNL_BLOCKED = 1 << 1;
|
||||
constexpr uint32_t WARNING_AMENDMENT_WARNED = 1 << 2;
|
||||
constexpr uint32_t WARNING_NOT_SYNCED = 1 << 3;
|
||||
|
||||
// Time window statistics for rates
|
||||
struct [[gnu::packed]] MetricRates
|
||||
{
|
||||
double rate_1m; // Average rate over last minute
|
||||
double rate_5m; // Average rate over last 5 minutes
|
||||
double rate_1h; // Average rate over last hour
|
||||
double rate_24h; // Average rate over last 24 hours
|
||||
};
|
||||
|
||||
struct AllRates
|
||||
{
|
||||
MetricRates network_in;
|
||||
MetricRates network_out;
|
||||
MetricRates disk_read;
|
||||
MetricRates disk_write;
|
||||
};
|
||||
|
||||
// Structure to represent a ledger sequence range
|
||||
struct [[gnu::packed]] LgrRange
|
||||
{
|
||||
uint32_t start;
|
||||
uint32_t end;
|
||||
};
|
||||
|
||||
// Map is returned separately since variable-length data
|
||||
// shouldn't be included in network structures
|
||||
using ObjectCountMap = std::vector<std::pair<std::basic_string<char>, int>>;
|
||||
|
||||
struct [[gnu::packed]] DebugCounters
|
||||
{
|
||||
// Database metrics
|
||||
std::uint64_t dbKBTotal{0};
|
||||
std::uint64_t dbKBLedger{0};
|
||||
std::uint64_t dbKBTransaction{0};
|
||||
std::uint64_t localTxCount{0};
|
||||
|
||||
// Basic metrics
|
||||
std::uint32_t writeLoad{0};
|
||||
std::int32_t historicalPerMinute{0};
|
||||
|
||||
// Cache metrics
|
||||
std::uint32_t sleHitRate{0}; // Stored as fixed point, multiplied by 1000
|
||||
std::uint32_t ledgerHitRate{
|
||||
0}; // Stored as fixed point, multiplied by 1000
|
||||
std::uint32_t alSize{0};
|
||||
std::uint32_t alHitRate{0}; // Stored as fixed point, multiplied by 1000
|
||||
std::int32_t fullbelowSize{0};
|
||||
std::uint32_t treenodeCacheSize{0};
|
||||
std::uint32_t treenodeTrackSize{0};
|
||||
|
||||
// Node store metrics
|
||||
std::uint64_t nodeWriteCount{0};
|
||||
std::uint64_t nodeWriteSize{0};
|
||||
std::uint64_t nodeFetchCount{0};
|
||||
std::uint64_t nodeFetchHitCount{0};
|
||||
std::uint64_t nodeFetchSize{0};
|
||||
};
|
||||
|
||||
// Core server metrics in the fixed header
|
||||
struct [[gnu::packed]] ServerInfoHeader
|
||||
{
|
||||
// Fixed header fields come first
|
||||
uint32_t magic; // Magic number to identify packet type
|
||||
uint32_t version; // Protocol version number
|
||||
uint32_t network_id; // Network ID from config
|
||||
uint32_t server_state; // Operating mode as enum
|
||||
uint32_t peer_count; // Number of connected peers
|
||||
uint32_t node_size; // Size category (0=tiny through 4=huge)
|
||||
uint32_t cpu_cores; // CPU core count
|
||||
uint32_t ledger_range_count; // Number of range entries
|
||||
uint32_t warning_flags; // Warning flags (reduced size)
|
||||
|
||||
uint32_t padding_1; // padding for alignment
|
||||
|
||||
// 64-bit metrics
|
||||
uint64_t timestamp; // System time in microseconds
|
||||
uint64_t uptime; // Server uptime in seconds
|
||||
uint64_t io_latency_us; // IO latency in microseconds
|
||||
uint64_t validation_quorum; // Validation quorum count
|
||||
uint64_t fetch_pack_size; // Size of fetch pack cache
|
||||
uint64_t proposer_count; // Number of proposers in last close
|
||||
uint64_t converge_time_ms; // Last convergence time in ms
|
||||
uint64_t load_factor; // Load factor (scaled by 1M)
|
||||
uint64_t load_base; // Load base value
|
||||
uint64_t reserve_base; // Reserve base amount
|
||||
uint64_t reserve_inc; // Reserve increment amount
|
||||
uint64_t ledger_seq; // Latest ledger sequence
|
||||
|
||||
// Fixed-size byte arrays
|
||||
uint8_t ledger_hash[32]; // Latest ledger hash
|
||||
uint8_t node_public_key[33]; // Node's public key
|
||||
uint8_t padding2[7]; // Padding to maintain 8-byte alignment
|
||||
uint8_t version_string[32];
|
||||
|
||||
// System metrics
|
||||
uint64_t process_memory_pages; // Process memory usage in bytes
|
||||
uint64_t system_memory_total; // Total system memory in bytes
|
||||
uint64_t system_memory_free; // Free system memory in bytes
|
||||
uint64_t system_memory_used; // Used system memory in bytes
|
||||
uint64_t system_disk_total; // Total disk space in bytes
|
||||
uint64_t system_disk_free; // Free disk space in bytes
|
||||
uint64_t system_disk_used; // Used disk space in bytes
|
||||
uint64_t io_wait_time; // IO wait time in milliseconds
|
||||
double load_avg_1min; // 1 minute load average
|
||||
double load_avg_5min; // 5 minute load average
|
||||
double load_avg_15min; // 15 minute load average
|
||||
|
||||
// State transition metrics
|
||||
uint64_t state_transitions[5]; // Count for each operating mode
|
||||
uint64_t state_durations[5]; // Duration in each mode
|
||||
uint64_t initial_sync_us; // Initial sync duration
|
||||
|
||||
// Network and disk rates remain unchanged
|
||||
struct
|
||||
{
|
||||
MetricRates network_in;
|
||||
MetricRates network_out;
|
||||
MetricRates disk_read;
|
||||
MetricRates disk_write;
|
||||
} rates;
|
||||
|
||||
DebugCounters dbg_counters;
|
||||
};
|
||||
|
||||
// System metrics collected for rate calculations
|
||||
struct SystemMetrics
|
||||
{
|
||||
uint64_t timestamp; // When metrics were collected
|
||||
uint64_t network_bytes_in; // Current total bytes in
|
||||
uint64_t network_bytes_out; // Current total bytes out
|
||||
uint64_t disk_bytes_read; // Current total bytes read
|
||||
uint64_t disk_bytes_written; // Current total bytes written
|
||||
};
|
||||
|
||||
class MetricsTracker
|
||||
{
|
||||
private:
|
||||
static constexpr size_t SAMPLES_1M = 60; // 1 sample/second for 1 minute
|
||||
static constexpr size_t SAMPLES_5M = 300; // 1 sample/second for 5 minutes
|
||||
static constexpr size_t SAMPLES_1H = 3600; // 1 sample/second for 1 hour
|
||||
static constexpr size_t SAMPLES_24H = 1440; // 1 sample/minute for 24 hours
|
||||
|
||||
std::vector<SystemMetrics> samples_1m{SAMPLES_1M};
|
||||
std::vector<SystemMetrics> samples_5m{SAMPLES_5M};
|
||||
std::vector<SystemMetrics> samples_1h{SAMPLES_1H};
|
||||
std::vector<SystemMetrics> samples_24h{SAMPLES_24H};
|
||||
|
||||
size_t index_1m{0}, index_5m{0}, index_1h{0}, index_24h{0};
|
||||
std::chrono::system_clock::time_point last_24h_sample{};
|
||||
|
||||
double
|
||||
calculateRate(
|
||||
const SystemMetrics& current,
|
||||
const std::vector<SystemMetrics>& samples,
|
||||
size_t current_index,
|
||||
size_t max_samples,
|
||||
bool is_24h_window,
|
||||
std::function<uint64_t(const SystemMetrics&)> metric_getter)
|
||||
{
|
||||
// If we don't have at least 2 samples, the rate is 0
|
||||
if (current_index < 2)
|
||||
{
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
// Calculate time window based on the window type
|
||||
uint64_t expected_window_micros;
|
||||
if (is_24h_window)
|
||||
{
|
||||
expected_window_micros =
|
||||
24ULL * 60ULL * 60ULL * 1000000ULL; // 24 hours in microseconds
|
||||
}
|
||||
else
|
||||
{
|
||||
expected_window_micros = max_samples *
|
||||
1000000ULL; // window in seconds * 1,000,000 for microseconds
|
||||
}
|
||||
|
||||
// For any window where we don't have full data, we should scale the
|
||||
// rate based on the actual time we have data for
|
||||
uint64_t actual_window_micros =
|
||||
current.timestamp - samples[0].timestamp;
|
||||
double window_scale = std::min(
|
||||
1.0,
|
||||
static_cast<double>(actual_window_micros) / expected_window_micros);
|
||||
|
||||
// Get the oldest valid sample
|
||||
size_t oldest_index = (current_index >= max_samples)
|
||||
? ((current_index + 1) % max_samples)
|
||||
: 0;
|
||||
const auto& oldest = samples[oldest_index];
|
||||
|
||||
double elapsed = actual_window_micros /
|
||||
1000000.0; // Convert microseconds to seconds
|
||||
|
||||
// Ensure we have a meaningful time difference
|
||||
if (elapsed < 0.001)
|
||||
{ // Less than 1ms difference
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
uint64_t current_value = metric_getter(current);
|
||||
uint64_t oldest_value = metric_getter(oldest);
|
||||
|
||||
// Handle counter wraparound
|
||||
uint64_t diff = (current_value >= oldest_value)
|
||||
? (current_value - oldest_value)
|
||||
: (std::numeric_limits<uint64_t>::max() - oldest_value +
|
||||
current_value + 1);
|
||||
|
||||
// Calculate the rate and scale it based on our window coverage
|
||||
return (static_cast<double>(diff) / elapsed) * window_scale;
|
||||
}
|
||||
|
||||
MetricRates
|
||||
calculateMetricRates(
|
||||
const SystemMetrics& current,
|
||||
std::function<uint64_t(const SystemMetrics&)> metric_getter)
|
||||
{
|
||||
MetricRates rates;
|
||||
rates.rate_1m = calculateRate(
|
||||
current, samples_1m, index_1m, SAMPLES_1M, false, metric_getter);
|
||||
rates.rate_5m = calculateRate(
|
||||
current, samples_5m, index_5m, SAMPLES_5M, false, metric_getter);
|
||||
rates.rate_1h = calculateRate(
|
||||
current, samples_1h, index_1h, SAMPLES_1H, false, metric_getter);
|
||||
rates.rate_24h = calculateRate(
|
||||
current, samples_24h, index_24h, SAMPLES_24H, true, metric_getter);
|
||||
return rates;
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
addSample(const SystemMetrics& metrics)
|
||||
{
|
||||
auto now = std::chrono::system_clock::now();
|
||||
|
||||
// Update 1-minute window (every second)
|
||||
samples_1m[index_1m++ % SAMPLES_1M] = metrics;
|
||||
|
||||
// Update 5-minute window (every second)
|
||||
samples_5m[index_5m++ % SAMPLES_5M] = metrics;
|
||||
|
||||
// Update 1-hour window (every second)
|
||||
samples_1h[index_1h++ % SAMPLES_1H] = metrics;
|
||||
|
||||
// Update 24-hour window (every minute)
|
||||
if (last_24h_sample + std::chrono::minutes(1) <= now)
|
||||
{
|
||||
samples_24h[index_24h++ % SAMPLES_24H] = metrics;
|
||||
last_24h_sample = now;
|
||||
}
|
||||
}
|
||||
|
||||
AllRates
|
||||
getRates(const SystemMetrics& current)
|
||||
{
|
||||
AllRates rates;
|
||||
rates.network_in = calculateMetricRates(
|
||||
current, [](const SystemMetrics& m) { return m.network_bytes_in; });
|
||||
rates.network_out = calculateMetricRates(
|
||||
current,
|
||||
[](const SystemMetrics& m) { return m.network_bytes_out; });
|
||||
rates.disk_read = calculateMetricRates(
|
||||
current, [](const SystemMetrics& m) { return m.disk_bytes_read; });
|
||||
rates.disk_write = calculateMetricRates(
|
||||
current,
|
||||
[](const SystemMetrics& m) { return m.disk_bytes_written; });
|
||||
return rates;
|
||||
}
|
||||
};
|
||||
|
||||
class DatagramMonitor
|
||||
{
|
||||
private:
|
||||
Application& app_;
|
||||
beast::Journal j_;
|
||||
std::atomic<bool> running_{false};
|
||||
std::thread monitor_thread_;
|
||||
MetricsTracker metrics_tracker_;
|
||||
|
||||
struct EndpointInfo
|
||||
{
|
||||
std::string ip;
|
||||
uint16_t port;
|
||||
bool is_ipv6;
|
||||
};
|
||||
EndpointInfo
|
||||
parseEndpoint(std::string const& endpoint)
|
||||
{
|
||||
auto space_pos = endpoint.find(' ');
|
||||
if (space_pos == std::string::npos)
|
||||
throw std::runtime_error("Invalid endpoint format");
|
||||
|
||||
EndpointInfo info;
|
||||
info.ip = endpoint.substr(0, space_pos);
|
||||
info.port = std::stoi(endpoint.substr(space_pos + 1));
|
||||
info.is_ipv6 = info.ip.find(':') != std::string::npos;
|
||||
return info;
|
||||
}
|
||||
|
||||
int
|
||||
createSocket(EndpointInfo const& endpoint)
|
||||
{
|
||||
int sock = socket(endpoint.is_ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
|
||||
if (sock < 0)
|
||||
throw std::runtime_error("Failed to create socket");
|
||||
return sock;
|
||||
}
|
||||
|
||||
void
|
||||
sendPacket(
|
||||
int sock,
|
||||
EndpointInfo const& endpoint,
|
||||
std::vector<uint8_t> const& buffer)
|
||||
{
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t addr_len;
|
||||
|
||||
if (endpoint.is_ipv6)
|
||||
{
|
||||
struct sockaddr_in6* addr6 =
|
||||
reinterpret_cast<struct sockaddr_in6*>(&addr);
|
||||
addr6->sin6_family = AF_INET6;
|
||||
addr6->sin6_port = htons(endpoint.port);
|
||||
inet_pton(AF_INET6, endpoint.ip.c_str(), &addr6->sin6_addr);
|
||||
addr_len = sizeof(struct sockaddr_in6);
|
||||
}
|
||||
else
|
||||
{
|
||||
struct sockaddr_in* addr4 =
|
||||
reinterpret_cast<struct sockaddr_in*>(&addr);
|
||||
addr4->sin_family = AF_INET;
|
||||
addr4->sin_port = htons(endpoint.port);
|
||||
inet_pton(AF_INET, endpoint.ip.c_str(), &addr4->sin_addr);
|
||||
addr_len = sizeof(struct sockaddr_in);
|
||||
}
|
||||
|
||||
sendto(
|
||||
sock,
|
||||
buffer.data(),
|
||||
buffer.size(),
|
||||
0,
|
||||
reinterpret_cast<struct sockaddr*>(&addr),
|
||||
addr_len);
|
||||
}
|
||||
|
||||
// Returns both the counters and object count map separately
|
||||
std::pair<DebugCounters, ObjectCountMap>
|
||||
getDebugCounters()
|
||||
{
|
||||
DebugCounters counters;
|
||||
ObjectCountMap objectCounts =
|
||||
CountedObjects::getInstance().getCounts(1);
|
||||
|
||||
// Database metrics if applicable
|
||||
if (app_.config().useTxTables())
|
||||
{
|
||||
auto const db =
|
||||
dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase());
|
||||
if (!db)
|
||||
Throw<std::runtime_error>("Failed to get relational database");
|
||||
|
||||
if (auto dbKB = db->getKBUsedAll())
|
||||
counters.dbKBTotal = dbKB;
|
||||
if (auto dbKB = db->getKBUsedLedger())
|
||||
counters.dbKBLedger = dbKB;
|
||||
if (auto dbKB = db->getKBUsedTransaction())
|
||||
counters.dbKBTransaction = dbKB;
|
||||
if (auto count = app_.getOPs().getLocalTxCount())
|
||||
counters.localTxCount = count;
|
||||
}
|
||||
|
||||
// Basic metrics
|
||||
counters.writeLoad = app_.getNodeStore().getWriteLoad();
|
||||
counters.historicalPerMinute =
|
||||
static_cast<std::int32_t>(app_.getInboundLedgers().fetchRate());
|
||||
|
||||
// Cache metrics - convert floating point rates to fixed point
|
||||
counters.sleHitRate = 0; // TODO: SLE cache hit-rate accessor absent on this fork
|
||||
counters.ledgerHitRate = static_cast<std::uint32_t>(
|
||||
app_.getLedgerMaster().getCacheHitRate() * 1000);
|
||||
counters.alSize = app_.getAcceptedLedgerCache().size();
|
||||
counters.alHitRate = static_cast<std::uint32_t>(
|
||||
app_.getAcceptedLedgerCache().getHitRate() * 1000);
|
||||
counters.fullbelowSize = static_cast<std::int32_t>(
|
||||
app_.getNodeFamily().getFullBelowCache()->size());
|
||||
counters.treenodeCacheSize =
|
||||
app_.getNodeFamily().getTreeNodeCache()->getCacheSize();
|
||||
counters.treenodeTrackSize =
|
||||
app_.getNodeFamily().getTreeNodeCache()->getTrackSize();
|
||||
|
||||
// Get regular node store metrics
|
||||
counters.nodeWriteCount = app_.getNodeStore().getStoreCount();
|
||||
counters.nodeWriteSize = app_.getNodeStore().getStoreSize();
|
||||
counters.nodeFetchCount = app_.getNodeStore().getFetchTotalCount();
|
||||
counters.nodeFetchHitCount = app_.getNodeStore().getFetchHitCount();
|
||||
counters.nodeFetchSize = app_.getNodeStore().getFetchSize();
|
||||
|
||||
return {counters, objectCounts};
|
||||
}
|
||||
|
||||
uint32_t
|
||||
getPhysicalCPUCount()
|
||||
{
|
||||
static uint32_t count = 0;
|
||||
if (count > 0)
|
||||
return count;
|
||||
|
||||
#if defined(__linux__)
|
||||
try
|
||||
{
|
||||
std::ifstream cpuinfo("/proc/cpuinfo");
|
||||
if (!cpuinfo)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "Unable to open file: /proc/cpuinfo";
|
||||
return count;
|
||||
}
|
||||
std::string line;
|
||||
std::set<std::string> physical_ids;
|
||||
std::string current_physical_id;
|
||||
|
||||
while (std::getline(cpuinfo, line))
|
||||
{
|
||||
if (line.find("core id") != std::string::npos)
|
||||
{
|
||||
current_physical_id = line.substr(line.find(":") + 1);
|
||||
// Trim whitespace
|
||||
current_physical_id.erase(
|
||||
0, current_physical_id.find_first_not_of(" \t"));
|
||||
current_physical_id.erase(
|
||||
current_physical_id.find_last_not_of(" \t") + 1);
|
||||
physical_ids.insert(current_physical_id);
|
||||
}
|
||||
}
|
||||
|
||||
count = physical_ids.size();
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "Error getting CPU count: " << e.what();
|
||||
}
|
||||
|
||||
// Return at least 1 if we couldn't determine the count
|
||||
return count > 0 ? count : (count = 1);
|
||||
#elif defined(__APPLE__)
|
||||
int value = 0;
|
||||
size_t size = sizeof(value);
|
||||
if (sysctlbyname("hw.physicalcpu", &value, &size, NULL, 0) == 0)
|
||||
count = value;
|
||||
return count > 0 ? count : (count = 1);
|
||||
#endif
|
||||
}
|
||||
|
||||
SystemMetrics
|
||||
collectSystemMetrics()
|
||||
{
|
||||
SystemMetrics metrics{};
|
||||
metrics.timestamp =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
|
||||
#if defined(__linux__)
|
||||
// Network stats collection
|
||||
try
|
||||
{
|
||||
std::ifstream net_file("/proc/net/dev");
|
||||
if (!net_file)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "Unable to open file /proc/net/dev";
|
||||
return metrics;
|
||||
}
|
||||
|
||||
std::string line;
|
||||
uint64_t total_bytes_in = 0, total_bytes_out = 0;
|
||||
|
||||
// Skip header lines
|
||||
std::getline(net_file, line); // Inter-| Receive...
|
||||
std::getline(net_file, line); // face |bytes...
|
||||
|
||||
while (std::getline(net_file, line))
|
||||
{
|
||||
if (line.find(':') != std::string::npos)
|
||||
{
|
||||
std::string interface = line.substr(0, line.find(':'));
|
||||
interface =
|
||||
interface.substr(interface.find_first_not_of(" \t"));
|
||||
interface = interface.substr(
|
||||
0, interface.find_last_not_of(" \t") + 1);
|
||||
|
||||
// Skip loopback interface
|
||||
if (interface == "lo")
|
||||
continue;
|
||||
|
||||
uint64_t bytes_in, bytes_out;
|
||||
std::istringstream iss(line.substr(line.find(':') + 1));
|
||||
iss >> bytes_in; // First field after : is bytes_in
|
||||
for (int i = 0; i < 8; ++i)
|
||||
iss >> std::ws; // Skip 8 fields
|
||||
iss >> bytes_out; // 9th field is bytes_out
|
||||
|
||||
total_bytes_in += bytes_in;
|
||||
total_bytes_out += bytes_out;
|
||||
}
|
||||
}
|
||||
metrics.network_bytes_in = total_bytes_in;
|
||||
metrics.network_bytes_out = total_bytes_out;
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "Error collecting network stats: " << e.what();
|
||||
}
|
||||
|
||||
// Disk stats collection
|
||||
try
|
||||
{
|
||||
std::ifstream disk_file("/proc/diskstats");
|
||||
if (!disk_file)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "Unable to open file: /proc/diskstats";
|
||||
return metrics;
|
||||
}
|
||||
std::string line;
|
||||
uint64_t total_bytes_read = 0, total_bytes_written = 0;
|
||||
|
||||
while (std::getline(disk_file, line))
|
||||
{
|
||||
unsigned int major, minor;
|
||||
char dev_name[32];
|
||||
uint64_t reads, read_sectors, writes, write_sectors;
|
||||
|
||||
if (sscanf(
|
||||
line.c_str(),
|
||||
"%u %u %31s %lu %*u %lu %*u %lu %*u %lu",
|
||||
&major,
|
||||
&minor,
|
||||
dev_name,
|
||||
&reads,
|
||||
&read_sectors,
|
||||
&writes,
|
||||
&write_sectors) == 7)
|
||||
{
|
||||
// Only process physical devices
|
||||
std::string device_name(dev_name);
|
||||
if (device_name.substr(0, 3) == "dm-" ||
|
||||
device_name.substr(0, 4) == "loop" ||
|
||||
device_name.substr(0, 3) == "ram")
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip partitions (usually have a number at the end)
|
||||
if (std::isdigit(device_name.back()))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
uint64_t bytes_read = read_sectors * 512;
|
||||
uint64_t bytes_written = write_sectors * 512;
|
||||
|
||||
total_bytes_read += bytes_read;
|
||||
total_bytes_written += bytes_written;
|
||||
}
|
||||
}
|
||||
metrics.disk_bytes_read = total_bytes_read;
|
||||
metrics.disk_bytes_written = total_bytes_written;
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "Error collecting disk stats: " << e.what();
|
||||
}
|
||||
#elif defined(__APPLE__)
|
||||
// Network stats collection
|
||||
try
|
||||
{
|
||||
struct ifaddrs* ifap;
|
||||
if (getifaddrs(&ifap) == 0)
|
||||
{
|
||||
uint64_t total_bytes_in = 0, total_bytes_out = 0;
|
||||
for (struct ifaddrs* ifa = ifap; ifa; ifa = ifa->ifa_next)
|
||||
{
|
||||
if (ifa->ifa_addr != NULL &&
|
||||
ifa->ifa_addr->sa_family == AF_LINK)
|
||||
{
|
||||
struct if_data* ifd = (struct if_data*)ifa->ifa_data;
|
||||
if (ifd != NULL)
|
||||
{
|
||||
// Skip loopback interface
|
||||
if (strcmp(ifa->ifa_name, "lo0") == 0)
|
||||
continue;
|
||||
|
||||
total_bytes_in += ifd->ifi_ibytes;
|
||||
total_bytes_out += ifd->ifi_obytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
freeifaddrs(ifap);
|
||||
|
||||
metrics.network_bytes_in = total_bytes_in;
|
||||
metrics.network_bytes_out = total_bytes_out;
|
||||
}
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "Error collecting network stats: " << e.what();
|
||||
}
|
||||
|
||||
// Disk stats collection
|
||||
// Disk IO stats are not easily accessible in macOS.
|
||||
// We'll set these values to zero for now.
|
||||
metrics.disk_bytes_read = 0;
|
||||
metrics.disk_bytes_written = 0;
|
||||
#endif
|
||||
return metrics;
|
||||
}
|
||||
|
||||
std::vector<uint8_t>
|
||||
generateServerInfo()
|
||||
{
|
||||
auto& ops = app_.getOPs();
|
||||
auto& ledgerMaster = app_.getLedgerMaster();
|
||||
|
||||
auto currentMetrics = collectSystemMetrics();
|
||||
metrics_tracker_.addSample(currentMetrics);
|
||||
|
||||
// Slimmed for this fork (3.2.0-b0): ledger ranges, DB debug-counters and
|
||||
// the object-count map are omitted (divergent accessors). The packet is
|
||||
// just the fixed header with core node + OS metrics.
|
||||
std::vector<uint8_t> buffer(sizeof(ServerInfoHeader));
|
||||
auto* header = reinterpret_cast<ServerInfoHeader*>(buffer.data());
|
||||
memset(header, 0, sizeof(ServerInfoHeader));
|
||||
|
||||
header->magic = SERVER_INFO_MAGIC;
|
||||
header->version = SERVER_INFO_VERSION;
|
||||
header->network_id = app_.config().networkId;
|
||||
header->timestamp =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
header->uptime = UptimeClock::now().time_since_epoch().count();
|
||||
header->io_latency_us = app_.getIOLatency().count();
|
||||
header->validation_quorum = 0; // TODO: fork validator-list accessor
|
||||
header->peer_count = app_.getOverlay().size();
|
||||
header->node_size = app_.config().nodeSize;
|
||||
|
||||
auto const [counters, mode, start, initialSync] =
|
||||
ops.getStateAccountingData();
|
||||
for (size_t i = 0; i < 5; ++i)
|
||||
{
|
||||
header->state_transitions[i] = counters[i].transitions;
|
||||
header->state_durations[i] = counters[i].dur.count();
|
||||
}
|
||||
header->initial_sync_us = initialSync;
|
||||
|
||||
if (ops.isAmendmentBlocked())
|
||||
header->warning_flags |= WARNING_AMENDMENT_BLOCKED;
|
||||
if (ops.isUNLBlocked())
|
||||
header->warning_flags |= WARNING_UNL_BLOCKED;
|
||||
if (ops.isAmendmentWarned())
|
||||
header->warning_flags |= WARNING_AMENDMENT_WARNED;
|
||||
if (ops.getOperatingMode() != OperatingMode::FULL)
|
||||
header->warning_flags |= WARNING_NOT_SYNCED;
|
||||
|
||||
// Consensus timing is private on this fork's NetworkOPs; zeroed.
|
||||
header->proposer_count = 0;
|
||||
header->converge_time_ms = 0;
|
||||
|
||||
auto const fp = ledgerMaster.getFetchPackCacheSize();
|
||||
if (fp != 0)
|
||||
header->fetch_pack_size = fp;
|
||||
|
||||
// Load factor (server only; fee-escalation term omitted on this fork).
|
||||
header->load_factor =
|
||||
static_cast<std::uint64_t>(app_.getFeeTrack().getLoadFactor());
|
||||
header->load_base = app_.getFeeTrack().getLoadBase();
|
||||
|
||||
#if defined(__linux__)
|
||||
// Get system info using sysinfo
|
||||
struct sysinfo si;
|
||||
if (sysinfo(&si) == 0)
|
||||
{
|
||||
header->system_memory_total = si.totalram * si.mem_unit;
|
||||
header->system_memory_free = si.freeram * si.mem_unit;
|
||||
header->system_memory_used =
|
||||
header->system_memory_total - header->system_memory_free;
|
||||
header->load_avg_1min = si.loads[0] / (float)(1 << SI_LOAD_SHIFT);
|
||||
header->load_avg_5min = si.loads[1] / (float)(1 << SI_LOAD_SHIFT);
|
||||
header->load_avg_15min = si.loads[2] / (float)(1 << SI_LOAD_SHIFT);
|
||||
}
|
||||
#elif defined(__APPLE__)
|
||||
// Get total physical memory
|
||||
int64_t physical_memory;
|
||||
size_t length = sizeof(physical_memory);
|
||||
if (sysctlbyname("hw.memsize", &physical_memory, &length, NULL, 0) == 0)
|
||||
{
|
||||
header->system_memory_total = physical_memory;
|
||||
}
|
||||
|
||||
// Get free and used memory
|
||||
vm_statistics_data_t vm_stats;
|
||||
mach_msg_type_number_t count = HOST_VM_INFO_COUNT;
|
||||
if (host_statistics(
|
||||
mach_host_self(),
|
||||
HOST_VM_INFO,
|
||||
(host_info_t)&vm_stats,
|
||||
&count) == KERN_SUCCESS)
|
||||
{
|
||||
uint64_t page_size;
|
||||
length = sizeof(page_size);
|
||||
sysctlbyname("hw.pagesize", &page_size, &length, NULL, 0);
|
||||
|
||||
header->system_memory_free =
|
||||
(uint64_t)vm_stats.free_count * page_size;
|
||||
header->system_memory_used =
|
||||
header->system_memory_total - header->system_memory_free;
|
||||
}
|
||||
|
||||
// Get load averages
|
||||
double loadavg[3];
|
||||
if (getloadavg(loadavg, 3) == 3)
|
||||
{
|
||||
header->load_avg_1min = loadavg[0];
|
||||
header->load_avg_5min = loadavg[1];
|
||||
header->load_avg_15min = loadavg[2];
|
||||
}
|
||||
#endif
|
||||
|
||||
// Get process memory usage
|
||||
struct rusage usage;
|
||||
getrusage(RUSAGE_SELF, &usage);
|
||||
header->process_memory_pages = usage.ru_maxrss;
|
||||
|
||||
// Get disk usage
|
||||
#if defined(__linux__)
|
||||
struct statvfs fs;
|
||||
if (statvfs("/", &fs) == 0)
|
||||
{
|
||||
header->system_disk_total = fs.f_blocks * fs.f_frsize;
|
||||
header->system_disk_free = fs.f_bfree * fs.f_frsize;
|
||||
header->system_disk_used =
|
||||
header->system_disk_total - header->system_disk_free;
|
||||
}
|
||||
#elif defined(__APPLE__)
|
||||
struct statfs fs;
|
||||
if (statfs("/", &fs) == 0)
|
||||
{
|
||||
header->system_disk_total = fs.f_blocks * fs.f_bsize;
|
||||
header->system_disk_free = fs.f_bfree * fs.f_bsize;
|
||||
header->system_disk_used =
|
||||
header->system_disk_total - header->system_disk_free;
|
||||
}
|
||||
#endif
|
||||
|
||||
// Get CPU core count
|
||||
header->cpu_cores = getPhysicalCPUCount();
|
||||
|
||||
// Get rate statistics
|
||||
auto rates = metrics_tracker_.getRates(currentMetrics);
|
||||
header->rates.network_in = rates.network_in;
|
||||
header->rates.network_out = rates.network_out;
|
||||
header->rates.disk_read = rates.disk_read;
|
||||
header->rates.disk_write = rates.disk_write;
|
||||
|
||||
// Ledger height via a stable accessor (this fork's Ledger lacks info()).
|
||||
header->ledger_seq = ledgerMaster.getValidLedgerIndex();
|
||||
header->reserve_base = app_.config().fees.accountReserve.drops();
|
||||
header->reserve_inc = app_.config().fees.ownerReserve.drops();
|
||||
|
||||
// Node public key + version string.
|
||||
auto const& nodeKey = app_.nodeIdentity().first;
|
||||
std::memcpy(header->node_public_key, nodeKey.data(), 33);
|
||||
memset(&header->version_string, 0, 32);
|
||||
memcpy(
|
||||
&header->version_string,
|
||||
BuildInfo::getVersionString().c_str(),
|
||||
BuildInfo::getVersionString().size() > 32
|
||||
? 32
|
||||
: BuildInfo::getVersionString().size());
|
||||
|
||||
header->ledger_range_count = 0;
|
||||
return buffer;
|
||||
}
|
||||
void
|
||||
monitorThread()
|
||||
{
|
||||
std::vector<std::pair<EndpointInfo, int>> endpoints;
|
||||
|
||||
for (auto const& epStr : app_.config().DATAGRAM_MONITOR)
|
||||
{
|
||||
auto endpoint = parseEndpoint(epStr);
|
||||
endpoints.push_back(
|
||||
std::make_pair(endpoint, createSocket(endpoint)));
|
||||
}
|
||||
|
||||
while (running_)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto info = generateServerInfo();
|
||||
for (auto const& ep : endpoints)
|
||||
{
|
||||
sendPacket(ep.second, ep.first, info);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
// Log error but continue monitoring
|
||||
JLOG(j_.error())
|
||||
<< "Server info monitor error: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
for (auto const& ep : endpoints)
|
||||
{
|
||||
close(ep.second);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
DatagramMonitor(Application& app) : app_(app), j_(beast::Journal::getNullSink())
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
start()
|
||||
{
|
||||
if (!running_.exchange(true))
|
||||
{
|
||||
monitor_thread_ =
|
||||
std::thread(&DatagramMonitor::monitorThread, this);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
stop()
|
||||
{
|
||||
if (running_.exchange(false))
|
||||
{
|
||||
if (monitor_thread_.joinable())
|
||||
monitor_thread_.join();
|
||||
}
|
||||
}
|
||||
|
||||
~DatagramMonitor()
|
||||
{
|
||||
stop();
|
||||
}
|
||||
};
|
||||
} // namespace xrpl
|
||||
#endif
|
||||
@@ -346,6 +346,9 @@ public:
|
||||
OperatingMode
|
||||
getOperatingMode() const override;
|
||||
|
||||
StateAccountingData
|
||||
getStateAccountingData() override;
|
||||
|
||||
std::string
|
||||
strOperatingMode(OperatingMode const mode, bool const admin) const override;
|
||||
|
||||
@@ -915,6 +918,16 @@ NetworkOPsImp::getOperatingMode() const
|
||||
return mode_;
|
||||
}
|
||||
|
||||
NetworkOPs::StateAccountingData
|
||||
NetworkOPsImp::getStateAccountingData()
|
||||
{
|
||||
auto const data = accounting_.getCounterData();
|
||||
std::array<NetworkOPs::AccountingCounter, 5> out;
|
||||
for (std::size_t i = 0; i < out.size(); ++i)
|
||||
out[i] = {data.counters[i].transitions, data.counters[i].dur};
|
||||
return {out, data.mode, data.start, data.initialSyncUs};
|
||||
}
|
||||
|
||||
inline std::string
|
||||
NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
|
||||
{
|
||||
|
||||
@@ -165,22 +165,6 @@ std::unique_ptr<NodeStore::Database>
|
||||
SHAMapStoreImp::makeNodeStore(int readThreads)
|
||||
{
|
||||
auto nscfg = app_.config().section(ConfigSection::nodeDatabase());
|
||||
|
||||
// Provide default values.
|
||||
if (!nscfg.exists("cache_size"))
|
||||
{
|
||||
nscfg.set(
|
||||
"cache_size",
|
||||
std::to_string(app_.config().getValueFor(SizedItem::TreeCacheSize, std::nullopt)));
|
||||
}
|
||||
|
||||
if (!nscfg.exists("cache_age"))
|
||||
{
|
||||
nscfg.set(
|
||||
"cache_age",
|
||||
std::to_string(app_.config().getValueFor(SizedItem::TreeCacheAge, std::nullopt)));
|
||||
}
|
||||
|
||||
std::unique_ptr<NodeStore::Database> db;
|
||||
|
||||
if (deleteInterval_ != 0u)
|
||||
@@ -270,8 +254,6 @@ SHAMapStoreImp::run()
|
||||
LedgerIndex lastRotated = stateDb_.getState().lastRotated;
|
||||
netOPs_ = &app_.getOPs();
|
||||
ledgerMaster_ = &app_.getLedgerMaster();
|
||||
fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache());
|
||||
treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache());
|
||||
|
||||
if (advisoryDelete_)
|
||||
canDelete_ = stateDb_.getCanDelete();
|
||||
@@ -560,16 +542,16 @@ SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
|
||||
// Also clear the FullBelowCache so its generation counter is bumped.
|
||||
// This prevents stale "full below" markers from persisting across
|
||||
// backend rotation/online deletion and interfering with SHAMap sync.
|
||||
fullBelowCache_->clear();
|
||||
app_.getNodeFamily().getFullBelowCache()->clear();
|
||||
}
|
||||
|
||||
void
|
||||
SHAMapStoreImp::freshenCaches()
|
||||
{
|
||||
if (freshenCache(*treeNodeCache_))
|
||||
return;
|
||||
if (freshenCache(app_.getMasterTransaction().getCache()))
|
||||
if (freshenCache(*app_.getNodeFamily().getTreeNodeCache()))
|
||||
return;
|
||||
|
||||
freshenCache(app_.getMasterTransaction().getCache());
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -7,8 +7,6 @@
|
||||
#include <xrpl/nodestore/Scheduler.h>
|
||||
#include <xrpl/rdb/DatabaseCon.h>
|
||||
#include <xrpl/server/State.h>
|
||||
#include <xrpl/shamap/FullBelowCache.h>
|
||||
#include <xrpl/shamap/TreeNodeCache.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
@@ -95,8 +93,6 @@ private:
|
||||
// as of run() or before
|
||||
NetworkOPs* netOPs_ = nullptr;
|
||||
LedgerMaster* ledgerMaster_ = nullptr;
|
||||
FullBelowCache* fullBelowCache_ = nullptr;
|
||||
TreeNodeCache* treeNodeCache_ = nullptr;
|
||||
|
||||
static constexpr auto kNodeStoreName = "NodeStore";
|
||||
|
||||
|
||||
@@ -59,10 +59,10 @@ struct ConsensusParms
|
||||
std::size_t const minConsensusPct = 80;
|
||||
|
||||
//! The duration a ledger may remain idle before closing
|
||||
std::chrono::milliseconds const ledgerIdleInterval = std::chrono::seconds{5};
|
||||
std::chrono::milliseconds const ledgerIdleInterval = std::chrono::seconds{15};
|
||||
|
||||
//! The number of seconds we wait minimum to ensure participation
|
||||
std::chrono::milliseconds const ledgerMinConsensus = std::chrono::milliseconds{1000};
|
||||
std::chrono::milliseconds const ledgerMinConsensus = std::chrono::milliseconds{1950};
|
||||
|
||||
/** The maximum amount of time to spend pausing for laggards.
|
||||
*
|
||||
@@ -73,7 +73,7 @@ struct ConsensusParms
|
||||
std::chrono::milliseconds const ledgerMaxConsensus = std::chrono::seconds{15};
|
||||
|
||||
//! Minimum number of seconds to wait to ensure others have computed the LCL
|
||||
std::chrono::milliseconds const ledgerMinClose = std::chrono::seconds{1};
|
||||
std::chrono::milliseconds const ledgerMinClose = std::chrono::seconds{2};
|
||||
|
||||
//! How often we check state or change positions
|
||||
std::chrono::milliseconds const ledgerGRANULARITY = std::chrono::seconds{1};
|
||||
@@ -99,7 +99,7 @@ struct ConsensusParms
|
||||
twice the interval between proposals (0.7s) divided by
|
||||
the interval between mid and late consensus ([85-50]/100).
|
||||
*/
|
||||
std::chrono::milliseconds const avMinConsensusTime = std::chrono::seconds{2};
|
||||
std::chrono::milliseconds const avMinConsensusTime = std::chrono::seconds{5};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Avalanche tuning
|
||||
|
||||
@@ -134,6 +134,10 @@ public:
|
||||
// Entries from [ips_fixed] config stanza
|
||||
std::vector<std::string> ipsFixed;
|
||||
|
||||
// Entries from [datagram_monitor]: "<IP> <port>" UDP targets the
|
||||
// DatagramMonitor sends node-stats packets to (XDGM, every 1s).
|
||||
std::vector<std::string> DATAGRAM_MONITOR;
|
||||
|
||||
StartUpType startUp = StartUpType::Normal;
|
||||
|
||||
bool startValid = false;
|
||||
@@ -209,7 +213,7 @@ public:
|
||||
|
||||
// Work queue limits
|
||||
int maxTransactions = 250;
|
||||
static constexpr int kMaxJobQueueTx = 100000;
|
||||
static constexpr int kMaxJobQueueTx = 1000;
|
||||
static constexpr int kMinJobQueueTx = 100;
|
||||
|
||||
// Amendment majority time
|
||||
|
||||
@@ -27,6 +27,7 @@ struct ConfigSection
|
||||
#define SECTION_BETA_RPC_API "beta_rpc_api"
|
||||
#define SECTION_CLUSTER_NODES "cluster_nodes"
|
||||
#define SECTION_COMPRESSION "compression"
|
||||
#define SECTION_DATAGRAM_MONITOR "datagram_monitor"
|
||||
#define SECTION_DEBUG_LOGFILE "debug_logfile"
|
||||
#define SECTION_ELB_SUPPORT "elb_support"
|
||||
#define SECTION_FEE_DEFAULT "fee_default"
|
||||
|
||||
@@ -483,6 +483,9 @@ Config::loadFromString(std::string const& fileContents)
|
||||
if (auto s = getIniFileSection(secConfig, SECTION_IPS_FIXED))
|
||||
ipsFixed = *s;
|
||||
|
||||
if (auto s = getIniFileSection(secConfig, SECTION_DATAGRAM_MONITOR))
|
||||
DATAGRAM_MONITOR = *s;
|
||||
|
||||
// if the user has specified ip:port then replace : with a space.
|
||||
{
|
||||
auto replaceColons = [](std::vector<std::string>& strVec) {
|
||||
|
||||
Reference in New Issue
Block a user