mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 19:25:51 +00:00
Improve handling of peers that aren't synced:
When evaluating the fitness and usefulness of an outbound peer, the code would incorrectly calculate the amount of time that the peer spent in a non-useful state. This commit, if merged, corrects the calculation and makes the timeout values configurable by server operators. Two new options are introduced in the 'overlay' stanza of the config file. The default values, in seconds, are: [overlay] max_unknown_time = 600 max_diverged_time = 300
This commit is contained in:
@@ -489,6 +489,23 @@
|
||||
# single host from consuming all inbound slots. If the value is not
|
||||
# present the server will autoconfigure an appropriate limit.
|
||||
#
|
||||
# max_unknown_time = <number>
|
||||
#
|
||||
# The maximum amount of time, in seconds, that an outbound connection
|
||||
# is allowed to stay in the "unknown" tracking state. This option can
|
||||
# take any value between 300 and 1800 seconds, inclusive. If the option
|
||||
# is not present the server will autoconfigure an appropriate limit.
|
||||
#
|
||||
# The current default (which is subject to change) is 600 seconds.
|
||||
#
|
||||
# max_diverged_time = <number>
|
||||
#
|
||||
# The maximum amount of time, in seconds, that an outbound connection
|
||||
# is allowed to stay in the "diverged" tracking state. The option can
|
||||
# take any value between 60 and 900 seconds, inclusive. If the option
|
||||
# is not present the server will autoconfigure an appropriate limit.
|
||||
#
|
||||
# The current default (which is subject to change) is 300 seconds.
|
||||
#
|
||||
#
|
||||
# [transaction_queue] EXPERIMENTAL
|
||||
@@ -590,13 +607,13 @@
|
||||
#
|
||||
#-------------------------------------------------------------------------------
|
||||
#
|
||||
# 3. Ripple Protocol
|
||||
# 3. Protocol
|
||||
#
|
||||
#-------------------
|
||||
#
|
||||
# These settings affect the behavior of the server instance with respect
|
||||
# to Ripple payment protocol level activities such as validating and
|
||||
# closing ledgers or adjusting fees in response to server overloads.
|
||||
# to protocol level activities such as validating and closing ledgers
|
||||
# adjusting fees in response to server overloads.
|
||||
#
|
||||
#
|
||||
#
|
||||
@@ -617,12 +634,6 @@
|
||||
# Legal values are: "trusted" and "all". The default is "all".
|
||||
#
|
||||
#
|
||||
# [node_size]
|
||||
#
|
||||
# Tunes the servers based on the expected load and available memory. Legal
|
||||
# sizes are "tiny", "small", "medium", "large", and "huge". We recommend
|
||||
# you start at the default and raise the setting if you have extra memory.
|
||||
# The default is "tiny".
|
||||
#
|
||||
#
|
||||
#
|
||||
@@ -1191,6 +1202,14 @@
|
||||
#
|
||||
#-----------------
|
||||
#
|
||||
# [node_size]
|
||||
#
|
||||
# Tunes the servers based on the expected load and available memory. Legal
|
||||
# sizes are "tiny", "small", "medium", "large", and "huge". We recommend
|
||||
# you start at the default and raise the setting if you have extra memory.
|
||||
# If no value is specified, the code assumes the proper size is "tiny". The
|
||||
# default configuration file explicitly specifies "medium" as the size.
|
||||
#
|
||||
# [signing_support]
|
||||
#
|
||||
# Specifies whether the server will accept "sign" and "sign_for" commands
|
||||
|
||||
@@ -755,7 +755,7 @@ Ledger::walkLedger(beast::Journal j) const
|
||||
}
|
||||
|
||||
bool
|
||||
Ledger::assertSane(beast::Journal ledgerJ) const
|
||||
Ledger::assertSensible(beast::Journal ledgerJ) const
|
||||
{
|
||||
if (info_.hash.isNonZero() && info_.accountHash.isNonZero() && stateMap_ &&
|
||||
txMap_ && (info_.accountHash == stateMap_->getHash().as_uint256()) &&
|
||||
@@ -769,7 +769,7 @@ Ledger::assertSane(beast::Journal ledgerJ) const
|
||||
j[jss::accountTreeHash] = to_string(info_.accountHash);
|
||||
j[jss::transTreeHash] = to_string(info_.txHash);
|
||||
|
||||
JLOG(ledgerJ.fatal()) << "ledger is not sane" << j;
|
||||
JLOG(ledgerJ.fatal()) << "ledger is not sensible" << j;
|
||||
|
||||
assert(false);
|
||||
|
||||
|
||||
@@ -322,7 +322,7 @@ public:
|
||||
walkLedger(beast::Journal j) const;
|
||||
|
||||
bool
|
||||
assertSane(beast::Journal ledgerJ) const;
|
||||
assertSensible(beast::Journal ledgerJ) const;
|
||||
|
||||
void
|
||||
invariants() const;
|
||||
|
||||
@@ -404,8 +404,8 @@ LedgerMaster::addHeldTransaction(
|
||||
}
|
||||
|
||||
// Validate a ledger's close time and sequence number if we're considering
|
||||
// jumping to that ledger. This helps defend agains some rare hostile or
|
||||
// insane majority scenarios.
|
||||
// jumping to that ledger. This helps defend against some rare hostile or
|
||||
// diverged majority scenarios.
|
||||
bool
|
||||
LedgerMaster::canBeCurrent(std::shared_ptr<Ledger const> const& ledger)
|
||||
{
|
||||
@@ -964,9 +964,9 @@ LedgerMaster::checkAccept(uint256 const& hash, std::uint32_t seq)
|
||||
{
|
||||
if ((seq != 0) && (getValidLedgerIndex() == 0))
|
||||
{
|
||||
// Set peers sane early if we can
|
||||
// Set peers converged early if we can
|
||||
if (valCount >= app_.validators().quorum())
|
||||
app_.overlay().checkSanity(seq);
|
||||
app_.overlay().checkTracking(seq);
|
||||
}
|
||||
|
||||
// FIXME: We may not want to fetch a ledger with just one
|
||||
|
||||
@@ -2043,9 +2043,9 @@ ApplicationImp::loadOldLedger(
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!loadLedger->assertSane(journal("Ledger")))
|
||||
if (!loadLedger->assertSensible(journal("Ledger")))
|
||||
{
|
||||
JLOG(m_journal.fatal()) << "Ledger is not sane.";
|
||||
JLOG(m_journal.fatal()) << "Ledger is not sensible.";
|
||||
assert(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -205,6 +205,12 @@ public:
|
||||
|
||||
std::string SERVER_DOMAIN;
|
||||
|
||||
// How long can a peer remain in the "unknown" state
|
||||
std::chrono::seconds MAX_UNKNOWN_TIME{600};
|
||||
|
||||
// How long can a peer remain in the "diverged" state
|
||||
std::chrono::seconds MAX_DIVERGED_TIME{300};
|
||||
|
||||
public:
|
||||
Config() : j_{beast::Journal::getNullSink()}
|
||||
{
|
||||
|
||||
@@ -66,6 +66,7 @@ struct ConfigSection
|
||||
#define SECTION_NETWORK_QUORUM "network_quorum"
|
||||
#define SECTION_NODE_SEED "node_seed"
|
||||
#define SECTION_NODE_SIZE "node_size"
|
||||
#define SECTION_OVERLAY "overlay"
|
||||
#define SECTION_PATH_SEARCH_OLD "path_search_old"
|
||||
#define SECTION_PATH_SEARCH "path_search"
|
||||
#define SECTION_PATH_SEARCH_FAST "path_search_fast"
|
||||
|
||||
@@ -465,8 +465,7 @@ Config::loadFromString(std::string const& fileContents)
|
||||
|
||||
if (exists(SECTION_VALIDATION_SEED) && exists(SECTION_VALIDATOR_TOKEN))
|
||||
Throw<std::runtime_error>("Cannot have both [" SECTION_VALIDATION_SEED
|
||||
"] "
|
||||
"and [" SECTION_VALIDATOR_TOKEN
|
||||
"] and [" SECTION_VALIDATOR_TOKEN
|
||||
"] config sections");
|
||||
|
||||
if (getSingleSection(secConfig, SECTION_NETWORK_QUORUM, strTemp, j_))
|
||||
@@ -550,6 +549,51 @@ Config::loadFromString(std::string const& fileContents)
|
||||
SERVER_DOMAIN = strTemp;
|
||||
}
|
||||
|
||||
if (exists(SECTION_OVERLAY))
|
||||
{
|
||||
auto const sec = section(SECTION_OVERLAY);
|
||||
|
||||
using namespace std::chrono;
|
||||
|
||||
try
|
||||
{
|
||||
if (auto val = sec.get<std::string>("max_unknown_time"))
|
||||
MAX_UNKNOWN_TIME =
|
||||
seconds{beast::lexicalCastThrow<std::uint32_t>(*val)};
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid value 'max_unknown_time' in " SECTION_OVERLAY
|
||||
": must be of the form '<number>' representing seconds.");
|
||||
}
|
||||
|
||||
if (MAX_UNKNOWN_TIME < seconds{300} || MAX_UNKNOWN_TIME > seconds{1800})
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid value 'max_unknown_time' in " SECTION_OVERLAY
|
||||
": the time must be between 300 and 1800 seconds, inclusive.");
|
||||
|
||||
try
|
||||
{
|
||||
if (auto val = sec.get<std::string>("max_diverged_time"))
|
||||
MAX_DIVERGED_TIME =
|
||||
seconds{beast::lexicalCastThrow<std::uint32_t>(*val)};
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid value 'max_diverged_time' in " SECTION_OVERLAY
|
||||
": must be of the form '<number>' representing seconds.");
|
||||
}
|
||||
|
||||
if (MAX_DIVERGED_TIME < seconds{60} || MAX_DIVERGED_TIME > seconds{900})
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"Invalid value 'max_diverged_time' in " SECTION_OVERLAY
|
||||
": the time must be between 60 and 900 seconds, inclusive.");
|
||||
}
|
||||
}
|
||||
|
||||
if (getSingleSection(
|
||||
secConfig, SECTION_AMENDMENT_MAJORITY_TIME, strTemp, j_))
|
||||
{
|
||||
|
||||
@@ -120,11 +120,11 @@ public:
|
||||
virtual PeerSequence
|
||||
getActivePeers() const = 0;
|
||||
|
||||
/** Calls the checkSanity function on each peer
|
||||
@param index the value to pass to the peer's checkSanity function
|
||||
/** Calls the checkTracking function on each peer
|
||||
@param index the value to pass to the peer's checkTracking function
|
||||
*/
|
||||
virtual void
|
||||
checkSanity(std::uint32_t index) = 0;
|
||||
checkTracking(std::uint32_t index) = 0;
|
||||
|
||||
/** Calls the check function on each peer
|
||||
*/
|
||||
|
||||
@@ -1181,10 +1181,10 @@ OverlayImpl::getActivePeers() const
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::checkSanity(std::uint32_t index)
|
||||
OverlayImpl::checkTracking(std::uint32_t index)
|
||||
{
|
||||
for_each(
|
||||
[index](std::shared_ptr<PeerImp>&& sp) { sp->checkSanity(index); });
|
||||
[index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -195,7 +195,7 @@ public:
|
||||
void
|
||||
check() override;
|
||||
|
||||
void checkSanity(std::uint32_t) override;
|
||||
void checkTracking(std::uint32_t) override;
|
||||
|
||||
std::shared_ptr<Peer>
|
||||
findPeerByShortID(Peer::id_t const& id) const override;
|
||||
|
||||
@@ -81,9 +81,8 @@ PeerImp::PeerImp(
|
||||
, overlay_(overlay)
|
||||
, m_inbound(true)
|
||||
, protocol_(protocol)
|
||||
, state_(State::active)
|
||||
, sanity_(Sanity::unknown)
|
||||
, insaneTime_(clock_type::now())
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, creationTime_(clock_type::now())
|
||||
, usage_(consumer)
|
||||
@@ -102,8 +101,7 @@ PeerImp::~PeerImp()
|
||||
const bool inCluster{cluster()};
|
||||
|
||||
overlay_.deletePeer(id_);
|
||||
if (state_ == State::active)
|
||||
overlay_.onPeerDeactivate(id_);
|
||||
overlay_.onPeerDeactivate(id_);
|
||||
overlay_.peerFinder().on_closed(slot_);
|
||||
overlay_.remove(slot_);
|
||||
|
||||
@@ -172,17 +170,9 @@ PeerImp::run()
|
||||
}
|
||||
|
||||
if (m_inbound)
|
||||
{
|
||||
doAccept();
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(state_ == State::active);
|
||||
// XXX Set timer: connection is in grace period to be useful.
|
||||
// XXX Set timer: connection idle (idle may vary depending on connection
|
||||
// type.)
|
||||
doProtocolStart();
|
||||
}
|
||||
|
||||
// Request shard info from peer
|
||||
protocol::TMGetPeerShardInfo tmGPS;
|
||||
@@ -359,17 +349,17 @@ PeerImp::json()
|
||||
ret[jss::complete_ledgers] =
|
||||
std::to_string(minSeq) + " - " + std::to_string(maxSeq);
|
||||
|
||||
switch (sanity_.load())
|
||||
switch (tracking_.load())
|
||||
{
|
||||
case Sanity::insane:
|
||||
ret[jss::sanity] = "insane";
|
||||
case Tracking::diverged:
|
||||
ret[jss::track] = "diverged";
|
||||
break;
|
||||
|
||||
case Sanity::unknown:
|
||||
ret[jss::sanity] = "unknown";
|
||||
case Tracking::unknown:
|
||||
ret[jss::track] = "unknown";
|
||||
break;
|
||||
|
||||
case Sanity::sane:
|
||||
case Tracking::converged:
|
||||
// Nothing to do here
|
||||
break;
|
||||
}
|
||||
@@ -447,7 +437,7 @@ PeerImp::hasLedger(uint256 const& hash, std::uint32_t seq) const
|
||||
{
|
||||
std::lock_guard sl(recentLock_);
|
||||
if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
|
||||
(sanity_.load() == Sanity::sane))
|
||||
(tracking_.load() == Tracking::converged))
|
||||
return true;
|
||||
if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
|
||||
recentLedgers_.end())
|
||||
@@ -499,7 +489,7 @@ bool
|
||||
PeerImp::hasRange(std::uint32_t uMin, std::uint32_t uMax)
|
||||
{
|
||||
std::lock_guard sl(recentLock_);
|
||||
return (sanity_ != Sanity::insane) && (uMin >= minLedger_) &&
|
||||
return (tracking_ != Tracking::diverged) && (uMin >= minLedger_) &&
|
||||
(uMax <= maxLedger_);
|
||||
}
|
||||
|
||||
@@ -1378,9 +1368,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfo> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
||||
{
|
||||
// Don't allow endpoints from peers that are not known sane or are
|
||||
// Don't allow endpoints from peers that are not known tracking or are
|
||||
// not using a version of the message that we support:
|
||||
if (sanity_.load() != Sanity::sane || m->version() != 2)
|
||||
if (tracking_.load() != Tracking::converged || m->version() != 2)
|
||||
return;
|
||||
|
||||
std::vector<PeerFinder::Endpoint> endpoints;
|
||||
@@ -1415,7 +1405,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
|
||||
{
|
||||
if (sanity_.load() == Sanity::insane)
|
||||
if (tracking_.load() == Tracking::diverged)
|
||||
return;
|
||||
|
||||
if (app_.getOPs().isNeedNetworkLedger())
|
||||
@@ -1630,21 +1620,22 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
|
||||
if (!isTrusted)
|
||||
{
|
||||
if (sanity_.load() == Sanity::insane)
|
||||
if (tracking_.load() == Tracking::diverged)
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "Proposal: Dropping UNTRUSTED (insane)";
|
||||
JLOG(p_journal_.debug())
|
||||
<< "Proposal: Dropping untrusted (peer divergence)";
|
||||
return;
|
||||
}
|
||||
|
||||
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "Proposal: Dropping UNTRUSTED (load)";
|
||||
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
JLOG(p_journal_.trace())
|
||||
<< "Proposal: " << (isTrusted ? "trusted" : "UNTRUSTED");
|
||||
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
|
||||
|
||||
auto proposal = RCLCxPeerPos(
|
||||
publicKey,
|
||||
@@ -1765,7 +1756,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
if (m->has_ledgerseq() &&
|
||||
app_.getLedgerMaster().getValidatedLedgerAge() < 2min)
|
||||
{
|
||||
checkSanity(
|
||||
checkTracking(
|
||||
m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
|
||||
}
|
||||
|
||||
@@ -1844,11 +1835,11 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::checkSanity(std::uint32_t validationSeq)
|
||||
PeerImp::checkTracking(std::uint32_t validationSeq)
|
||||
{
|
||||
std::uint32_t serverSeq;
|
||||
{
|
||||
// Extract the seqeuence number of the highest
|
||||
// Extract the sequence number of the highest
|
||||
// ledger this peer has
|
||||
std::lock_guard sl(recentLock_);
|
||||
|
||||
@@ -1858,29 +1849,29 @@ PeerImp::checkSanity(std::uint32_t validationSeq)
|
||||
{
|
||||
// Compare the peer's ledger sequence to the
|
||||
// sequence of a recently-validated ledger
|
||||
checkSanity(serverSeq, validationSeq);
|
||||
checkTracking(serverSeq, validationSeq);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::checkSanity(std::uint32_t seq1, std::uint32_t seq2)
|
||||
PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
|
||||
{
|
||||
int diff = std::max(seq1, seq2) - std::min(seq1, seq2);
|
||||
|
||||
if (diff < Tuning::saneLedgerLimit)
|
||||
if (diff < Tuning::convergedLedgerLimit)
|
||||
{
|
||||
// The peer's ledger sequence is close to the validation's
|
||||
sanity_ = Sanity::sane;
|
||||
tracking_ = Tracking::converged;
|
||||
}
|
||||
|
||||
if ((diff > Tuning::insaneLedgerLimit) &&
|
||||
(sanity_.load() != Sanity::insane))
|
||||
if ((diff > Tuning::divergedLedgerLimit) &&
|
||||
(tracking_.load() != Tracking::diverged))
|
||||
{
|
||||
// The peer's ledger sequence is way off the validation's
|
||||
std::lock_guard sl(recentLock_);
|
||||
|
||||
sanity_ = Sanity::insane;
|
||||
insaneTime_ = clock_type::now();
|
||||
tracking_ = Tracking::diverged;
|
||||
trackingTime_ = clock_type::now();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1889,25 +1880,28 @@ PeerImp::checkSanity(std::uint32_t seq1, std::uint32_t seq2)
|
||||
void
|
||||
PeerImp::check()
|
||||
{
|
||||
if (m_inbound || (sanity_.load() == Sanity::sane))
|
||||
if (m_inbound)
|
||||
return;
|
||||
|
||||
clock_type::time_point insaneTime;
|
||||
auto const sanity = tracking_.load();
|
||||
|
||||
if (sanity == Tracking::converged)
|
||||
return;
|
||||
|
||||
clock_type::duration duration;
|
||||
|
||||
{
|
||||
std::lock_guard sl(recentLock_);
|
||||
|
||||
insaneTime = insaneTime_;
|
||||
duration = clock_type::now() - trackingTime_;
|
||||
}
|
||||
|
||||
bool reject = false;
|
||||
|
||||
if (sanity_.load() == Sanity::insane)
|
||||
reject = (insaneTime - clock_type::now()) >
|
||||
std::chrono::seconds(Tuning::maxInsaneTime);
|
||||
if (sanity == Tracking::diverged)
|
||||
reject = (duration > app_.config().MAX_DIVERGED_TIME);
|
||||
|
||||
if (sanity_.load() == Sanity::unknown)
|
||||
reject = (insaneTime - clock_type::now()) >
|
||||
std::chrono::seconds(Tuning::maxUnknownTime);
|
||||
if (sanity == Tracking::unknown)
|
||||
reject = (duration > app_.config().MAX_UNKNOWN_TIME);
|
||||
|
||||
if (reject)
|
||||
{
|
||||
@@ -2141,10 +2135,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
auto const isTrusted =
|
||||
app_.validators().trusted(val->getSignerPublic());
|
||||
|
||||
if (!isTrusted && (sanity_.load() == Sanity::insane))
|
||||
if (!isTrusted && (tracking_.load() == Tracking::diverged))
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "Validation: dropping untrusted from insane peer";
|
||||
<< "Validation: dropping untrusted from diverged peer";
|
||||
}
|
||||
if (isTrusted || cluster() || !app_.getFeeTrack().isLoadedLocal())
|
||||
{
|
||||
|
||||
@@ -48,30 +48,8 @@ class PeerImp : public Peer,
|
||||
public OverlayImpl::Child
|
||||
{
|
||||
public:
|
||||
/** Type of connection.
|
||||
This affects how messages are routed.
|
||||
*/
|
||||
enum class Type { legacy, leaf, peer };
|
||||
|
||||
/** Current state */
|
||||
enum class State {
|
||||
/** A connection is being established (outbound) */
|
||||
connecting
|
||||
|
||||
/** Connection has been successfully established */
|
||||
,
|
||||
connected
|
||||
|
||||
/** Handshake has been received from this peer */
|
||||
,
|
||||
handshaked
|
||||
|
||||
/** Running the Ripple protocol actively */
|
||||
,
|
||||
active
|
||||
};
|
||||
|
||||
enum class Sanity { insane, unknown, sane };
|
||||
/** Whether the peer's view of the ledger converges or diverges from ours */
|
||||
enum class Tracking { diverged, unknown, converged };
|
||||
|
||||
struct ShardInfo
|
||||
{
|
||||
@@ -105,8 +83,6 @@ private:
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
// Type type_ = Type::legacy;
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
@@ -119,9 +95,8 @@ private:
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
|
||||
State state_; // Current state
|
||||
std::atomic<Sanity> sanity_;
|
||||
clock_type::time_point insaneTime_;
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
@@ -162,7 +137,7 @@ private:
|
||||
// o maxLedger_
|
||||
// o recentLedgers_
|
||||
// o recentTxSets_
|
||||
// o insaneTime_
|
||||
// o trackingTime_
|
||||
// o latency_
|
||||
//
|
||||
// The following variables are being protected preemptively:
|
||||
@@ -330,17 +305,14 @@ public:
|
||||
bool
|
||||
cluster() const override;
|
||||
|
||||
void
|
||||
check();
|
||||
|
||||
/** Check if the peer is sane
|
||||
/** Check if the peer is tracking
|
||||
@param validationSeq The ledger sequence of a recently-validated ledger
|
||||
*/
|
||||
void
|
||||
checkSanity(std::uint32_t validationSeq);
|
||||
checkTracking(std::uint32_t validationSeq);
|
||||
|
||||
void
|
||||
checkSanity(std::uint32_t seq1, std::uint32_t seq2);
|
||||
checkTracking(std::uint32_t seq1, std::uint32_t seq2);
|
||||
|
||||
PublicKey const&
|
||||
getNodePublic() const override
|
||||
@@ -561,18 +533,6 @@ public:
|
||||
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
|
||||
|
||||
private:
|
||||
State
|
||||
state() const
|
||||
{
|
||||
return state_;
|
||||
}
|
||||
|
||||
void
|
||||
state(State new_state)
|
||||
{
|
||||
state_ = new_state;
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// lockedRecentLock is passed as a reminder to callers that recentLock_
|
||||
// must be locked.
|
||||
@@ -635,9 +595,8 @@ PeerImp::PeerImp(
|
||||
, overlay_(overlay)
|
||||
, m_inbound(false)
|
||||
, protocol_(protocol)
|
||||
, state_(State::active)
|
||||
, sanity_(Sanity::unknown)
|
||||
, insaneTime_(clock_type::now())
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, creationTime_(clock_type::now())
|
||||
, usage_(usage)
|
||||
|
||||
@@ -30,21 +30,13 @@ enum {
|
||||
/** Size of buffer used to read from the socket. */
|
||||
readBufferBytes = 4096,
|
||||
|
||||
/** How long a server can remain insane before we
|
||||
disconnected it (if outbound) */
|
||||
maxInsaneTime = 60,
|
||||
|
||||
/** How long a server can remain unknown before we
|
||||
disconnect it (if outbound) */
|
||||
maxUnknownTime = 300,
|
||||
|
||||
/** How many ledgers off a server can be and we will
|
||||
still consider it sane */
|
||||
saneLedgerLimit = 24,
|
||||
still consider it converged */
|
||||
convergedLedgerLimit = 24,
|
||||
|
||||
/** How many ledgers off a server has to be before we
|
||||
consider it insane */
|
||||
insaneLedgerLimit = 128,
|
||||
consider it diverged */
|
||||
divergedLedgerLimit = 128,
|
||||
|
||||
/** The maximum number of ledger entries in a single
|
||||
reply */
|
||||
|
||||
@@ -459,7 +459,6 @@ JSS(role); // out: Ping.cpp
|
||||
JSS(rpc);
|
||||
JSS(rt_accounts); // in: Subscribe, Unsubscribe
|
||||
JSS(running_duration_us);
|
||||
JSS(sanity); // out: PeerImp
|
||||
JSS(search_depth); // in: RipplePathFind
|
||||
JSS(searched_all); // out: Tx
|
||||
JSS(secret); // in: TransactionSign,
|
||||
@@ -520,6 +519,7 @@ JSS(ticket_count); // out: AccountInfo
|
||||
JSS(ticket_seq); // in: LedgerEntry
|
||||
JSS(time);
|
||||
JSS(timeouts); // out: InboundLedger
|
||||
JSS(track); // out: PeerImp
|
||||
JSS(traffic); // out: Overlay
|
||||
JSS(total); // out: counters
|
||||
JSS(totalCoins); // out: LedgerToJson
|
||||
|
||||
@@ -32,17 +32,33 @@ doPeers(RPC::JsonContext& context)
|
||||
{
|
||||
Json::Value jvResult(Json::objectValue);
|
||||
|
||||
jvResult[jss::peers] = context.app.overlay().json();
|
||||
|
||||
// Legacy support
|
||||
if (context.apiVersion == 1)
|
||||
{
|
||||
jvResult[jss::peers] = context.app.overlay().json();
|
||||
for (auto& p : jvResult[jss::peers])
|
||||
{
|
||||
if (p.isMember(jss::track))
|
||||
{
|
||||
auto const s = p[jss::track].asString();
|
||||
|
||||
auto const now = context.app.timeKeeper().now();
|
||||
auto const self = context.app.nodeIdentity().first;
|
||||
if (s == "diverged")
|
||||
p["sanity"] = "insane";
|
||||
else if (s == "unknown")
|
||||
p["sanity"] = "unknown";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Json::Value& cluster = (jvResult[jss::cluster] = Json::objectValue);
|
||||
std::uint32_t ref = context.app.getFeeTrack().getLoadBase();
|
||||
auto const now = context.app.timeKeeper().now();
|
||||
auto const self = context.app.nodeIdentity().first;
|
||||
|
||||
context.app.cluster().for_each([&cluster, now, ref, &self](
|
||||
ClusterNode const& node) {
|
||||
Json::Value& cluster = (jvResult[jss::cluster] = Json::objectValue);
|
||||
std::uint32_t ref = context.app.getFeeTrack().getLoadBase();
|
||||
|
||||
context.app.cluster().for_each(
|
||||
[&cluster, now, ref, &self](ClusterNode const& node) {
|
||||
if (node.identity() == self)
|
||||
return;
|
||||
|
||||
@@ -60,7 +76,6 @@ doPeers(RPC::JsonContext& context)
|
||||
? 0
|
||||
: (now - node.getReportTime()).count();
|
||||
});
|
||||
}
|
||||
|
||||
return jvResult;
|
||||
}
|
||||
|
||||
@@ -1065,6 +1065,80 @@ r.ripple.com 51235
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testOverlay()
|
||||
{
|
||||
testcase("overlay: unknown time");
|
||||
|
||||
auto testUnknown =
|
||||
[](std::string value) -> std::optional<std::chrono::seconds> {
|
||||
try
|
||||
{
|
||||
Config c;
|
||||
c.loadFromString("[overlay]\nmax_unknown_time=" + value);
|
||||
return c.MAX_UNKNOWN_TIME;
|
||||
}
|
||||
catch (std::runtime_error&)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
// Failures
|
||||
BEAST_EXPECT(!testUnknown("none"));
|
||||
BEAST_EXPECT(!testUnknown("0.5"));
|
||||
BEAST_EXPECT(!testUnknown("180 seconds"));
|
||||
BEAST_EXPECT(!testUnknown("9 minutes"));
|
||||
|
||||
// Below lower bound
|
||||
BEAST_EXPECT(!testUnknown("299"));
|
||||
|
||||
// In bounds
|
||||
BEAST_EXPECT(testUnknown("300") == std::chrono::seconds{300});
|
||||
BEAST_EXPECT(testUnknown("301") == std::chrono::seconds{301});
|
||||
BEAST_EXPECT(testUnknown("1799") == std::chrono::seconds{1799});
|
||||
BEAST_EXPECT(testUnknown("1800") == std::chrono::seconds{1800});
|
||||
|
||||
// Above upper bound
|
||||
BEAST_EXPECT(!testUnknown("1801"));
|
||||
|
||||
testcase("overlay: diverged time");
|
||||
|
||||
// In bounds:
|
||||
auto testDiverged =
|
||||
[](std::string value) -> std::optional<std::chrono::seconds> {
|
||||
try
|
||||
{
|
||||
Config c;
|
||||
c.loadFromString("[overlay]\nmax_diverged_time=" + value);
|
||||
return c.MAX_DIVERGED_TIME;
|
||||
}
|
||||
catch (std::runtime_error&)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
// Failures
|
||||
BEAST_EXPECT(!testDiverged("none"));
|
||||
BEAST_EXPECT(!testDiverged("0.5"));
|
||||
BEAST_EXPECT(!testDiverged("180 seconds"));
|
||||
BEAST_EXPECT(!testDiverged("9 minutes"));
|
||||
|
||||
// Below lower bound
|
||||
BEAST_EXPECT(!testDiverged("0"));
|
||||
BEAST_EXPECT(!testDiverged("59"));
|
||||
|
||||
// In bounds
|
||||
BEAST_EXPECT(testDiverged("60") == std::chrono::seconds{60});
|
||||
BEAST_EXPECT(testDiverged("61") == std::chrono::seconds{61});
|
||||
BEAST_EXPECT(testDiverged("899") == std::chrono::seconds{899});
|
||||
BEAST_EXPECT(testDiverged("900") == std::chrono::seconds{900});
|
||||
|
||||
// Above upper bound
|
||||
BEAST_EXPECT(!testDiverged("901"));
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
@@ -1079,6 +1153,7 @@ r.ripple.com 51235
|
||||
testComments();
|
||||
testGetters();
|
||||
testAmendment();
|
||||
testOverlay();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -707,7 +707,7 @@ struct Peer
|
||||
|
||||
// Basic Sequence number router
|
||||
// A message that will be flooded across the network is tagged with a
|
||||
// seqeuence number by the origin node in a BroadcastMesg. Receivers will
|
||||
// sequence number by the origin node in a BroadcastMesg. Receivers will
|
||||
// ignore a message as stale if they've already processed a newer sequence
|
||||
// number, or will process and potentially relay the message along.
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user