Compare commits

...

5 Commits

Author SHA1 Message Date
Vito
e3bfbaef80 edits 2025-09-08 13:20:04 +02:00
Vito
2dc49e30c4 refactors p2p handshake out of PeerImp 2025-09-04 19:43:50 +02:00
Vito
da617007ac removes dependency on http headers 2025-09-04 16:38:06 +02:00
Vito
ad770a443b decouples ssl_stream from peerImp 2025-09-03 18:30:33 +02:00
Vito
0ac24f5f73 removes socket_ and stream_ member attributes 2025-09-02 15:45:50 +02:00
12 changed files with 1533 additions and 689 deletions

View File

@@ -0,0 +1,163 @@
#ifndef RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
#define RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
#include <xrpl/basics/Log.h>
#include <xrpl/basics/base_uint.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/executor.hpp>
#include <boost/asio/write.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/ssl/ssl_stream.hpp>
#include <functional>
#include <optional>
namespace ripple {
// Forward declarations
using socket_type = boost::beast::tcp_stream;
using concrete_stream_type = boost::beast::ssl_stream<socket_type>;
/**
* @brief Minimal interface for stream operations needed by PeerImp
*/
class StreamInterface
{
public:
virtual ~StreamInterface() = default;
// Executor access for ASIO operations
virtual boost::asio::any_io_executor
get_executor() = 0;
// Connection status checking
virtual bool
is_open() const = 0;
// Stream lifecycle operations
virtual void
close() = 0;
virtual void
cancel() = 0;
// Async I/O operations
virtual void
async_read_some(
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_write_some(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_write(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_write(
boost::beast::multi_buffer::const_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_shutdown(std::function<void(boost::beast::error_code)> handler) = 0;
// SSL handshake support
virtual std::optional<base_uint<256>>
makeSharedValue(beast::Journal journal) = 0;
};
/**
* @brief Production implementation wrapping boost::beast::ssl_stream
*/
class ProductionStream : public StreamInterface
{
private:
std::unique_ptr<concrete_stream_type> stream_;
public:
explicit ProductionStream(std::unique_ptr<concrete_stream_type> stream)
: stream_(std::move(stream))
{
}
boost::asio::any_io_executor
get_executor() override
{
return stream_->get_executor();
}
bool
is_open() const override
{
return stream_->next_layer().socket().is_open();
}
void
close() override
{
stream_->lowest_layer().close();
}
void
cancel() override
{
stream_->lowest_layer().cancel();
}
void
async_read_some(
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
stream_->async_read_some(buffers, std::move(handler));
}
void
async_write_some(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
stream_->async_write_some(buffer, std::move(handler));
}
void
async_write(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
boost::asio::async_write(*stream_, buffer, std::move(handler));
}
void
async_write(
boost::beast::multi_buffer::const_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
boost::asio::async_write(*stream_, buffers, std::move(handler));
}
void
async_shutdown(
std::function<void(boost::beast::error_code)> handler) override
{
stream_->async_shutdown(std::move(handler));
}
std::optional<base_uint<256>>
makeSharedValue(beast::Journal journal) override;
};
} // namespace ripple
#endif

View File

@@ -1,289 +1,299 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright 2020 Ripple Labs Inc.
// //------------------------------------------------------------------------------
// /*
// This file is part of rippled: https://github.com/ripple/rippled
// Copyright 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
// */
// //==============================================================================
#include <test/jtx.h>
#include <test/jtx/Env.h>
// #include <test/jtx.h>
// #include <test/jtx/Env.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/peerfinder/detail/SlotImp.h>
// #include <xrpld/overlay/detail/OverlayImpl.h>
// #include <xrpld/overlay/detail/PeerImp.h>
// #include <xrpld/peerfinder/detail/SlotImp.h>
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/unit_test.h>
// #include <xrpl/basics/make_SSLContext.h>
// #include <xrpl/beast/unit_test.h>
namespace ripple {
// namespace ripple {
namespace test {
// namespace test {
class tx_reduce_relay_test : public beast::unit_test::suite
{
public:
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
// class tx_reduce_relay_test : public beast::unit_test::suite
// {
// public:
// using socket_type = boost::asio::ip::tcp::socket;
// using middle_type = boost::beast::tcp_stream;
// using stream_type = boost::beast::ssl_stream<middle_type>;
// using shared_context = std::shared_ptr<boost::asio::ssl::context>;
private:
void
doTest(std::string const& msg, bool log, std::function<void(bool)> f)
{
testcase(msg);
f(log);
}
// private:
// void
// doTest(std::string const& msg, bool log, std::function<void(bool)> f)
// {
// testcase(msg);
// f(log);
// }
void
testConfig(bool log)
{
doTest("Config Test", log, [&](bool log) {
auto test = [&](bool enable,
bool metrics,
std::uint16_t min,
std::uint16_t pct,
bool success = true) {
std::stringstream str("[reduce_relay]");
str << "[reduce_relay]\n"
<< "tx_enable=" << static_cast<int>(enable) << "\n"
<< "tx_metrics=" << static_cast<int>(metrics) << "\n"
<< "tx_min_peers=" << min << "\n"
<< "tx_relay_percentage=" << pct << "\n";
Config c;
try
{
c.loadFromString(str.str());
// void
// testConfig(bool log)
// {
// doTest("Config Test", log, [&](bool log) {
// auto test = [&](bool enable,
// bool metrics,
// std::uint16_t min,
// std::uint16_t pct,
// bool success = true) {
// std::stringstream str("[reduce_relay]");
// str << "[reduce_relay]\n"
// << "tx_enable=" << static_cast<int>(enable) << "\n"
// << "tx_metrics=" << static_cast<int>(metrics) << "\n"
// << "tx_min_peers=" << min << "\n"
// << "tx_relay_percentage=" << pct << "\n";
// Config c;
// try
// {
// c.loadFromString(str.str());
BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
if (success)
pass();
else
fail();
}
catch (...)
{
if (success)
fail();
else
pass();
}
};
// BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
// BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
// BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
// BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
// if (success)
// pass();
// else
// fail();
// }
// catch (...)
// {
// if (success)
// fail();
// else
// pass();
// }
// };
test(true, true, 20, 25);
test(false, false, 20, 25);
test(false, false, 20, 0, false);
test(false, false, 20, 101, false);
test(false, false, 9, 10, false);
test(false, false, 10, 9, false);
});
}
// test(true, true, 20, 25);
// test(false, false, 20, 25);
// test(false, false, 20, 0, false);
// test(false, false, 20, 101, false);
// test(false, false, 9, 10, false);
// test(false, false, 10, 9, false);
// });
// }
class PeerTest : public PeerImp
{
public:
PeerTest(
Application& app,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
OverlayImpl& overlay)
: PeerImp(
app,
sid_,
slot,
std::move(request),
publicKey,
protocol,
consumer,
std::move(stream_ptr),
overlay)
{
sid_++;
}
~PeerTest() = default;
// class PeerTest : public PeerImp
// {
// public:
// PeerTest(
// Application& app,
// std::shared_ptr<PeerFinder::Slot> const& slot,
// http_request_type&& request,
// PublicKey const& publicKey,
// ProtocolVersion protocol,
// Resource::Consumer consumer,
// std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
// OverlayImpl& overlay)
// : PeerImp(
// app,
// sid_,
// slot,
// std::move(request),
// publicKey,
// protocol,
// consumer,
// std::move(stream_ptr),
// overlay)
// {
// sid_++;
// }
// ~PeerTest() = default;
void
run() override
{
}
void
send(std::shared_ptr<Message> const&) override
{
sendTx_++;
}
void
addTxQueue(uint256 const& hash) override
{
queueTx_++;
}
static void
init()
{
queueTx_ = 0;
sendTx_ = 0;
sid_ = 0;
}
inline static std::size_t sid_ = 0;
inline static std::uint16_t queueTx_ = 0;
inline static std::uint16_t sendTx_ = 0;
};
// void
// run() override
// {
// }
// void
// send(std::shared_ptr<Message> const&) override
// {
// sendTx_++;
// }
// void
// addTxQueue(uint256 const& hash) override
// {
// queueTx_++;
// }
// static void
// init()
// {
// queueTx_ = 0;
// sendTx_ = 0;
// sid_ = 0;
// }
// inline static std::size_t sid_ = 0;
// inline static std::uint16_t queueTx_ = 0;
// inline static std::uint16_t sendTx_ = 0;
// };
std::uint16_t lid_{0};
std::uint16_t rid_{1};
shared_context context_;
ProtocolVersion protocolVersion_;
boost::beast::multi_buffer read_buf_;
// std::uint16_t lid_{0};
// std::uint16_t rid_{1};
// shared_context context_;
// ProtocolVersion protocolVersion_;
// boost::beast::multi_buffer read_buf_;
public:
tx_reduce_relay_test()
: context_(make_SSLContext("")), protocolVersion_{1, 7}
{
}
// public:
// tx_reduce_relay_test()
// : context_(make_SSLContext("")), protocolVersion_{1, 7}
// {
// }
private:
void
addPeer(
jtx::Env& env,
std::vector<std::shared_ptr<PeerTest>>& peers,
std::uint16_t& nDisabled)
{
auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
boost::beast::http::request<boost::beast::http::dynamic_body> request;
(nDisabled == 0)
? (void)request.insert(
"X-Protocol-Ctl",
makeFeaturesRequestHeader(false, false, true, false))
: (void)nDisabled--;
auto stream_ptr = std::make_unique<stream_type>(
socket_type(std::forward<boost::asio::io_context&>(
env.app().getIOContext())),
*context_);
beast::IP::Endpoint local(
boost::asio::ip::make_address("172.1.1." + std::to_string(lid_)));
beast::IP::Endpoint remote(
boost::asio::ip::make_address("172.1.1." + std::to_string(rid_)));
PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
auto [slot, _] = overlay.peerFinder().new_inbound_slot(local, remote);
auto const peer = std::make_shared<PeerTest>(
env.app(),
slot,
std::move(request),
key,
protocolVersion_,
consumer,
std::move(stream_ptr),
overlay);
BEAST_EXPECT(
overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
overlay.add_active(peer);
BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
peers.emplace_back(peer); // overlay stores week ptr to PeerImp
lid_ += 2;
rid_ += 2;
assert(lid_ <= 254);
}
// private:
// void
// addPeer(
// jtx::Env& env,
// std::vector<std::shared_ptr<PeerTest>>& peers,
// std::uint16_t& nDisabled)
// {
// auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
// boost::beast::http::request<boost::beast::http::dynamic_body>
// request; (nDisabled == 0)
// ? (void)request.insert(
// "X-Protocol-Ctl",
// makeFeaturesRequestHeader(false, false, true, false))
// : (void)nDisabled--;
// auto stream_ptr = std::make_unique<stream_type>(
// socket_type(std::forward<boost::asio::io_context&>(
// env.app().getIOContext())),
// *context_);
// beast::IP::Endpoint local(
// boost::asio::ip::make_address("172.1.1." +
// std::to_string(lid_)));
// beast::IP::Endpoint remote(
// boost::asio::ip::make_address("172.1.1." +
// std::to_string(rid_)));
// PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
// auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
// auto [slot, _] = overlay.peerFinder().new_inbound_slot(local,
// remote); auto const peer = std::make_shared<PeerTest>(
// env.app(),
// slot,
// std::move(request),
// key,
// protocolVersion_,
// consumer,
// std::move(stream_ptr),
// overlay);
// BEAST_EXPECT(
// overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
// overlay.add_active(peer);
// BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
// peers.emplace_back(peer); // overlay stores week ptr to PeerImp
// lid_ += 2;
// rid_ += 2;
// assert(lid_ <= 254);
// }
void
testRelay(
std::string const& test,
bool txRREnabled,
std::uint16_t nPeers,
std::uint16_t nDisabled,
std::uint16_t minPeers,
std::uint16_t relayPercentage,
std::uint16_t expectRelay,
std::uint16_t expectQueue,
std::set<Peer::id_t> const& toSkip = {})
{
testcase(test);
jtx::Env env(*this);
std::vector<std::shared_ptr<PeerTest>> peers;
env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
PeerTest::init();
lid_ = 0;
rid_ = 0;
for (int i = 0; i < nPeers; i++)
addPeer(env, peers, nDisabled);
// void
// testRelay(
// std::string const& test,
// bool txRREnabled,
// std::uint16_t nPeers,
// std::uint16_t nDisabled,
// std::uint16_t minPeers,
// std::uint16_t relayPercentage,
// std::uint16_t expectRelay,
// std::uint16_t expectQueue,
// std::set<Peer::id_t> const& toSkip = {})
// {
// testcase(test);
// jtx::Env env(*this);
// std::vector<std::shared_ptr<PeerTest>> peers;
// env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
// env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
// env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
// PeerTest::init();
// lid_ = 0;
// rid_ = 0;
// for (int i = 0; i < nPeers; i++)
// addPeer(env, peers, nDisabled);
auto const jtx = env.jt(noop(env.master));
if (BEAST_EXPECT(jtx.stx))
{
protocol::TMTransaction m;
Serializer s;
jtx.stx->add(s);
m.set_rawtransaction(s.data(), s.size());
m.set_deferred(false);
m.set_status(protocol::TransactionStatus::tsNEW);
env.app().overlay().relay(uint256{0}, m, toSkip);
BEAST_EXPECT(
PeerTest::sendTx_ == expectRelay &&
PeerTest::queueTx_ == expectQueue);
}
}
// auto const jtx = env.jt(noop(env.master));
// if (BEAST_EXPECT(jtx.stx))
// {
// protocol::TMTransaction m;
// Serializer s;
// jtx.stx->add(s);
// m.set_rawtransaction(s.data(), s.size());
// m.set_deferred(false);
// m.set_status(protocol::TransactionStatus::tsNEW);
// env.app().overlay().relay(uint256{0}, m, toSkip);
// BEAST_EXPECT(
// PeerTest::sendTx_ == expectRelay &&
// PeerTest::queueTx_ == expectQueue);
// }
// }
void
run() override
{
bool log = false;
std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
testConfig(log);
// relay to all peers, no hash queue
testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
// relay to nPeers - skip (10-5=5)
testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip);
// relay to all peers because min is greater than nPeers
testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
// relay to all peers because min + disabled is greater thant nPeers
testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
// relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
// queue the rest (30)
testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
// relay to minPeers + 25% of (nPeers - nPeers) - skip
// (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed
// (60-25-5=30)
testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed)
// (20+10+0.25*(70-20-10)=40), queue the rest (30)
testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
// relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers
// - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
// towards relayed (70-35-5=30))
testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
// - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
// towards relayed (15-5-10=0)
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
// - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
// towards relayed (20-14=6)
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip);
}
};
// void
// run() override
// {
// bool log = false;
// std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
// testConfig(log);
// // relay to all peers, no hash queue
// testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
// // relay to nPeers - skip (10-5=5)
// testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0,
// skip);
// // relay to all peers because min is greater than nPeers
// testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
// // relay to all peers because min + disabled is greater thant nPeers
// testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
// // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
// // queue the rest (30)
// testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
// // relay to minPeers + 25% of (nPeers - nPeers) - skip
// // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards
// relayed
// // (60-25-5=30)
// testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
// disalbed)
// // (20+10+0.25*(70-20-10)=40), queue the rest (30)
// testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
// // relay to minPeers + disabled-not-in-skip + 25% of (nPeers -
// minPeers
// // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
// // towards relayed (70-35-5=30))
// testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
// disabled)
// // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
// // towards relayed (15-5-10=0)
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
// testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0,
// skip);
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
// disabled)
// // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
// // towards relayed (20-14=6)
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
// testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6,
// skip);
// }
// };
BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
} // namespace test
} // namespace ripple
// BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
// } // namespace test
// } // namespace ripple

View File

@@ -23,6 +23,7 @@
#include <xrpld/overlay/detail/ProtocolVersion.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/server/detail/StreamInterface.h>
namespace ripple {
@@ -410,17 +411,22 @@ ConnectAttempt::processResponse()
if (result != PeerFinder::Result::success)
return fail("Outbound " + std::string(to_string(result)));
// Extract peer attributes from the response before creating PeerImp
auto const attributes =
extractPeerAttributes(response_, app_.config(), false);
auto const peer = std::make_shared<PeerImp>(
app_,
std::move(stream_ptr_),
std::make_unique<ProductionStream>(std::move(stream_ptr_)),
read_buf_.data(),
std::move(slot_),
std::move(response_),
usage_,
publicKey,
*negotiatedProtocol,
id_,
overlay_);
attributes,
overlay_,
app_.cluster().member(publicKey).value_or(""));
overlay_.add_active(peer);
}

View File

@@ -0,0 +1,234 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/InboundHandshake.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <boost/beast/core/ostream.hpp>
namespace ripple {
InboundHandshake::InboundHandshake(
Application& app,
std::uint32_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocolVersion,
Resource::Consumer consumer,
std::unique_ptr<StreamInterface>&& stream_ptr,
PeerAttributes const& attributes,
endpoint_type const& remoteEndpoint,
OverlayImpl& overlay)
: Child(overlay)
, app_(app)
, id_(id)
, sink_(app_.logs()["Peer"], OverlayImpl::makePrefix(id))
, journal_(sink_)
, stream_ptr_(std::move(stream_ptr))
, request_(std::move(request))
, publicKey_(publicKey)
, protocolVersion_(protocolVersion)
, consumer_(consumer)
, attributes_(attributes)
, slot_(slot)
, remoteEndpoint_(remoteEndpoint)
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
{
}
InboundHandshake::~InboundHandshake()
{
if (slot_ != nullptr)
overlay_.peerFinder().on_closed(slot_);
}
void
InboundHandshake::stop()
{
if (!strand_.running_in_this_thread())
return boost::asio::post(
strand_, std::bind(&InboundHandshake::stop, shared_from_this()));
shutdown();
}
void
InboundHandshake::shutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::shutdown : strand in this thread");
if (!stream_ptr_->is_open() || shutdown_)
return;
shutdown_ = true;
stream_ptr_->cancel();
tryAsyncShutdown();
}
void
InboundHandshake::tryAsyncShutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::tryAsyncShutdown : strand in this thread");
if (!stream_ptr_->is_open())
return;
if (shutdown_ || shutdownStarted_)
return;
if (ioPending_)
return;
shutdownStarted_ = true;
return stream_ptr_->async_shutdown(boost::asio::bind_executor(
strand_,
std::bind(
&InboundHandshake::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
void
InboundHandshake::onShutdown(error_code ec)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::onShutdown : strand in this thread");
if (!stream_ptr_->is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
{
JLOG(journal_.warn()) << "onShutdown: " << ec.message();
}
stream_ptr_->close();
}
void
InboundHandshake::run()
{
if (!strand_.running_in_this_thread())
return boost::asio::post(
strand_, std::bind(&InboundHandshake::run, shared_from_this()));
// TODO: implement fail overload to handle strings
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
if (!sharedValue)
return fail("makeSharedValue", boost::system::error_code{});
// Create the handshake response
auto const response = makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remoteEndpoint_.address(),
*sharedValue,
overlay_.setup().networkID,
protocolVersion_,
app_);
// Convert response to buffer for async_write
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
boost::beast::ostream(*write_buffer) << response;
ioPending_ = true;
// Write the response asynchronously
stream_ptr_->async_write(
write_buffer->data(),
boost::asio::bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
onHandshake(ec, bytes_transferred);
}));
}
void
InboundHandshake::onHandshake(error_code ec, std::size_t bytes_transferred)
{
ioPending_ = false;
if (!stream_ptr_->is_open())
return;
if (ec == boost::asio::error::operation_aborted || shutdown_)
return tryAsyncShutdown();
if (ec)
return fail("onHandshake", ec);
JLOG(journal_.debug()) << "InboundHandshake completed for "
<< remoteEndpoint_
<< ", bytes transferred: " << bytes_transferred;
// Handshake successful, create the peer
createPeer();
}
void
InboundHandshake::createPeer()
{
auto const peer = std::make_shared<PeerImp>(
app_,
overlay_,
std::move(slot_),
std::move(stream_ptr_),
consumer_,
protocolVersion_,
attributes_,
publicKey_,
id_,
app_.cluster().member(publicKey_).value_or(""));
// Add the peer to the overlay
overlay_.add_active(peer);
JLOG(journal_.debug()) << "Created peer for " << remoteEndpoint_;
}
void
InboundHandshake::fail(std::string const& name, error_code ec)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::fail : strand in this thread");
JLOG(journal_.warn()) << name << " from "
<< toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remoteEndpoint_.address().to_string()
<< ": " << ec.message();
shutdown();
}
} // namespace ripple

