mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-11 12:46:44 +00:00
Compare commits
6 Commits
ximinez/nu
...
bthomee/di
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9c21a8d08 | ||
|
|
e57d5ac775 | ||
|
|
7e9c12b360 | ||
|
|
d0761744b4 | ||
|
|
85f6358a79 | ||
|
|
2a73e11f51 |
@@ -58,7 +58,6 @@
|
||||
#include <xrpl/resource/Disposition.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
#include <xrpl/resource/Gossip.h>
|
||||
#include <xrpl/server/Handoff.h>
|
||||
#include <xrpl/server/LoadFeeTrack.h>
|
||||
#include <xrpl/server/NetworkOPs.h>
|
||||
#include <xrpl/shamap/SHAMapNodeID.h>
|
||||
@@ -68,8 +67,8 @@
|
||||
#include <boost/asio/bind_executor.hpp>
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/completion_condition.hpp>
|
||||
#include <boost/asio/dispatch.hpp>
|
||||
#include <boost/asio/error.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/beast/core/multi_buffer.hpp>
|
||||
@@ -196,78 +195,70 @@ stringIsUInt256Sized(std::string const& pBuffStr)
|
||||
void
|
||||
PeerImp::run()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::run, shared_from_this()));
|
||||
return;
|
||||
}
|
||||
dispatch(strand_, [self = shared_from_this()]() {
|
||||
auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
|
||||
auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
if (auto const s = base64Decode(value); s.size() == uint256::size())
|
||||
return uint256::fromRaw(s);
|
||||
|
||||
if (auto const s = base64Decode(value); s.size() == uint256::size())
|
||||
return uint256::fromRaw(s);
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
return std::nullopt;
|
||||
};
|
||||
std::optional<uint256> closed;
|
||||
std::optional<uint256> previous;
|
||||
|
||||
std::optional<uint256> closed;
|
||||
std::optional<uint256> previous;
|
||||
if (auto const iter = self->headers_.find("Closed-Ledger"); iter != self->headers_.end())
|
||||
{
|
||||
closed = parseLedgerHash(iter->value());
|
||||
|
||||
if (auto const iter = headers_.find("Closed-Ledger"); iter != headers_.end())
|
||||
{
|
||||
closed = parseLedgerHash(iter->value());
|
||||
if (!closed)
|
||||
self->fail("Malformed handshake data (1)");
|
||||
}
|
||||
|
||||
if (!closed)
|
||||
fail("Malformed handshake data (1)");
|
||||
}
|
||||
if (auto const iter = self->headers_.find("Previous-Ledger"); iter != self->headers_.end())
|
||||
{
|
||||
previous = parseLedgerHash(iter->value());
|
||||
|
||||
if (auto const iter = headers_.find("Previous-Ledger"); iter != headers_.end())
|
||||
{
|
||||
previous = parseLedgerHash(iter->value());
|
||||
if (!previous)
|
||||
self->fail("Malformed handshake data (2)");
|
||||
}
|
||||
|
||||
if (!previous)
|
||||
fail("Malformed handshake data (2)");
|
||||
}
|
||||
if (previous && !closed)
|
||||
self->fail("Malformed handshake data (3)");
|
||||
|
||||
if (previous && !closed)
|
||||
fail("Malformed handshake data (3)");
|
||||
{
|
||||
std::scoped_lock const sl(self->recentLock_);
|
||||
if (closed)
|
||||
self->closedLedgerHash_ = *closed;
|
||||
if (previous)
|
||||
self->previousLedgerHash_ = *previous;
|
||||
}
|
||||
|
||||
{
|
||||
std::scoped_lock const sl(recentLock_);
|
||||
if (closed)
|
||||
closedLedgerHash_ = *closed;
|
||||
if (previous)
|
||||
previousLedgerHash_ = *previous;
|
||||
}
|
||||
if (self->inbound_)
|
||||
{
|
||||
self->doAccept();
|
||||
}
|
||||
else
|
||||
{
|
||||
self->doProtocolStart();
|
||||
}
|
||||
|
||||
if (inbound_)
|
||||
{
|
||||
doAccept();
|
||||
}
|
||||
else
|
||||
{
|
||||
doProtocolStart();
|
||||
}
|
||||
|
||||
// Anything else that needs to be done with the connection should be
|
||||
// done in doProtocolStart
|
||||
// Anything else that needs to be done with the connection should be
|
||||
// done in doProtocolStart
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::stop()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
|
||||
return;
|
||||
}
|
||||
dispatch(strand_, [self = shared_from_this()]() {
|
||||
if (!self->socket_.is_open())
|
||||
return;
|
||||
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
close();
|
||||
self->close();
|
||||
});
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -275,130 +266,117 @@ PeerImp::stop()
|
||||
void
|
||||
PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
|
||||
return;
|
||||
}
|
||||
if (gracefulClose_)
|
||||
return;
|
||||
if (detaching_)
|
||||
return;
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
dispatch(strand_, [self = shared_from_this(), m]() {
|
||||
if (self->gracefulClose_)
|
||||
return;
|
||||
if (self->detaching_)
|
||||
return;
|
||||
if (!self->socket_.is_open())
|
||||
return;
|
||||
|
||||
auto validator = m->getValidatorKey();
|
||||
if (validator && !squelch_.expireSquelch(*validator))
|
||||
{
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::Category::SquelchSuppressed,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
return;
|
||||
}
|
||||
auto validator = m->getValidatorKey();
|
||||
if (validator && !self->squelch_.expireSquelch(*validator))
|
||||
{
|
||||
self->overlay_.reportOutboundTraffic(
|
||||
TrafficCount::Category::SquelchSuppressed,
|
||||
static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
|
||||
return;
|
||||
}
|
||||
|
||||
// report categorized outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
safeCast<TrafficCount::Category>(m->getCategory()),
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
// report categorized outgoing traffic
|
||||
self->overlay_.reportOutboundTraffic(
|
||||
safeCast<TrafficCount::Category>(m->getCategory()),
|
||||
static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
|
||||
|
||||
// report total outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::Category::Total, static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
// report total outgoing traffic
|
||||
self->overlay_.reportOutboundTraffic(
|
||||
TrafficCount::Category::Total,
|
||||
static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
|
||||
|
||||
auto sendqSize = sendQueue_.size();
|
||||
auto sendqSize = self->sendQueue_.size();
|
||||
|
||||
if (sendqSize < Tuning::kTargetSendQueue)
|
||||
{
|
||||
// To detect a peer that does not read from their
|
||||
// side of the connection, we expect a peer to have
|
||||
// a small senq periodically
|
||||
largeSendq_ = 0;
|
||||
}
|
||||
else if (auto sink = journal_.debug(); sink && (sendqSize % Tuning::kSendQueueLogFreq) == 0)
|
||||
{
|
||||
std::string const n = name();
|
||||
sink << n << " sendq: " << sendqSize;
|
||||
}
|
||||
if (sendqSize < Tuning::kTargetSendQueue)
|
||||
{
|
||||
// To detect a peer that does not read from their
|
||||
// side of the connection, we expect a peer to have
|
||||
// a small sendq periodically
|
||||
self->largeSendq_ = 0;
|
||||
}
|
||||
else if (
|
||||
auto sink = self->journal_.debug();
|
||||
sink && (sendqSize % Tuning::kSendQueueLogFreq) == 0)
|
||||
{
|
||||
std::string const n = self->name();
|
||||
sink << n << " sendq: " << sendqSize;
|
||||
}
|
||||
|
||||
sendQueue_.push(m);
|
||||
self->sendQueue_.push(m);
|
||||
|
||||
if (sendqSize != 0)
|
||||
return;
|
||||
if (sendqSize != 0)
|
||||
return;
|
||||
|
||||
boost::asio::async_write(
|
||||
stream_,
|
||||
boost::asio::buffer(sendQueue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
boost::asio::async_write(
|
||||
self->stream_,
|
||||
boost::asio::buffer(self->sendQueue_.front()->getBuffer(self->compressionEnabled_)),
|
||||
bind_executor(
|
||||
self->strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage, self, std::placeholders::_1, std::placeholders::_2)));
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::sendTxQueue()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::sendTxQueue, shared_from_this()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!txQueue_.empty())
|
||||
{
|
||||
protocol::TMHaveTransactions ht;
|
||||
std::ranges::for_each(
|
||||
txQueue_, [&](auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
|
||||
JLOG(pJournal_.trace()) << "sendTxQueue " << txQueue_.size();
|
||||
txQueue_.clear();
|
||||
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
|
||||
}
|
||||
dispatch(strand_, [self = shared_from_this()]() {
|
||||
if (!self->txQueue_.empty())
|
||||
{
|
||||
protocol::TMHaveTransactions ht;
|
||||
std::ranges::for_each(
|
||||
self->txQueue_, [&](auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
|
||||
JLOG(self->pJournal_.trace()) << "sendTxQueue " << self->txQueue_.size();
|
||||
self->txQueue_.clear();
|
||||
self->send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::addTxQueue(uint256 const& hash)
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::addTxQueue, shared_from_this(), hash));
|
||||
return;
|
||||
}
|
||||
dispatch(strand_, [self = shared_from_this(), hash]() {
|
||||
if (self->txQueue_.size() == reduce_relay::kMaxTxQueueSize)
|
||||
{
|
||||
JLOG(self->pJournal_.warn()) << "addTxQueue exceeds the cap";
|
||||
self->sendTxQueue();
|
||||
}
|
||||
|
||||
if (txQueue_.size() == reduce_relay::kMaxTxQueueSize)
|
||||
{
|
||||
JLOG(pJournal_.warn()) << "addTxQueue exceeds the cap";
|
||||
sendTxQueue();
|
||||
}
|
||||
|
||||
txQueue_.insert(hash);
|
||||
JLOG(pJournal_.trace()) << "addTxQueue " << txQueue_.size();
|
||||
self->txQueue_.insert(hash);
|
||||
JLOG(self->pJournal_.trace()) << "addTxQueue " << self->txQueue_.size();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::removeTxQueue(uint256 const& hash)
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash));
|
||||
return;
|
||||
}
|
||||
|
||||
auto removed = txQueue_.erase(hash);
|
||||
JLOG(pJournal_.trace()) << "removeTxQueue " << removed;
|
||||
dispatch(strand_, [self = shared_from_this(), hash]() {
|
||||
auto removed = self->txQueue_.erase(hash);
|
||||
JLOG(self->pJournal_.trace()) << "removeTxQueue " << removed;
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::charge(Resource::Charge const& fee, std::string const& context)
|
||||
{
|
||||
if ((usage_.charge(fee, context) == Resource::Disposition::Drop) &&
|
||||
usage_.disconnect(pJournal_) && strand_.running_in_this_thread())
|
||||
{
|
||||
// Sever the connection
|
||||
overlay_.incPeerDisconnectCharges();
|
||||
fail("charge: Resources");
|
||||
}
|
||||
dispatch(strand_, [self = shared_from_this(), fee, context]() {
|
||||
if ((self->usage_.charge(fee, context) == Resource::Disposition::Drop) &&
|
||||
self->usage_.disconnect(self->pJournal_))
|
||||
{
|
||||
// Sever the connection
|
||||
self->overlay_.incPeerDisconnectCharges();
|
||||
self->fail("charge: Resources");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -626,20 +604,14 @@ PeerImp::close()
|
||||
void
|
||||
PeerImp::fail(std::string const& reason)
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(
|
||||
strand_,
|
||||
std::bind(
|
||||
(void (Peer::*)(std::string const&))&PeerImp::fail, shared_from_this(), reason));
|
||||
return;
|
||||
}
|
||||
if (journal_.active(beast::Severity::Warning) && socket_.is_open())
|
||||
{
|
||||
std::string const n = name();
|
||||
JLOG(journal_.warn()) << n << " failed: " << reason;
|
||||
}
|
||||
close();
|
||||
dispatch(strand_, [self = shared_from_this(), reason]() {
|
||||
if (self->journal_.active(beast::Severity::Warning) && self->socket_.is_open())
|
||||
{
|
||||
std::string const n = self->name();
|
||||
JLOG(self->journal_.warn()) << n << " failed: " << reason;
|
||||
}
|
||||
self->close();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
@@ -2675,45 +2647,42 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
||||
{
|
||||
using on_message_fn = void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
|
||||
if (!strand_.running_in_this_thread())
|
||||
{
|
||||
post(strand_, std::bind((on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
|
||||
return;
|
||||
}
|
||||
dispatch(strand_, [self = shared_from_this(), m]() {
|
||||
if (!m->has_validatorpubkey())
|
||||
{
|
||||
self->fee_.update(Resource::kFeeInvalidData, "squelch no pubkey");
|
||||
return;
|
||||
}
|
||||
auto validator = m->validatorpubkey();
|
||||
auto const slice{makeSlice(validator)};
|
||||
if (!publicKeyType(slice))
|
||||
{
|
||||
self->fee_.update(Resource::kFeeInvalidData, "squelch bad pubkey");
|
||||
return;
|
||||
}
|
||||
PublicKey const key(slice);
|
||||
|
||||
if (!m->has_validatorpubkey())
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch no pubkey");
|
||||
return;
|
||||
}
|
||||
auto validator = m->validatorpubkey();
|
||||
auto const slice{makeSlice(validator)};
|
||||
if (!publicKeyType(slice))
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch bad pubkey");
|
||||
return;
|
||||
}
|
||||
PublicKey const key(slice);
|
||||
// Ignore the squelch for validator's own messages.
|
||||
if (key == self->app_.getValidationPublicKey())
|
||||
{
|
||||
JLOG(self->pJournal_.debug())
|
||||
<< "onMessage: TMSquelch discarding validator's squelch " << slice;
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore the squelch for validator's own messages.
|
||||
if (key == app_.getValidationPublicKey())
|
||||
{
|
||||
JLOG(pJournal_.debug()) << "onMessage: TMSquelch discarding validator's squelch " << slice;
|
||||
return;
|
||||
}
|
||||
std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
|
||||
if (!m->squelch())
|
||||
{
|
||||
self->squelch_.removeSquelch(key);
|
||||
}
|
||||
else if (!self->squelch_.addSquelch(key, std::chrono::seconds{duration}))
|
||||
{
|
||||
self->fee_.update(Resource::kFeeInvalidData, "squelch duration");
|
||||
}
|
||||
|
||||
std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
|
||||
if (!m->squelch())
|
||||
{
|
||||
squelch_.removeSquelch(key);
|
||||
}
|
||||
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
|
||||
{
|
||||
fee_.update(Resource::kFeeInvalidData, "squelch duration");
|
||||
}
|
||||
|
||||
JLOG(pJournal_.debug()) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
|
||||
JLOG(self->pJournal_.debug())
|
||||
<< "onMessage: TMSquelch " << slice << " " << self->id() << " " << duration;
|
||||
});
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user