mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
decouples ssl_stream from peerImp
This commit is contained in:
154
include/xrpl/server/detail/StreamInterface.h
Normal file
154
include/xrpl/server/detail/StreamInterface.h
Normal file
@@ -0,0 +1,154 @@
|
||||
#ifndef RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||
#define RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/executor.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/beast/core/error.hpp>
|
||||
#include <boost/beast/core/multi_buffer.hpp>
|
||||
#include <boost/beast/core/tcp_stream.hpp>
|
||||
#include <boost/beast/ssl/ssl_stream.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Forward declarations
|
||||
using socket_type = boost::beast::tcp_stream;
|
||||
using concrete_stream_type = boost::beast::ssl_stream<socket_type>;
|
||||
|
||||
/**
|
||||
* @brief Minimal interface for stream operations needed by PeerImp
|
||||
*/
|
||||
class StreamInterface
|
||||
{
|
||||
public:
|
||||
virtual ~StreamInterface() = default;
|
||||
|
||||
// Executor access for ASIO operations
|
||||
virtual boost::asio::any_io_executor
|
||||
get_executor() = 0;
|
||||
|
||||
// Connection status checking
|
||||
virtual bool
|
||||
is_open() const = 0;
|
||||
|
||||
// Stream lifecycle operations
|
||||
virtual void
|
||||
close() = 0;
|
||||
|
||||
// Async I/O operations
|
||||
virtual void
|
||||
async_read_some(
|
||||
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write_some(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write(
|
||||
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_shutdown(std::function<void(boost::beast::error_code)> handler) = 0;
|
||||
|
||||
// SSL handshake support
|
||||
virtual std::optional<base_uint<256>>
|
||||
makeSharedValue(beast::Journal journal) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Production implementation wrapping boost::beast::ssl_stream
|
||||
*/
|
||||
class ProductionStream : public StreamInterface
|
||||
{
|
||||
private:
|
||||
std::unique_ptr<concrete_stream_type> stream_;
|
||||
|
||||
public:
|
||||
explicit ProductionStream(std::unique_ptr<concrete_stream_type> stream)
|
||||
: stream_(std::move(stream))
|
||||
{
|
||||
}
|
||||
|
||||
boost::asio::any_io_executor
|
||||
get_executor() override
|
||||
{
|
||||
return stream_->get_executor();
|
||||
}
|
||||
|
||||
bool
|
||||
is_open() const override
|
||||
{
|
||||
return stream_->next_layer().socket().is_open();
|
||||
}
|
||||
|
||||
void
|
||||
close() override
|
||||
{
|
||||
stream_->lowest_layer().close();
|
||||
}
|
||||
|
||||
void
|
||||
async_read_some(
|
||||
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
stream_->async_read_some(buffers, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write_some(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
stream_->async_write_some(buffer, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
boost::asio::async_write(*stream_, buffer, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write(
|
||||
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
boost::asio::async_write(*stream_, buffers, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_shutdown(
|
||||
std::function<void(boost::beast::error_code)> handler) override
|
||||
{
|
||||
stream_->async_shutdown(std::move(handler));
|
||||
}
|
||||
|
||||
std::optional<base_uint<256>>
|
||||
makeSharedValue(beast::Journal journal) override;
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -1,289 +1,299 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright 2020 Ripple Labs Inc.
|
||||
// //------------------------------------------------------------------------------
|
||||
// /*
|
||||
// This file is part of rippled: https://github.com/ripple/rippled
|
||||
// Copyright 2020 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
// Permission to use, copy, modify, and/or distribute this software for any
|
||||
// purpose with or without fee is hereby granted, provided that the above
|
||||
// copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
// ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
// */
|
||||
// //==============================================================================
|
||||
|
||||
#include <test/jtx.h>
|
||||
#include <test/jtx/Env.h>
|
||||
// #include <test/jtx.h>
|
||||
// #include <test/jtx/Env.h>
|
||||
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/peerfinder/detail/SlotImp.h>
|
||||
// #include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
// #include <xrpld/overlay/detail/PeerImp.h>
|
||||
// #include <xrpld/peerfinder/detail/SlotImp.h>
|
||||
|
||||
#include <xrpl/basics/make_SSLContext.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
// #include <xrpl/basics/make_SSLContext.h>
|
||||
// #include <xrpl/beast/unit_test.h>
|
||||
|
||||
namespace ripple {
|
||||
// namespace ripple {
|
||||
|
||||
namespace test {
|
||||
// namespace test {
|
||||
|
||||
class tx_reduce_relay_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
using socket_type = boost::asio::ip::tcp::socket;
|
||||
using middle_type = boost::beast::tcp_stream;
|
||||
using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
||||
// class tx_reduce_relay_test : public beast::unit_test::suite
|
||||
// {
|
||||
// public:
|
||||
// using socket_type = boost::asio::ip::tcp::socket;
|
||||
// using middle_type = boost::beast::tcp_stream;
|
||||
// using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
// using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
||||
|
||||
private:
|
||||
void
|
||||
doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
||||
{
|
||||
testcase(msg);
|
||||
f(log);
|
||||
}
|
||||
// private:
|
||||
// void
|
||||
// doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
||||
// {
|
||||
// testcase(msg);
|
||||
// f(log);
|
||||
// }
|
||||
|
||||
void
|
||||
testConfig(bool log)
|
||||
{
|
||||
doTest("Config Test", log, [&](bool log) {
|
||||
auto test = [&](bool enable,
|
||||
bool metrics,
|
||||
std::uint16_t min,
|
||||
std::uint16_t pct,
|
||||
bool success = true) {
|
||||
std::stringstream str("[reduce_relay]");
|
||||
str << "[reduce_relay]\n"
|
||||
<< "tx_enable=" << static_cast<int>(enable) << "\n"
|
||||
<< "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
||||
<< "tx_min_peers=" << min << "\n"
|
||||
<< "tx_relay_percentage=" << pct << "\n";
|
||||
Config c;
|
||||
try
|
||||
{
|
||||
c.loadFromString(str.str());
|
||||
// void
|
||||
// testConfig(bool log)
|
||||
// {
|
||||
// doTest("Config Test", log, [&](bool log) {
|
||||
// auto test = [&](bool enable,
|
||||
// bool metrics,
|
||||
// std::uint16_t min,
|
||||
// std::uint16_t pct,
|
||||
// bool success = true) {
|
||||
// std::stringstream str("[reduce_relay]");
|
||||
// str << "[reduce_relay]\n"
|
||||
// << "tx_enable=" << static_cast<int>(enable) << "\n"
|
||||
// << "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
||||
// << "tx_min_peers=" << min << "\n"
|
||||
// << "tx_relay_percentage=" << pct << "\n";
|
||||
// Config c;
|
||||
// try
|
||||
// {
|
||||
// c.loadFromString(str.str());
|
||||
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
||||
BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
||||
if (success)
|
||||
pass();
|
||||
else
|
||||
fail();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (success)
|
||||
fail();
|
||||
else
|
||||
pass();
|
||||
}
|
||||
};
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
||||
// BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
||||
// if (success)
|
||||
// pass();
|
||||
// else
|
||||
// fail();
|
||||
// }
|
||||
// catch (...)
|
||||
// {
|
||||
// if (success)
|
||||
// fail();
|
||||
// else
|
||||
// pass();
|
||||
// }
|
||||
// };
|
||||
|
||||
test(true, true, 20, 25);
|
||||
test(false, false, 20, 25);
|
||||
test(false, false, 20, 0, false);
|
||||
test(false, false, 20, 101, false);
|
||||
test(false, false, 9, 10, false);
|
||||
test(false, false, 10, 9, false);
|
||||
});
|
||||
}
|
||||
// test(true, true, 20, 25);
|
||||
// test(false, false, 20, 25);
|
||||
// test(false, false, 20, 0, false);
|
||||
// test(false, false, 20, 101, false);
|
||||
// test(false, false, 9, 10, false);
|
||||
// test(false, false, 10, 9, false);
|
||||
// });
|
||||
// }
|
||||
|
||||
class PeerTest : public PeerImp
|
||||
{
|
||||
public:
|
||||
PeerTest(
|
||||
Application& app,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
||||
OverlayImpl& overlay)
|
||||
: PeerImp(
|
||||
app,
|
||||
sid_,
|
||||
slot,
|
||||
std::move(request),
|
||||
publicKey,
|
||||
protocol,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
overlay)
|
||||
{
|
||||
sid_++;
|
||||
}
|
||||
~PeerTest() = default;
|
||||
// class PeerTest : public PeerImp
|
||||
// {
|
||||
// public:
|
||||
// PeerTest(
|
||||
// Application& app,
|
||||
// std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
// http_request_type&& request,
|
||||
// PublicKey const& publicKey,
|
||||
// ProtocolVersion protocol,
|
||||
// Resource::Consumer consumer,
|
||||
// std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
||||
// OverlayImpl& overlay)
|
||||
// : PeerImp(
|
||||
// app,
|
||||
// sid_,
|
||||
// slot,
|
||||
// std::move(request),
|
||||
// publicKey,
|
||||
// protocol,
|
||||
// consumer,
|
||||
// std::move(stream_ptr),
|
||||
// overlay)
|
||||
// {
|
||||
// sid_++;
|
||||
// }
|
||||
// ~PeerTest() = default;
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
}
|
||||
void
|
||||
send(std::shared_ptr<Message> const&) override
|
||||
{
|
||||
sendTx_++;
|
||||
}
|
||||
void
|
||||
addTxQueue(uint256 const& hash) override
|
||||
{
|
||||
queueTx_++;
|
||||
}
|
||||
static void
|
||||
init()
|
||||
{
|
||||
queueTx_ = 0;
|
||||
sendTx_ = 0;
|
||||
sid_ = 0;
|
||||
}
|
||||
inline static std::size_t sid_ = 0;
|
||||
inline static std::uint16_t queueTx_ = 0;
|
||||
inline static std::uint16_t sendTx_ = 0;
|
||||
};
|
||||
// void
|
||||
// run() override
|
||||
// {
|
||||
// }
|
||||
// void
|
||||
// send(std::shared_ptr<Message> const&) override
|
||||
// {
|
||||
// sendTx_++;
|
||||
// }
|
||||
// void
|
||||
// addTxQueue(uint256 const& hash) override
|
||||
// {
|
||||
// queueTx_++;
|
||||
// }
|
||||
// static void
|
||||
// init()
|
||||
// {
|
||||
// queueTx_ = 0;
|
||||
// sendTx_ = 0;
|
||||
// sid_ = 0;
|
||||
// }
|
||||
// inline static std::size_t sid_ = 0;
|
||||
// inline static std::uint16_t queueTx_ = 0;
|
||||
// inline static std::uint16_t sendTx_ = 0;
|
||||
// };
|
||||
|
||||
std::uint16_t lid_{0};
|
||||
std::uint16_t rid_{1};
|
||||
shared_context context_;
|
||||
ProtocolVersion protocolVersion_;
|
||||
boost::beast::multi_buffer read_buf_;
|
||||
// std::uint16_t lid_{0};
|
||||
// std::uint16_t rid_{1};
|
||||
// shared_context context_;
|
||||
// ProtocolVersion protocolVersion_;
|
||||
// boost::beast::multi_buffer read_buf_;
|
||||
|
||||
public:
|
||||
tx_reduce_relay_test()
|
||||
: context_(make_SSLContext("")), protocolVersion_{1, 7}
|
||||
{
|
||||
}
|
||||
// public:
|
||||
// tx_reduce_relay_test()
|
||||
// : context_(make_SSLContext("")), protocolVersion_{1, 7}
|
||||
// {
|
||||
// }
|
||||
|
||||
private:
|
||||
void
|
||||
addPeer(
|
||||
jtx::Env& env,
|
||||
std::vector<std::shared_ptr<PeerTest>>& peers,
|
||||
std::uint16_t& nDisabled)
|
||||
{
|
||||
auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
||||
boost::beast::http::request<boost::beast::http::dynamic_body> request;
|
||||
(nDisabled == 0)
|
||||
? (void)request.insert(
|
||||
"X-Protocol-Ctl",
|
||||
makeFeaturesRequestHeader(false, false, true, false))
|
||||
: (void)nDisabled--;
|
||||
auto stream_ptr = std::make_unique<stream_type>(
|
||||
socket_type(std::forward<boost::asio::io_context&>(
|
||||
env.app().getIOContext())),
|
||||
*context_);
|
||||
beast::IP::Endpoint local(
|
||||
boost::asio::ip::make_address("172.1.1." + std::to_string(lid_)));
|
||||
beast::IP::Endpoint remote(
|
||||
boost::asio::ip::make_address("172.1.1." + std::to_string(rid_)));
|
||||
PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
||||
auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
||||
auto [slot, _] = overlay.peerFinder().new_inbound_slot(local, remote);
|
||||
auto const peer = std::make_shared<PeerTest>(
|
||||
env.app(),
|
||||
slot,
|
||||
std::move(request),
|
||||
key,
|
||||
protocolVersion_,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
overlay);
|
||||
BEAST_EXPECT(
|
||||
overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
||||
overlay.add_active(peer);
|
||||
BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
||||
peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
||||
lid_ += 2;
|
||||
rid_ += 2;
|
||||
assert(lid_ <= 254);
|
||||
}
|
||||
// private:
|
||||
// void
|
||||
// addPeer(
|
||||
// jtx::Env& env,
|
||||
// std::vector<std::shared_ptr<PeerTest>>& peers,
|
||||
// std::uint16_t& nDisabled)
|
||||
// {
|
||||
// auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
||||
// boost::beast::http::request<boost::beast::http::dynamic_body>
|
||||
// request; (nDisabled == 0)
|
||||
// ? (void)request.insert(
|
||||
// "X-Protocol-Ctl",
|
||||
// makeFeaturesRequestHeader(false, false, true, false))
|
||||
// : (void)nDisabled--;
|
||||
// auto stream_ptr = std::make_unique<stream_type>(
|
||||
// socket_type(std::forward<boost::asio::io_context&>(
|
||||
// env.app().getIOContext())),
|
||||
// *context_);
|
||||
// beast::IP::Endpoint local(
|
||||
// boost::asio::ip::make_address("172.1.1." +
|
||||
// std::to_string(lid_)));
|
||||
// beast::IP::Endpoint remote(
|
||||
// boost::asio::ip::make_address("172.1.1." +
|
||||
// std::to_string(rid_)));
|
||||
// PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
||||
// auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
||||
// auto [slot, _] = overlay.peerFinder().new_inbound_slot(local,
|
||||
// remote); auto const peer = std::make_shared<PeerTest>(
|
||||
// env.app(),
|
||||
// slot,
|
||||
// std::move(request),
|
||||
// key,
|
||||
// protocolVersion_,
|
||||
// consumer,
|
||||
// std::move(stream_ptr),
|
||||
// overlay);
|
||||
// BEAST_EXPECT(
|
||||
// overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
||||
// overlay.add_active(peer);
|
||||
// BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
||||
// peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
||||
// lid_ += 2;
|
||||
// rid_ += 2;
|
||||
// assert(lid_ <= 254);
|
||||
// }
|
||||
|
||||
void
|
||||
testRelay(
|
||||
std::string const& test,
|
||||
bool txRREnabled,
|
||||
std::uint16_t nPeers,
|
||||
std::uint16_t nDisabled,
|
||||
std::uint16_t minPeers,
|
||||
std::uint16_t relayPercentage,
|
||||
std::uint16_t expectRelay,
|
||||
std::uint16_t expectQueue,
|
||||
std::set<Peer::id_t> const& toSkip = {})
|
||||
{
|
||||
testcase(test);
|
||||
jtx::Env env(*this);
|
||||
std::vector<std::shared_ptr<PeerTest>> peers;
|
||||
env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
||||
env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
||||
env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
||||
PeerTest::init();
|
||||
lid_ = 0;
|
||||
rid_ = 0;
|
||||
for (int i = 0; i < nPeers; i++)
|
||||
addPeer(env, peers, nDisabled);
|
||||
// void
|
||||
// testRelay(
|
||||
// std::string const& test,
|
||||
// bool txRREnabled,
|
||||
// std::uint16_t nPeers,
|
||||
// std::uint16_t nDisabled,
|
||||
// std::uint16_t minPeers,
|
||||
// std::uint16_t relayPercentage,
|
||||
// std::uint16_t expectRelay,
|
||||
// std::uint16_t expectQueue,
|
||||
// std::set<Peer::id_t> const& toSkip = {})
|
||||
// {
|
||||
// testcase(test);
|
||||
// jtx::Env env(*this);
|
||||
// std::vector<std::shared_ptr<PeerTest>> peers;
|
||||
// env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
||||
// env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
||||
// env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
||||
// PeerTest::init();
|
||||
// lid_ = 0;
|
||||
// rid_ = 0;
|
||||
// for (int i = 0; i < nPeers; i++)
|
||||
// addPeer(env, peers, nDisabled);
|
||||
|
||||
auto const jtx = env.jt(noop(env.master));
|
||||
if (BEAST_EXPECT(jtx.stx))
|
||||
{
|
||||
protocol::TMTransaction m;
|
||||
Serializer s;
|
||||
jtx.stx->add(s);
|
||||
m.set_rawtransaction(s.data(), s.size());
|
||||
m.set_deferred(false);
|
||||
m.set_status(protocol::TransactionStatus::tsNEW);
|
||||
env.app().overlay().relay(uint256{0}, m, toSkip);
|
||||
BEAST_EXPECT(
|
||||
PeerTest::sendTx_ == expectRelay &&
|
||||
PeerTest::queueTx_ == expectQueue);
|
||||
}
|
||||
}
|
||||
// auto const jtx = env.jt(noop(env.master));
|
||||
// if (BEAST_EXPECT(jtx.stx))
|
||||
// {
|
||||
// protocol::TMTransaction m;
|
||||
// Serializer s;
|
||||
// jtx.stx->add(s);
|
||||
// m.set_rawtransaction(s.data(), s.size());
|
||||
// m.set_deferred(false);
|
||||
// m.set_status(protocol::TransactionStatus::tsNEW);
|
||||
// env.app().overlay().relay(uint256{0}, m, toSkip);
|
||||
// BEAST_EXPECT(
|
||||
// PeerTest::sendTx_ == expectRelay &&
|
||||
// PeerTest::queueTx_ == expectQueue);
|
||||
// }
|
||||
// }
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
bool log = false;
|
||||
std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
||||
testConfig(log);
|
||||
// relay to all peers, no hash queue
|
||||
testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
||||
// relay to nPeers - skip (10-5=5)
|
||||
testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip);
|
||||
// relay to all peers because min is greater than nPeers
|
||||
testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
||||
// relay to all peers because min + disabled is greater thant nPeers
|
||||
testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
||||
// relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
||||
// queue the rest (30)
|
||||
testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
||||
// relay to minPeers + 25% of (nPeers - nPeers) - skip
|
||||
// (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed
|
||||
// (60-25-5=30)
|
||||
testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed)
|
||||
// (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
||||
testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
||||
// relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers
|
||||
// - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
||||
// towards relayed (70-35-5=30))
|
||||
testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
||||
// - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
||||
// towards relayed (15-5-10=0)
|
||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||
testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
||||
// - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
||||
// towards relayed (20-14=6)
|
||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
||||
testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip);
|
||||
}
|
||||
};
|
||||
// void
|
||||
// run() override
|
||||
// {
|
||||
// bool log = false;
|
||||
// std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
||||
// testConfig(log);
|
||||
// // relay to all peers, no hash queue
|
||||
// testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
||||
// // relay to nPeers - skip (10-5=5)
|
||||
// testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0,
|
||||
// skip);
|
||||
// // relay to all peers because min is greater than nPeers
|
||||
// testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
||||
// // relay to all peers because min + disabled is greater thant nPeers
|
||||
// testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
||||
// // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
||||
// // queue the rest (30)
|
||||
// testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
||||
// // relay to minPeers + 25% of (nPeers - nPeers) - skip
|
||||
// // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards
|
||||
// relayed
|
||||
// // (60-25-5=30)
|
||||
// testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disalbed)
|
||||
// // (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
||||
// testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
||||
// // relay to minPeers + disabled-not-in-skip + 25% of (nPeers -
|
||||
// minPeers
|
||||
// // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
||||
// // towards relayed (70-35-5=30))
|
||||
// testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disabled)
|
||||
// // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
||||
// // towards relayed (15-5-10=0)
|
||||
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||
// testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0,
|
||||
// skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disabled)
|
||||
// // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
||||
// // towards relayed (20-14=6)
|
||||
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
||||
// testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6,
|
||||
// skip);
|
||||
// }
|
||||
// };
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
// BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
||||
// } // namespace test
|
||||
// } // namespace ripple
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include <xrpld/overlay/detail/ProtocolVersion.h>
|
||||
|
||||
#include <xrpl/json/json_reader.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -412,7 +413,7 @@ ConnectAttempt::processResponse()
|
||||
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
app_,
|
||||
std::move(stream_ptr_),
|
||||
std::make_unique<ProductionStream>(std::move(stream_ptr_)),
|
||||
read_buf_.data(),
|
||||
std::move(slot_),
|
||||
std::move(response_),
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/server/SimpleWriter.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/asio/executor_work_guard.hpp>
|
||||
@@ -163,7 +164,7 @@ OverlayImpl::OverlayImpl(
|
||||
|
||||
Handoff
|
||||
OverlayImpl::onHandoff(
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<stream_type>&& ssl_stream_ptr,
|
||||
http_request_type&& request,
|
||||
endpoint_type remote_endpoint)
|
||||
{
|
||||
@@ -183,7 +184,7 @@ OverlayImpl::onHandoff(
|
||||
|
||||
error_code ec;
|
||||
auto const local_endpoint(
|
||||
stream_ptr->next_layer().socket().local_endpoint(ec));
|
||||
ssl_stream_ptr->next_layer().socket().local_endpoint(ec));
|
||||
if (ec)
|
||||
{
|
||||
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
|
||||
@@ -238,7 +239,9 @@ OverlayImpl::onHandoff(
|
||||
return handoff;
|
||||
}
|
||||
|
||||
auto const sharedValue = makeSharedValue(*stream_ptr, journal);
|
||||
auto stream_ptr =
|
||||
std::make_unique<ProductionStream>(std::move(ssl_stream_ptr));
|
||||
auto const sharedValue = stream_ptr->makeSharedValue(journal);
|
||||
if (!sharedValue)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/protocol/TxFlags.h>
|
||||
#include <xrpl/protocol/digest.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/beast/core/ostream.hpp>
|
||||
@@ -72,7 +73,7 @@ PeerImp::PeerImp(
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
OverlayImpl& overlay)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
@@ -285,23 +286,24 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
if (sendq_size != 0)
|
||||
return;
|
||||
|
||||
boost::asio::async_write(
|
||||
*stream_ptr_,
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
stream_ptr_->async_write(
|
||||
boost::asio::buffer(
|
||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onWriteMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
PeerImp::socketOpen() const
|
||||
{
|
||||
return stream_ptr_->next_layer().socket().is_open();
|
||||
return stream_ptr_->is_open();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -588,7 +590,7 @@ PeerImp::close()
|
||||
try
|
||||
{
|
||||
timer_.cancel();
|
||||
stream_ptr_->lowest_layer().close();
|
||||
stream_ptr_->close();
|
||||
}
|
||||
catch (boost::system::system_error const&)
|
||||
{
|
||||
@@ -790,7 +792,7 @@ PeerImp::doAccept()
|
||||
|
||||
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
|
||||
|
||||
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
|
||||
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
|
||||
|
||||
// This shouldn't fail since we already computed
|
||||
// the shared value successfully in OverlayImpl
|
||||
@@ -829,24 +831,24 @@ PeerImp::doAccept()
|
||||
app_);
|
||||
|
||||
// Write the whole buffer and only start protocol when that's done.
|
||||
boost::asio::async_write(
|
||||
*stream_ptr_,
|
||||
stream_ptr_->async_write(
|
||||
write_buffer->data(),
|
||||
boost::asio::transfer_all(),
|
||||
bind_executor(
|
||||
strand_,
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if (ec)
|
||||
return fail("onWriteResponse", ec);
|
||||
if (write_buffer->size() == bytes_transferred)
|
||||
return doProtocolStart();
|
||||
return fail("Failed to write header");
|
||||
}));
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(
|
||||
strand_, [this, write_buffer, self, ec, bytes_transferred]() {
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if (ec)
|
||||
return fail("onWriteResponse", ec);
|
||||
if (write_buffer->size() == bytes_transferred)
|
||||
return doProtocolStart();
|
||||
return fail("Failed to write header");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
std::string
|
||||
@@ -907,6 +909,10 @@ PeerImp::doProtocolStart()
|
||||
void
|
||||
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::onReadMessage : strand in this thread");
|
||||
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
@@ -956,21 +962,26 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
read_buffer_.consume(bytes_consumed);
|
||||
}
|
||||
|
||||
auto self = shared_from_this();
|
||||
|
||||
// Timeout on writes only
|
||||
stream_ptr_->async_read_some(
|
||||
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onReadMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onReadMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::onWriteMessage : strand in this thread");
|
||||
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
@@ -984,7 +995,6 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
else
|
||||
stream << "onWriteMessage";
|
||||
}
|
||||
|
||||
metrics_.sent.add_message(bytes_transferred);
|
||||
|
||||
XRPL_ASSERT(
|
||||
@@ -994,27 +1004,31 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
if (!send_queue_.empty())
|
||||
{
|
||||
// Timeout on writes only
|
||||
return boost::asio::async_write(
|
||||
*stream_ptr_,
|
||||
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
return stream_ptr_->async_write(
|
||||
boost::asio::buffer(
|
||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onWriteMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (gracefulClose_)
|
||||
{
|
||||
return stream_ptr_->async_shutdown(bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onShutdown,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
return stream_ptr_->async_shutdown([self](boost::beast::error_code ec) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(
|
||||
self->strand_, [self, ec]() { self->onShutdown(ec); });
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1302,8 +1316,8 @@ PeerImp::handleTransaction(
|
||||
auto stx = std::make_shared<STTx const>(sit);
|
||||
uint256 txID = stx->getTransactionID();
|
||||
|
||||
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
|
||||
// LCOV_EXCL_START
|
||||
// Charge strongly for attempting to relay a txn with
|
||||
// tfInnerBatchTxn LCOV_EXCL_START
|
||||
if (stx->isFlag(tfInnerBatchTxn) &&
|
||||
getCurrentTransactionRules()->enabled(featureBatch))
|
||||
{
|
||||
@@ -1326,8 +1340,9 @@ PeerImp::handleTransaction(
|
||||
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
|
||||
}
|
||||
|
||||
// Erase only if the server has seen this tx. If the server has not
|
||||
// seen this tx then the tx could not has been queued for this peer.
|
||||
// Erase only if the server has seen this tx. If the server has
|
||||
// not seen this tx then the tx could not has been queued for
|
||||
// this peer.
|
||||
else if (eraseTxQueue && txReduceRelayEnabled())
|
||||
removeTxQueue(txID);
|
||||
|
||||
@@ -1714,14 +1729,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive lookup
|
||||
// every time a spam packet is received
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||
|
||||
// If the operator has specified that untrusted proposals be dropped then
|
||||
// this happens here I.e. before further wasting CPU verifying the signature
|
||||
// of an untrusted key
|
||||
// If the operator has specified that untrusted proposals be dropped
|
||||
// then this happens here I.e. before further wasting CPU verifying the
|
||||
// signature of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// report untrusted proposal messages
|
||||
@@ -1750,8 +1765,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||
!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
|
||||
// receives within IDLED seconds since the message has been relayed.
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||
@@ -1832,8 +1848,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
{
|
||||
bool outOfSync{false};
|
||||
{
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
||||
// guarded by recentLock_.
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||
// be guarded by recentLock_.
|
||||
std::lock_guard sl(recentLock_);
|
||||
if (!closedLedgerHash_.isZero())
|
||||
{
|
||||
@@ -1855,8 +1871,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
|
||||
|
||||
{
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
||||
// guarded by recentLock_.
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||
// be guarded by recentLock_.
|
||||
std::lock_guard sl(recentLock_);
|
||||
if (peerChangedLedgers)
|
||||
{
|
||||
@@ -2057,7 +2073,8 @@ PeerImp::onValidatorListMessage(
|
||||
std::vector<ValidatorBlobInfo> const& blobs)
|
||||
{
|
||||
// If there are no blobs, the message is malformed (possibly because of
|
||||
// ValidatorList class rules), so charge accordingly and skip processing.
|
||||
// ValidatorList class rules), so charge accordingly and skip
|
||||
// processing.
|
||||
if (blobs.empty())
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
|
||||
@@ -2114,7 +2131,8 @@ PeerImp::onValidatorListMessage(
|
||||
|
||||
XRPL_ASSERT(
|
||||
applyResult.publisherKey,
|
||||
"ripple::PeerImp::onValidatorListMessage : publisher key is "
|
||||
"ripple::PeerImp::onValidatorListMessage : publisher key "
|
||||
"is "
|
||||
"set");
|
||||
auto const& pubKey = *applyResult.publisherKey;
|
||||
#ifndef NDEBUG
|
||||
@@ -2123,7 +2141,8 @@ PeerImp::onValidatorListMessage(
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
iter->second < applyResult.sequence,
|
||||
"ripple::PeerImp::onValidatorListMessage : lower sequence");
|
||||
"ripple::PeerImp::onValidatorListMessage : lower "
|
||||
"sequence");
|
||||
}
|
||||
#endif
|
||||
publisherListSequences_[pubKey] = applyResult.sequence;
|
||||
@@ -2136,12 +2155,14 @@ PeerImp::onValidatorListMessage(
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
XRPL_ASSERT(
|
||||
applyResult.sequence && applyResult.publisherKey,
|
||||
"ripple::PeerImp::onValidatorListMessage : nonzero sequence "
|
||||
"ripple::PeerImp::onValidatorListMessage : nonzero "
|
||||
"sequence "
|
||||
"and set publisher key");
|
||||
XRPL_ASSERT(
|
||||
publisherListSequences_[*applyResult.publisherKey] <=
|
||||
applyResult.sequence,
|
||||
"ripple::PeerImp::onValidatorListMessage : maximum sequence");
|
||||
"ripple::PeerImp::onValidatorListMessage : maximum "
|
||||
"sequence");
|
||||
}
|
||||
#endif // !NDEBUG
|
||||
|
||||
@@ -2153,7 +2174,8 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid best list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid best "
|
||||
"list "
|
||||
"disposition");
|
||||
}
|
||||
|
||||
@@ -2197,7 +2219,8 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid worst list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid worst "
|
||||
"list "
|
||||
"disposition");
|
||||
}
|
||||
|
||||
@@ -2257,7 +2280,8 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid "
|
||||
"list "
|
||||
"disposition");
|
||||
}
|
||||
}
|
||||
@@ -2301,7 +2325,8 @@ PeerImp::onMessage(
|
||||
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "ValidatorListCollection: received validator list from peer "
|
||||
<< "ValidatorListCollection: received validator list from "
|
||||
"peer "
|
||||
<< "using protocol version " << to_string(protocol_)
|
||||
<< " which shouldn't support this feature.";
|
||||
fee_.update(Resource::feeUselessData, "unsupported peer");
|
||||
@@ -2310,7 +2335,8 @@ PeerImp::onMessage(
|
||||
else if (m->version() < 2)
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "ValidatorListCollection: received invalid validator list "
|
||||
<< "ValidatorListCollection: received invalid validator "
|
||||
"list "
|
||||
"version "
|
||||
<< m->version() << " from peer using protocol version "
|
||||
<< to_string(protocol_);
|
||||
@@ -2370,9 +2396,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
return;
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
// RH TODO: when isTrusted = false we should probably also cache a
|
||||
// key suppression for 30 seconds to avoid doing a relatively
|
||||
// expensive lookup every time a spam packet is received
|
||||
auto const isTrusted =
|
||||
app_.validators().trusted(val->getSignerPublic());
|
||||
|
||||
@@ -2397,9 +2423,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
|
||||
if (!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
// Count unique messages (Slots has it's own 'HashRouter'),
|
||||
// which a peer receives within IDLED seconds since the message
|
||||
// has been relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
|
||||
@@ -2892,8 +2918,8 @@ PeerImp::checkTransaction(
|
||||
|
||||
if (isPseudoTx(*stx))
|
||||
{
|
||||
// Don't do anything with pseudo transactions except put them in the
|
||||
// TransactionMaster cache
|
||||
// Don't do anything with pseudo transactions except put them in
|
||||
// the TransactionMaster cache
|
||||
std::string reason;
|
||||
auto tx = std::make_shared<Transaction>(stx, reason, app_);
|
||||
XRPL_ASSERT(
|
||||
@@ -3049,16 +3075,17 @@ PeerImp::checkValidation(
|
||||
return;
|
||||
}
|
||||
|
||||
// FIXME it should be safe to remove this try/catch. Investigate codepaths.
|
||||
// FIXME it should be safe to remove this try/catch. Investigate
|
||||
// codepaths.
|
||||
try
|
||||
{
|
||||
if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
|
||||
cluster())
|
||||
{
|
||||
// haveMessage contains peers, which are suppressed; i.e. the peers
|
||||
// are the source of the message, consequently the message should
|
||||
// not be relayed to these peers. But the message must be counted
|
||||
// as part of the squelch logic.
|
||||
// haveMessage contains peers, which are suppressed; i.e. the
|
||||
// peers are the source of the message, consequently the message
|
||||
// should not be relayed to these peers. But the message must be
|
||||
// counted as part of the squelch logic.
|
||||
auto haveMessage =
|
||||
overlay_.relay(*packet, key, val->getSignerPublic());
|
||||
if (!haveMessage.empty())
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/circular_buffer.hpp>
|
||||
#include <boost/endian/conversion.hpp>
|
||||
@@ -74,7 +75,7 @@ private:
|
||||
beast::WrappedSink p_sink_;
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
std::unique_ptr<stream_type> stream_ptr_;
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
@@ -240,7 +241,7 @@ public:
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
OverlayImpl& overlay);
|
||||
|
||||
/** Create outgoing, handshaked peer. */
|
||||
@@ -248,7 +249,7 @@ public:
|
||||
template <class Buffers>
|
||||
PeerImp(
|
||||
Application& app,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Buffers const& buffers,
|
||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||
http_response_type&& response,
|
||||
@@ -650,7 +651,7 @@ private:
|
||||
template <class Buffers>
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Buffers const& buffers,
|
||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||
http_response_type&& response,
|
||||
|
||||
14
src/xrpld/server/detail/StreamInterface.cpp
Normal file
14
src/xrpld/server/detail/StreamInterface.cpp
Normal file
@@ -0,0 +1,14 @@
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
std::optional<base_uint<256>>
|
||||
ProductionStream::makeSharedValue(beast::Journal journal)
|
||||
{
|
||||
// Delegate to the existing Handshake module
|
||||
return ripple::makeSharedValue(*stream_, journal);
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
Reference in New Issue
Block a user