View File

@@ -0,0 +1,109 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
#define RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpl/server/Handoff.h>
namespace ripple {
/** Manages an inbound peer handshake. */
class InboundHandshake : public OverlayImpl::Child,
public std::enable_shared_from_this<InboundHandshake>
{
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
private:
Application& app_;
std::uint32_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
std::unique_ptr<StreamInterface> stream_ptr_;
http_request_type request_;
PublicKey publicKey_;
ProtocolVersion protocolVersion_;
Resource::Consumer consumer_;
PeerAttributes attributes_;
std::shared_ptr<PeerFinder::Slot> slot_;
endpoint_type remoteEndpoint_;
boost::asio::strand<boost::asio::executor> strand_;
bool shutdown_ = false;
bool ioPending_ = false;
bool shutdownStarted_ = false;
public:
InboundHandshake(
Application& app,
std::uint32_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& public_key,
ProtocolVersion protocol_version,
Resource::Consumer consumer,
std::unique_ptr<StreamInterface>&& stream_ptr,
PeerAttributes const& attributes,
endpoint_type const& remote_endpoint,
OverlayImpl& overlay);
~InboundHandshake();
void
stop() override;
void
run();
private:
void
setTimer();
void
onTimer(error_code ec);
void
cancelTimer();
void
shutdown();
void
tryAsyncShutdown();
void
onShutdown(error_code ec);
void
onHandshake(error_code ec, std::size_t bytes_transferred);
void
createPeer();
void
fail(std::string const& name, error_code ec);
};
} // namespace ripple
#endif

