mirror of
https://github.com/Xahau/xahaud.git
synced 2026-04-29 15:37:46 +00:00
fix: harden RuntimeConfig validation and add startup diagnostics
- Error on unknown message_types instead of silently widening scope - Make messageCategories optional so per-peer can override global filter to "all categories" (nullopt=inherit, empty set=explicitly all) - Clamp send_drop_pct to 0-100% range - Add STARTDIAG: logging for consensus startup diagnostics - Add 3 test cases (11 total, 58 assertions)
This commit is contained in:
@@ -285,13 +285,109 @@ class RuntimeConfig_test : public beast::unit_test::suite
|
||||
if (!BEAST_EXPECT(cfg.has_value()))
|
||||
return;
|
||||
|
||||
BEAST_EXPECT(cfg->messageCategories.empty());
|
||||
BEAST_EXPECT(!cfg->messageCategories.has_value());
|
||||
BEAST_EXPECT(cfg->appliesTo(TrafficCount::category::proposal));
|
||||
BEAST_EXPECT(cfg->appliesTo(TrafficCount::category::validation));
|
||||
BEAST_EXPECT(cfg->appliesTo(TrafficCount::category::transaction));
|
||||
BEAST_EXPECT(cfg->appliesTo(TrafficCount::category::base));
|
||||
}
|
||||
|
||||
void
|
||||
testInvalidMessageType()
|
||||
{
|
||||
testcase("Invalid message type returns error");
|
||||
using namespace test::jtx;
|
||||
Env env{*this};
|
||||
|
||||
Json::Value params;
|
||||
params["set"] = Json::objectValue;
|
||||
params["set"]["*"] = Json::objectValue;
|
||||
params["set"]["*"]["send_delay_ms"] = 100;
|
||||
params["set"]["*"]["message_types"] = Json::arrayValue;
|
||||
params["set"]["*"]["message_types"].append("proposals"); // typo
|
||||
auto result = runtimeConfig(env, params);
|
||||
|
||||
BEAST_EXPECT(result.isMember("error"));
|
||||
BEAST_EXPECT(result["error"].asString() == "invalidParams");
|
||||
// Config should NOT have been applied
|
||||
BEAST_EXPECT(!env.app().getRuntimeConfig().active());
|
||||
}
|
||||
|
||||
void
|
||||
testDropPctClamping()
|
||||
{
|
||||
testcase("send_drop_pct clamped to 0-100");
|
||||
using namespace test::jtx;
|
||||
Env env{*this};
|
||||
|
||||
// Over 100
|
||||
{
|
||||
Json::Value params;
|
||||
params["set"] = Json::objectValue;
|
||||
params["set"]["*"] = Json::objectValue;
|
||||
params["set"]["*"]["send_drop_pct"] = 200.0;
|
||||
runtimeConfig(env, params);
|
||||
}
|
||||
auto cfg = env.app().getRuntimeConfig().getConfig("*");
|
||||
BEAST_EXPECT(cfg.has_value());
|
||||
BEAST_EXPECT(cfg->sendDropPctX100 == 10000); // clamped to 100%
|
||||
|
||||
// Negative
|
||||
{
|
||||
Json::Value params;
|
||||
params["set"] = Json::objectValue;
|
||||
params["set"]["*"] = Json::objectValue;
|
||||
params["set"]["*"]["send_drop_pct"] = -50.0;
|
||||
runtimeConfig(env, params);
|
||||
}
|
||||
cfg = env.app().getRuntimeConfig().getConfig("*");
|
||||
BEAST_EXPECT(cfg.has_value());
|
||||
BEAST_EXPECT(cfg->sendDropPctX100 == 0); // clamped to 0%
|
||||
}
|
||||
|
||||
void
|
||||
testPerPeerClearInheritedFilter()
|
||||
{
|
||||
testcase("Per-peer can override global filter to all");
|
||||
using namespace test::jtx;
|
||||
Env env{*this};
|
||||
|
||||
// Global: only proposals
|
||||
{
|
||||
Json::Value params;
|
||||
params["set"] = Json::objectValue;
|
||||
params["set"]["*"] = Json::objectValue;
|
||||
params["set"]["*"]["send_delay_ms"] = 100;
|
||||
params["set"]["*"]["message_types"] = Json::arrayValue;
|
||||
params["set"]["*"]["message_types"].append("proposal");
|
||||
runtimeConfig(env, params);
|
||||
}
|
||||
|
||||
// Per-peer: message_types = [] (explicitly all)
|
||||
{
|
||||
Json::Value params;
|
||||
params["set"] = Json::objectValue;
|
||||
params["set"]["10.0.0.2:51235"] = Json::objectValue;
|
||||
params["set"]["10.0.0.2:51235"]["message_types"] = Json::arrayValue;
|
||||
runtimeConfig(env, params);
|
||||
}
|
||||
|
||||
auto& rc = env.app().getRuntimeConfig();
|
||||
|
||||
// Per-peer should apply to all categories (empty set override)
|
||||
auto peerCfg = rc.getConfig("10.0.0.2:51235");
|
||||
BEAST_EXPECT(peerCfg.has_value());
|
||||
BEAST_EXPECT(peerCfg->appliesTo(TrafficCount::category::proposal));
|
||||
BEAST_EXPECT(peerCfg->appliesTo(TrafficCount::category::validation));
|
||||
BEAST_EXPECT(peerCfg->appliesTo(TrafficCount::category::transaction));
|
||||
|
||||
// Other peers still only get proposal filter from global
|
||||
auto otherCfg = rc.getConfig("10.0.0.3:51235");
|
||||
BEAST_EXPECT(otherCfg.has_value());
|
||||
BEAST_EXPECT(otherCfg->appliesTo(TrafficCount::category::proposal));
|
||||
BEAST_EXPECT(!otherCfg->appliesTo(TrafficCount::category::validation));
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
@@ -304,6 +400,9 @@ public:
|
||||
testPerPeerWithoutGlobal();
|
||||
testMessageTypeFilter();
|
||||
testMessageTypeFilterEmpty();
|
||||
testInvalidMessageType();
|
||||
testDropPctClamping();
|
||||
testPerPeerClearInheritedFilter();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1168,8 +1168,17 @@ RCLConsensus::Adaptor::preStartRound(
|
||||
!nowTrusted.empty())
|
||||
nUnlVote_.newValidators(prevLgr.seq() + 1, nowTrusted);
|
||||
|
||||
bool const proposing = validating_ && synced;
|
||||
|
||||
JLOG(j_.info()) << "STARTDIAG: preStartRound"
|
||||
<< " mode=" << app_.getOPs().strOperatingMode()
|
||||
<< " synced=" << (synced ? "yes" : "no")
|
||||
<< " validating=" << (validating_ ? "yes" : "no")
|
||||
<< " proposing=" << (proposing ? "yes" : "no")
|
||||
<< " seq=" << (prevLgr.seq() + 1);
|
||||
|
||||
// propose only if we're in sync with the network (and validating)
|
||||
return validating_ && synced;
|
||||
return proposing;
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -1208,7 +1217,11 @@ void
|
||||
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
|
||||
{
|
||||
if (!positions && app_.getOPs().isFull())
|
||||
{
|
||||
JLOG(j_.warn()) << "STARTDIAG: updateOperatingMode demoting"
|
||||
<< " FULL->CONNECTED positions=" << positions;
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED);
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -39,16 +39,17 @@ struct ConfigVals
|
||||
std::optional<int> sendDelayMs;
|
||||
std::optional<int> sendDelayJitterMs;
|
||||
std::optional<int> sendDropPctX100; // 0-10000 (pct * 100, avoids float)
|
||||
// If set, only apply to these TrafficCount::category values.
|
||||
// Empty set = no filter (apply to all messages).
|
||||
std::set<std::size_t> messageCategories;
|
||||
// If set (non-nullopt), only apply to these TrafficCount::category values.
|
||||
// nullopt = not specified (inherit from global on merge).
|
||||
// Empty set = explicitly "all categories" (overrides global filter).
|
||||
std::optional<std::set<std::size_t>> messageCategories;
|
||||
|
||||
/** Check if this config applies to a given message category. */
|
||||
bool
|
||||
appliesTo(std::size_t category) const
|
||||
{
|
||||
return messageCategories.empty() ||
|
||||
messageCategories.count(category) > 0;
|
||||
return !messageCategories || messageCategories->empty() ||
|
||||
messageCategories->count(category) > 0;
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -70,7 +71,7 @@ struct ConfigVals
|
||||
result.sendDelayJitterMs = other.sendDelayJitterMs;
|
||||
if (other.sendDropPctX100)
|
||||
result.sendDropPctX100 = other.sendDropPctX100;
|
||||
if (!other.messageCategories.empty())
|
||||
if (other.messageCategories)
|
||||
result.messageCategories = other.messageCategories;
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -829,8 +829,12 @@ Consensus<Adaptor>::peerProposalInternal(
|
||||
|
||||
if (newPeerProp.prevLedger() != prevLedgerID_)
|
||||
{
|
||||
JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger()
|
||||
<< " but we are on " << prevLedgerID_;
|
||||
JLOG(j_.info()) << "STARTDIAG: peerProposal REJECTED prevLedger"
|
||||
<< " peer=" << newPeerProp.nodeID()
|
||||
<< " theirPrev=" << newPeerProp.prevLedger()
|
||||
<< " ourPrev=" << prevLedgerID_
|
||||
<< " phase=" << to_string(phase_)
|
||||
<< " seq=" << newPeerProp.proposeSeq();
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -871,9 +875,18 @@ Consensus<Adaptor>::peerProposalInternal(
|
||||
}
|
||||
|
||||
if (peerPosIt != currPeerPositions_.end())
|
||||
{
|
||||
peerPosIt->second = newPeerPos;
|
||||
}
|
||||
else
|
||||
{
|
||||
currPeerPositions_.emplace(peerID, newPeerPos);
|
||||
JLOG(j_.info()) << "STARTDIAG: peerProposal ACCEPTED"
|
||||
<< " peer=" << peerID
|
||||
<< " peerPositions=" << currPeerPositions_.size()
|
||||
<< " seq=" << newPeerProp.proposeSeq()
|
||||
<< " phase=" << to_string(phase_);
|
||||
}
|
||||
}
|
||||
|
||||
// Harvest RNG data from proposal if adaptor supports it
|
||||
@@ -1698,6 +1711,11 @@ Consensus<Adaptor>::phaseEstablish(
|
||||
}
|
||||
//@@end rng-phase-establish-substates
|
||||
|
||||
JLOG(j_.info()) << "STARTDIAG: converge cutoff"
|
||||
<< " peerPositions=" << currPeerPositions_.size()
|
||||
<< " roundTime=" << result_->roundTime.read().count()
|
||||
<< "ms"
|
||||
<< " mode=" << to_string(mode_.get());
|
||||
JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size()
|
||||
<< " participants)";
|
||||
CLOG(clog) << "Converge cutoff (" << currPeerPositions_.size()
|
||||
@@ -2013,8 +2031,14 @@ Consensus<Adaptor>::haveConsensus(
|
||||
auto currentFinished =
|
||||
adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
|
||||
|
||||
JLOG(j_.debug()) << "Checking for TX consensus: agree=" << agree
|
||||
<< ", disagree=" << disagree;
|
||||
JLOG(j_.info()) << "STARTDIAG: haveConsensus"
|
||||
<< " agree=" << agree << " disagree=" << disagree
|
||||
<< " total=" << (agree + disagree)
|
||||
<< " peerPositions=" << currPeerPositions_.size()
|
||||
<< " prevProposers=" << prevProposers_
|
||||
<< " roundTime=" << result_->roundTime.read().count()
|
||||
<< "ms"
|
||||
<< " mode=" << to_string(mode_.get());
|
||||
|
||||
// Determine if we actually have consensus or not
|
||||
result_->state = checkConsensus(
|
||||
|
||||
@@ -82,14 +82,34 @@ doRuntimeConfig(RPC::JsonContext& context)
|
||||
if (v.isMember("send_delay_jitter_ms"))
|
||||
cfg.sendDelayJitterMs = v["send_delay_jitter_ms"].asInt();
|
||||
if (v.isMember("send_drop_pct"))
|
||||
cfg.sendDropPctX100 =
|
||||
static_cast<int>(v["send_drop_pct"].asDouble() * 100);
|
||||
if (v.isMember("message_types") && v["message_types"].isArray())
|
||||
{
|
||||
for (auto const& mt : v["message_types"])
|
||||
auto pct = v["send_drop_pct"].asDouble();
|
||||
if (pct < 0.0)
|
||||
pct = 0.0;
|
||||
else if (pct > 100.0)
|
||||
pct = 100.0;
|
||||
cfg.sendDropPctX100 = static_cast<int>(pct * 100);
|
||||
}
|
||||
if (v.isMember("message_types"))
|
||||
{
|
||||
auto const& mts = v["message_types"];
|
||||
cfg.messageCategories.emplace(); // set to empty = "all"
|
||||
if (mts.isArray())
|
||||
{
|
||||
if (auto cat = categoryFromName(mt.asString()))
|
||||
cfg.messageCategories.insert(*cat);
|
||||
for (auto const& mt : mts)
|
||||
{
|
||||
auto const name = mt.asString();
|
||||
auto cat = categoryFromName(name);
|
||||
if (!cat)
|
||||
{
|
||||
Json::Value err{Json::objectValue};
|
||||
err["error"] = "invalidParams";
|
||||
err["error_message"] =
|
||||
"Unknown message_type: " + name;
|
||||
return err;
|
||||
}
|
||||
cfg.messageCategories->insert(*cat);
|
||||
}
|
||||
}
|
||||
}
|
||||
rc.setConfig(target, cfg);
|
||||
@@ -123,10 +143,10 @@ doRuntimeConfig(RPC::JsonContext& context)
|
||||
entry["send_delay_jitter_ms"] = *cfg.sendDelayJitterMs;
|
||||
if (cfg.sendDropPctX100)
|
||||
entry["send_drop_pct"] = *cfg.sendDropPctX100 / 100.0;
|
||||
if (!cfg.messageCategories.empty())
|
||||
if (cfg.messageCategories)
|
||||
{
|
||||
Json::Value types{Json::arrayValue};
|
||||
for (auto cat : cfg.messageCategories)
|
||||
for (auto cat : *cfg.messageCategories)
|
||||
types.append(categoryToName(cat));
|
||||
entry["message_types"] = types;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user