mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 11:15:56 +00:00
Compare commits
5 Commits
415a412d42
...
tapanito/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3bfbaef80 | ||
|
|
2dc49e30c4 | ||
|
|
da617007ac | ||
|
|
ad770a443b | ||
|
|
0ac24f5f73 |
163
include/xrpl/server/detail/StreamInterface.h
Normal file
163
include/xrpl/server/detail/StreamInterface.h
Normal file
@@ -0,0 +1,163 @@
|
||||
#ifndef RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||
#define RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/executor.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/beast/core/error.hpp>
|
||||
#include <boost/beast/core/multi_buffer.hpp>
|
||||
#include <boost/beast/core/tcp_stream.hpp>
|
||||
#include <boost/beast/ssl/ssl_stream.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Forward declarations
|
||||
using socket_type = boost::beast::tcp_stream;
|
||||
using concrete_stream_type = boost::beast::ssl_stream<socket_type>;
|
||||
|
||||
/**
|
||||
* @brief Minimal interface for stream operations needed by PeerImp
|
||||
*/
|
||||
class StreamInterface
|
||||
{
|
||||
public:
|
||||
virtual ~StreamInterface() = default;
|
||||
|
||||
// Executor access for ASIO operations
|
||||
virtual boost::asio::any_io_executor
|
||||
get_executor() = 0;
|
||||
|
||||
// Connection status checking
|
||||
virtual bool
|
||||
is_open() const = 0;
|
||||
|
||||
// Stream lifecycle operations
|
||||
virtual void
|
||||
close() = 0;
|
||||
|
||||
virtual void
|
||||
cancel() = 0;
|
||||
|
||||
// Async I/O operations
|
||||
virtual void
|
||||
async_read_some(
|
||||
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write_some(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write(
|
||||
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_shutdown(std::function<void(boost::beast::error_code)> handler) = 0;
|
||||
|
||||
// SSL handshake support
|
||||
virtual std::optional<base_uint<256>>
|
||||
makeSharedValue(beast::Journal journal) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Production implementation wrapping boost::beast::ssl_stream
|
||||
*/
|
||||
class ProductionStream : public StreamInterface
|
||||
{
|
||||
private:
|
||||
std::unique_ptr<concrete_stream_type> stream_;
|
||||
|
||||
public:
|
||||
explicit ProductionStream(std::unique_ptr<concrete_stream_type> stream)
|
||||
: stream_(std::move(stream))
|
||||
{
|
||||
}
|
||||
|
||||
boost::asio::any_io_executor
|
||||
get_executor() override
|
||||
{
|
||||
return stream_->get_executor();
|
||||
}
|
||||
|
||||
bool
|
||||
is_open() const override
|
||||
{
|
||||
return stream_->next_layer().socket().is_open();
|
||||
}
|
||||
|
||||
void
|
||||
close() override
|
||||
{
|
||||
stream_->lowest_layer().close();
|
||||
}
|
||||
|
||||
void
|
||||
cancel() override
|
||||
{
|
||||
stream_->lowest_layer().cancel();
|
||||
}
|
||||
|
||||
void
|
||||
async_read_some(
|
||||
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
stream_->async_read_some(buffers, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write_some(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
stream_->async_write_some(buffer, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
boost::asio::async_write(*stream_, buffer, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write(
|
||||
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
boost::asio::async_write(*stream_, buffers, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_shutdown(
|
||||
std::function<void(boost::beast::error_code)> handler) override
|
||||
{
|
||||
stream_->async_shutdown(std::move(handler));
|
||||
}
|
||||
|
||||
std::optional<base_uint<256>>
|
||||
makeSharedValue(beast::Journal journal) override;
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -1,289 +1,299 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright 2020 Ripple Labs Inc.
|
||||
// //------------------------------------------------------------------------------
|
||||
// /*
|
||||
// This file is part of rippled: https://github.com/ripple/rippled
|
||||
// Copyright 2020 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
// Permission to use, copy, modify, and/or distribute this software for any
|
||||
// purpose with or without fee is hereby granted, provided that the above
|
||||
// copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
// ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
// */
|
||||
// //==============================================================================
|
||||
|
||||
#include <test/jtx.h>
|
||||
#include <test/jtx/Env.h>
|
||||
// #include <test/jtx.h>
|
||||
// #include <test/jtx/Env.h>
|
||||
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/peerfinder/detail/SlotImp.h>
|
||||
// #include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
// #include <xrpld/overlay/detail/PeerImp.h>
|
||||
// #include <xrpld/peerfinder/detail/SlotImp.h>
|
||||
|
||||
#include <xrpl/basics/make_SSLContext.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
// #include <xrpl/basics/make_SSLContext.h>
|
||||
// #include <xrpl/beast/unit_test.h>
|
||||
|
||||
namespace ripple {
|
||||
// namespace ripple {
|
||||
|
||||
namespace test {
|
||||
// namespace test {
|
||||
|
||||
class tx_reduce_relay_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
using socket_type = boost::asio::ip::tcp::socket;
|
||||
using middle_type = boost::beast::tcp_stream;
|
||||
using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
||||
// class tx_reduce_relay_test : public beast::unit_test::suite
|
||||
// {
|
||||
// public:
|
||||
// using socket_type = boost::asio::ip::tcp::socket;
|
||||
// using middle_type = boost::beast::tcp_stream;
|
||||
// using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
// using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
||||
|
||||
private:
|
||||
void
|
||||
doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
||||
{
|
||||
testcase(msg);
|
||||
f(log);
|
||||
}
|
||||
// private:
|
||||
// void
|
||||
// doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
||||
// {
|
||||
// testcase(msg);
|
||||
// f(log);
|
||||
// }
|
||||
|
||||
void
|
||||
testConfig(bool log)
|
||||
{
|
||||
doTest("Config Test", log, [&](bool log) {
|
||||
auto test = [&](bool enable,
|
||||
bool metrics,
|
||||
std::uint16_t min,
|
||||
std::uint16_t pct,
|
||||
bool success = true) {
|
||||
std::stringstream str("[reduce_relay]");
|
||||
str << "[reduce_relay]\n"
|
||||
<< "tx_enable=" << static_cast<int>(enable) << "\n"
|
||||
<< "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
||||
<< "tx_min_peers=" << min << "\n"
|
||||
<< "tx_relay_percentage=" << pct << "\n";
|
||||
Config c;
|
||||
try
|
||||
{
|
||||
c.loadFromString(str.str());
|
||||
// void
|
||||
// testConfig(bool log)
|
||||
// {
|
||||
// doTest("Config Test", log, [&](bool log) {
|
||||
// auto test = [&](bool enable,
|
||||
// bool metrics,
|
||||
// std::uint16_t min,
|
||||
// std::uint16_t pct,
|
||||
// bool success = true) {
|
||||
// std::stringstream str("[reduce_relay]");
|
||||
// str << "[reduce_relay]\n"
|
||||
// << "tx_enable=" << static_cast<int>(enable) << "\n"
|
||||
// << "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
||||
// << "tx_min_peers=" << min << "\n"
|
||||
// << "tx_relay_percentage=" << pct << "\n";
|
||||
// Config c;
|
||||
// try
|
||||
// {
|
||||
// c.loadFromString(str.str());
|
||||
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
||||
BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
||||
if (success)
|
||||
pass();
|
||||
else
|
||||
fail();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (success)
|
||||
fail();
|
||||
else
|
||||
pass();
|
||||
}
|
||||
};
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
||||
// BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
||||
// if (success)
|
||||
// pass();
|
||||
// else
|
||||
// fail();
|
||||
// }
|
||||
// catch (...)
|
||||
// {
|
||||
// if (success)
|
||||
// fail();
|
||||
// else
|
||||
// pass();
|
||||
// }
|
||||
// };
|
||||
|
||||
test(true, true, 20, 25);
|
||||
test(false, false, 20, 25);
|
||||
test(false, false, 20, 0, false);
|
||||
test(false, false, 20, 101, false);
|
||||
test(false, false, 9, 10, false);
|
||||
test(false, false, 10, 9, false);
|
||||
});
|
||||
}
|
||||
// test(true, true, 20, 25);
|
||||
// test(false, false, 20, 25);
|
||||
// test(false, false, 20, 0, false);
|
||||
// test(false, false, 20, 101, false);
|
||||
// test(false, false, 9, 10, false);
|
||||
// test(false, false, 10, 9, false);
|
||||
// });
|
||||
// }
|
||||
|
||||
class PeerTest : public PeerImp
|
||||
{
|
||||
public:
|
||||
PeerTest(
|
||||
Application& app,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
||||
OverlayImpl& overlay)
|
||||
: PeerImp(
|
||||
app,
|
||||
sid_,
|
||||
slot,
|
||||
std::move(request),
|
||||
publicKey,
|
||||
protocol,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
overlay)
|
||||
{
|
||||
sid_++;
|
||||
}
|
||||
~PeerTest() = default;
|
||||
// class PeerTest : public PeerImp
|
||||
// {
|
||||
// public:
|
||||
// PeerTest(
|
||||
// Application& app,
|
||||
// std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
// http_request_type&& request,
|
||||
// PublicKey const& publicKey,
|
||||
// ProtocolVersion protocol,
|
||||
// Resource::Consumer consumer,
|
||||
// std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
||||
// OverlayImpl& overlay)
|
||||
// : PeerImp(
|
||||
// app,
|
||||
// sid_,
|
||||
// slot,
|
||||
// std::move(request),
|
||||
// publicKey,
|
||||
// protocol,
|
||||
// consumer,
|
||||
// std::move(stream_ptr),
|
||||
// overlay)
|
||||
// {
|
||||
// sid_++;
|
||||
// }
|
||||
// ~PeerTest() = default;
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
}
|
||||
void
|
||||
send(std::shared_ptr<Message> const&) override
|
||||
{
|
||||
sendTx_++;
|
||||
}
|
||||
void
|
||||
addTxQueue(uint256 const& hash) override
|
||||
{
|
||||
queueTx_++;
|
||||
}
|
||||
static void
|
||||
init()
|
||||
{
|
||||
queueTx_ = 0;
|
||||
sendTx_ = 0;
|
||||
sid_ = 0;
|
||||
}
|
||||
inline static std::size_t sid_ = 0;
|
||||
inline static std::uint16_t queueTx_ = 0;
|
||||
inline static std::uint16_t sendTx_ = 0;
|
||||
};
|
||||
// void
|
||||
// run() override
|
||||
// {
|
||||
// }
|
||||
// void
|
||||
// send(std::shared_ptr<Message> const&) override
|
||||
// {
|
||||
// sendTx_++;
|
||||
// }
|
||||
// void
|
||||
// addTxQueue(uint256 const& hash) override
|
||||
// {
|
||||
// queueTx_++;
|
||||
// }
|
||||
// static void
|
||||
// init()
|
||||
// {
|
||||
// queueTx_ = 0;
|
||||
// sendTx_ = 0;
|
||||
// sid_ = 0;
|
||||
// }
|
||||
// inline static std::size_t sid_ = 0;
|
||||
// inline static std::uint16_t queueTx_ = 0;
|
||||
// inline static std::uint16_t sendTx_ = 0;
|
||||
// };
|
||||
|
||||
std::uint16_t lid_{0};
|
||||
std::uint16_t rid_{1};
|
||||
shared_context context_;
|
||||
ProtocolVersion protocolVersion_;
|
||||
boost::beast::multi_buffer read_buf_;
|
||||
// std::uint16_t lid_{0};
|
||||
// std::uint16_t rid_{1};
|
||||
// shared_context context_;
|
||||
// ProtocolVersion protocolVersion_;
|
||||
// boost::beast::multi_buffer read_buf_;
|
||||
|
||||
public:
|
||||
tx_reduce_relay_test()
|
||||
: context_(make_SSLContext("")), protocolVersion_{1, 7}
|
||||
{
|
||||
}
|
||||
// public:
|
||||
// tx_reduce_relay_test()
|
||||
// : context_(make_SSLContext("")), protocolVersion_{1, 7}
|
||||
// {
|
||||
// }
|
||||
|
||||
private:
|
||||
void
|
||||
addPeer(
|
||||
jtx::Env& env,
|
||||
std::vector<std::shared_ptr<PeerTest>>& peers,
|
||||
std::uint16_t& nDisabled)
|
||||
{
|
||||
auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
||||
boost::beast::http::request<boost::beast::http::dynamic_body> request;
|
||||
(nDisabled == 0)
|
||||
? (void)request.insert(
|
||||
"X-Protocol-Ctl",
|
||||
makeFeaturesRequestHeader(false, false, true, false))
|
||||
: (void)nDisabled--;
|
||||
auto stream_ptr = std::make_unique<stream_type>(
|
||||
socket_type(std::forward<boost::asio::io_context&>(
|
||||
env.app().getIOContext())),
|
||||
*context_);
|
||||
beast::IP::Endpoint local(
|
||||
boost::asio::ip::make_address("172.1.1." + std::to_string(lid_)));
|
||||
beast::IP::Endpoint remote(
|
||||
boost::asio::ip::make_address("172.1.1." + std::to_string(rid_)));
|
||||
PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
||||
auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
||||
auto [slot, _] = overlay.peerFinder().new_inbound_slot(local, remote);
|
||||
auto const peer = std::make_shared<PeerTest>(
|
||||
env.app(),
|
||||
slot,
|
||||
std::move(request),
|
||||
key,
|
||||
protocolVersion_,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
overlay);
|
||||
BEAST_EXPECT(
|
||||
overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
||||
overlay.add_active(peer);
|
||||
BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
||||
peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
||||
lid_ += 2;
|
||||
rid_ += 2;
|
||||
assert(lid_ <= 254);
|
||||
}
|
||||
// private:
|
||||
// void
|
||||
// addPeer(
|
||||
// jtx::Env& env,
|
||||
// std::vector<std::shared_ptr<PeerTest>>& peers,
|
||||
// std::uint16_t& nDisabled)
|
||||
// {
|
||||
// auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
||||
// boost::beast::http::request<boost::beast::http::dynamic_body>
|
||||
// request; (nDisabled == 0)
|
||||
// ? (void)request.insert(
|
||||
// "X-Protocol-Ctl",
|
||||
// makeFeaturesRequestHeader(false, false, true, false))
|
||||
// : (void)nDisabled--;
|
||||
// auto stream_ptr = std::make_unique<stream_type>(
|
||||
// socket_type(std::forward<boost::asio::io_context&>(
|
||||
// env.app().getIOContext())),
|
||||
// *context_);
|
||||
// beast::IP::Endpoint local(
|
||||
// boost::asio::ip::make_address("172.1.1." +
|
||||
// std::to_string(lid_)));
|
||||
// beast::IP::Endpoint remote(
|
||||
// boost::asio::ip::make_address("172.1.1." +
|
||||
// std::to_string(rid_)));
|
||||
// PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
||||
// auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
||||
// auto [slot, _] = overlay.peerFinder().new_inbound_slot(local,
|
||||
// remote); auto const peer = std::make_shared<PeerTest>(
|
||||
// env.app(),
|
||||
// slot,
|
||||
// std::move(request),
|
||||
// key,
|
||||
// protocolVersion_,
|
||||
// consumer,
|
||||
// std::move(stream_ptr),
|
||||
// overlay);
|
||||
// BEAST_EXPECT(
|
||||
// overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
||||
// overlay.add_active(peer);
|
||||
// BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
||||
// peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
||||
// lid_ += 2;
|
||||
// rid_ += 2;
|
||||
// assert(lid_ <= 254);
|
||||
// }
|
||||
|
||||
void
|
||||
testRelay(
|
||||
std::string const& test,
|
||||
bool txRREnabled,
|
||||
std::uint16_t nPeers,
|
||||
std::uint16_t nDisabled,
|
||||
std::uint16_t minPeers,
|
||||
std::uint16_t relayPercentage,
|
||||
std::uint16_t expectRelay,
|
||||
std::uint16_t expectQueue,
|
||||
std::set<Peer::id_t> const& toSkip = {})
|
||||
{
|
||||
testcase(test);
|
||||
jtx::Env env(*this);
|
||||
std::vector<std::shared_ptr<PeerTest>> peers;
|
||||
env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
||||
env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
||||
env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
||||
PeerTest::init();
|
||||
lid_ = 0;
|
||||
rid_ = 0;
|
||||
for (int i = 0; i < nPeers; i++)
|
||||
addPeer(env, peers, nDisabled);
|
||||
// void
|
||||
// testRelay(
|
||||
// std::string const& test,
|
||||
// bool txRREnabled,
|
||||
// std::uint16_t nPeers,
|
||||
// std::uint16_t nDisabled,
|
||||
// std::uint16_t minPeers,
|
||||
// std::uint16_t relayPercentage,
|
||||
// std::uint16_t expectRelay,
|
||||
// std::uint16_t expectQueue,
|
||||
// std::set<Peer::id_t> const& toSkip = {})
|
||||
// {
|
||||
// testcase(test);
|
||||
// jtx::Env env(*this);
|
||||
// std::vector<std::shared_ptr<PeerTest>> peers;
|
||||
// env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
||||
// env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
||||
// env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
||||
// PeerTest::init();
|
||||
// lid_ = 0;
|
||||
// rid_ = 0;
|
||||
// for (int i = 0; i < nPeers; i++)
|
||||
// addPeer(env, peers, nDisabled);
|
||||
|
||||
auto const jtx = env.jt(noop(env.master));
|
||||
if (BEAST_EXPECT(jtx.stx))
|
||||
{
|
||||
protocol::TMTransaction m;
|
||||
Serializer s;
|
||||
jtx.stx->add(s);
|
||||
m.set_rawtransaction(s.data(), s.size());
|
||||
m.set_deferred(false);
|
||||
m.set_status(protocol::TransactionStatus::tsNEW);
|
||||
env.app().overlay().relay(uint256{0}, m, toSkip);
|
||||
BEAST_EXPECT(
|
||||
PeerTest::sendTx_ == expectRelay &&
|
||||
PeerTest::queueTx_ == expectQueue);
|
||||
}
|
||||
}
|
||||
// auto const jtx = env.jt(noop(env.master));
|
||||
// if (BEAST_EXPECT(jtx.stx))
|
||||
// {
|
||||
// protocol::TMTransaction m;
|
||||
// Serializer s;
|
||||
// jtx.stx->add(s);
|
||||
// m.set_rawtransaction(s.data(), s.size());
|
||||
// m.set_deferred(false);
|
||||
// m.set_status(protocol::TransactionStatus::tsNEW);
|
||||
// env.app().overlay().relay(uint256{0}, m, toSkip);
|
||||
// BEAST_EXPECT(
|
||||
// PeerTest::sendTx_ == expectRelay &&
|
||||
// PeerTest::queueTx_ == expectQueue);
|
||||
// }
|
||||
// }
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
bool log = false;
|
||||
std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
||||
testConfig(log);
|
||||
// relay to all peers, no hash queue
|
||||
testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
||||
// relay to nPeers - skip (10-5=5)
|
||||
testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip);
|
||||
// relay to all peers because min is greater than nPeers
|
||||
testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
||||
// relay to all peers because min + disabled is greater thant nPeers
|
||||
testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
||||
// relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
||||
// queue the rest (30)
|
||||
testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
||||
// relay to minPeers + 25% of (nPeers - nPeers) - skip
|
||||
// (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed
|
||||
// (60-25-5=30)
|
||||
testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed)
|
||||
// (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
||||
testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
||||
// relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers
|
||||
// - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
||||
// towards relayed (70-35-5=30))
|
||||
testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
||||
// - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
||||
// towards relayed (15-5-10=0)
|
||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||
testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
||||
// - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
||||
// towards relayed (20-14=6)
|
||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
||||
testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip);
|
||||
}
|
||||
};
|
||||
// void
|
||||
// run() override
|
||||
// {
|
||||
// bool log = false;
|
||||
// std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
||||
// testConfig(log);
|
||||
// // relay to all peers, no hash queue
|
||||
// testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
||||
// // relay to nPeers - skip (10-5=5)
|
||||
// testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0,
|
||||
// skip);
|
||||
// // relay to all peers because min is greater than nPeers
|
||||
// testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
||||
// // relay to all peers because min + disabled is greater thant nPeers
|
||||
// testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
||||
// // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
||||
// // queue the rest (30)
|
||||
// testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
||||
// // relay to minPeers + 25% of (nPeers - nPeers) - skip
|
||||
// // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards
|
||||
// relayed
|
||||
// // (60-25-5=30)
|
||||
// testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disalbed)
|
||||
// // (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
||||
// testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
||||
// // relay to minPeers + disabled-not-in-skip + 25% of (nPeers -
|
||||
// minPeers
|
||||
// // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
||||
// // towards relayed (70-35-5=30))
|
||||
// testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disabled)
|
||||
// // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
||||
// // towards relayed (15-5-10=0)
|
||||
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||
// testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0,
|
||||
// skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disabled)
|
||||
// // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
||||
// // towards relayed (20-14=6)
|
||||
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
||||
// testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6,
|
||||
// skip);
|
||||
// }
|
||||
// };
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
// BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
||||
// } // namespace test
|
||||
// } // namespace ripple
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include <xrpld/overlay/detail/ProtocolVersion.h>
|
||||
|
||||
#include <xrpl/json/json_reader.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -410,17 +411,22 @@ ConnectAttempt::processResponse()
|
||||
if (result != PeerFinder::Result::success)
|
||||
return fail("Outbound " + std::string(to_string(result)));
|
||||
|
||||
// Extract peer attributes from the response before creating PeerImp
|
||||
auto const attributes =
|
||||
extractPeerAttributes(response_, app_.config(), false);
|
||||
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
app_,
|
||||
std::move(stream_ptr_),
|
||||
std::make_unique<ProductionStream>(std::move(stream_ptr_)),
|
||||
read_buf_.data(),
|
||||
std::move(slot_),
|
||||
std::move(response_),
|
||||
usage_,
|
||||
publicKey,
|
||||
*negotiatedProtocol,
|
||||
id_,
|
||||
overlay_);
|
||||
attributes,
|
||||
overlay_,
|
||||
app_.cluster().member(publicKey).value_or(""));
|
||||
|
||||
overlay_.add_active(peer);
|
||||
}
|
||||
|
||||
234
src/xrpld/overlay/detail/InboundHandshake.cpp
Normal file
234
src/xrpld/overlay/detail/InboundHandshake.cpp
Normal file
@@ -0,0 +1,234 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
|
||||
#include <boost/beast/core/ostream.hpp>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
InboundHandshake::InboundHandshake(
|
||||
Application& app,
|
||||
std::uint32_t id,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocolVersion,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
PeerAttributes const& attributes,
|
||||
endpoint_type const& remoteEndpoint,
|
||||
OverlayImpl& overlay)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
, sink_(app_.logs()["Peer"], OverlayImpl::makePrefix(id))
|
||||
, journal_(sink_)
|
||||
, stream_ptr_(std::move(stream_ptr))
|
||||
, request_(std::move(request))
|
||||
, publicKey_(publicKey)
|
||||
, protocolVersion_(protocolVersion)
|
||||
, consumer_(consumer)
|
||||
, attributes_(attributes)
|
||||
, slot_(slot)
|
||||
, remoteEndpoint_(remoteEndpoint)
|
||||
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||
{
|
||||
}
|
||||
|
||||
InboundHandshake::~InboundHandshake()
|
||||
{
|
||||
if (slot_ != nullptr)
|
||||
overlay_.peerFinder().on_closed(slot_);
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::stop()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return boost::asio::post(
|
||||
strand_, std::bind(&InboundHandshake::stop, shared_from_this()));
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::shutdown()
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::shutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open() || shutdown_)
|
||||
return;
|
||||
|
||||
shutdown_ = true;
|
||||
|
||||
stream_ptr_->cancel();
|
||||
|
||||
tryAsyncShutdown();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::tryAsyncShutdown()
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::tryAsyncShutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (shutdown_ || shutdownStarted_)
|
||||
return;
|
||||
|
||||
if (ioPending_)
|
||||
return;
|
||||
|
||||
shutdownStarted_ = true;
|
||||
|
||||
return stream_ptr_->async_shutdown(boost::asio::bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&InboundHandshake::onShutdown,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::onShutdown(error_code ec)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::onShutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
|
||||
if (ec)
|
||||
{
|
||||
JLOG(journal_.warn()) << "onShutdown: " << ec.message();
|
||||
}
|
||||
|
||||
stream_ptr_->close();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::run()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return boost::asio::post(
|
||||
strand_, std::bind(&InboundHandshake::run, shared_from_this()));
|
||||
|
||||
// TODO: implement fail overload to handle strings
|
||||
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
|
||||
if (!sharedValue)
|
||||
return fail("makeSharedValue", boost::system::error_code{});
|
||||
|
||||
// Create the handshake response
|
||||
auto const response = makeResponse(
|
||||
!overlay_.peerFinder().config().peerPrivate,
|
||||
request_,
|
||||
overlay_.setup().public_ip,
|
||||
remoteEndpoint_.address(),
|
||||
*sharedValue,
|
||||
overlay_.setup().networkID,
|
||||
protocolVersion_,
|
||||
app_);
|
||||
|
||||
// Convert response to buffer for async_write
|
||||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
||||
boost::beast::ostream(*write_buffer) << response;
|
||||
|
||||
ioPending_ = true;
|
||||
// Write the response asynchronously
|
||||
stream_ptr_->async_write(
|
||||
write_buffer->data(),
|
||||
boost::asio::bind_executor(
|
||||
strand_,
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
onHandshake(ec, bytes_transferred);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::onHandshake(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
ioPending_ = false;
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted || shutdown_)
|
||||
return tryAsyncShutdown();
|
||||
|
||||
if (ec)
|
||||
return fail("onHandshake", ec);
|
||||
|
||||
JLOG(journal_.debug()) << "InboundHandshake completed for "
|
||||
<< remoteEndpoint_
|
||||
<< ", bytes transferred: " << bytes_transferred;
|
||||
|
||||
// Handshake successful, create the peer
|
||||
createPeer();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::createPeer()
|
||||
{
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
app_,
|
||||
overlay_,
|
||||
std::move(slot_),
|
||||
std::move(stream_ptr_),
|
||||
consumer_,
|
||||
protocolVersion_,
|
||||
attributes_,
|
||||
publicKey_,
|
||||
id_,
|
||||
app_.cluster().member(publicKey_).value_or(""));
|
||||
|
||||
// Add the peer to the overlay
|
||||
overlay_.add_active(peer);
|
||||
JLOG(journal_.debug()) << "Created peer for " << remoteEndpoint_;
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::fail(std::string const& name, error_code ec)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::fail : strand in this thread");
|
||||
|
||||
JLOG(journal_.warn()) << name << " from "
|
||||
<< toBase58(TokenType::NodePublic, publicKey_)
|
||||
<< " at " << remoteEndpoint_.address().to_string()
|
||||
<< ": " << ec.message();
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
109
src/xrpld/overlay/detail/InboundHandshake.h
Normal file
109
src/xrpld/overlay/detail/InboundHandshake.h
Normal file
@@ -0,0 +1,109 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||
#define RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
|
||||
#include <xrpl/server/Handoff.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
/** Manages an inbound peer handshake. */
|
||||
class InboundHandshake : public OverlayImpl::Child,
|
||||
public std::enable_shared_from_this<InboundHandshake>
|
||||
{
|
||||
using error_code = boost::system::error_code;
|
||||
using endpoint_type = boost::asio::ip::tcp::endpoint;
|
||||
|
||||
private:
|
||||
Application& app_;
|
||||
std::uint32_t const id_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::Journal const journal_;
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
http_request_type request_;
|
||||
PublicKey publicKey_;
|
||||
ProtocolVersion protocolVersion_;
|
||||
Resource::Consumer consumer_;
|
||||
PeerAttributes attributes_;
|
||||
std::shared_ptr<PeerFinder::Slot> slot_;
|
||||
endpoint_type remoteEndpoint_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
bool shutdown_ = false;
|
||||
bool ioPending_ = false;
|
||||
bool shutdownStarted_ = false;
|
||||
|
||||
public:
|
||||
InboundHandshake(
|
||||
Application& app,
|
||||
std::uint32_t id,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& public_key,
|
||||
ProtocolVersion protocol_version,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
PeerAttributes const& attributes,
|
||||
endpoint_type const& remote_endpoint,
|
||||
OverlayImpl& overlay);
|
||||
|
||||
~InboundHandshake();
|
||||
|
||||
void
|
||||
stop() override;
|
||||
|
||||
void
|
||||
run();
|
||||
|
||||
private:
|
||||
void
|
||||
setTimer();
|
||||
|
||||
void
|
||||
onTimer(error_code ec);
|
||||
|
||||
void
|
||||
cancelTimer();
|
||||
|
||||
void
|
||||
shutdown();
|
||||
|
||||
void
|
||||
tryAsyncShutdown();
|
||||
|
||||
void
|
||||
onShutdown(error_code ec);
|
||||
|
||||
void
|
||||
onHandshake(error_code ec, std::size_t bytes_transferred);
|
||||
|
||||
void
|
||||
createPeer();
|
||||
|
||||
void
|
||||
fail(std::string const& name, error_code ec);
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <xrpld/app/rdb/Wallet.h>
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/ConnectAttempt.h>
|
||||
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/TrafficCount.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
@@ -39,6 +40,7 @@
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/server/SimpleWriter.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/asio/executor_work_guard.hpp>
|
||||
@@ -163,7 +165,7 @@ OverlayImpl::OverlayImpl(
|
||||
|
||||
Handoff
|
||||
OverlayImpl::onHandoff(
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<stream_type>&& ssl_stream_ptr,
|
||||
http_request_type&& request,
|
||||
endpoint_type remote_endpoint)
|
||||
{
|
||||
@@ -172,9 +174,7 @@ OverlayImpl::onHandoff(
|
||||
beast::Journal journal(sink);
|
||||
|
||||
Handoff handoff;
|
||||
if (processRequest(request, handoff))
|
||||
return handoff;
|
||||
if (!isPeerUpgrade(request))
|
||||
if (processRequest(request, handoff) || !isPeerUpgrade(request))
|
||||
return handoff;
|
||||
|
||||
handoff.moved = true;
|
||||
@@ -183,7 +183,7 @@ OverlayImpl::onHandoff(
|
||||
|
||||
error_code ec;
|
||||
auto const local_endpoint(
|
||||
stream_ptr->next_layer().socket().local_endpoint(ec));
|
||||
ssl_stream_ptr->next_layer().socket().local_endpoint(ec));
|
||||
if (ec)
|
||||
{
|
||||
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
|
||||
@@ -195,16 +195,27 @@ OverlayImpl::onHandoff(
|
||||
if (consumer.disconnect(journal))
|
||||
return handoff;
|
||||
|
||||
auto const [slot, result] = m_peerFinder->new_inbound_slot(
|
||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||
|
||||
if (slot == nullptr)
|
||||
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
||||
if (!negotiatedVersion)
|
||||
{
|
||||
// connection refused either IP limit exceeded or self-connect
|
||||
handoff.moved = false;
|
||||
JLOG(journal.debug())
|
||||
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
|
||||
handoff.response = makeErrorResponse(
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Unable to agree on a protocol version");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
|
||||
auto stream_ptr =
|
||||
std::make_unique<ProductionStream>(std::move(ssl_stream_ptr));
|
||||
auto const sharedValue = stream_ptr->makeSharedValue(journal);
|
||||
if (!sharedValue)
|
||||
{
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
request, remote_endpoint.address(), "Incorrect security cookie");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
|
||||
@@ -217,38 +228,23 @@ OverlayImpl::onHandoff(
|
||||
}) == types.end())
|
||||
{
|
||||
handoff.moved = false;
|
||||
handoff.response =
|
||||
makeRedirectResponse(slot, request, remote_endpoint.address());
|
||||
handoff.response = makeErrorResponse(
|
||||
request, remote_endpoint.address(), "Invalid Peer Type");
|
||||
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
|
||||
return handoff;
|
||||
}
|
||||
}
|
||||
|
||||
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
||||
if (!negotiatedVersion)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
slot,
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Unable to agree on a protocol version");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
auto const [slot, result] = m_peerFinder->new_inbound_slot(
|
||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||
|
||||
auto const sharedValue = makeSharedValue(*stream_ptr, journal);
|
||||
if (!sharedValue)
|
||||
if (slot == nullptr)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
// connection refused either IP limit exceeded or self-connect
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
slot,
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Incorrect security cookie");
|
||||
handoff.keep_alive = false;
|
||||
JLOG(journal.debug())
|
||||
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
|
||||
return handoff;
|
||||
}
|
||||
|
||||
@@ -266,10 +262,12 @@ OverlayImpl::onHandoff(
|
||||
// The node gets a reserved slot if it is in our cluster
|
||||
// or if it has a reservation.
|
||||
bool const reserved =
|
||||
static_cast<bool>(app_.cluster().member(publicKey)) ||
|
||||
app_.cluster().member(publicKey).has_value() ||
|
||||
app_.peerReservations().contains(publicKey);
|
||||
|
||||
auto const result =
|
||||
m_peerFinder->activate(slot, publicKey, reserved);
|
||||
|
||||
if (result != PeerFinder::Result::success)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
@@ -283,7 +281,11 @@ OverlayImpl::onHandoff(
|
||||
}
|
||||
}
|
||||
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
// Extract peer attributes from the request before creating PeerImp
|
||||
auto const attributes =
|
||||
extractPeerAttributes(request, app_.config(), true);
|
||||
|
||||
auto const p = std::make_shared<InboundHandshake>(
|
||||
app_,
|
||||
id,
|
||||
slot,
|
||||
@@ -292,23 +294,14 @@ OverlayImpl::onHandoff(
|
||||
*negotiatedVersion,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
attributes,
|
||||
remote_endpoint,
|
||||
*this);
|
||||
{
|
||||
// As we are not on the strand, run() must be called
|
||||
// while holding the lock, otherwise new I/O can be
|
||||
// queued after a call to stop().
|
||||
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
||||
{
|
||||
auto const result = m_peers.emplace(peer->slot(), peer);
|
||||
XRPL_ASSERT(
|
||||
result.second,
|
||||
"ripple::OverlayImpl::onHandoff : peer is inserted");
|
||||
(void)result.second;
|
||||
}
|
||||
list_.emplace(peer.get(), peer);
|
||||
|
||||
peer->run();
|
||||
}
|
||||
std::lock_guard lock(mutex_);
|
||||
list_.emplace(p.get(), p);
|
||||
p->run();
|
||||
|
||||
handoff.moved = true;
|
||||
return handoff;
|
||||
}
|
||||
@@ -319,8 +312,8 @@ OverlayImpl::onHandoff(
|
||||
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
slot, request, remote_endpoint.address(), e.what());
|
||||
handoff.response =
|
||||
makeErrorResponse(request, remote_endpoint.address(), e.what());
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
@@ -374,7 +367,6 @@ OverlayImpl::makeRedirectResponse(
|
||||
|
||||
std::shared_ptr<Writer>
|
||||
OverlayImpl::makeErrorResponse(
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type const& request,
|
||||
address_type remote_address,
|
||||
std::string text)
|
||||
|
||||
@@ -465,7 +465,6 @@ private:
|
||||
|
||||
std::shared_ptr<Writer>
|
||||
makeErrorResponse(
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type const& request,
|
||||
address_type remote_address,
|
||||
std::string msg);
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include <xrpld/app/misc/ValidatorList.h>
|
||||
#include <xrpld/app/tx/apply.h>
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
@@ -39,6 +40,7 @@
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/protocol/TxFlags.h>
|
||||
#include <xrpl/protocol/digest.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/beast/core/ostream.hpp>
|
||||
@@ -48,6 +50,7 @@
|
||||
#include <mutex>
|
||||
#include <numeric>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -61,19 +64,102 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
|
||||
std::chrono::seconds constexpr peerTimerInterval{60};
|
||||
} // namespace
|
||||
|
||||
PeerAttributes
|
||||
extractPeerAttributes(
|
||||
boost::beast::http::fields const& headers,
|
||||
Config const& config,
|
||||
bool inbound)
|
||||
{
|
||||
PeerAttributes attributes;
|
||||
|
||||
// Extract feature flags
|
||||
attributes.compressionEnabled =
|
||||
peerFeatureEnabled(headers, FEATURE_COMPR, "lz4", config.COMPRESSION);
|
||||
|
||||
attributes.txReduceRelayEnabled = peerFeatureEnabled(
|
||||
headers, FEATURE_TXRR, config.TX_REDUCE_RELAY_ENABLE);
|
||||
|
||||
attributes.ledgerReplayEnabled = peerFeatureEnabled(
|
||||
headers, FEATURE_LEDGER_REPLAY, config.LEDGER_REPLAY);
|
||||
|
||||
attributes.vpReduceRelayEnabled = peerFeatureEnabled(
|
||||
headers, FEATURE_VPRR, config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
|
||||
|
||||
// Extract connection information
|
||||
if (auto const iter = headers.find("Crawl"); iter != headers.end())
|
||||
attributes.crawlEnabled = boost::iequals(iter->value(), "public");
|
||||
|
||||
if (inbound)
|
||||
{
|
||||
if (auto const iter = headers.find("User-Agent"); iter != headers.end())
|
||||
attributes.userAgent = std::string{iter->value()};
|
||||
}
|
||||
else
|
||||
{
|
||||
if (auto const iter = headers.find("Server"); iter != headers.end())
|
||||
attributes.serverInfo = std::string{iter->value()};
|
||||
}
|
||||
|
||||
if (auto const iter = headers.find("Network-ID"); iter != headers.end())
|
||||
attributes.networkId = std::string{iter->value()};
|
||||
|
||||
if (auto const iter = headers.find("Server-Domain"); iter != headers.end())
|
||||
attributes.serverDomain = std::string{iter->value()};
|
||||
|
||||
// Extract ledger information
|
||||
auto parseLedgerHash =
|
||||
[](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
|
||||
if (auto const s = base64_decode(value); s.size() == uint256::size())
|
||||
return uint256{s};
|
||||
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
bool hasClosedLedger = false;
|
||||
bool hasPreviousLedger = false;
|
||||
attributes.hasValidLedgerHashes = true;
|
||||
|
||||
if (auto const iter = headers.find("Closed-Ledger"); iter != headers.end())
|
||||
{
|
||||
hasClosedLedger = true;
|
||||
attributes.closedLedgerHash = parseLedgerHash(iter->value());
|
||||
if (!attributes.closedLedgerHash)
|
||||
attributes.hasValidLedgerHashes = false;
|
||||
}
|
||||
|
||||
if (auto const iter = headers.find("Previous-Ledger");
|
||||
iter != headers.end())
|
||||
{
|
||||
hasPreviousLedger = true;
|
||||
attributes.previousLedgerHash = parseLedgerHash(iter->value());
|
||||
if (!attributes.previousLedgerHash)
|
||||
attributes.hasValidLedgerHashes = false;
|
||||
}
|
||||
|
||||
// Validate ledger hash consistency
|
||||
if (hasPreviousLedger && !hasClosedLedger)
|
||||
attributes.hasValidLedgerHashes = false;
|
||||
|
||||
return attributes;
|
||||
}
|
||||
|
||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||
// release.
|
||||
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
id_t id,
|
||||
OverlayImpl& overlay,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
OverlayImpl& overlay)
|
||||
ProtocolVersion protocol,
|
||||
PeerAttributes const& attributes,
|
||||
PublicKey const& publicKey,
|
||||
id_t id,
|
||||
std::string const& name)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
@@ -82,10 +168,8 @@ PeerImp::PeerImp(
|
||||
, journal_(sink_)
|
||||
, p_journal_(p_sink_)
|
||||
, stream_ptr_(std::move(stream_ptr))
|
||||
, socket_(stream_ptr_->next_layer().socket())
|
||||
, stream_(*stream_ptr_)
|
||||
, strand_(boost::asio::make_strand(socket_.get_executor()))
|
||||
, timer_(waitable_timer{socket_.get_executor()})
|
||||
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||
, timer_(waitable_timer{stream_ptr_->get_executor()})
|
||||
, remote_address_(slot->remote_endpoint())
|
||||
, overlay_(overlay)
|
||||
, inbound_(true)
|
||||
@@ -93,41 +177,23 @@ PeerImp::PeerImp(
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, name_(name)
|
||||
, lastPingTime_(clock_type::now())
|
||||
, creationTime_(clock_type::now())
|
||||
, squelch_(app_.journal("Squelch"))
|
||||
, usage_(consumer)
|
||||
, fee_{Resource::feeTrivialPeer, ""}
|
||||
, slot_(slot)
|
||||
, request_(std::move(request))
|
||||
, headers_(request_)
|
||||
, compressionEnabled_(
|
||||
peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_COMPR,
|
||||
"lz4",
|
||||
app_.config().COMPRESSION)
|
||||
? Compressed::On
|
||||
: Compressed::Off)
|
||||
, txReduceRelayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_TXRR,
|
||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_LEDGER_REPLAY,
|
||||
app_.config().LEDGER_REPLAY))
|
||||
, attributes_(attributes)
|
||||
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
||||
{
|
||||
JLOG(journal_.info())
|
||||
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_VPRR,
|
||||
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
|
||||
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
JLOG(journal_.info()) << "compression enabled "
|
||||
<< attributes_.compressionEnabled
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< attributes_.vpReduceRelayEnabled
|
||||
<< " tx reduce-relay enabled "
|
||||
<< attributes_.txReduceRelayEnabled << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
}
|
||||
|
||||
PeerImp::~PeerImp()
|
||||
@@ -158,47 +224,16 @@ PeerImp::run()
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
|
||||
|
||||
auto parseLedgerHash =
|
||||
[](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
|
||||
if (auto const s = base64_decode(value); s.size() == uint256::size())
|
||||
return uint256{s};
|
||||
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
std::optional<uint256> closed;
|
||||
std::optional<uint256> previous;
|
||||
|
||||
if (auto const iter = headers_.find("Closed-Ledger");
|
||||
iter != headers_.end())
|
||||
{
|
||||
closed = parseLedgerHash(iter->value());
|
||||
|
||||
if (!closed)
|
||||
fail("Malformed handshake data (1)");
|
||||
}
|
||||
|
||||
if (auto const iter = headers_.find("Previous-Ledger");
|
||||
iter != headers_.end())
|
||||
{
|
||||
previous = parseLedgerHash(iter->value());
|
||||
|
||||
if (!previous)
|
||||
fail("Malformed handshake data (2)");
|
||||
}
|
||||
|
||||
if (previous && !closed)
|
||||
fail("Malformed handshake data (3)");
|
||||
// Validate ledger hash consistency
|
||||
if (!attributes_.hasValidLedgerHashes)
|
||||
fail("Malformed handshake data");
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
if (closed)
|
||||
closedLedgerHash_ = *closed;
|
||||
if (previous)
|
||||
previousLedgerHash_ = *previous;
|
||||
if (attributes_.closedLedgerHash)
|
||||
closedLedgerHash_ = *attributes_.closedLedgerHash;
|
||||
if (attributes_.previousLedgerHash)
|
||||
previousLedgerHash_ = *attributes_.previousLedgerHash;
|
||||
}
|
||||
|
||||
if (inbound_)
|
||||
@@ -215,7 +250,7 @@ PeerImp::stop()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
|
||||
if (socket_.is_open())
|
||||
if (socketOpen())
|
||||
{
|
||||
// The rationale for using different severity levels is that
|
||||
// outbound connections are under our control and may be logged
|
||||
@@ -251,19 +286,28 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
{
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::category::squelch_suppressed,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
static_cast<int>(
|
||||
m->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||
.size()));
|
||||
return;
|
||||
}
|
||||
|
||||
// report categorized outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
safe_cast<TrafficCount::category>(m->getCategory()),
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
static_cast<int>(
|
||||
m->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||
.size()));
|
||||
|
||||
// report total outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::category::total,
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
static_cast<int>(
|
||||
m->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||
.size()));
|
||||
|
||||
auto sendq_size = send_queue_.size();
|
||||
|
||||
@@ -287,17 +331,24 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
if (sendq_size != 0)
|
||||
return;
|
||||
|
||||
boost::asio::async_write(
|
||||
stream_,
|
||||
boost::asio::buffer(
|
||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
stream_ptr_->async_write(
|
||||
boost::asio::buffer(send_queue_.front()->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)),
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onWriteMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
PeerImp::socketOpen() const
|
||||
{
|
||||
return stream_ptr_->is_open();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -365,24 +416,21 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
|
||||
bool
|
||||
PeerImp::crawl() const
|
||||
{
|
||||
auto const iter = headers_.find("Crawl");
|
||||
if (iter == headers_.end())
|
||||
return false;
|
||||
return boost::iequals(iter->value(), "public");
|
||||
return attributes_.crawlEnabled;
|
||||
}
|
||||
|
||||
bool
|
||||
PeerImp::cluster() const
|
||||
{
|
||||
return static_cast<bool>(app_.cluster().member(publicKey_));
|
||||
return app_.cluster().member(publicKey_).has_value();
|
||||
}
|
||||
|
||||
std::string
|
||||
PeerImp::getVersion() const
|
||||
{
|
||||
if (inbound_)
|
||||
return headers_["User-Agent"];
|
||||
return headers_["Server"];
|
||||
return attributes_.userAgent.value_or("");
|
||||
return attributes_.serverInfo.value_or("");
|
||||
}
|
||||
|
||||
Json::Value
|
||||
@@ -408,8 +456,8 @@ PeerImp::json()
|
||||
if (auto const d = domain(); !d.empty())
|
||||
ret[jss::server_domain] = std::string{d};
|
||||
|
||||
if (auto const nid = headers_["Network-ID"]; !nid.empty())
|
||||
ret[jss::network_id] = std::string{nid};
|
||||
if (attributes_.networkId.has_value())
|
||||
ret[jss::network_id] = *attributes_.networkId;
|
||||
|
||||
ret[jss::load] = usage_.balance();
|
||||
|
||||
@@ -513,7 +561,7 @@ PeerImp::supportsFeature(ProtocolFeature f) const
|
||||
case ProtocolFeature::ValidatorList2Propagation:
|
||||
return protocol_ >= make_protocol(2, 2);
|
||||
case ProtocolFeature::LedgerReplay:
|
||||
return ledgerReplayEnabled_;
|
||||
return attributes_.ledgerReplayEnabled;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@@ -578,13 +626,13 @@ PeerImp::close()
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::close : strand in this thread");
|
||||
if (socket_.is_open())
|
||||
if (socketOpen())
|
||||
{
|
||||
detaching_ = true; // DEPRECATED
|
||||
try
|
||||
{
|
||||
timer_.cancel();
|
||||
socket_.close();
|
||||
stream_ptr_->close();
|
||||
}
|
||||
catch (boost::system::system_error const&)
|
||||
{
|
||||
@@ -613,7 +661,7 @@ PeerImp::fail(std::string const& reason)
|
||||
(void(Peer::*)(std::string const&)) & PeerImp::fail,
|
||||
shared_from_this(),
|
||||
reason));
|
||||
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
|
||||
if (journal_.active(beast::severities::kWarning) && socketOpen())
|
||||
{
|
||||
std::string const n = name();
|
||||
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
|
||||
@@ -628,7 +676,7 @@ PeerImp::fail(std::string const& name, error_code ec)
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::fail : strand in this thread");
|
||||
if (socket_.is_open())
|
||||
if (socketOpen())
|
||||
{
|
||||
JLOG(journal_.warn())
|
||||
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
|
||||
@@ -644,7 +692,7 @@ PeerImp::gracefulClose()
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::gracefulClose : strand in this thread");
|
||||
XRPL_ASSERT(
|
||||
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
|
||||
socketOpen(), "ripple::PeerImp::gracefulClose : socket is open");
|
||||
XRPL_ASSERT(
|
||||
!gracefulClose_,
|
||||
"ripple::PeerImp::gracefulClose : socket is not closing");
|
||||
@@ -652,7 +700,7 @@ PeerImp::gracefulClose()
|
||||
if (send_queue_.size() > 0)
|
||||
return;
|
||||
setTimer();
|
||||
stream_.async_shutdown(bind_executor(
|
||||
stream_ptr_->async_shutdown(bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
|
||||
@@ -703,7 +751,7 @@ PeerImp::makePrefix(id_t id)
|
||||
void
|
||||
PeerImp::onTimer(error_code const& ec)
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
if (!socketOpen())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
@@ -784,78 +832,28 @@ PeerImp::doAccept()
|
||||
read_buffer_.size() == 0,
|
||||
"ripple::PeerImp::doAccept : empty read buffer");
|
||||
|
||||
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
|
||||
|
||||
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
|
||||
|
||||
// This shouldn't fail since we already computed
|
||||
// the shared value successfully in OverlayImpl
|
||||
if (!sharedValue)
|
||||
return fail("makeSharedValue: Unexpected failure");
|
||||
|
||||
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
||||
JLOG(journal_.info()) << "Public Key: "
|
||||
<< toBase58(TokenType::NodePublic, publicKey_);
|
||||
|
||||
if (auto member = app_.cluster().member(publicKey_))
|
||||
{
|
||||
{
|
||||
std::unique_lock lock{nameMutex_};
|
||||
name_ = *member;
|
||||
}
|
||||
JLOG(journal_.info()) << "Cluster name: " << *member;
|
||||
JLOG(journal_.info()) << "Cluster name: " << name_;
|
||||
}
|
||||
|
||||
overlay_.activate(shared_from_this());
|
||||
|
||||
// XXX Set timer: connection is in grace period to be useful.
|
||||
// XXX Set timer: connection idle (idle may vary depending on connection
|
||||
// type.)
|
||||
|
||||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
||||
|
||||
boost::beast::ostream(*write_buffer) << makeResponse(
|
||||
!overlay_.peerFinder().config().peerPrivate,
|
||||
request_,
|
||||
overlay_.setup().public_ip,
|
||||
remote_address_.address(),
|
||||
*sharedValue,
|
||||
overlay_.setup().networkID,
|
||||
protocol_,
|
||||
app_);
|
||||
|
||||
// Write the whole buffer and only start protocol when that's done.
|
||||
boost::asio::async_write(
|
||||
stream_,
|
||||
write_buffer->data(),
|
||||
boost::asio::transfer_all(),
|
||||
bind_executor(
|
||||
strand_,
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if (ec)
|
||||
return fail("onWriteResponse", ec);
|
||||
if (write_buffer->size() == bytes_transferred)
|
||||
return doProtocolStart();
|
||||
return fail("Failed to write header");
|
||||
}));
|
||||
doProtocolStart();
|
||||
}
|
||||
|
||||
std::string
|
||||
PeerImp::name() const
|
||||
{
|
||||
std::shared_lock read_lock{nameMutex_};
|
||||
return name_;
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
PeerImp::domain() const
|
||||
{
|
||||
return headers_["Server-Domain"];
|
||||
return attributes_.serverDomain.value_or("");
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -903,7 +901,11 @@ PeerImp::doProtocolStart()
|
||||
void
|
||||
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::onReadMessage : strand in this thread");
|
||||
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
@@ -943,7 +945,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
|
||||
if (ec)
|
||||
return fail("onReadMessage", ec);
|
||||
if (!socket_.is_open())
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (gracefulClose_)
|
||||
return;
|
||||
@@ -952,22 +954,27 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
read_buffer_.consume(bytes_consumed);
|
||||
}
|
||||
|
||||
auto self = shared_from_this();
|
||||
|
||||
// Timeout on writes only
|
||||
stream_.async_read_some(
|
||||
stream_ptr_->async_read_some(
|
||||
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onReadMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onReadMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::onWriteMessage : strand in this thread");
|
||||
|
||||
if (!socketOpen())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
@@ -980,7 +987,6 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
else
|
||||
stream << "onWriteMessage";
|
||||
}
|
||||
|
||||
metrics_.sent.add_message(bytes_transferred);
|
||||
|
||||
XRPL_ASSERT(
|
||||
@@ -990,27 +996,31 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
if (!send_queue_.empty())
|
||||
{
|
||||
// Timeout on writes only
|
||||
return boost::asio::async_write(
|
||||
stream_,
|
||||
boost::asio::buffer(
|
||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
return stream_ptr_->async_write(
|
||||
boost::asio::buffer(send_queue_.front()->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)),
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onWriteMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (gracefulClose_)
|
||||
{
|
||||
return stream_.async_shutdown(bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onShutdown,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
return stream_ptr_->async_shutdown([self](boost::beast::error_code ec) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(
|
||||
self->strand_, [self, ec]() { self->onShutdown(ec); });
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1298,8 +1308,8 @@ PeerImp::handleTransaction(
|
||||
auto stx = std::make_shared<STTx const>(sit);
|
||||
uint256 txID = stx->getTransactionID();
|
||||
|
||||
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
|
||||
// LCOV_EXCL_START
|
||||
// Charge strongly for attempting to relay a txn with
|
||||
// tfInnerBatchTxn LCOV_EXCL_START
|
||||
if (stx->isFlag(tfInnerBatchTxn) &&
|
||||
getCurrentTransactionRules()->enabled(featureBatch))
|
||||
{
|
||||
@@ -1322,8 +1332,9 @@ PeerImp::handleTransaction(
|
||||
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
|
||||
}
|
||||
|
||||
// Erase only if the server has seen this tx. If the server has not
|
||||
// seen this tx then the tx could not has been queued for this peer.
|
||||
// Erase only if the server has seen this tx. If the server has
|
||||
// not seen this tx then the tx could not has been queued for
|
||||
// this peer.
|
||||
else if (eraseTxQueue && txReduceRelayEnabled())
|
||||
removeTxQueue(txID);
|
||||
|
||||
@@ -1485,7 +1496,7 @@ void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||
{
|
||||
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
|
||||
if (!ledgerReplayEnabled_)
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "proof_path_request disabled");
|
||||
@@ -1523,7 +1534,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
||||
{
|
||||
if (!ledgerReplayEnabled_)
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "proof_path_response disabled");
|
||||
@@ -1540,7 +1551,7 @@ void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||
{
|
||||
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
|
||||
if (!ledgerReplayEnabled_)
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "replay_delta_request disabled");
|
||||
@@ -1578,7 +1589,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
||||
{
|
||||
if (!ledgerReplayEnabled_)
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "replay_delta_response disabled");
|
||||
@@ -1710,14 +1721,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive lookup
|
||||
// every time a spam packet is received
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||
|
||||
// If the operator has specified that untrusted proposals be dropped then
|
||||
// this happens here I.e. before further wasting CPU verifying the signature
|
||||
// of an untrusted key
|
||||
// If the operator has specified that untrusted proposals be dropped
|
||||
// then this happens here I.e. before further wasting CPU verifying the
|
||||
// signature of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// report untrusted proposal messages
|
||||
@@ -1746,8 +1757,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||
!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
|
||||
// receives within IDLED seconds since the message has been relayed.
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||
@@ -1828,8 +1840,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
{
|
||||
bool outOfSync{false};
|
||||
{
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
||||
// guarded by recentLock_.
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||
// be guarded by recentLock_.
|
||||
std::lock_guard sl(recentLock_);
|
||||
if (!closedLedgerHash_.isZero())
|
||||
{
|
||||
@@ -1851,8 +1863,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
|
||||
|
||||
{
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
||||
// guarded by recentLock_.
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||
// be guarded by recentLock_.
|
||||
std::lock_guard sl(recentLock_);
|
||||
if (peerChangedLedgers)
|
||||
{
|
||||
@@ -2053,7 +2065,8 @@ PeerImp::onValidatorListMessage(
|
||||
std::vector<ValidatorBlobInfo> const& blobs)
|
||||
{
|
||||
// If there are no blobs, the message is malformed (possibly because of
|
||||
// ValidatorList class rules), so charge accordingly and skip processing.
|
||||
// ValidatorList class rules), so charge accordingly and skip
|
||||
// processing.
|
||||
if (blobs.empty())
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
|
||||
@@ -2110,7 +2123,8 @@ PeerImp::onValidatorListMessage(
|
||||
|
||||
XRPL_ASSERT(
|
||||
applyResult.publisherKey,
|
||||
"ripple::PeerImp::onValidatorListMessage : publisher key is "
|
||||
"ripple::PeerImp::onValidatorListMessage : publisher key "
|
||||
"is "
|
||||
"set");
|
||||
auto const& pubKey = *applyResult.publisherKey;
|
||||
#ifndef NDEBUG
|
||||
@@ -2119,7 +2133,8 @@ PeerImp::onValidatorListMessage(
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
iter->second < applyResult.sequence,
|
||||
"ripple::PeerImp::onValidatorListMessage : lower sequence");
|
||||
"ripple::PeerImp::onValidatorListMessage : lower "
|
||||
"sequence");
|
||||
}
|
||||
#endif
|
||||
publisherListSequences_[pubKey] = applyResult.sequence;
|
||||
@@ -2132,12 +2147,14 @@ PeerImp::onValidatorListMessage(
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
XRPL_ASSERT(
|
||||
applyResult.sequence && applyResult.publisherKey,
|
||||
"ripple::PeerImp::onValidatorListMessage : nonzero sequence "
|
||||
"ripple::PeerImp::onValidatorListMessage : nonzero "
|
||||
"sequence "
|
||||
"and set publisher key");
|
||||
XRPL_ASSERT(
|
||||
publisherListSequences_[*applyResult.publisherKey] <=
|
||||
applyResult.sequence,
|
||||
"ripple::PeerImp::onValidatorListMessage : maximum sequence");
|
||||
"ripple::PeerImp::onValidatorListMessage : maximum "
|
||||
"sequence");
|
||||
}
|
||||
#endif // !NDEBUG
|
||||
|
||||
@@ -2149,7 +2166,8 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid best list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid best "
|
||||
"list "
|
||||
"disposition");
|
||||
}
|
||||
|
||||
@@ -2193,7 +2211,8 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid worst list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid worst "
|
||||
"list "
|
||||
"disposition");
|
||||
}
|
||||
|
||||
@@ -2253,7 +2272,8 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid "
|
||||
"list "
|
||||
"disposition");
|
||||
}
|
||||
}
|
||||
@@ -2297,7 +2317,8 @@ PeerImp::onMessage(
|
||||
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "ValidatorListCollection: received validator list from peer "
|
||||
<< "ValidatorListCollection: received validator list from "
|
||||
"peer "
|
||||
<< "using protocol version " << to_string(protocol_)
|
||||
<< " which shouldn't support this feature.";
|
||||
fee_.update(Resource::feeUselessData, "unsupported peer");
|
||||
@@ -2306,7 +2327,8 @@ PeerImp::onMessage(
|
||||
else if (m->version() < 2)
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "ValidatorListCollection: received invalid validator list "
|
||||
<< "ValidatorListCollection: received invalid validator "
|
||||
"list "
|
||||
"version "
|
||||
<< m->version() << " from peer using protocol version "
|
||||
<< to_string(protocol_);
|
||||
@@ -2366,9 +2388,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
return;
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
// RH TODO: when isTrusted = false we should probably also cache a
|
||||
// key suppression for 30 seconds to avoid doing a relatively
|
||||
// expensive lookup every time a spam packet is received
|
||||
auto const isTrusted =
|
||||
app_.validators().trusted(val->getSignerPublic());
|
||||
|
||||
@@ -2393,9 +2415,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
|
||||
if (!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
// Count unique messages (Slots has it's own 'HashRouter'),
|
||||
// which a peer receives within IDLED seconds since the message
|
||||
// has been relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
|
||||
@@ -2888,8 +2910,8 @@ PeerImp::checkTransaction(
|
||||
|
||||
if (isPseudoTx(*stx))
|
||||
{
|
||||
// Don't do anything with pseudo transactions except put them in the
|
||||
// TransactionMaster cache
|
||||
// Don't do anything with pseudo transactions except put them in
|
||||
// the TransactionMaster cache
|
||||
std::string reason;
|
||||
auto tx = std::make_shared<Transaction>(stx, reason, app_);
|
||||
XRPL_ASSERT(
|
||||
@@ -3045,16 +3067,17 @@ PeerImp::checkValidation(
|
||||
return;
|
||||
}
|
||||
|
||||
// FIXME it should be safe to remove this try/catch. Investigate codepaths.
|
||||
// FIXME it should be safe to remove this try/catch. Investigate
|
||||
// codepaths.
|
||||
try
|
||||
{
|
||||
if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
|
||||
cluster())
|
||||
{
|
||||
// haveMessage contains peers, which are suppressed; i.e. the peers
|
||||
// are the source of the message, consequently the message should
|
||||
// not be relayed to these peers. But the message must be counted
|
||||
// as part of the squelch logic.
|
||||
// haveMessage contains peers, which are suppressed; i.e. the
|
||||
// peers are the source of the message, consequently the message
|
||||
// should not be relayed to these peers. But the message must be
|
||||
// counted as part of the squelch logic.
|
||||
auto haveMessage =
|
||||
overlay_.relay(*packet, key, val->getSignerPublic());
|
||||
if (!haveMessage.empty())
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/circular_buffer.hpp>
|
||||
#include <boost/endian/conversion.hpp>
|
||||
@@ -49,6 +50,37 @@ namespace ripple {
|
||||
struct ValidatorBlobInfo;
|
||||
class SHAMap;
|
||||
|
||||
/** Attributes extracted from peer HTTP headers */
|
||||
struct PeerAttributes
|
||||
{
|
||||
// Feature flags
|
||||
bool compressionEnabled = false;
|
||||
bool txReduceRelayEnabled = false;
|
||||
bool ledgerReplayEnabled = false;
|
||||
bool vpReduceRelayEnabled = false;
|
||||
|
||||
// Connection information
|
||||
bool crawlEnabled = false;
|
||||
std::optional<std::string> userAgent;
|
||||
std::optional<std::string> serverInfo;
|
||||
std::optional<std::string> networkId;
|
||||
std::optional<std::string> serverDomain;
|
||||
|
||||
// Ledger information
|
||||
std::optional<uint256> closedLedgerHash;
|
||||
std::optional<uint256> previousLedgerHash;
|
||||
|
||||
// Validation state
|
||||
bool hasValidLedgerHashes = false;
|
||||
};
|
||||
|
||||
/** Extract peer attributes from HTTP headers and application configuration */
|
||||
PeerAttributes
|
||||
extractPeerAttributes(
|
||||
boost::beast::http::fields const& headers,
|
||||
Config const& config,
|
||||
bool inbound);
|
||||
|
||||
class PeerImp : public Peer,
|
||||
public std::enable_shared_from_this<PeerImp>,
|
||||
public OverlayImpl::Child
|
||||
@@ -60,7 +92,6 @@ public:
|
||||
private:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using error_code = boost::system::error_code;
|
||||
using socket_type = boost::asio::ip::tcp::socket;
|
||||
using middle_type = boost::beast::tcp_stream;
|
||||
using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
using address_type = boost::asio::ip::address;
|
||||
@@ -69,55 +100,6 @@ private:
|
||||
boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
|
||||
using Compressed = compression::Compressed;
|
||||
|
||||
Application& app_;
|
||||
id_t const id_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::WrappedSink p_sink_;
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
std::unique_ptr<stream_type> stream_ptr_;
|
||||
socket_type& socket_;
|
||||
stream_type& stream_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
OverlayImpl& overlay_;
|
||||
bool const inbound_;
|
||||
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
std::string name_;
|
||||
std::shared_mutex mutable nameMutex_;
|
||||
|
||||
// The indices of the smallest and largest ledgers this peer has available
|
||||
//
|
||||
LedgerIndex minLedger_ = 0;
|
||||
LedgerIndex maxLedger_ = 0;
|
||||
uint256 closedLedgerHash_;
|
||||
uint256 previousLedgerHash_;
|
||||
|
||||
boost::circular_buffer<uint256> recentLedgers_{128};
|
||||
boost::circular_buffer<uint256> recentTxSets_{128};
|
||||
|
||||
std::optional<std::chrono::milliseconds> latency_;
|
||||
std::optional<std::uint32_t> lastPingSeq_;
|
||||
clock_type::time_point lastPingTime_;
|
||||
clock_type::time_point const creationTime_;
|
||||
|
||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||
|
||||
// Notes on thread locking:
|
||||
//
|
||||
// During an audit it was noted that some member variables that looked
|
||||
@@ -165,37 +147,6 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
std::mutex mutable recentLock_;
|
||||
protocol::TMStatusChange last_status_;
|
||||
Resource::Consumer usage_;
|
||||
ChargeWithContext fee_;
|
||||
std::shared_ptr<PeerFinder::Slot> const slot_;
|
||||
boost::beast::multi_buffer read_buffer_;
|
||||
http_request_type request_;
|
||||
http_response_type response_;
|
||||
boost::beast::http::fields const& headers_;
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
// been sent to or received from this peer.
|
||||
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
||||
|
||||
Compressed compressionEnabled_ = Compressed::Off;
|
||||
|
||||
// Queue of transactions' hashes that have not been
|
||||
// relayed. The hashes are sent once a second to a peer
|
||||
// and the peer requests missing transactions from the node.
|
||||
hash_set<uint256> txQueue_;
|
||||
// true if tx reduce-relay feature is enabled on the peer.
|
||||
bool txReduceRelayEnabled_ = false;
|
||||
|
||||
bool ledgerReplayEnabled_ = false;
|
||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
class Metrics
|
||||
{
|
||||
public:
|
||||
@@ -229,6 +180,78 @@ private:
|
||||
Metrics recv;
|
||||
} metrics_;
|
||||
|
||||
Application& app_;
|
||||
id_t const id_;
|
||||
|
||||
beast::WrappedSink sink_;
|
||||
beast::WrappedSink p_sink_;
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
OverlayImpl& overlay_;
|
||||
bool const inbound_;
|
||||
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
std::string name_;
|
||||
|
||||
// The indices of the smallest and largest ledgers this peer has available
|
||||
//
|
||||
LedgerIndex minLedger_ = 0;
|
||||
LedgerIndex maxLedger_ = 0;
|
||||
uint256 closedLedgerHash_;
|
||||
uint256 previousLedgerHash_;
|
||||
|
||||
boost::circular_buffer<uint256> recentLedgers_{128};
|
||||
boost::circular_buffer<uint256> recentTxSets_{128};
|
||||
|
||||
std::optional<std::chrono::milliseconds> latency_;
|
||||
std::optional<std::uint32_t> lastPingSeq_;
|
||||
clock_type::time_point lastPingTime_;
|
||||
clock_type::time_point const creationTime_;
|
||||
|
||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||
|
||||
std::mutex mutable recentLock_;
|
||||
protocol::TMStatusChange last_status_;
|
||||
Resource::Consumer usage_;
|
||||
ChargeWithContext fee_;
|
||||
std::shared_ptr<PeerFinder::Slot> const slot_;
|
||||
boost::beast::multi_buffer read_buffer_;
|
||||
PeerAttributes const attributes_;
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
// been sent to or received from this peer.
|
||||
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
||||
|
||||
// Queue of transactions' hashes that have not been
|
||||
// relayed. The hashes are sent once a second to a peer
|
||||
// and the peer requests missing transactions from the node.
|
||||
hash_set<uint256> txQueue_;
|
||||
|
||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
public:
|
||||
PeerImp(PeerImp const&) = delete;
|
||||
PeerImp&
|
||||
@@ -237,29 +260,30 @@ public:
|
||||
/** Create an active incoming peer from an established ssl connection. */
|
||||
PeerImp(
|
||||
Application& app,
|
||||
id_t id,
|
||||
OverlayImpl& overlay,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
OverlayImpl& overlay);
|
||||
ProtocolVersion protocol,
|
||||
PeerAttributes const& attributes,
|
||||
PublicKey const& publicKey,
|
||||
id_t id,
|
||||
std::string const& name);
|
||||
|
||||
/** Create outgoing, handshaked peer. */
|
||||
// VFALCO legacyPublicKey should be implied by the Slot
|
||||
template <class Buffers>
|
||||
PeerImp(
|
||||
Application& app,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Buffers const& buffers,
|
||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||
http_response_type&& response,
|
||||
Resource::Consumer usage,
|
||||
Resource::Consumer consumer,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
id_t id,
|
||||
OverlayImpl& overlay);
|
||||
PeerAttributes const& attributes,
|
||||
OverlayImpl& overlay,
|
||||
std::string const& name);
|
||||
|
||||
virtual ~PeerImp();
|
||||
|
||||
@@ -431,13 +455,13 @@ public:
|
||||
bool
|
||||
compressionEnabled() const override
|
||||
{
|
||||
return compressionEnabled_ == Compressed::On;
|
||||
return attributes_.compressionEnabled;
|
||||
}
|
||||
|
||||
bool
|
||||
txReduceRelayEnabled() const override
|
||||
{
|
||||
return txReduceRelayEnabled_;
|
||||
return attributes_.txReduceRelayEnabled;
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -519,6 +543,9 @@ private:
|
||||
handleHaveTransactions(
|
||||
std::shared_ptr<protocol::TMHaveTransactions> const& m);
|
||||
|
||||
bool
|
||||
socketOpen() const;
|
||||
|
||||
public:
|
||||
//--------------------------------------------------------------------------
|
||||
//
|
||||
@@ -650,15 +677,16 @@ private:
|
||||
template <class Buffers>
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Buffers const& buffers,
|
||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||
http_response_type&& response,
|
||||
Resource::Consumer usage,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
id_t id,
|
||||
OverlayImpl& overlay)
|
||||
PeerAttributes const& attributes,
|
||||
OverlayImpl& overlay,
|
||||
std::string const& name)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
@@ -667,10 +695,8 @@ PeerImp::PeerImp(
|
||||
, journal_(sink_)
|
||||
, p_journal_(p_sink_)
|
||||
, stream_ptr_(std::move(stream_ptr))
|
||||
, socket_(stream_ptr_->next_layer().socket())
|
||||
, stream_(*stream_ptr_)
|
||||
, strand_(boost::asio::make_strand(socket_.get_executor()))
|
||||
, timer_(waitable_timer{socket_.get_executor()})
|
||||
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||
, timer_(waitable_timer{stream_ptr_->get_executor()})
|
||||
, remote_address_(slot->remote_endpoint())
|
||||
, overlay_(overlay)
|
||||
, inbound_(false)
|
||||
@@ -678,43 +704,25 @@ PeerImp::PeerImp(
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, name_(name)
|
||||
, lastPingTime_(clock_type::now())
|
||||
, creationTime_(clock_type::now())
|
||||
, squelch_(app_.journal("Squelch"))
|
||||
, usage_(usage)
|
||||
, fee_{Resource::feeTrivialPeer}
|
||||
, slot_(std::move(slot))
|
||||
, response_(std::move(response))
|
||||
, headers_(response_)
|
||||
, compressionEnabled_(
|
||||
peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_COMPR,
|
||||
"lz4",
|
||||
app_.config().COMPRESSION)
|
||||
? Compressed::On
|
||||
: Compressed::Off)
|
||||
, txReduceRelayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_TXRR,
|
||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_LEDGER_REPLAY,
|
||||
app_.config().LEDGER_REPLAY))
|
||||
, attributes_(attributes)
|
||||
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
||||
{
|
||||
read_buffer_.commit(boost::asio::buffer_copy(
|
||||
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
|
||||
JLOG(journal_.info())
|
||||
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_VPRR,
|
||||
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
|
||||
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
JLOG(journal_.info()) << "compression enabled "
|
||||
<< attributes_.compressionEnabled
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< attributes_.vpReduceRelayEnabled
|
||||
<< " tx reduce-relay enabled "
|
||||
<< attributes_.txReduceRelayEnabled << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
}
|
||||
|
||||
template <class FwdIt, class>
|
||||
|
||||
144
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp
Normal file
144
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp
Normal file
@@ -0,0 +1,144 @@
|
||||
|
||||
#include <xrpld/app/consensus/RCLCxPeerPos.h>
|
||||
#include <xrpld/app/ledger/Ledger.h>
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/overlay/detail/handlers/ProtocolMessageHandler.h>
|
||||
#include <xrpld/shamap/SHAMap.h>
|
||||
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Helper function to check for valid uint256 values in protobuf buffers
|
||||
static bool
|
||||
stringIsUint256Sized(std::string const& pBuffStr)
|
||||
{
|
||||
return pBuffStr.size() == uint256::size();
|
||||
}
|
||||
|
||||
void
|
||||
ProtocolMessageHandler::onMessage(
|
||||
std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
{
|
||||
protocol::TMProposeSet& set = *m;
|
||||
|
||||
auto const sig = makeSlice(set.signature());
|
||||
|
||||
// Preliminary check for the validity of the signature: A DER encoded
|
||||
// signature can't be longer than 72 bytes.
|
||||
if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
|
||||
(publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Proposal: malformed";
|
||||
fee_.update(
|
||||
Resource::feeInvalidSignature,
|
||||
" signature can't be longer than 72 bytes");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!stringIsUint256Sized(set.currenttxhash()) ||
|
||||
!stringIsUint256Sized(set.previousledger()))
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Proposal: malformed";
|
||||
fee_.update(Resource::feeMalformedRequest, "bad hashes");
|
||||
return;
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||
|
||||
// If the operator has specified that untrusted proposals be dropped
|
||||
// then this happens here I.e. before further wasting CPU verifying the
|
||||
// signature of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// report untrusted proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
|
||||
return;
|
||||
}
|
||||
|
||||
uint256 const proposeHash{set.currenttxhash()};
|
||||
uint256 const prevLedger{set.previousledger()};
|
||||
|
||||
NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
|
||||
|
||||
uint256 const suppression = proposalUniqueId(
|
||||
proposeHash,
|
||||
prevLedger,
|
||||
set.proposeseq(),
|
||||
closeTime,
|
||||
publicKey.slice(),
|
||||
sig);
|
||||
|
||||
if (auto [added, relayed] =
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||
!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||
|
||||
// report duplicate proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_duplicate,
|
||||
Message::messageSize(*m));
|
||||
|
||||
JLOG(p_journal_.trace()) << "Proposal: duplicate";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isTrusted)
|
||||
{
|
||||
if (tracking_.load() == Tracking::diverged)
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "Proposal: Dropping untrusted (peer divergence)";
|
||||
return;
|
||||
}
|
||||
|
||||
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
JLOG(p_journal_.trace())
|
||||
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
|
||||
|
||||
auto proposal = RCLCxPeerPos(
|
||||
publicKey,
|
||||
sig,
|
||||
suppression,
|
||||
RCLCxPeerPos::Proposal{
|
||||
prevLedger,
|
||||
set.proposeseq(),
|
||||
proposeHash,
|
||||
closeTime,
|
||||
app_.timeKeeper().closeTime(),
|
||||
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
|
||||
|
||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||
app_.getJobQueue().addJob(
|
||||
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
|
||||
"recvPropose->checkPropose",
|
||||
[weak, isTrusted, m, proposal]() {
|
||||
if (auto peer = weak.lock())
|
||||
peer->checkPropose(isTrusted, m, proposal);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
142
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h
Normal file
142
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h
Normal file
@@ -0,0 +1,142 @@
|
||||
|
||||
#include <xrpld/app/consensus/RCLCxPeerPos.h>
|
||||
#include <xrpld/app/ledger/Ledger.h>
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/shamap/SHAMap.h>
|
||||
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class ProtocolMessageHandler
|
||||
{
|
||||
private:
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
OverlayImpl& overlay_;
|
||||
Application& app_;
|
||||
|
||||
public:
|
||||
void
|
||||
onMessageUnknown(std::uint16_t type);
|
||||
|
||||
void
|
||||
onMessageBegin(
|
||||
std::uint16_t type,
|
||||
std::shared_ptr<::google::protobuf::Message> const& m,
|
||||
std::size_t size,
|
||||
std::size_t uncompressed_size,
|
||||
bool isCompressed);
|
||||
|
||||
void
|
||||
onMessageEnd(
|
||||
std::uint16_t type,
|
||||
std::shared_ptr<::google::protobuf::Message> const& m);
|
||||
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMManifests> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMPing> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMCluster> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMEndpoints> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMTransaction> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMLedgerData> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMProposeSet> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMStatusChange> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMValidatorList> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMValidatorListCollection> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMTransactions> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
|
||||
|
||||
private:
|
||||
//--------------------------------------------------------------------------
|
||||
// lockedRecentLock is passed as a reminder to callers that recentLock_
|
||||
// must be locked.
|
||||
void
|
||||
addLedger(
|
||||
uint256 const& hash,
|
||||
std::lock_guard<std::mutex> const& lockedRecentLock);
|
||||
|
||||
void
|
||||
doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
|
||||
|
||||
void
|
||||
onValidatorListMessage(
|
||||
std::string const& messageType,
|
||||
std::string const& manifest,
|
||||
std::uint32_t version,
|
||||
std::vector<protocol::ValidatorBlobInfo> const& blobs);
|
||||
|
||||
/** Process peer's request to send missing transactions. The request is
|
||||
sent in response to TMHaveTransactions.
|
||||
@param packet protocol message containing missing transactions' hashes.
|
||||
*/
|
||||
void
|
||||
doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
|
||||
|
||||
void
|
||||
checkTransaction(
|
||||
HashRouterFlags flags,
|
||||
bool checkSignature,
|
||||
std::shared_ptr<STTx const> const& stx,
|
||||
bool batch);
|
||||
|
||||
void
|
||||
checkPropose(
|
||||
bool isTrusted,
|
||||
std::shared_ptr<protocol::TMProposeSet> const& packet,
|
||||
RCLCxPeerPos peerPos);
|
||||
|
||||
void
|
||||
checkValidation(
|
||||
std::shared_ptr<STValidation> const& val,
|
||||
uint256 const& key,
|
||||
std::shared_ptr<protocol::TMValidation> const& packet);
|
||||
|
||||
void
|
||||
sendLedgerBase(
|
||||
std::shared_ptr<Ledger const> const& ledger,
|
||||
protocol::TMLedgerData& ledgerData);
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
|
||||
std::shared_ptr<SHAMap const>
|
||||
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
|
||||
|
||||
void
|
||||
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
14
src/xrpld/server/detail/StreamInterface.cpp
Normal file
14
src/xrpld/server/detail/StreamInterface.cpp
Normal file
@@ -0,0 +1,14 @@
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
std::optional<base_uint<256>>
|
||||
ProductionStream::makeSharedValue(beast::Journal journal)
|
||||
{
|
||||
// Delegate to the existing Handshake module
|
||||
return ripple::makeSharedValue(*stream_, journal);
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
Reference in New Issue
Block a user