View File

@@ -25,6 +25,7 @@
#include <xrpld/app/rdb/Wallet.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/ConnectAttempt.h>
#include <xrpld/overlay/detail/InboundHandshake.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/TrafficCount.h>
#include <xrpld/overlay/detail/Tuning.h>
@@ -39,6 +40,7 @@
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/server/SimpleWriter.h>
#include <xrpl/server/detail/StreamInterface.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/executor_work_guard.hpp>
@@ -163,7 +165,7 @@ OverlayImpl::OverlayImpl(
Handoff
OverlayImpl::onHandoff(
std::unique_ptr<stream_type>&& stream_ptr,
std::unique_ptr<stream_type>&& ssl_stream_ptr,
http_request_type&& request,
endpoint_type remote_endpoint)
{
@@ -172,9 +174,7 @@ OverlayImpl::onHandoff(
beast::Journal journal(sink);
Handoff handoff;
if (processRequest(request, handoff))
return handoff;
if (!isPeerUpgrade(request))
if (processRequest(request, handoff) || !isPeerUpgrade(request))
return handoff;
handoff.moved = true;
@@ -183,7 +183,7 @@ OverlayImpl::onHandoff(
error_code ec;
auto const local_endpoint(
stream_ptr->next_layer().socket().local_endpoint(ec));
ssl_stream_ptr->next_layer().socket().local_endpoint(ec));
if (ec)
{
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
@@ -195,16 +195,27 @@ OverlayImpl::onHandoff(
if (consumer.disconnect(journal))
return handoff;
auto const [slot, result] = m_peerFinder->new_inbound_slot(
beast::IPAddressConversion::from_asio(local_endpoint),
beast::IPAddressConversion::from_asio(remote_endpoint));
if (slot == nullptr)
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
if (!negotiatedVersion)
{
// connection refused either IP limit exceeded or self-connect
handoff.moved = false;
JLOG(journal.debug())
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
handoff.response = makeErrorResponse(
request,
remote_endpoint.address(),
"Unable to agree on a protocol version");
handoff.keep_alive = false;
return handoff;
}
auto stream_ptr =
std::make_unique<ProductionStream>(std::move(ssl_stream_ptr));
auto const sharedValue = stream_ptr->makeSharedValue(journal);
if (!sharedValue)
{
handoff.moved = false;
handoff.response = makeErrorResponse(
request, remote_endpoint.address(), "Incorrect security cookie");
handoff.keep_alive = false;
return handoff;
}
@@ -217,38 +228,23 @@ OverlayImpl::onHandoff(
}) == types.end())
{
handoff.moved = false;
handoff.response =
makeRedirectResponse(slot, request, remote_endpoint.address());
handoff.response = makeErrorResponse(
request, remote_endpoint.address(), "Invalid Peer Type");
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
return handoff;
}
}
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
if (!negotiatedVersion)
{
m_peerFinder->on_closed(slot);
handoff.moved = false;
handoff.response = makeErrorResponse(
slot,
request,
remote_endpoint.address(),
"Unable to agree on a protocol version");
handoff.keep_alive = false;
return handoff;
}
auto const [slot, result] = m_peerFinder->new_inbound_slot(
beast::IPAddressConversion::from_asio(local_endpoint),
beast::IPAddressConversion::from_asio(remote_endpoint));
auto const sharedValue = makeSharedValue(*stream_ptr, journal);
if (!sharedValue)
if (slot == nullptr)
{
m_peerFinder->on_closed(slot);
// connection refused either IP limit exceeded or self-connect
handoff.moved = false;
handoff.response = makeErrorResponse(
slot,
request,
remote_endpoint.address(),
"Incorrect security cookie");
handoff.keep_alive = false;
JLOG(journal.debug())
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
return handoff;
}
@@ -266,10 +262,12 @@ OverlayImpl::onHandoff(
// The node gets a reserved slot if it is in our cluster
// or if it has a reservation.
bool const reserved =
static_cast<bool>(app_.cluster().member(publicKey)) ||
app_.cluster().member(publicKey).has_value() ||
app_.peerReservations().contains(publicKey);
auto const result =
m_peerFinder->activate(slot, publicKey, reserved);
if (result != PeerFinder::Result::success)
{
m_peerFinder->on_closed(slot);
@@ -283,7 +281,11 @@ OverlayImpl::onHandoff(
}
}
auto const peer = std::make_shared<PeerImp>(
// Extract peer attributes from the request before creating PeerImp
auto const attributes =
extractPeerAttributes(request, app_.config(), true);
auto const p = std::make_shared<InboundHandshake>(
app_,
id,
slot,
@@ -292,23 +294,14 @@ OverlayImpl::onHandoff(
*negotiatedVersion,
consumer,
std::move(stream_ptr),
attributes,
remote_endpoint,
*this);
{
// As we are not on the strand, run() must be called
// while holding the lock, otherwise new I/O can be
// queued after a call to stop().
std::lock_guard<decltype(mutex_)> lock(mutex_);
{
auto const result = m_peers.emplace(peer->slot(), peer);
XRPL_ASSERT(
result.second,
"ripple::OverlayImpl::onHandoff : peer is inserted");
(void)result.second;
}
list_.emplace(peer.get(), peer);
peer->run();
}
std::lock_guard lock(mutex_);
list_.emplace(p.get(), p);
p->run();
handoff.moved = true;
return handoff;
}
@@ -319,8 +312,8 @@ OverlayImpl::onHandoff(
m_peerFinder->on_closed(slot);
handoff.moved = false;
handoff.response = makeErrorResponse(
slot, request, remote_endpoint.address(), e.what());
handoff.response =
makeErrorResponse(request, remote_endpoint.address(), e.what());
handoff.keep_alive = false;
return handoff;
}
@@ -374,7 +367,6 @@ OverlayImpl::makeRedirectResponse(
std::shared_ptr<Writer>
OverlayImpl::makeErrorResponse(
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type const& request,
address_type remote_address,
std::string text)

View File

@@ -465,7 +465,6 @@ private:
std::shared_ptr<Writer>
makeErrorResponse(
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type const& request,
address_type remote_address,
std::string msg);

View File

@@ -29,6 +29,7 @@
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/app/tx/apply.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/perflog/PerfLog.h>
@@ -39,6 +40,7 @@
#include <xrpl/basics/safe_cast.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/digest.h>
#include <xrpl/server/detail/StreamInterface.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/beast/core/ostream.hpp>
@@ -48,6 +50,7 @@
#include <mutex>
#include <numeric>
#include <sstream>
#include <string>
using namespace std::chrono_literals;
@@ -61,19 +64,102 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
std::chrono::seconds constexpr peerTimerInterval{60};
} // namespace
PeerAttributes
extractPeerAttributes(
boost::beast::http::fields const& headers,
Config const& config,
bool inbound)
{
PeerAttributes attributes;
// Extract feature flags
attributes.compressionEnabled =
peerFeatureEnabled(headers, FEATURE_COMPR, "lz4", config.COMPRESSION);
attributes.txReduceRelayEnabled = peerFeatureEnabled(
headers, FEATURE_TXRR, config.TX_REDUCE_RELAY_ENABLE);
attributes.ledgerReplayEnabled = peerFeatureEnabled(
headers, FEATURE_LEDGER_REPLAY, config.LEDGER_REPLAY);
attributes.vpReduceRelayEnabled = peerFeatureEnabled(
headers, FEATURE_VPRR, config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
// Extract connection information
if (auto const iter = headers.find("Crawl"); iter != headers.end())
attributes.crawlEnabled = boost::iequals(iter->value(), "public");
if (inbound)
{
if (auto const iter = headers.find("User-Agent"); iter != headers.end())
attributes.userAgent = std::string{iter->value()};
}
else
{
if (auto const iter = headers.find("Server"); iter != headers.end())
attributes.serverInfo = std::string{iter->value()};
}
if (auto const iter = headers.find("Network-ID"); iter != headers.end())
attributes.networkId = std::string{iter->value()};
if (auto const iter = headers.find("Server-Domain"); iter != headers.end())
attributes.serverDomain = std::string{iter->value()};
// Extract ledger information
auto parseLedgerHash =
[](std::string_view value) -> std::optional<uint256> {
if (uint256 ret; ret.parseHex(value))
return ret;
if (auto const s = base64_decode(value); s.size() == uint256::size())
return uint256{s};
return std::nullopt;
};
bool hasClosedLedger = false;
bool hasPreviousLedger = false;
attributes.hasValidLedgerHashes = true;
if (auto const iter = headers.find("Closed-Ledger"); iter != headers.end())
{
hasClosedLedger = true;
attributes.closedLedgerHash = parseLedgerHash(iter->value());
if (!attributes.closedLedgerHash)
attributes.hasValidLedgerHashes = false;
}
if (auto const iter = headers.find("Previous-Ledger");
iter != headers.end())
{
hasPreviousLedger = true;
attributes.previousLedgerHash = parseLedgerHash(iter->value());
if (!attributes.previousLedgerHash)
attributes.hasValidLedgerHashes = false;
}
// Validate ledger hash consistency
if (hasPreviousLedger && !hasClosedLedger)
attributes.hasValidLedgerHashes = false;
return attributes;
}
// TODO: Remove this exclusion once unit tests are added after the hotfix
// release.
PeerImp::PeerImp(
Application& app,
id_t id,
OverlayImpl& overlay,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
std::unique_ptr<StreamInterface>&& stream_ptr,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay)
ProtocolVersion protocol,
PeerAttributes const& attributes,
PublicKey const& publicKey,
id_t id,
std::string const& name)
: Child(overlay)
, app_(app)
, id_(id)
@@ -82,10 +168,8 @@ PeerImp::PeerImp(
, journal_(sink_)
, p_journal_(p_sink_)
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
, strand_(boost::asio::make_strand(socket_.get_executor()))
, timer_(waitable_timer{socket_.get_executor()})
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
, timer_(waitable_timer{stream_ptr_->get_executor()})
, remote_address_(slot->remote_endpoint())
, overlay_(overlay)
, inbound_(true)
@@ -93,41 +177,23 @@ PeerImp::PeerImp(
, tracking_(Tracking::unknown)
, trackingTime_(clock_type::now())
, publicKey_(publicKey)
, name_(name)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(consumer)
, fee_{Resource::feeTrivialPeer, ""}
, slot_(slot)
, request_(std::move(request))
, headers_(request_)
, compressionEnabled_(
peerFeatureEnabled(
headers_,
FEATURE_COMPR,
"lz4",
app_.config().COMPRESSION)
? Compressed::On
: Compressed::Off)
, txReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, attributes_(attributes)
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
JLOG(journal_.info()) << "compression enabled "
<< attributes_.compressionEnabled
<< " vp reduce-relay base squelch enabled "
<< attributes_.vpReduceRelayEnabled
<< " tx reduce-relay enabled "
<< attributes_.txReduceRelayEnabled << " on "
<< remote_address_ << " " << id_;
}
PeerImp::~PeerImp()
@@ -158,47 +224,16 @@ PeerImp::run()
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
auto parseLedgerHash =
[](std::string_view value) -> std::optional<uint256> {
if (uint256 ret; ret.parseHex(value))
return ret;
if (auto const s = base64_decode(value); s.size() == uint256::size())
return uint256{s};
return std::nullopt;
};
std::optional<uint256> closed;
std::optional<uint256> previous;
if (auto const iter = headers_.find("Closed-Ledger");
iter != headers_.end())
{
closed = parseLedgerHash(iter->value());
if (!closed)
fail("Malformed handshake data (1)");
}
if (auto const iter = headers_.find("Previous-Ledger");
iter != headers_.end())
{
previous = parseLedgerHash(iter->value());
if (!previous)
fail("Malformed handshake data (2)");
}
if (previous && !closed)
fail("Malformed handshake data (3)");
// Validate ledger hash consistency
if (!attributes_.hasValidLedgerHashes)
fail("Malformed handshake data");
{
std::lock_guard<std::mutex> sl(recentLock_);
if (closed)
closedLedgerHash_ = *closed;
if (previous)
previousLedgerHash_ = *previous;
if (attributes_.closedLedgerHash)
closedLedgerHash_ = *attributes_.closedLedgerHash;
if (attributes_.previousLedgerHash)
previousLedgerHash_ = *attributes_.previousLedgerHash;
}
if (inbound_)
@@ -215,7 +250,7 @@ PeerImp::stop()
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
if (socket_.is_open())
if (socketOpen())
{
// The rationale for using different severity levels is that
// outbound connections are under our control and may be logged
@@ -251,19 +286,28 @@ PeerImp::send(std::shared_ptr<Message> const& m)
{
overlay_.reportOutboundTraffic(
TrafficCount::category::squelch_suppressed,
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
static_cast<int>(
m->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)
.size()));
return;
}
// report categorized outgoing traffic
overlay_.reportOutboundTraffic(
safe_cast<TrafficCount::category>(m->getCategory()),
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
static_cast<int>(
m->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)
.size()));
// report total outgoing traffic
overlay_.reportOutboundTraffic(
TrafficCount::category::total,
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
static_cast<int>(
m->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)
.size()));
auto sendq_size = send_queue_.size();
@@ -287,17 +331,24 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (sendq_size != 0)
return;
boost::asio::async_write(
stream_,
boost::asio::buffer(
send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor(
strand_,
std::bind(
&PeerImp::onWriteMessage,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
// Capture shared_ptr to ensure object lifetime
auto self = shared_from_this();
stream_ptr_->async_write(
boost::asio::buffer(send_queue_.front()->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)),
[self](boost::beast::error_code ec, std::size_t bytes) {
// Post completion to the strand to ensure thread safety
boost::asio::post(self->strand_, [self, ec, bytes]() {
self->onWriteMessage(ec, bytes);
});
});
}
bool
PeerImp::socketOpen() const
{
return stream_ptr_->is_open();
}
void
@@ -365,24 +416,21 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
bool
PeerImp::crawl() const
{
auto const iter = headers_.find("Crawl");
if (iter == headers_.end())
return false;
return boost::iequals(iter->value(), "public");
return attributes_.crawlEnabled;
}
bool
PeerImp::cluster() const
{
return static_cast<bool>(app_.cluster().member(publicKey_));
return app_.cluster().member(publicKey_).has_value();
}
std::string
PeerImp::getVersion() const
{
if (inbound_)
return headers_["User-Agent"];
return headers_["Server"];
return attributes_.userAgent.value_or("");
return attributes_.serverInfo.value_or("");
}
Json::Value
@@ -408,8 +456,8 @@ PeerImp::json()
if (auto const d = domain(); !d.empty())
ret[jss::server_domain] = std::string{d};
if (auto const nid = headers_["Network-ID"]; !nid.empty())
ret[jss::network_id] = std::string{nid};
if (attributes_.networkId.has_value())
ret[jss::network_id] = *attributes_.networkId;
ret[jss::load] = usage_.balance();
@@ -513,7 +561,7 @@ PeerImp::supportsFeature(ProtocolFeature f) const
case ProtocolFeature::ValidatorList2Propagation:
return protocol_ >= make_protocol(2, 2);
case ProtocolFeature::LedgerReplay:
return ledgerReplayEnabled_;
return attributes_.ledgerReplayEnabled;
}
return false;
}
@@ -578,13 +626,13 @@ PeerImp::close()
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::close : strand in this thread");
if (socket_.is_open())
if (socketOpen())
{
detaching_ = true; // DEPRECATED
try
{
timer_.cancel();
socket_.close();
stream_ptr_->close();
}
catch (boost::system::system_error const&)
{
@@ -613,7 +661,7 @@ PeerImp::fail(std::string const& reason)
(void(Peer::*)(std::string const&)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
if (journal_.active(beast::severities::kWarning) && socketOpen())
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
@@ -628,7 +676,7 @@ PeerImp::fail(std::string const& name, error_code ec)
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::fail : strand in this thread");
if (socket_.is_open())
if (socketOpen())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
@@ -644,7 +692,7 @@ PeerImp::gracefulClose()
strand_.running_in_this_thread(),
"ripple::PeerImp::gracefulClose : strand in this thread");
XRPL_ASSERT(
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
socketOpen(), "ripple::PeerImp::gracefulClose : socket is open");
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
@@ -652,7 +700,7 @@ PeerImp::gracefulClose()
if (send_queue_.size() > 0)
return;
setTimer();
stream_.async_shutdown(bind_executor(
stream_ptr_->async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
@@ -703,7 +751,7 @@ PeerImp::makePrefix(id_t id)
void
PeerImp::onTimer(error_code const& ec)
{
if (!socket_.is_open())
if (!socketOpen())
return;
if (ec == boost::asio::error::operation_aborted)
@@ -784,78 +832,28 @@ PeerImp::doAccept()
read_buffer_.size() == 0,
"ripple::PeerImp::doAccept : empty read buffer");
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);
if (auto member = app_.cluster().member(publicKey_))
{
{
std::unique_lock lock{nameMutex_};
name_ = *member;
}
JLOG(journal_.info()) << "Cluster name: " << *member;
JLOG(journal_.info()) << "Cluster name: " << name_;
}
overlay_.activate(shared_from_this());
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection
// type.)
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
boost::beast::ostream(*write_buffer) << makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remote_address_.address(),
*sharedValue,
overlay_.setup().networkID,
protocol_,
app_);
// Write the whole buffer and only start protocol when that's done.
boost::asio::async_write(
stream_,
write_buffer->data(),
boost::asio::transfer_all(),
bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return doProtocolStart();
return fail("Failed to write header");
}));
doProtocolStart();
}
std::string
PeerImp::name() const
{
std::shared_lock read_lock{nameMutex_};
return name_;
}
}
std::string
PeerImp::domain() const
{
return headers_["Server-Domain"];
return attributes_.serverDomain.value_or("");
}
//------------------------------------------------------------------------------
@@ -903,7 +901,11 @@ PeerImp::doProtocolStart()
void
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onReadMessage : strand in this thread");
if (!socketOpen())
return;
if (ec == boost::asio::error::operation_aborted)
return;
@@ -943,7 +945,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
if (ec)
return fail("onReadMessage", ec);
if (!socket_.is_open())
if (!socketOpen())
return;
if (gracefulClose_)
return;
@@ -952,22 +954,27 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
read_buffer_.consume(bytes_consumed);
}
auto self = shared_from_this();
// Timeout on writes only
stream_.async_read_some(
stream_ptr_->async_read_some(
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
bind_executor(
strand_,
std::bind(
&PeerImp::onReadMessage,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
[self](boost::beast::error_code ec, std::size_t bytes) {
// Post completion to the strand to ensure thread safety
boost::asio::post(self->strand_, [self, ec, bytes]() {
self->onReadMessage(ec, bytes);
});
});
}
void
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onWriteMessage : strand in this thread");
if (!socketOpen())
return;
if (ec == boost::asio::error::operation_aborted)
return;
@@ -980,7 +987,6 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
else
stream << "onWriteMessage";
}
metrics_.sent.add_message(bytes_transferred);
XRPL_ASSERT(
@@ -990,27 +996,31 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
if (!send_queue_.empty())
{
// Timeout on writes only
return boost::asio::async_write(
stream_,
boost::asio::buffer(
send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor(
strand_,
std::bind(
&PeerImp::onWriteMessage,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
// Capture shared_ptr to ensure object lifetime
auto self = shared_from_this();
return stream_ptr_->async_write(
boost::asio::buffer(send_queue_.front()->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)),
[self](boost::beast::error_code ec, std::size_t bytes) {
// Post completion to the strand to ensure thread safety
boost::asio::post(self->strand_, [self, ec, bytes]() {
self->onWriteMessage(ec, bytes);
});
});
}
if (gracefulClose_)
{
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,
shared_from_this(),
std::placeholders::_1)));
// Capture shared_ptr to ensure object lifetime
auto self = shared_from_this();
return stream_ptr_->async_shutdown([self](boost::beast::error_code ec) {
// Post completion to the strand to ensure thread safety
boost::asio::post(
self->strand_, [self, ec]() { self->onShutdown(ec); });
});
}
}
@@ -1298,8 +1308,8 @@ PeerImp::handleTransaction(
auto stx = std::make_shared<STTx const>(sit);
uint256 txID = stx->getTransactionID();
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
// Charge strongly for attempting to relay a txn with
// tfInnerBatchTxn LCOV_EXCL_START
if (stx->isFlag(tfInnerBatchTxn) &&
getCurrentTransactionRules()->enabled(featureBatch))
{
@@ -1322,8 +1332,9 @@ PeerImp::handleTransaction(
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}
// Erase only if the server has seen this tx. If the server has not
// seen this tx then the tx could not has been queued for this peer.
// Erase only if the server has seen this tx. If the server has
// not seen this tx then the tx could not has been queued for
// this peer.
else if (eraseTxQueue && txReduceRelayEnabled())
removeTxQueue(txID);
@@ -1485,7 +1496,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
{
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!ledgerReplayEnabled_)
if (!attributes_.ledgerReplayEnabled)
{
fee_.update(
Resource::feeMalformedRequest, "proof_path_request disabled");
@@ -1523,7 +1534,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{
if (!ledgerReplayEnabled_)
if (!attributes_.ledgerReplayEnabled)
{
fee_.update(
Resource::feeMalformedRequest, "proof_path_response disabled");
@@ -1540,7 +1551,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
{
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!ledgerReplayEnabled_)
if (!attributes_.ledgerReplayEnabled)
{
fee_.update(
Resource::feeMalformedRequest, "replay_delta_request disabled");
@@ -1578,7 +1589,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{
if (!ledgerReplayEnabled_)
if (!attributes_.ledgerReplayEnabled)
{
fee_.update(
Resource::feeMalformedRequest, "replay_delta_response disabled");
@@ -1710,14 +1721,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
}
// RH TODO: when isTrusted = false we should probably also cache a key
// suppression for 30 seconds to avoid doing a relatively expensive lookup
// every time a spam packet is received
// suppression for 30 seconds to avoid doing a relatively expensive
// lookup every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped then
// this happens here I.e. before further wasting CPU verifying the signature
// of an untrusted key
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
@@ -1746,8 +1757,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
@@ -1828,8 +1840,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
{
bool outOfSync{false};
{
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
// Operations on closedLedgerHash_ and previousLedgerHash_ must
// be guarded by recentLock_.
std::lock_guard sl(recentLock_);
if (!closedLedgerHash_.isZero())
{
@@ -1851,8 +1863,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
{
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
// Operations on closedLedgerHash_ and previousLedgerHash_ must
// be guarded by recentLock_.
std::lock_guard sl(recentLock_);
if (peerChangedLedgers)
{
@@ -2053,7 +2065,8 @@ PeerImp::onValidatorListMessage(
std::vector<ValidatorBlobInfo> const& blobs)
{
// If there are no blobs, the message is malformed (possibly because of
// ValidatorList class rules), so charge accordingly and skip processing.
// ValidatorList class rules), so charge accordingly and skip
// processing.
if (blobs.empty())
{
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
@@ -2110,7 +2123,8 @@ PeerImp::onValidatorListMessage(
XRPL_ASSERT(
applyResult.publisherKey,
"ripple::PeerImp::onValidatorListMessage : publisher key is "
"ripple::PeerImp::onValidatorListMessage : publisher key "
"is "
"set");
auto const& pubKey = *applyResult.publisherKey;
#ifndef NDEBUG
@@ -2119,7 +2133,8 @@ PeerImp::onValidatorListMessage(
{
XRPL_ASSERT(
iter->second < applyResult.sequence,
"ripple::PeerImp::onValidatorListMessage : lower sequence");
"ripple::PeerImp::onValidatorListMessage : lower "
"sequence");
}
#endif
publisherListSequences_[pubKey] = applyResult.sequence;
@@ -2132,12 +2147,14 @@ PeerImp::onValidatorListMessage(
std::lock_guard<std::mutex> sl(recentLock_);
XRPL_ASSERT(
applyResult.sequence && applyResult.publisherKey,
"ripple::PeerImp::onValidatorListMessage : nonzero sequence "
"ripple::PeerImp::onValidatorListMessage : nonzero "
"sequence "
"and set publisher key");
XRPL_ASSERT(
publisherListSequences_[*applyResult.publisherKey] <=
applyResult.sequence,
"ripple::PeerImp::onValidatorListMessage : maximum sequence");
"ripple::PeerImp::onValidatorListMessage : maximum "
"sequence");
}
#endif // !NDEBUG
@@ -2149,7 +2166,8 @@ PeerImp::onValidatorListMessage(
break;
default:
UNREACHABLE(
"ripple::PeerImp::onValidatorListMessage : invalid best list "
"ripple::PeerImp::onValidatorListMessage : invalid best "
"list "
"disposition");
}
@@ -2193,7 +2211,8 @@ PeerImp::onValidatorListMessage(
break;
default:
UNREACHABLE(
"ripple::PeerImp::onValidatorListMessage : invalid worst list "
"ripple::PeerImp::onValidatorListMessage : invalid worst "
"list "
"disposition");
}
@@ -2253,7 +2272,8 @@ PeerImp::onValidatorListMessage(
break;
default:
UNREACHABLE(
"ripple::PeerImp::onValidatorListMessage : invalid list "
"ripple::PeerImp::onValidatorListMessage : invalid "
"list "
"disposition");
}
}
@@ -2297,7 +2317,8 @@ PeerImp::onMessage(
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
{
JLOG(p_journal_.debug())
<< "ValidatorListCollection: received validator list from peer "
<< "ValidatorListCollection: received validator list from "
"peer "
<< "using protocol version " << to_string(protocol_)
<< " which shouldn't support this feature.";
fee_.update(Resource::feeUselessData, "unsupported peer");
@@ -2306,7 +2327,8 @@ PeerImp::onMessage(
else if (m->version() < 2)
{
JLOG(p_journal_.debug())
<< "ValidatorListCollection: received invalid validator list "
<< "ValidatorListCollection: received invalid validator "
"list "
"version "
<< m->version() << " from peer using protocol version "
<< to_string(protocol_);
@@ -2366,9 +2388,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
// RH TODO: when isTrusted = false we should probably also cache a key
// suppression for 30 seconds to avoid doing a relatively expensive
// lookup every time a spam packet is received
// RH TODO: when isTrusted = false we should probably also cache a
// key suppression for 30 seconds to avoid doing a relatively
// expensive lookup every time a spam packet is received
auto const isTrusted =
app_.validators().trusted(val->getSignerPublic());
@@ -2393,9 +2415,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
if (!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
// Count unique messages (Slots has it's own 'HashRouter'),
// which a peer receives within IDLED seconds since the message
// has been relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
@@ -2888,8 +2910,8 @@ PeerImp::checkTransaction(
if (isPseudoTx(*stx))
{
// Don't do anything with pseudo transactions except put them in the
// TransactionMaster cache
// Don't do anything with pseudo transactions except put them in
// the TransactionMaster cache
std::string reason;
auto tx = std::make_shared<Transaction>(stx, reason, app_);
XRPL_ASSERT(
@@ -3045,16 +3067,17 @@ PeerImp::checkValidation(
return;
}
// FIXME it should be safe to remove this try/catch. Investigate codepaths.
// FIXME it should be safe to remove this try/catch. Investigate
// codepaths.
try
{
if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
cluster())
{
// haveMessage contains peers, which are suppressed; i.e. the peers
// are the source of the message, consequently the message should
// not be relayed to these peers. But the message must be counted
// as part of the squelch logic.
// haveMessage contains peers, which are suppressed; i.e. the
// peers are the source of the message, consequently the message
// should not be relayed to these peers. But the message must be
// counted as part of the squelch logic.
auto haveMessage =
overlay_.relay(*packet, key, val->getSignerPublic());
if (!haveMessage.empty())

View File

@@ -35,6 +35,7 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/server/detail/StreamInterface.h>
#include <boost/circular_buffer.hpp>
#include <boost/endian/conversion.hpp>
@@ -49,6 +50,37 @@ namespace ripple {
struct ValidatorBlobInfo;
class SHAMap;
/** Attributes extracted from peer HTTP headers */
struct PeerAttributes
{
// Feature flags
bool compressionEnabled = false;
bool txReduceRelayEnabled = false;
bool ledgerReplayEnabled = false;
bool vpReduceRelayEnabled = false;
// Connection information
bool crawlEnabled = false;
std::optional<std::string> userAgent;
std::optional<std::string> serverInfo;
std::optional<std::string> networkId;
std::optional<std::string> serverDomain;
// Ledger information
std::optional<uint256> closedLedgerHash;
std::optional<uint256> previousLedgerHash;
// Validation state
bool hasValidLedgerHashes = false;
};
/** Extract peer attributes from HTTP headers and application configuration */
PeerAttributes
extractPeerAttributes(
boost::beast::http::fields const& headers,
Config const& config,
bool inbound);
class PeerImp : public Peer,
public std::enable_shared_from_this<PeerImp>,
public OverlayImpl::Child
@@ -60,7 +92,6 @@ public:
private:
using clock_type = std::chrono::steady_clock;
using error_code = boost::system::error_code;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using address_type = boost::asio::ip::address;
@@ -69,55 +100,6 @@ private:
boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
using Compressed = compression::Compressed;
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::WrappedSink p_sink_;
beast::Journal const journal_;
beast::Journal const p_journal_;
std::unique_ptr<stream_type> stream_ptr_;
socket_type& socket_;
stream_type& stream_;
boost::asio::strand<boost::asio::executor> strand_;
waitable_timer timer_;
// Updated at each stage of the connection process to reflect
// the current conditions as closely as possible.
beast::IP::Endpoint const remote_address_;
// These are up here to prevent warnings about order of initializations
//
OverlayImpl& overlay_;
bool const inbound_;
// Protocol version to use for this link
ProtocolVersion protocol_;
std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_;
bool detaching_ = false;
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
std::shared_mutex mutable nameMutex_;
// The indices of the smallest and largest ledgers this peer has available
//
LedgerIndex minLedger_ = 0;
LedgerIndex maxLedger_ = 0;
uint256 closedLedgerHash_;
uint256 previousLedgerHash_;
boost::circular_buffer<uint256> recentLedgers_{128};
boost::circular_buffer<uint256> recentTxSets_{128};
std::optional<std::chrono::milliseconds> latency_;
std::optional<std::uint32_t> lastPingSeq_;
clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_;
reduce_relay::Squelch<UptimeClock> squelch_;
// Notes on thread locking:
//
// During an audit it was noted that some member variables that looked
@@ -165,37 +147,6 @@ private:
}
};
std::mutex mutable recentLock_;
protocol::TMStatusChange last_status_;
Resource::Consumer usage_;
ChargeWithContext fee_;
std::shared_ptr<PeerFinder::Slot> const slot_;
boost::beast::multi_buffer read_buffer_;
http_request_type request_;
http_response_type response_;
boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
// been sent to or received from this peer.
hash_map<PublicKey, std::size_t> publisherListSequences_;
Compressed compressionEnabled_ = Compressed::Off;
// Queue of transactions' hashes that have not been
// relayed. The hashes are sent once a second to a peer
// and the peer requests missing transactions from the node.
hash_set<uint256> txQueue_;
// true if tx reduce-relay feature is enabled on the peer.
bool txReduceRelayEnabled_ = false;
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
friend class OverlayImpl;
class Metrics
{
public:
@@ -229,6 +180,78 @@ private:
Metrics recv;
} metrics_;
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::WrappedSink p_sink_;
beast::Journal const journal_;
beast::Journal const p_journal_;
std::unique_ptr<StreamInterface> stream_ptr_;
boost::asio::strand<boost::asio::executor> strand_;
waitable_timer timer_;
// Updated at each stage of the connection process to reflect
// the current conditions as closely as possible.
beast::IP::Endpoint const remote_address_;
// These are up here to prevent warnings about order of initializations
//
OverlayImpl& overlay_;
bool const inbound_;
// Protocol version to use for this link
ProtocolVersion protocol_;
std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_;
bool detaching_ = false;
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
// The indices of the smallest and largest ledgers this peer has available
//
LedgerIndex minLedger_ = 0;
LedgerIndex maxLedger_ = 0;
uint256 closedLedgerHash_;
uint256 previousLedgerHash_;
boost::circular_buffer<uint256> recentLedgers_{128};
boost::circular_buffer<uint256> recentTxSets_{128};
std::optional<std::chrono::milliseconds> latency_;
std::optional<std::uint32_t> lastPingSeq_;
clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_;
reduce_relay::Squelch<UptimeClock> squelch_;
std::mutex mutable recentLock_;
protocol::TMStatusChange last_status_;
Resource::Consumer usage_;
ChargeWithContext fee_;
std::shared_ptr<PeerFinder::Slot> const slot_;
boost::beast::multi_buffer read_buffer_;
PeerAttributes const attributes_;
std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
// been sent to or received from this peer.
hash_map<PublicKey, std::size_t> publisherListSequences_;
// Queue of transactions' hashes that have not been
// relayed. The hashes are sent once a second to a peer
// and the peer requests missing transactions from the node.
hash_set<uint256> txQueue_;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
friend class OverlayImpl;
public:
PeerImp(PeerImp const&) = delete;
PeerImp&
@@ -237,29 +260,30 @@ public:
/** Create an active incoming peer from an established ssl connection. */
PeerImp(
Application& app,
id_t id,
OverlayImpl& overlay,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
std::unique_ptr<StreamInterface>&& stream_ptr,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay);
ProtocolVersion protocol,
PeerAttributes const& attributes,
PublicKey const& publicKey,
id_t id,
std::string const& name);
/** Create outgoing, handshaked peer. */
// VFALCO legacyPublicKey should be implied by the Slot
template <class Buffers>
PeerImp(
Application& app,
std::unique_ptr<stream_type>&& stream_ptr,
std::unique_ptr<StreamInterface>&& stream_ptr,
Buffers const& buffers,
std::shared_ptr<PeerFinder::Slot>&& slot,
http_response_type&& response,
Resource::Consumer usage,
Resource::Consumer consumer,
PublicKey const& publicKey,
ProtocolVersion protocol,
id_t id,
OverlayImpl& overlay);
PeerAttributes const& attributes,
OverlayImpl& overlay,
std::string const& name);
virtual ~PeerImp();
@@ -431,13 +455,13 @@ public:
bool
compressionEnabled() const override
{
return compressionEnabled_ == Compressed::On;
return attributes_.compressionEnabled;
}
bool
txReduceRelayEnabled() const override
{
return txReduceRelayEnabled_;
return attributes_.txReduceRelayEnabled;
}
private:
@@ -519,6 +543,9 @@ private:
handleHaveTransactions(
std::shared_ptr<protocol::TMHaveTransactions> const& m);
bool
socketOpen() const;
public:
//--------------------------------------------------------------------------
//
@@ -650,15 +677,16 @@ private:
template <class Buffers>
PeerImp::PeerImp(
Application& app,
std::unique_ptr<stream_type>&& stream_ptr,
std::unique_ptr<StreamInterface>&& stream_ptr,
Buffers const& buffers,
std::shared_ptr<PeerFinder::Slot>&& slot,
http_response_type&& response,
Resource::Consumer usage,
PublicKey const& publicKey,
ProtocolVersion protocol,
id_t id,
OverlayImpl& overlay)
PeerAttributes const& attributes,
OverlayImpl& overlay,
std::string const& name)
: Child(overlay)
, app_(app)
, id_(id)
@@ -667,10 +695,8 @@ PeerImp::PeerImp(
, journal_(sink_)
, p_journal_(p_sink_)
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
, strand_(boost::asio::make_strand(socket_.get_executor()))
, timer_(waitable_timer{socket_.get_executor()})
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
, timer_(waitable_timer{stream_ptr_->get_executor()})
, remote_address_(slot->remote_endpoint())
, overlay_(overlay)
, inbound_(false)
@@ -678,43 +704,25 @@ PeerImp::PeerImp(
, tracking_(Tracking::unknown)
, trackingTime_(clock_type::now())
, publicKey_(publicKey)
, name_(name)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(usage)
, fee_{Resource::feeTrivialPeer}
, slot_(std::move(slot))
, response_(std::move(response))
, headers_(response_)
, compressionEnabled_(
peerFeatureEnabled(
headers_,
FEATURE_COMPR,
"lz4",
app_.config().COMPRESSION)
? Compressed::On
: Compressed::Off)
, txReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, attributes_(attributes)
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
read_buffer_.commit(boost::asio::buffer_copy(
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
JLOG(journal_.info()) << "compression enabled "
<< attributes_.compressionEnabled
<< " vp reduce-relay base squelch enabled "
<< attributes_.vpReduceRelayEnabled
<< " tx reduce-relay enabled "
<< attributes_.txReduceRelayEnabled << " on "
<< remote_address_ << " " << id_;
}
template <class FwdIt, class>

View File

@@ -0,0 +1,144 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/detail/handlers/ProtocolMessageHandler.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/messages.h>
namespace ripple {
// Helper function to check for valid uint256 values in protobuf buffers
static bool
stringIsUint256Sized(std::string const& pBuffStr)
{
return pBuffStr.size() == uint256::size();
}
void
ProtocolMessageHandler::onMessage(
std::shared_ptr<protocol::TMProposeSet> const& m)
{
protocol::TMProposeSet& set = *m;
auto const sig = makeSlice(set.signature());
// Preliminary check for the validity of the signature: A DER encoded
// signature can't be longer than 72 bytes.
if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
(publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_.update(
Resource::feeInvalidSignature,
" signature can't be longer than 72 bytes");
return;
}
if (!stringIsUint256Sized(set.currenttxhash()) ||
!stringIsUint256Sized(set.previousledger()))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_.update(Resource::feeMalformedRequest, "bad hashes");
return;
}
// RH TODO: when isTrusted = false we should probably also cache a key
// suppression for 30 seconds to avoid doing a relatively expensive
// lookup every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
}
uint256 const proposeHash{set.currenttxhash()};
uint256 const prevLedger{set.previousledger()};
NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
uint256 const suppression = proposalUniqueId(
proposeHash,
prevLedger,
set.proposeseq(),
closeTime,
publicKey.slice(),
sig);
if (auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
// report duplicate proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_duplicate,
Message::messageSize(*m));
JLOG(p_journal_.trace()) << "Proposal: duplicate";
return;
}
if (!isTrusted)
{
if (tracking_.load() == Tracking::diverged)
{
JLOG(p_journal_.debug())
<< "Proposal: Dropping untrusted (peer divergence)";
return;
}
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
{
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
return;
}
}
JLOG(p_journal_.trace())
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
auto proposal = RCLCxPeerPos(
publicKey,
sig,
suppression,
RCLCxPeerPos::Proposal{
prevLedger,
set.proposeseq(),
proposeHash,
closeTime,
app_.timeKeeper().closeTime(),
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"recvPropose->checkPropose",
[weak, isTrusted, m, proposal]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
});
}
} // namespace ripple

View File

@@ -0,0 +1,142 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/messages.h>
namespace ripple {
class ProtocolMessageHandler
{
private:
beast::Journal const journal_;
beast::Journal const p_journal_;
OverlayImpl& overlay_;
Application& app_;
public:
void
onMessageUnknown(std::uint16_t type);
void
onMessageBegin(
std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m,
std::size_t size,
std::size_t uncompressed_size,
bool isCompressed);
void
onMessageEnd(
std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m);
void
onMessage(std::shared_ptr<protocol::TMManifests> const& m);
void
onMessage(std::shared_ptr<protocol::TMPing> const& m);
void
onMessage(std::shared_ptr<protocol::TMCluster> const& m);
void
onMessage(std::shared_ptr<protocol::TMEndpoints> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransaction> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetLedger> const& m);
void
onMessage(std::shared_ptr<protocol::TMLedgerData> const& m);
void
onMessage(std::shared_ptr<protocol::TMProposeSet> const& m);
void
onMessage(std::shared_ptr<protocol::TMStatusChange> const& m);
void
onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidatorList> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidatorListCollection> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
void
onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
private:
//--------------------------------------------------------------------------
// lockedRecentLock is passed as a reminder to callers that recentLock_
// must be locked.
void
addLedger(
uint256 const& hash,
std::lock_guard<std::mutex> const& lockedRecentLock);
void
doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
void
onValidatorListMessage(
std::string const& messageType,
std::string const& manifest,
std::uint32_t version,
std::vector<protocol::ValidatorBlobInfo> const& blobs);
/** Process peer's request to send missing transactions. The request is
sent in response to TMHaveTransactions.
@param packet protocol message containing missing transactions' hashes.
*/
void
doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
void
checkTransaction(
HashRouterFlags flags,
bool checkSignature,
std::shared_ptr<STTx const> const& stx,
bool batch);
void
checkPropose(
bool isTrusted,
std::shared_ptr<protocol::TMProposeSet> const& packet,
RCLCxPeerPos peerPos);
void
checkValidation(
std::shared_ptr<STValidation> const& val,
uint256 const& key,
std::shared_ptr<protocol::TMValidation> const& packet);
void
sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
};
} // namespace ripple

View File

@@ -0,0 +1,14 @@
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpl/server/detail/StreamInterface.h>
namespace ripple {
std::optional<base_uint<256>>
ProductionStream::makeSharedValue(beast::Journal journal)
{
// Delegate to the existing Handshake module
return ripple::makeSharedValue(*stream_, journal);
}
} // namespace ripple