mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-10 04:06:59 +00:00
Compare commits
6 Commits
bthomee/di
...
pratik/ran
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fb75b4f70c | ||
|
|
809b617bb7 | ||
|
|
0480d951e6 | ||
|
|
14fef306dd | ||
|
|
d3d2cf0c9a | ||
|
|
87eb3fcf3b |
@@ -534,7 +534,62 @@ public:
|
||||
std::pair<T, int>
|
||||
normalizeToRange() const;
|
||||
|
||||
/** Normalize raw (mantissa, exponent) integers directly to a target range.
|
||||
*
|
||||
* This is the construction-time counterpart of the member overload above.
|
||||
* Callers that hold raw integers (e.g. IOUAmount) and want them in a
|
||||
* narrow range would otherwise build a Number (one normalize pass to the
|
||||
* default kRange) and then call the member normalizeToRange (a second pass
|
||||
* down to the narrow range). This overload does a single pass: it converts
|
||||
* the signed mantissa to its internal magnitude and normalizes straight to
|
||||
* [MinMantissa, MaxMantissa], building no intermediate Number.
|
||||
*
|
||||
* Data flow (single pass), contrasted with the old two-pass path:
|
||||
*
|
||||
* two-pass: (m,e) --build Number--> [kRange/Large] --member--> [Min,Max]
|
||||
* one-pass: (m,e) -------------- normalize --------------> [Min,Max]
|
||||
*
|
||||
* @tparam MinMantissa Lower bound of the target mantissa range; must be a
|
||||
* positive power of ten.
|
||||
* @tparam MaxMantissa Upper bound; must equal MinMantissa * 10 - 1.
|
||||
* @tparam T Result mantissa type, int64_t or uint64_t. Defaults
|
||||
* to the type of MinMantissa.
|
||||
* @param mantissa Raw signed mantissa (sign is extracted internally).
|
||||
* @param exponent Raw exponent.
|
||||
* @return The normalized (mantissa, exponent) pair in the target range.
|
||||
* A zero mantissa is returned as {mantissa=0, exponent=0, negative=false}.
|
||||
* @note The result is bit-identical to the two-pass path: an intermediate
|
||||
* pass to a strictly wider range cannot change the final
|
||||
* narrower-range result.
|
||||
* @note Thread-safety: reads the thread-local rounding mode only; holds no
|
||||
* shared state of its own. Safe to call concurrently.
|
||||
*
|
||||
* Example (IOU range, 10^15 .. 10^16-1):
|
||||
* @code
|
||||
* auto [m, e] = Number::normalizeToRange<1'000'000'000'000'000,
|
||||
* 9'999'999'999'999'999>(1, 0);
|
||||
* // m == 1'000'000'000'000'000, e == -15
|
||||
* @endcode
|
||||
*/
|
||||
template <
|
||||
auto MinMantissa,
|
||||
auto MaxMantissa,
|
||||
Integral64 T = std::decay_t<decltype(MinMantissa)>>
|
||||
[[nodiscard]]
|
||||
static std::pair<T, int>
|
||||
normalizeToRange(rep mantissa, int exponent);
|
||||
|
||||
private:
|
||||
// Shared implementation for both normalizeToRange overloads. Takes the sign
|
||||
// and internal (uint64) magnitude already separated, normalizes in place to
|
||||
// [MinMantissa, MaxMantissa], and returns the signed (mantissa, exponent).
|
||||
template <
|
||||
auto MinMantissa,
|
||||
auto MaxMantissa,
|
||||
Integral64 T = std::decay_t<decltype(MinMantissa)>>
|
||||
static std::pair<T, int>
|
||||
normalizeToRangeImpl(bool negative, internalrep mantissa, int exponent);
|
||||
|
||||
static thread_local RoundingMode mode;
|
||||
// The available ranges for mantissa
|
||||
|
||||
@@ -779,7 +834,7 @@ Number::isnormal() const noexcept
|
||||
|
||||
template <auto MinMantissa, auto MaxMantissa, Integral64 T>
|
||||
std::pair<T, int>
|
||||
Number::normalizeToRange() const
|
||||
Number::normalizeToRangeImpl(bool negative, internalrep mantissa, int exponent)
|
||||
{
|
||||
static_assert(std::is_same_v<T, std::uint64_t> || std::is_same_v<T, std::int64_t>);
|
||||
static_assert(std::is_same_v<T, std::decay_t<decltype(MinMantissa)>>);
|
||||
@@ -792,10 +847,6 @@ Number::normalizeToRange() const
|
||||
static_assert(kMAX % 10 == 9);
|
||||
static_assert((kMAX + 1) / 10 == kMIN);
|
||||
|
||||
bool negative = negative_;
|
||||
internalrep mantissa = mantissa_;
|
||||
int exponent = exponent_;
|
||||
|
||||
if constexpr (std::is_unsigned_v<T>)
|
||||
{
|
||||
XRPL_ASSERT_PARTS(
|
||||
@@ -812,6 +863,26 @@ Number::normalizeToRange() const
|
||||
return std::make_pair(static_cast<T>(sign * mantissa), exponent);
|
||||
}
|
||||
|
||||
template <auto MinMantissa, auto MaxMantissa, Integral64 T>
|
||||
std::pair<T, int>
|
||||
Number::normalizeToRange() const
|
||||
{
|
||||
// Forward this Number's already-separated internal components to the shared
|
||||
// implementation. Passing mantissa_ (which may exceed kMaxRep in the Large
|
||||
// range) through unchanged keeps the result byte-identical to before.
|
||||
return normalizeToRangeImpl<MinMantissa, MaxMantissa, T>(negative_, mantissa_, exponent_);
|
||||
}
|
||||
|
||||
template <auto MinMantissa, auto MaxMantissa, Integral64 T>
|
||||
std::pair<T, int>
|
||||
Number::normalizeToRange(rep mantissa, int exponent)
|
||||
{
|
||||
// Separate sign and magnitude from the raw signed mantissa, then normalize
|
||||
// straight to the target range in a single pass (no intermediate Number).
|
||||
return normalizeToRangeImpl<MinMantissa, MaxMantissa, T>(
|
||||
mantissa < 0, externalToInternal(mantissa), exponent);
|
||||
}
|
||||
|
||||
constexpr Number
|
||||
abs(Number x) noexcept
|
||||
{
|
||||
|
||||
@@ -77,8 +77,12 @@ IOUAmount::normalize()
|
||||
|
||||
if (getSTNumberSwitchover())
|
||||
{
|
||||
Number const v{mantissa_, exponent_};
|
||||
*this = fromNumber(v);
|
||||
// Normalize the raw mantissa/exponent straight to the IOU range in a
|
||||
// single pass. Previously this built a Number (one pass to the default
|
||||
// range) and then re-normalized to the IOU range via fromNumber (a
|
||||
// second pass); the static primitive collapses both into one.
|
||||
std::tie(mantissa_, exponent_) =
|
||||
Number::normalizeToRange<kMinMantissa, kMaxMantissa>(mantissa_, exponent_);
|
||||
if (exponent_ > kMaxExponent)
|
||||
Throw<std::overflow_error>("value overflow");
|
||||
if (exponent_ < kMinExponent)
|
||||
|
||||
198
src/tests/libxrpl/basics/Number.cpp
Normal file
198
src/tests/libxrpl/basics/Number.cpp
Normal file
@@ -0,0 +1,198 @@
|
||||
#include <xrpl/basics/Number.h>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <utility>
|
||||
|
||||
using namespace xrpl;
|
||||
|
||||
namespace {
|
||||
|
||||
// The IOUAmount mantissa range: [10^15, 10^16 - 1]. Kept here as signed
|
||||
// constants so the default template parameter T resolves to std::int64_t,
|
||||
// matching IOUAmount's own use of Number::normalizeToRange.
|
||||
constexpr std::int64_t kMin = 1'000'000'000'000'000;
|
||||
constexpr std::int64_t kMax = (kMin * 10) - 1;
|
||||
|
||||
// The two-pass path that the static primitive replaces: build a Number (one
|
||||
// normalize pass to the default range) and then re-normalize to the narrow IOU
|
||||
// range via the const member overload (a second pass).
|
||||
std::pair<std::int64_t, int>
|
||||
twoPass(std::int64_t mantissa, int exponent)
|
||||
{
|
||||
Number const v{mantissa, exponent};
|
||||
return v.normalizeToRange<kMin, kMax>();
|
||||
}
|
||||
|
||||
// The single-pass static primitive under test.
|
||||
std::pair<std::int64_t, int>
|
||||
onePass(std::int64_t mantissa, int exponent)
|
||||
{
|
||||
return Number::normalizeToRange<kMin, kMax>(mantissa, exponent);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
// The static primitive must produce bit-identical (mantissa, exponent) to the
|
||||
// old two-pass path across a broad sweep of inputs: values needing scale-up,
|
||||
// scale-down, rounding cusps, negatives, and exponent extremes.
|
||||
TEST(Number, normalizeToRangeEquivalence)
|
||||
{
|
||||
// A spread of mantissa magnitudes: tiny (heavy scale-up), mid, at the IOU
|
||||
// floor/ceiling, beyond it (scale-down), and int64 extremes.
|
||||
std::int64_t const mantissas[] = {
|
||||
1,
|
||||
2,
|
||||
7,
|
||||
9,
|
||||
99,
|
||||
100,
|
||||
12345,
|
||||
999'999'999'999'999,
|
||||
kMin,
|
||||
kMin + 1,
|
||||
kMax,
|
||||
kMax + 1,
|
||||
1'234'567'890'123'456,
|
||||
12'345'678'901'234'567,
|
||||
std::numeric_limits<std::int64_t>::max(),
|
||||
std::numeric_limits<std::int64_t>::max() - 1,
|
||||
};
|
||||
|
||||
for (std::int64_t const absM : mantissas)
|
||||
{
|
||||
for (std::int64_t const m : {absM, -absM})
|
||||
{
|
||||
for (int const e : {-90, -32, -1, 0, 1, 5, 32, 70})
|
||||
{
|
||||
auto const expected = twoPass(m, e);
|
||||
auto const actual = onePass(m, e);
|
||||
EXPECT_EQ(actual.first, expected.first)
|
||||
<< "mantissa mismatch for m=" << m << " e=" << e;
|
||||
EXPECT_EQ(actual.second, expected.second)
|
||||
<< "exponent mismatch for m=" << m << " e=" << e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// int64::min cannot be negated naively; externalToInternal handles it. Make
|
||||
// sure the static path agrees with the two-pass path on it too.
|
||||
{
|
||||
std::int64_t const m = std::numeric_limits<std::int64_t>::min();
|
||||
auto const expected = twoPass(m, 0);
|
||||
auto const actual = onePass(m, 0);
|
||||
EXPECT_EQ(actual.first, expected.first);
|
||||
EXPECT_EQ(actual.second, expected.second);
|
||||
}
|
||||
}
|
||||
|
||||
// Exact, hand-computed results (state + cause), not just "equals the old path".
|
||||
TEST(Number, normalizeToRangeExactValues)
|
||||
{
|
||||
// A single digit scales up by 15 powers of ten to reach the floor 10^15,
|
||||
// with the exponent dropping by the same 15.
|
||||
{
|
||||
auto const [m, e] = onePass(1, 0);
|
||||
EXPECT_EQ(m, kMin); // 1'000'000'000'000'000
|
||||
EXPECT_EQ(e, -15);
|
||||
}
|
||||
// Already exactly at the floor: unchanged.
|
||||
{
|
||||
auto const [m, e] = onePass(kMin, 4);
|
||||
EXPECT_EQ(m, kMin);
|
||||
EXPECT_EQ(e, 4);
|
||||
}
|
||||
// Already exactly at the ceiling: unchanged.
|
||||
{
|
||||
auto const [m, e] = onePass(kMax, -7);
|
||||
EXPECT_EQ(m, kMax); // 9'999'999'999'999'999
|
||||
EXPECT_EQ(e, -7);
|
||||
}
|
||||
// One past the ceiling scales down by one power of ten; the dropped ones
|
||||
// digit (0) truncates cleanly and the exponent rises by one.
|
||||
{
|
||||
auto const [m, e] = onePass(kMax + 1, 0); // 10'000'000'000'000'000
|
||||
EXPECT_EQ(m, kMin); // 1'000'000'000'000'000
|
||||
EXPECT_EQ(e, 1);
|
||||
}
|
||||
// Negative values keep their sign through normalization.
|
||||
{
|
||||
auto const [m, e] = onePass(-5, 0);
|
||||
EXPECT_EQ(m, -5 * kMin); // -5'000'000'000'000'000
|
||||
EXPECT_EQ(e, -15);
|
||||
}
|
||||
// Zero mantissa: the workhorse leaves it as zero (callers special-case it).
|
||||
{
|
||||
auto const [m, e] = onePass(0, 0);
|
||||
EXPECT_EQ(m, 0);
|
||||
}
|
||||
}
|
||||
|
||||
// Equivalence must hold under every rounding mode, not just the default
|
||||
// ToNearest. This is the subtlest risk: the single-pass impl hardcodes
|
||||
// CuspRoundingFix::Disabled, whereas the old two-pass path ran an intermediate
|
||||
// normalize to the wider range first. Sweep all four modes, including inputs
|
||||
// that round at a tie (a trailing digit of exactly 5 when scaling down).
|
||||
TEST(Number, normalizeToRangeAllRoundingModes)
|
||||
{
|
||||
// Inputs chosen so scale-down drops a non-zero (and tie) trailing digit.
|
||||
std::int64_t const mantissas[] = {
|
||||
15,
|
||||
25,
|
||||
12'345'678'901'234'565, // 17 digits, trailing 5 -> tie on the drop
|
||||
99'999'999'999'999'995,
|
||||
kMax + 5,
|
||||
std::numeric_limits<std::int64_t>::max(),
|
||||
};
|
||||
|
||||
for (auto mode :
|
||||
{Number::RoundingMode::ToNearest,
|
||||
Number::RoundingMode::TowardsZero,
|
||||
Number::RoundingMode::Downward,
|
||||
Number::RoundingMode::Upward})
|
||||
{
|
||||
for (std::int64_t const absM : mantissas)
|
||||
{
|
||||
for (std::int64_t const m : {absM, -absM})
|
||||
{
|
||||
for (int const e : {-20, 0, 13})
|
||||
{
|
||||
NumberRoundModeGuard const g(mode);
|
||||
auto const expected = twoPass(m, e);
|
||||
auto const actual = onePass(m, e);
|
||||
EXPECT_EQ(actual.first, expected.first)
|
||||
<< "mantissa mismatch: mode=" << static_cast<int>(mode) << " m=" << m
|
||||
<< " e=" << e;
|
||||
EXPECT_EQ(actual.second, expected.second)
|
||||
<< "exponent mismatch: mode=" << static_cast<int>(mode) << " m=" << m
|
||||
<< " e=" << e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The refactored const member overload must forward to the static primitive
|
||||
// and yield identical results for the same Number.
|
||||
TEST(Number, normalizeToRangeMemberStaticConsistency)
|
||||
{
|
||||
std::int64_t const mantissas[] = {3, 42, kMin, kMin + 7, kMax, kMax + 1, 1'234'567'890'123'456};
|
||||
|
||||
for (std::int64_t const absM : mantissas)
|
||||
{
|
||||
for (std::int64_t const m : {absM, -absM})
|
||||
{
|
||||
for (int const e : {-50, -3, 0, 11, 60})
|
||||
{
|
||||
Number const v{m, e};
|
||||
auto const viaMember = v.normalizeToRange<kMin, kMax>();
|
||||
// Feed the static the raw inputs that built the Number.
|
||||
auto const viaStatic = Number::normalizeToRange<kMin, kMax>(m, e);
|
||||
EXPECT_EQ(viaMember.first, viaStatic.first) << "m=" << m << " e=" << e;
|
||||
EXPECT_EQ(viaMember.second, viaStatic.second) << "m=" << m << " e=" << e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -58,6 +58,7 @@
|
||||
#include <xrpl/resource/Disposition.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
#include <xrpl/resource/Gossip.h>
|
||||
#include <xrpl/server/Handoff.h>
|
||||
#include <xrpl/server/LoadFeeTrack.h>
|
||||
#include <xrpl/server/NetworkOPs.h>
|
||||
#include <xrpl/shamap/SHAMapNodeID.h>
|
||||
@@ -67,8 +68,8 @@
|
||||
#include <boost/asio/bind_executor.hpp>
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/completion_condition.hpp>
|
||||
#include <boost/asio/dispatch.hpp>
|
||||
#include <boost/asio/error.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/beast/core/multi_buffer.hpp>
|
||||
@@ -195,70 +196,78 @@ stringIsUInt256Sized(std::string const& pBuffStr)
|
||||
void
|
||||
PeerImp::run()
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this()]() {
|
||||
auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::run, shared_from_this()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (auto const s = base64Decode(value); s.size() == uint256::size())
|
||||
return uint256::fromRaw(s);
|
||||
auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
|
||||
return std::nullopt;
|
||||
};
|
||||
if (auto const s = base64Decode(value); s.size() == uint256::size())
|
||||
return uint256::fromRaw(s);
|
||||
|
||||
std::optional<uint256> closed;
|
||||
std::optional<uint256> previous;
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
if (auto const iter = headers_.find("Closed-Ledger"); iter != headers_.end())
|
||||
{
|
||||
closed = parseLedgerHash(iter->value());
|
||||
std::optional<uint256> closed;
|
||||
std::optional<uint256> previous;
|
||||
|
||||
if (!closed)
|
||||
fail("Malformed handshake data (1)");
|
||||
}
|
||||
if (auto const iter = headers_.find("Closed-Ledger"); iter != headers_.end())
|
||||
{
|
||||
closed = parseLedgerHash(iter->value());
|
||||
|
||||
if (auto const iter = headers_.find("Previous-Ledger"); iter != headers_.end())
|
||||
{
|
||||
previous = parseLedgerHash(iter->value());
|
||||
if (!closed)
|
||||
fail("Malformed handshake data (1)");
|
||||
}
|
||||
|
||||
if (!previous)
|
||||
fail("Malformed handshake data (2)");
|
||||
}
|
||||
if (auto const iter = headers_.find("Previous-Ledger"); iter != headers_.end())
|
||||
{
|
||||
previous = parseLedgerHash(iter->value());
|
||||
|
||||
if (previous && !closed)
|
||||
fail("Malformed handshake data (3)");
|
||||
if (!previous)
|
||||
fail("Malformed handshake data (2)");
|
||||
}
|
||||
|
||||
{
|
||||
std::scoped_lock const sl(recentLock_);
|
||||
if (closed)
|
||||
closedLedgerHash_ = *closed;
|
||||
if (previous)
|
||||
previousLedgerHash_ = *previous;
|
||||
}
|
||||
if (previous && !closed)
|
||||
fail("Malformed handshake data (3)");
|
||||
|
||||
if (inbound_)
|
||||
{
|
||||
doAccept();
|
||||
}
|
||||
else
|
||||
{
|
||||
doProtocolStart();
|
||||
}
|
||||
{
|
||||
std::scoped_lock const sl(recentLock_);
|
||||
if (closed)
|
||||
closedLedgerHash_ = *closed;
|
||||
if (previous)
|
||||
previousLedgerHash_ = *previous;
|
||||
}
|
||||
|
||||
// Anything else that needs to be done with the connection should be
|
||||
// done in doProtocolStart
|
||||
});
|
||||
if (inbound_)
|
||||
{
|
||||
doAccept();
|
||||
}
|
||||
else
|
||||
{
|
||||
doProtocolStart();
|
||||
}
|
||||
|
||||
// Anything else that needs to be done with the connection should be
|
||||
// done in doProtocolStart
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::stop()
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this()]() {
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
|
||||
return;
|
||||
}
|
||||
|
||||
close();
|
||||
});
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
close();
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -266,115 +275,130 @@ PeerImp::stop()
|
||||
void
|
||||
PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this(), m]() {
|
||||
if (gracefulClose_)
|
||||
return;
|
||||
if (detaching_)
|
||||
return;
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
|
||||
return;
|
||||
}
|
||||
if (gracefulClose_)
|
||||
return;
|
||||
if (detaching_)
|
||||
return;
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
auto validator = m->getValidatorKey();
|
||||
if (validator && !squelch_.expireSquelch(*validator))
|
||||
{
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::Category::SquelchSuppressed,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
return;
|
||||
}
|
||||
|
||||
// report categorized outgoing traffic
|
||||
auto validator = m->getValidatorKey();
|
||||
if (validator && !squelch_.expireSquelch(*validator))
|
||||
{
|
||||
overlay_.reportOutboundTraffic(
|
||||
safeCast<TrafficCount::Category>(m->getCategory()),
|
||||
TrafficCount::Category::SquelchSuppressed,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
return;
|
||||
}
|
||||
|
||||
// report total outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::Category::Total,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
// report categorized outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
safeCast<TrafficCount::Category>(m->getCategory()),
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
|
||||
auto sendqSize = sendQueue_.size();
|
||||
// report total outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::Category::Total, static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
|
||||
if (sendqSize < Tuning::kTargetSendQueue)
|
||||
{
|
||||
// To detect a peer that does not read from their
|
||||
// side of the connection, we expect a peer to have
|
||||
// a small sendq periodically
|
||||
largeSendq_ = 0;
|
||||
}
|
||||
else if (auto sink = journal_.debug(); sink && (sendqSize % Tuning::kSendQueueLogFreq) == 0)
|
||||
{
|
||||
std::string const n = name();
|
||||
sink << n << " sendq: " << sendqSize;
|
||||
}
|
||||
auto sendqSize = sendQueue_.size();
|
||||
|
||||
sendQueue_.push(m);
|
||||
if (sendqSize < Tuning::kTargetSendQueue)
|
||||
{
|
||||
// To detect a peer that does not read from their
|
||||
// side of the connection, we expect a peer to have
|
||||
// a small senq periodically
|
||||
largeSendq_ = 0;
|
||||
}
|
||||
else if (auto sink = journal_.debug(); sink && (sendqSize % Tuning::kSendQueueLogFreq) == 0)
|
||||
{
|
||||
std::string const n = name();
|
||||
sink << n << " sendq: " << sendqSize;
|
||||
}
|
||||
|
||||
if (sendqSize != 0)
|
||||
return;
|
||||
sendQueue_.push(m);
|
||||
|
||||
boost::asio::async_write(
|
||||
stream_,
|
||||
boost::asio::buffer(sendQueue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage, self, std::placeholders::_1, std::placeholders::_2)));
|
||||
});
|
||||
if (sendqSize != 0)
|
||||
return;
|
||||
|
||||
boost::asio::async_write(
|
||||
stream_,
|
||||
boost::asio::buffer(sendQueue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::sendTxQueue()
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this()]() {
|
||||
if (!txQueue_.empty())
|
||||
{
|
||||
protocol::TMHaveTransactions ht;
|
||||
std::ranges::for_each(
|
||||
txQueue_, [&](auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
|
||||
JLOG(pJournal_.trace()) << "sendTxQueue " << txQueue_.size();
|
||||
txQueue_.clear();
|
||||
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
|
||||
}
|
||||
});
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::sendTxQueue, shared_from_this()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!txQueue_.empty())
|
||||
{
|
||||
protocol::TMHaveTransactions ht;
|
||||
std::ranges::for_each(
|
||||
txQueue_, [&](auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
|
||||
JLOG(pJournal_.trace()) << "sendTxQueue " << txQueue_.size();
|
||||
txQueue_.clear();
|
||||
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::addTxQueue(uint256 const& hash)
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this(), hash]() {
|
||||
if (txQueue_.size() == reduce_relay::kMaxTxQueueSize)
|
||||
{
|
||||
JLOG(pJournal_.warn()) << "addTxQueue exceeds the cap";
|
||||
sendTxQueue();
|
||||
}
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::addTxQueue, shared_from_this(), hash));
|
||||
return;
|
||||
}
|
||||
|
||||
txQueue_.insert(hash);
|
||||
JLOG(pJournal_.trace()) << "addTxQueue " << txQueue_.size();
|
||||
});
|
||||
if (txQueue_.size() == reduce_relay::kMaxTxQueueSize)
|
||||
{
|
||||
JLOG(pJournal_.warn()) << "addTxQueue exceeds the cap";
|
||||
sendTxQueue();
|
||||
}
|
||||
|
||||
txQueue_.insert(hash);
|
||||
JLOG(pJournal_.trace()) << "addTxQueue " << txQueue_.size();
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::removeTxQueue(uint256 const& hash)
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this(), hash]() {
|
||||
auto removed = txQueue_.erase(hash);
|
||||
JLOG(pJournal_.trace()) << "removeTxQueue " << removed;
|
||||
});
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash));
|
||||
return;
|
||||
}
|
||||
|
||||
auto removed = txQueue_.erase(hash);
|
||||
JLOG(pJournal_.trace()) << "removeTxQueue " << removed;
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::charge(Resource::Charge const& fee, std::string const& context)
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this(), fee, context]() {
|
||||
if ((usage_.charge(fee, context) == Resource::Disposition::Drop) &&
|
||||
usage_.disconnect(pJournal_))
|
||||
{
|
||||
// Sever the connection
|
||||
overlay_.incPeerDisconnectCharges();
|
||||
fail("charge: Resources");
|
||||
}
|
||||
});
|
||||
if ((usage_.charge(fee, context) == Resource::Disposition::Drop) &&
|
||||
usage_.disconnect(pJournal_) && strand_.running_in_this_thread())
|
||||
{
|
||||
// Sever the connection
|
||||
overlay_.incPeerDisconnectCharges();
|
||||
fail("charge: Resources");
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -602,14 +626,20 @@ PeerImp::close()
|
||||
void
|
||||
PeerImp::fail(std::string const& reason)
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this(), reason]() {
|
||||
if (journal_.active(beast::Severity::Warning) && socket_.is_open())
|
||||
{
|
||||
std::string const n = name();
|
||||
JLOG(journal_.warn()) << n << " failed: " << reason;
|
||||
}
|
||||
close();
|
||||
});
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(
|
||||
strand_,
|
||||
std::bind(
|
||||
(void (Peer::*)(std::string const&))&PeerImp::fail, shared_from_this(), reason));
|
||||
return;
|
||||
}
|
||||
if (journal_.active(beast::Severity::Warning) && socket_.is_open())
|
||||
{
|
||||
std::string const n = name();
|
||||
JLOG(journal_.warn()) << n << " failed: " << reason;
|
||||
}
|
||||
close();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -2645,42 +2675,45 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
||||
{
|
||||
dispatch(strand_, [this, self = shared_from_this(), m]() {
|
||||
if (!m->has_validatorpubkey())
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch no pubkey");
|
||||
return;
|
||||
}
|
||||
auto validator = m->validatorpubkey();
|
||||
auto const slice{makeSlice(validator)};
|
||||
if (!publicKeyType(slice))
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch bad pubkey");
|
||||
return;
|
||||
}
|
||||
PublicKey const key(slice);
|
||||
using on_message_fn = void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind((on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore the squelch for validator's own messages.
|
||||
if (key == app_.getValidationPublicKey())
|
||||
{
|
||||
JLOG(pJournal_.debug())
|
||||
<< "onMessage: TMSquelch discarding validator's squelch " << slice;
|
||||
return;
|
||||
}
|
||||
if (!m->has_validatorpubkey())
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch no pubkey");
|
||||
return;
|
||||
}
|
||||
auto validator = m->validatorpubkey();
|
||||
auto const slice{makeSlice(validator)};
|
||||
if (!publicKeyType(slice))
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch bad pubkey");
|
||||
return;
|
||||
}
|
||||
PublicKey const key(slice);
|
||||
|
||||
std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
|
||||
if (!m->squelch())
|
||||
{
|
||||
squelch_.removeSquelch(key);
|
||||
}
|
||||
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch duration");
|
||||
}
|
||||
// Ignore the squelch for validator's own messages.
|
||||
if (key == app_.getValidationPublicKey())
|
||||
{
|
||||
JLOG(pJournal_.debug()) << "onMessage: TMSquelch discarding validator's squelch " << slice;
|
||||
return;
|
||||
}
|
||||
|
||||
JLOG(pJournal_.debug()) << "onMessage: TMSquelch " << slice << " " << id() << " "
|
||||
<< duration;
|
||||
});
|
||||
std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
|
||||
if (!m->squelch())
|
||||
{
|
||||
squelch_.removeSquelch(key);
|
||||
}
|
||||
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch duration");
|
||||
}
|
||||
|
||||
JLOG(pJournal_.debug()) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user