Datagram monitor (#400)

Co-authored-by: Denis Angell <dangell@transia.co>
This commit is contained in:
RichardAH
2024-12-11 09:38:16 +10:00
committed by Richard Holland
parent 9d54da3880
commit e9468d8b4a
12 changed files with 1294 additions and 182 deletions

View File

@@ -392,6 +392,7 @@ target_sources (rippled PRIVATE
src/ripple/app/misc/NegativeUNLVote.cpp src/ripple/app/misc/NegativeUNLVote.cpp
src/ripple/app/misc/NetworkOPs.cpp src/ripple/app/misc/NetworkOPs.cpp
src/ripple/app/misc/SHAMapStoreImp.cpp src/ripple/app/misc/SHAMapStoreImp.cpp
src/ripple/app/misc/StateAccounting.cpp
src/ripple/app/misc/detail/impl/WorkSSL.cpp src/ripple/app/misc/detail/impl/WorkSSL.cpp
src/ripple/app/misc/impl/AccountTxPaging.cpp src/ripple/app/misc/impl/AccountTxPaging.cpp
src/ripple/app/misc/impl/AmendmentTable.cpp src/ripple/app/misc/impl/AmendmentTable.cpp

View File

@@ -152,6 +152,9 @@ public:
std::string std::string
getCompleteLedgers(); getCompleteLedgers();
RangeSet<std::uint32_t>
getCompleteLedgersRangeSet();
/** Apply held transactions to the open ledger /** Apply held transactions to the open ledger
This is normally called as we close the ledger. This is normally called as we close the ledger.
The open ledger remains open to handle new transactions The open ledger remains open to handle new transactions

View File

@@ -1714,6 +1714,13 @@ LedgerMaster::getCompleteLedgers()
return to_string(mCompleteLedgers); return to_string(mCompleteLedgers);
} }
RangeSet<std::uint32_t>
LedgerMaster::getCompleteLedgersRangeSet()
{
std::lock_guard sl(mCompleteLock);
return mCompleteLedgers;
}
std::optional<NetClock::time_point> std::optional<NetClock::time_point>
LedgerMaster::getCloseTimeBySeq(LedgerIndex ledgerIndex) LedgerMaster::getCloseTimeBySeq(LedgerIndex ledgerIndex)
{ {

View File

@@ -37,6 +37,7 @@
#include <ripple/app/main/NodeStoreScheduler.h> #include <ripple/app/main/NodeStoreScheduler.h>
#include <ripple/app/main/Tuning.h> #include <ripple/app/main/Tuning.h>
#include <ripple/app/misc/AmendmentTable.h> #include <ripple/app/misc/AmendmentTable.h>
#include <ripple/app/misc/DatagramMonitor.h>
#include <ripple/app/misc/HashRouter.h> #include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/LoadFeeTrack.h> #include <ripple/app/misc/LoadFeeTrack.h>
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
@@ -167,6 +168,8 @@ public:
std::unique_ptr<Logs> logs_; std::unique_ptr<Logs> logs_;
std::unique_ptr<TimeKeeper> timeKeeper_; std::unique_ptr<TimeKeeper> timeKeeper_;
std::unique_ptr<DatagramMonitor> datagram_monitor_;
std::uint64_t const instanceCookie_; std::uint64_t const instanceCookie_;
beast::Journal m_journal; beast::Journal m_journal;
@@ -1523,6 +1526,14 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
if (reportingETL_) if (reportingETL_)
reportingETL_->start(); reportingETL_->start();
// Datagram monitor if applicable
if (!config_->standalone() && config_->DATAGRAM_MONITOR != "")
{
datagram_monitor_ = std::make_unique<DatagramMonitor>(*this);
if (datagram_monitor_)
datagram_monitor_->start();
}
return true; return true;
} }

File diff suppressed because it is too large Load Diff

View File

@@ -33,6 +33,7 @@
#include <ripple/app/misc/HashRouter.h> #include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/LoadFeeTrack.h> #include <ripple/app/misc/LoadFeeTrack.h>
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/StateAccounting.h>
#include <ripple/app/misc/Transaction.h> #include <ripple/app/misc/Transaction.h>
#include <ripple/app/misc/TxQ.h> #include <ripple/app/misc/TxQ.h>
#include <ripple/app/misc/ValidatorKeys.h> #include <ripple/app/misc/ValidatorKeys.h>
@@ -70,7 +71,6 @@
#include <ripple/rpc/impl/UDPInfoSub.h> #include <ripple/rpc/impl/UDPInfoSub.h>
#include <boost/asio/ip/host_name.hpp> #include <boost/asio/ip/host_name.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <ripple/rpc/impl/UDPInfoSub.h>
#include <exception> #include <exception>
#include <mutex> #include <mutex>
#include <set> #include <set>
@@ -117,81 +117,6 @@ class NetworkOPsImp final : public NetworkOPs
running, running,
}; };
static std::array<char const*, 5> const states_;
/**
* State accounting records two attributes for each possible server state:
* 1) Amount of time spent in each state (in microseconds). This value is
* updated upon each state transition.
* 2) Number of transitions to each state.
*
* This data can be polled through server_info and represented by
* monitoring systems similarly to how bandwidth, CPU, and other
* counter-based metrics are managed.
*
* State accounting is more accurate than periodic sampling of server
* state. With periodic sampling, it is very likely that state transitions
* are missed, and accuracy of time spent in each state is very rough.
*/
class StateAccounting
{
struct Counters
{
explicit Counters() = default;
std::uint64_t transitions = 0;
std::chrono::microseconds dur = std::chrono::microseconds(0);
};
OperatingMode mode_ = OperatingMode::DISCONNECTED;
std::array<Counters, 5> counters_;
mutable std::mutex mutex_;
std::chrono::steady_clock::time_point start_ =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point const processStart_ = start_;
std::uint64_t initialSyncUs_{0};
static std::array<Json::StaticString const, 5> const states_;
public:
explicit StateAccounting()
{
counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions = 1;
}
/**
* Record state transition. Update duration spent in previous
* state.
*
* @param om New state.
*/
void
mode(OperatingMode om);
/**
* Output state counters in JSON format.
*
* @obj Json object to which to add state accounting data.
*/
void
json(Json::Value& obj) const;
struct CounterData
{
decltype(counters_) counters;
decltype(mode_) mode;
decltype(start_) start;
decltype(initialSyncUs_) initialSyncUs;
};
CounterData
getCounterData() const
{
std::lock_guard lock(mutex_);
return {counters_, mode_, start_, initialSyncUs_};
}
};
//! Server fees published on `server` subscription //! Server fees published on `server` subscription
struct ServerFeeSummary struct ServerFeeSummary
{ {
@@ -273,6 +198,9 @@ public:
std::string std::string
strOperatingMode(bool const admin = false) const override; strOperatingMode(bool const admin = false) const override;
StateAccounting::CounterData
getStateAccountingData();
// //
// Transaction operations. // Transaction operations.
// //
@@ -777,11 +705,17 @@ private:
DispatchState mDispatchState = DispatchState::none; DispatchState mDispatchState = DispatchState::none;
std::vector<TransactionStatus> mTransactions; std::vector<TransactionStatus> mTransactions;
StateAccounting accounting_{}; StateAccounting accounting_;
std::set<uint256> pendingValidations_; std::set<uint256> pendingValidations_;
std::mutex validationsMutex_; std::mutex validationsMutex_;
RCLConsensus&
getConsensus();
LedgerMaster&
getLedgerMaster();
private: private:
struct Stats struct Stats
{ {
@@ -844,19 +778,6 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
static std::array<char const*, 5> const stateNames{
{"disconnected", "connected", "syncing", "tracking", "full"}};
std::array<char const*, 5> const NetworkOPsImp::states_ = stateNames;
std::array<Json::StaticString const, 5> const
NetworkOPsImp::StateAccounting::states_ = {
{Json::StaticString(stateNames[0]),
Json::StaticString(stateNames[1]),
Json::StaticString(stateNames[2]),
Json::StaticString(stateNames[3]),
Json::StaticString(stateNames[4])}};
static auto const genesisAccountId = calcAccountID( static auto const genesisAccountId = calcAccountID(
generateKeyPair(KeyType::secp256k1, generateSeed("masterpassphrase")) generateKeyPair(KeyType::secp256k1, generateSeed("masterpassphrase"))
.first); .first);
@@ -1131,7 +1052,7 @@ NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
} }
} }
return states_[static_cast<std::size_t>(mode)]; return {StateAccounting::states_[static_cast<std::size_t>(mode)].c_str()};
} }
void void
@@ -2397,6 +2318,19 @@ NetworkOPsImp::getConsensusInfo()
return mConsensus.getJson(true); return mConsensus.getJson(true);
} }
// RHTODO: not threadsafe?
RCLConsensus&
NetworkOPsImp::getConsensus()
{
return mConsensus;
}
LedgerMaster&
NetworkOPsImp::getLedgerMaster()
{
return m_ledgerMaster;
}
Json::Value Json::Value
NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters) NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
{ {
@@ -4194,6 +4128,12 @@ NetworkOPsImp::stateAccounting(Json::Value& obj)
accounting_.json(obj); accounting_.json(obj);
} }
StateAccounting::CounterData
NetworkOPsImp::getStateAccountingData()
{
return accounting_.getCounterData();
}
// <-- bool: true=erased, false=was not there // <-- bool: true=erased, false=was not there
bool bool
NetworkOPsImp::unsubValidations(std::uint64_t uSeq) NetworkOPsImp::unsubValidations(std::uint64_t uSeq)
@@ -4664,50 +4604,6 @@ NetworkOPsImp::collect_metrics()
counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions); counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
} }
void
NetworkOPsImp::StateAccounting::mode(OperatingMode om)
{
auto now = std::chrono::steady_clock::now();
std::lock_guard lock(mutex_);
++counters_[static_cast<std::size_t>(om)].transitions;
if (om == OperatingMode::FULL &&
counters_[static_cast<std::size_t>(om)].transitions == 1)
{
initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
now - processStart_)
.count();
}
counters_[static_cast<std::size_t>(mode_)].dur +=
std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
mode_ = om;
start_ = now;
}
void
NetworkOPsImp::StateAccounting::json(Json::Value& obj) const
{
auto [counters, mode, start, initialSync] = getCounterData();
auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
obj[jss::state_accounting] = Json::objectValue;
for (std::size_t i = static_cast<std::size_t>(OperatingMode::DISCONNECTED);
i <= static_cast<std::size_t>(OperatingMode::FULL);
++i)
{
obj[jss::state_accounting][states_[i]] = Json::objectValue;
auto& state = obj[jss::state_accounting][states_[i]];
state[jss::transitions] = std::to_string(counters[i].transitions);
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}
obj[jss::server_state_duration_us] = std::to_string(current.count());
if (initialSync)
obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::unique_ptr<NetworkOPs> std::unique_ptr<NetworkOPs>

View File

@@ -20,8 +20,10 @@
#ifndef RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED #ifndef RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
#define RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED #define RIPPLE_APP_MISC_NETWORKOPS_H_INCLUDED
#include <ripple/app/consensus/RCLConsensus.h>
#include <ripple/app/consensus/RCLCxPeerPos.h> #include <ripple/app/consensus/RCLCxPeerPos.h>
#include <ripple/app/ledger/Ledger.h> #include <ripple/app/ledger/Ledger.h>
#include <ripple/app/misc/StateAccounting.h>
#include <ripple/core/JobQueue.h> #include <ripple/core/JobQueue.h>
#include <ripple/ledger/ReadView.h> #include <ripple/ledger/ReadView.h>
#include <ripple/net/InfoSub.h> #include <ripple/net/InfoSub.h>
@@ -42,35 +44,6 @@ class LedgerMaster;
class Transaction; class Transaction;
class ValidatorKeys; class ValidatorKeys;
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
// should use this interface. The RPC code will primarily be a light wrapper
// over this code.
//
// Eventually, it will check the node's operating mode (synched, unsynched,
// etectera) and defer to the correct means of processing. The current
// code assumes this node is synched (and will continue to do so until
// there's a functional network.
//
/** Specifies the mode under which the server believes it's operating.
This has implications about how the server processes transactions and
how it responds to requests (e.g. account balance request).
@note Other code relies on the numerical values of these constants; do
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
SYNCING = 2, //!< fallen slightly behind
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
/** Provides server functionality for clients. /** Provides server functionality for clients.
Clients include backend applications, local commands, and connected Clients include backend applications, local commands, and connected
@@ -221,6 +194,13 @@ public:
virtual Json::Value virtual Json::Value
getConsensusInfo() = 0; getConsensusInfo() = 0;
virtual RCLConsensus&
getConsensus() = 0;
virtual LedgerMaster&
getLedgerMaster() = 0;
virtual Json::Value virtual Json::Value
getServerInfo(bool human, bool admin, bool counters) = 0; getServerInfo(bool human, bool admin, bool counters) = 0;
virtual void virtual void
@@ -228,6 +208,9 @@ public:
virtual Json::Value virtual Json::Value
getLedgerFetchInfo() = 0; getLedgerFetchInfo() = 0;
virtual StateAccounting::CounterData
getStateAccountingData() = 0;
/** Accepts the current transaction tree, return the new ledger's sequence /** Accepts the current transaction tree, return the new ledger's sequence
This API is only used via RPC with the server in STANDALONE mode and This API is only used via RPC with the server in STANDALONE mode and

View File

@@ -0,0 +1,49 @@
#include <ripple/app/misc/StateAccounting.h>
namespace ripple {
void
StateAccounting::mode(OperatingMode om)
{
std::lock_guard lock(mutex_);
auto now = std::chrono::steady_clock::now();
++counters_[static_cast<std::size_t>(om)].transitions;
if (om == OperatingMode::FULL &&
counters_[static_cast<std::size_t>(om)].transitions == 1)
{
initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
now - processStart_)
.count();
}
counters_[static_cast<std::size_t>(mode_)].dur +=
std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
mode_ = om;
start_ = now;
}
void
StateAccounting::json(Json::Value& obj)
{
auto [counters, mode, start, initialSync] = getCounterData();
auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - start);
counters[static_cast<std::size_t>(mode)].dur += current;
obj[jss::state_accounting] = Json::objectValue;
for (std::size_t i = static_cast<std::size_t>(OperatingMode::DISCONNECTED);
i <= static_cast<std::size_t>(OperatingMode::FULL);
++i)
{
obj[jss::state_accounting][states_[i]] = Json::objectValue;
auto& state = obj[jss::state_accounting][states_[i]];
state[jss::transitions] = std::to_string(counters[i].transitions);
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}
obj[jss::server_state_duration_us] = std::to_string(current.count());
if (initialSync)
obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
}
} // namespace ripple

View File

@@ -0,0 +1,99 @@
#ifndef RIPPLE_APP_MAIN_STATEACCOUNTING_H_INCLUDED
#define RIPPLE_APP_MAIN_STATEACCOUNTING_H_INCLUDED
#include <ripple/basics/chrono.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/json/json_value.h>
#include <ripple/protocol/jss.h>
#include <array>
#include <mutex>
namespace ripple {
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
// should use this interface. The RPC code will primarily be a light wrapper
// over this code.
//
// Eventually, it will check the node's operating mode (synched, unsynched,
// etectera) and defer to the correct means of processing. The current
// code assumes this node is synched (and will continue to do so until
// there's a functional network.
//
/** Specifies the mode under which the server believes it's operating.
This has implications about how the server processes transactions and
how it responds to requests (e.g. account balance request).
@note Other code relies on the numerical values of these constants; do
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
SYNCING = 2, //!< fallen slightly behind
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
class StateAccounting
{
public:
constexpr static std::array<Json::StaticString const, 5> const states_ = {
{Json::StaticString("disconnected"),
Json::StaticString("connected"),
Json::StaticString("syncing"),
Json::StaticString("tracking"),
Json::StaticString("full")}};
struct Counters
{
explicit Counters() = default;
std::uint64_t transitions = 0;
std::chrono::microseconds dur = std::chrono::microseconds(0);
};
private:
OperatingMode mode_ = OperatingMode::DISCONNECTED;
std::array<Counters, 5> counters_;
mutable std::mutex mutex_;
std::chrono::steady_clock::time_point start_ =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point const processStart_ = start_;
std::uint64_t initialSyncUs_{0};
public:
explicit StateAccounting()
{
counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
.transitions = 1;
}
//! Record state transition. Update duration spent in previous state.
void
mode(OperatingMode om);
//! Output state counters in JSON format.
void
json(Json::Value& obj);
using CounterData = std::tuple<
decltype(counters_),
decltype(mode_),
decltype(start_),
decltype(initialSyncUs_)>;
CounterData
getCounterData()
{
return {counters_, mode_, start_, initialSyncUs_};
}
};
} // namespace ripple
#endif

View File

@@ -155,6 +155,8 @@ public:
std::map<std::string, PublicKey> std::map<std::string, PublicKey>
IMPORT_VL_KEYS; // hex string -> class PublicKey (for caching purposes) IMPORT_VL_KEYS; // hex string -> class PublicKey (for caching purposes)
std::string DATAGRAM_MONITOR;
enum StartUpType { enum StartUpType {
FRESH, FRESH,
NORMAL, NORMAL,

View File

@@ -101,6 +101,7 @@ struct ConfigSection
#define SECTION_SWEEP_INTERVAL "sweep_interval" #define SECTION_SWEEP_INTERVAL "sweep_interval"
#define SECTION_NETWORK_ID "network_id" #define SECTION_NETWORK_ID "network_id"
#define SECTION_IMPORT_VL_KEYS "import_vl_keys" #define SECTION_IMPORT_VL_KEYS "import_vl_keys"
#define SECTION_DATAGRAM_MONITOR "datagram_monitor"
} // namespace ripple } // namespace ripple

View File

@@ -281,6 +281,9 @@ Config::setupControl(bool bQuiet, bool bSilent, bool bStandalone)
// RAM and CPU resources. We default to "tiny" for standalone mode. // RAM and CPU resources. We default to "tiny" for standalone mode.
if (!bStandalone) if (!bStandalone)
{ {
NODE_SIZE = 4;
return;
// First, check against 'minimum' RAM requirements per node size: // First, check against 'minimum' RAM requirements per node size:
auto const& threshold = auto const& threshold =
sizedItems[std::underlying_type_t<SizedItem>(SizedItem::ramSizeGB)]; sizedItems[std::underlying_type_t<SizedItem>(SizedItem::ramSizeGB)];
@@ -465,26 +468,24 @@ Config::loadFromString(std::string const& fileContents)
SNTP_SERVERS = *s; SNTP_SERVERS = *s;
// if the user has specified ip:port then replace : with a space. // if the user has specified ip:port then replace : with a space.
{ auto replaceColons = [](std::vector<std::string>& strVec) {
auto replaceColons = [](std::vector<std::string>& strVec) { const static std::regex e(":([0-9]+)$");
const static std::regex e(":([0-9]+)$"); for (auto& line : strVec)
for (auto& line : strVec) {
{ // skip anything that might be an ipv6 address
// skip anything that might be an ipv6 address if (std::count(line.begin(), line.end(), ':') != 1)
if (std::count(line.begin(), line.end(), ':') != 1) continue;
continue;
std::string result = std::regex_replace(line, e, " $1"); std::string result = std::regex_replace(line, e, " $1");
// sanity check the result of the replace, should be same length // sanity check the result of the replace, should be same length
// as input // as input
if (result.size() == line.size()) if (result.size() == line.size())
line = result; line = result;
} }
}; };
replaceColons(IPS_FIXED); replaceColons(IPS_FIXED);
replaceColons(IPS); replaceColons(IPS);
}
{ {
std::string dbPath; std::string dbPath;
@@ -509,6 +510,13 @@ Config::loadFromString(std::string const& fileContents)
NETWORK_ID = beast::lexicalCastThrow<uint32_t>(strTemp); NETWORK_ID = beast::lexicalCastThrow<uint32_t>(strTemp);
} }
if (getSingleSection(secConfig, SECTION_DATAGRAM_MONITOR, strTemp, j_))
{
std::vector<std::string> vecTemp{strTemp};
replaceColons(vecTemp);
DATAGRAM_MONITOR = vecTemp[0];
}
if (getSingleSection(secConfig, SECTION_PEER_PRIVATE, strTemp, j_)) if (getSingleSection(secConfig, SECTION_PEER_PRIVATE, strTemp, j_))
PEER_PRIVATE = beast::lexicalCastThrow<bool>(strTemp); PEER_PRIVATE = beast::lexicalCastThrow<bool>(strTemp);