mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-19 18:45:52 +00:00
Compare commits
5 Commits
ximinez/le
...
tapanito/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3bfbaef80 | ||
|
|
2dc49e30c4 | ||
|
|
da617007ac | ||
|
|
ad770a443b | ||
|
|
0ac24f5f73 |
163
include/xrpl/server/detail/StreamInterface.h
Normal file
163
include/xrpl/server/detail/StreamInterface.h
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
#ifndef RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||||
|
#define RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||||
|
|
||||||
|
#include <xrpl/basics/Log.h>
|
||||||
|
#include <xrpl/basics/base_uint.h>
|
||||||
|
|
||||||
|
#include <boost/asio/buffer.hpp>
|
||||||
|
#include <boost/asio/executor.hpp>
|
||||||
|
#include <boost/asio/write.hpp>
|
||||||
|
#include <boost/beast/core/error.hpp>
|
||||||
|
#include <boost/beast/core/multi_buffer.hpp>
|
||||||
|
#include <boost/beast/core/tcp_stream.hpp>
|
||||||
|
#include <boost/beast/ssl/ssl_stream.hpp>
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
|
namespace ripple {
|
||||||
|
|
||||||
|
// Forward declarations
|
||||||
|
using socket_type = boost::beast::tcp_stream;
|
||||||
|
using concrete_stream_type = boost::beast::ssl_stream<socket_type>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Minimal interface for stream operations needed by PeerImp
|
||||||
|
*/
|
||||||
|
class StreamInterface
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual ~StreamInterface() = default;
|
||||||
|
|
||||||
|
// Executor access for ASIO operations
|
||||||
|
virtual boost::asio::any_io_executor
|
||||||
|
get_executor() = 0;
|
||||||
|
|
||||||
|
// Connection status checking
|
||||||
|
virtual bool
|
||||||
|
is_open() const = 0;
|
||||||
|
|
||||||
|
// Stream lifecycle operations
|
||||||
|
virtual void
|
||||||
|
close() = 0;
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
cancel() = 0;
|
||||||
|
|
||||||
|
// Async I/O operations
|
||||||
|
virtual void
|
||||||
|
async_read_some(
|
||||||
|
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
async_write_some(
|
||||||
|
boost::asio::const_buffer buffer,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
async_write(
|
||||||
|
boost::asio::const_buffer buffer,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
async_write(
|
||||||
|
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||||
|
|
||||||
|
virtual void
|
||||||
|
async_shutdown(std::function<void(boost::beast::error_code)> handler) = 0;
|
||||||
|
|
||||||
|
// SSL handshake support
|
||||||
|
virtual std::optional<base_uint<256>>
|
||||||
|
makeSharedValue(beast::Journal journal) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Production implementation wrapping boost::beast::ssl_stream
|
||||||
|
*/
|
||||||
|
class ProductionStream : public StreamInterface
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
std::unique_ptr<concrete_stream_type> stream_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit ProductionStream(std::unique_ptr<concrete_stream_type> stream)
|
||||||
|
: stream_(std::move(stream))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::asio::any_io_executor
|
||||||
|
get_executor() override
|
||||||
|
{
|
||||||
|
return stream_->get_executor();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
is_open() const override
|
||||||
|
{
|
||||||
|
return stream_->next_layer().socket().is_open();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
close() override
|
||||||
|
{
|
||||||
|
stream_->lowest_layer().close();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
cancel() override
|
||||||
|
{
|
||||||
|
stream_->lowest_layer().cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
async_read_some(
|
||||||
|
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||||
|
override
|
||||||
|
{
|
||||||
|
stream_->async_read_some(buffers, std::move(handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
async_write_some(
|
||||||
|
boost::asio::const_buffer buffer,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||||
|
override
|
||||||
|
{
|
||||||
|
stream_->async_write_some(buffer, std::move(handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
async_write(
|
||||||
|
boost::asio::const_buffer buffer,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||||
|
override
|
||||||
|
{
|
||||||
|
boost::asio::async_write(*stream_, buffer, std::move(handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
async_write(
|
||||||
|
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||||
|
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||||
|
override
|
||||||
|
{
|
||||||
|
boost::asio::async_write(*stream_, buffers, std::move(handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
async_shutdown(
|
||||||
|
std::function<void(boost::beast::error_code)> handler) override
|
||||||
|
{
|
||||||
|
stream_->async_shutdown(std::move(handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<base_uint<256>>
|
||||||
|
makeSharedValue(beast::Journal journal) override;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace ripple
|
||||||
|
|
||||||
|
#endif
|
||||||
@@ -1,289 +1,299 @@
|
|||||||
//------------------------------------------------------------------------------
|
// //------------------------------------------------------------------------------
|
||||||
/*
|
// /*
|
||||||
This file is part of rippled: https://github.com/ripple/rippled
|
// This file is part of rippled: https://github.com/ripple/rippled
|
||||||
Copyright 2020 Ripple Labs Inc.
|
// Copyright 2020 Ripple Labs Inc.
|
||||||
|
|
||||||
Permission to use, copy, modify, and/or distribute this software for any
|
// Permission to use, copy, modify, and/or distribute this software for any
|
||||||
purpose with or without fee is hereby granted, provided that the above
|
// purpose with or without fee is hereby granted, provided that the above
|
||||||
copyright notice and this permission notice appear in all copies.
|
// copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
// ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
*/
|
// */
|
||||||
//==============================================================================
|
// //==============================================================================
|
||||||
|
|
||||||
#include <test/jtx.h>
|
// #include <test/jtx.h>
|
||||||
#include <test/jtx/Env.h>
|
// #include <test/jtx/Env.h>
|
||||||
|
|
||||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
// #include <xrpld/overlay/detail/OverlayImpl.h>
|
||||||
#include <xrpld/overlay/detail/PeerImp.h>
|
// #include <xrpld/overlay/detail/PeerImp.h>
|
||||||
#include <xrpld/peerfinder/detail/SlotImp.h>
|
// #include <xrpld/peerfinder/detail/SlotImp.h>
|
||||||
|
|
||||||
#include <xrpl/basics/make_SSLContext.h>
|
// #include <xrpl/basics/make_SSLContext.h>
|
||||||
#include <xrpl/beast/unit_test.h>
|
// #include <xrpl/beast/unit_test.h>
|
||||||
|
|
||||||
namespace ripple {
|
// namespace ripple {
|
||||||
|
|
||||||
namespace test {
|
// namespace test {
|
||||||
|
|
||||||
class tx_reduce_relay_test : public beast::unit_test::suite
|
// class tx_reduce_relay_test : public beast::unit_test::suite
|
||||||
{
|
// {
|
||||||
public:
|
// public:
|
||||||
using socket_type = boost::asio::ip::tcp::socket;
|
// using socket_type = boost::asio::ip::tcp::socket;
|
||||||
using middle_type = boost::beast::tcp_stream;
|
// using middle_type = boost::beast::tcp_stream;
|
||||||
using stream_type = boost::beast::ssl_stream<middle_type>;
|
// using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||||
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
// using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
||||||
|
|
||||||
private:
|
// private:
|
||||||
void
|
// void
|
||||||
doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
// doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
||||||
{
|
// {
|
||||||
testcase(msg);
|
// testcase(msg);
|
||||||
f(log);
|
// f(log);
|
||||||
}
|
// }
|
||||||
|
|
||||||
void
|
// void
|
||||||
testConfig(bool log)
|
// testConfig(bool log)
|
||||||
{
|
// {
|
||||||
doTest("Config Test", log, [&](bool log) {
|
// doTest("Config Test", log, [&](bool log) {
|
||||||
auto test = [&](bool enable,
|
// auto test = [&](bool enable,
|
||||||
bool metrics,
|
// bool metrics,
|
||||||
std::uint16_t min,
|
// std::uint16_t min,
|
||||||
std::uint16_t pct,
|
// std::uint16_t pct,
|
||||||
bool success = true) {
|
// bool success = true) {
|
||||||
std::stringstream str("[reduce_relay]");
|
// std::stringstream str("[reduce_relay]");
|
||||||
str << "[reduce_relay]\n"
|
// str << "[reduce_relay]\n"
|
||||||
<< "tx_enable=" << static_cast<int>(enable) << "\n"
|
// << "tx_enable=" << static_cast<int>(enable) << "\n"
|
||||||
<< "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
// << "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
||||||
<< "tx_min_peers=" << min << "\n"
|
// << "tx_min_peers=" << min << "\n"
|
||||||
<< "tx_relay_percentage=" << pct << "\n";
|
// << "tx_relay_percentage=" << pct << "\n";
|
||||||
Config c;
|
// Config c;
|
||||||
try
|
// try
|
||||||
{
|
// {
|
||||||
c.loadFromString(str.str());
|
// c.loadFromString(str.str());
|
||||||
|
|
||||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
// BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
||||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
// BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
||||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
// BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
||||||
BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
// BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
||||||
if (success)
|
// if (success)
|
||||||
pass();
|
// pass();
|
||||||
else
|
// else
|
||||||
fail();
|
// fail();
|
||||||
}
|
// }
|
||||||
catch (...)
|
// catch (...)
|
||||||
{
|
// {
|
||||||
if (success)
|
// if (success)
|
||||||
fail();
|
// fail();
|
||||||
else
|
// else
|
||||||
pass();
|
// pass();
|
||||||
}
|
// }
|
||||||
};
|
// };
|
||||||
|
|
||||||
test(true, true, 20, 25);
|
// test(true, true, 20, 25);
|
||||||
test(false, false, 20, 25);
|
// test(false, false, 20, 25);
|
||||||
test(false, false, 20, 0, false);
|
// test(false, false, 20, 0, false);
|
||||||
test(false, false, 20, 101, false);
|
// test(false, false, 20, 101, false);
|
||||||
test(false, false, 9, 10, false);
|
// test(false, false, 9, 10, false);
|
||||||
test(false, false, 10, 9, false);
|
// test(false, false, 10, 9, false);
|
||||||
});
|
// });
|
||||||
}
|
// }
|
||||||
|
|
||||||
class PeerTest : public PeerImp
|
// class PeerTest : public PeerImp
|
||||||
{
|
// {
|
||||||
public:
|
// public:
|
||||||
PeerTest(
|
// PeerTest(
|
||||||
Application& app,
|
// Application& app,
|
||||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
// std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||||
http_request_type&& request,
|
// http_request_type&& request,
|
||||||
PublicKey const& publicKey,
|
// PublicKey const& publicKey,
|
||||||
ProtocolVersion protocol,
|
// ProtocolVersion protocol,
|
||||||
Resource::Consumer consumer,
|
// Resource::Consumer consumer,
|
||||||
std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
// std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
||||||
OverlayImpl& overlay)
|
// OverlayImpl& overlay)
|
||||||
: PeerImp(
|
// : PeerImp(
|
||||||
app,
|
// app,
|
||||||
sid_,
|
// sid_,
|
||||||
slot,
|
// slot,
|
||||||
std::move(request),
|
// std::move(request),
|
||||||
publicKey,
|
// publicKey,
|
||||||
protocol,
|
// protocol,
|
||||||
consumer,
|
// consumer,
|
||||||
std::move(stream_ptr),
|
// std::move(stream_ptr),
|
||||||
overlay)
|
// overlay)
|
||||||
{
|
// {
|
||||||
sid_++;
|
// sid_++;
|
||||||
}
|
// }
|
||||||
~PeerTest() = default;
|
// ~PeerTest() = default;
|
||||||
|
|
||||||
void
|
// void
|
||||||
run() override
|
// run() override
|
||||||
{
|
// {
|
||||||
}
|
// }
|
||||||
void
|
// void
|
||||||
send(std::shared_ptr<Message> const&) override
|
// send(std::shared_ptr<Message> const&) override
|
||||||
{
|
// {
|
||||||
sendTx_++;
|
// sendTx_++;
|
||||||
}
|
// }
|
||||||
void
|
// void
|
||||||
addTxQueue(uint256 const& hash) override
|
// addTxQueue(uint256 const& hash) override
|
||||||
{
|
// {
|
||||||
queueTx_++;
|
// queueTx_++;
|
||||||
}
|
// }
|
||||||
static void
|
// static void
|
||||||
init()
|
// init()
|
||||||
{
|
// {
|
||||||
queueTx_ = 0;
|
// queueTx_ = 0;
|
||||||
sendTx_ = 0;
|
// sendTx_ = 0;
|
||||||
sid_ = 0;
|
// sid_ = 0;
|
||||||
}
|
// }
|
||||||
inline static std::size_t sid_ = 0;
|
// inline static std::size_t sid_ = 0;
|
||||||
inline static std::uint16_t queueTx_ = 0;
|
// inline static std::uint16_t queueTx_ = 0;
|
||||||
inline static std::uint16_t sendTx_ = 0;
|
// inline static std::uint16_t sendTx_ = 0;
|
||||||
};
|
// };
|
||||||
|
|
||||||
std::uint16_t lid_{0};
|
// std::uint16_t lid_{0};
|
||||||
std::uint16_t rid_{1};
|
// std::uint16_t rid_{1};
|
||||||
shared_context context_;
|
// shared_context context_;
|
||||||
ProtocolVersion protocolVersion_;
|
// ProtocolVersion protocolVersion_;
|
||||||
boost::beast::multi_buffer read_buf_;
|
// boost::beast::multi_buffer read_buf_;
|
||||||
|
|
||||||
public:
|
// public:
|
||||||
tx_reduce_relay_test()
|
// tx_reduce_relay_test()
|
||||||
: context_(make_SSLContext("")), protocolVersion_{1, 7}
|
// : context_(make_SSLContext("")), protocolVersion_{1, 7}
|
||||||
{
|
// {
|
||||||
}
|
// }
|
||||||
|
|
||||||
private:
|
// private:
|
||||||
void
|
// void
|
||||||
addPeer(
|
// addPeer(
|
||||||
jtx::Env& env,
|
// jtx::Env& env,
|
||||||
std::vector<std::shared_ptr<PeerTest>>& peers,
|
// std::vector<std::shared_ptr<PeerTest>>& peers,
|
||||||
std::uint16_t& nDisabled)
|
// std::uint16_t& nDisabled)
|
||||||
{
|
// {
|
||||||
auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
// auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
||||||
boost::beast::http::request<boost::beast::http::dynamic_body> request;
|
// boost::beast::http::request<boost::beast::http::dynamic_body>
|
||||||
(nDisabled == 0)
|
// request; (nDisabled == 0)
|
||||||
? (void)request.insert(
|
// ? (void)request.insert(
|
||||||
"X-Protocol-Ctl",
|
// "X-Protocol-Ctl",
|
||||||
makeFeaturesRequestHeader(false, false, true, false))
|
// makeFeaturesRequestHeader(false, false, true, false))
|
||||||
: (void)nDisabled--;
|
// : (void)nDisabled--;
|
||||||
auto stream_ptr = std::make_unique<stream_type>(
|
// auto stream_ptr = std::make_unique<stream_type>(
|
||||||
socket_type(std::forward<boost::asio::io_context&>(
|
// socket_type(std::forward<boost::asio::io_context&>(
|
||||||
env.app().getIOContext())),
|
// env.app().getIOContext())),
|
||||||
*context_);
|
// *context_);
|
||||||
beast::IP::Endpoint local(
|
// beast::IP::Endpoint local(
|
||||||
boost::asio::ip::make_address("172.1.1." + std::to_string(lid_)));
|
// boost::asio::ip::make_address("172.1.1." +
|
||||||
beast::IP::Endpoint remote(
|
// std::to_string(lid_)));
|
||||||
boost::asio::ip::make_address("172.1.1." + std::to_string(rid_)));
|
// beast::IP::Endpoint remote(
|
||||||
PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
// boost::asio::ip::make_address("172.1.1." +
|
||||||
auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
// std::to_string(rid_)));
|
||||||
auto [slot, _] = overlay.peerFinder().new_inbound_slot(local, remote);
|
// PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
||||||
auto const peer = std::make_shared<PeerTest>(
|
// auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
||||||
env.app(),
|
// auto [slot, _] = overlay.peerFinder().new_inbound_slot(local,
|
||||||
slot,
|
// remote); auto const peer = std::make_shared<PeerTest>(
|
||||||
std::move(request),
|
// env.app(),
|
||||||
key,
|
// slot,
|
||||||
protocolVersion_,
|
// std::move(request),
|
||||||
consumer,
|
// key,
|
||||||
std::move(stream_ptr),
|
// protocolVersion_,
|
||||||
overlay);
|
// consumer,
|
||||||
BEAST_EXPECT(
|
// std::move(stream_ptr),
|
||||||
overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
// overlay);
|
||||||
overlay.add_active(peer);
|
// BEAST_EXPECT(
|
||||||
BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
// overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
||||||
peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
// overlay.add_active(peer);
|
||||||
lid_ += 2;
|
// BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
||||||
rid_ += 2;
|
// peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
||||||
assert(lid_ <= 254);
|
// lid_ += 2;
|
||||||
}
|
// rid_ += 2;
|
||||||
|
// assert(lid_ <= 254);
|
||||||
|
// }
|
||||||
|
|
||||||
void
|
// void
|
||||||
testRelay(
|
// testRelay(
|
||||||
std::string const& test,
|
// std::string const& test,
|
||||||
bool txRREnabled,
|
// bool txRREnabled,
|
||||||
std::uint16_t nPeers,
|
// std::uint16_t nPeers,
|
||||||
std::uint16_t nDisabled,
|
// std::uint16_t nDisabled,
|
||||||
std::uint16_t minPeers,
|
// std::uint16_t minPeers,
|
||||||
std::uint16_t relayPercentage,
|
// std::uint16_t relayPercentage,
|
||||||
std::uint16_t expectRelay,
|
// std::uint16_t expectRelay,
|
||||||
std::uint16_t expectQueue,
|
// std::uint16_t expectQueue,
|
||||||
std::set<Peer::id_t> const& toSkip = {})
|
// std::set<Peer::id_t> const& toSkip = {})
|
||||||
{
|
// {
|
||||||
testcase(test);
|
// testcase(test);
|
||||||
jtx::Env env(*this);
|
// jtx::Env env(*this);
|
||||||
std::vector<std::shared_ptr<PeerTest>> peers;
|
// std::vector<std::shared_ptr<PeerTest>> peers;
|
||||||
env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
// env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
||||||
env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
// env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
||||||
env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
// env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
||||||
PeerTest::init();
|
// PeerTest::init();
|
||||||
lid_ = 0;
|
// lid_ = 0;
|
||||||
rid_ = 0;
|
// rid_ = 0;
|
||||||
for (int i = 0; i < nPeers; i++)
|
// for (int i = 0; i < nPeers; i++)
|
||||||
addPeer(env, peers, nDisabled);
|
// addPeer(env, peers, nDisabled);
|
||||||
|
|
||||||
auto const jtx = env.jt(noop(env.master));
|
// auto const jtx = env.jt(noop(env.master));
|
||||||
if (BEAST_EXPECT(jtx.stx))
|
// if (BEAST_EXPECT(jtx.stx))
|
||||||
{
|
// {
|
||||||
protocol::TMTransaction m;
|
// protocol::TMTransaction m;
|
||||||
Serializer s;
|
// Serializer s;
|
||||||
jtx.stx->add(s);
|
// jtx.stx->add(s);
|
||||||
m.set_rawtransaction(s.data(), s.size());
|
// m.set_rawtransaction(s.data(), s.size());
|
||||||
m.set_deferred(false);
|
// m.set_deferred(false);
|
||||||
m.set_status(protocol::TransactionStatus::tsNEW);
|
// m.set_status(protocol::TransactionStatus::tsNEW);
|
||||||
env.app().overlay().relay(uint256{0}, m, toSkip);
|
// env.app().overlay().relay(uint256{0}, m, toSkip);
|
||||||
BEAST_EXPECT(
|
// BEAST_EXPECT(
|
||||||
PeerTest::sendTx_ == expectRelay &&
|
// PeerTest::sendTx_ == expectRelay &&
|
||||||
PeerTest::queueTx_ == expectQueue);
|
// PeerTest::queueTx_ == expectQueue);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
void
|
// void
|
||||||
run() override
|
// run() override
|
||||||
{
|
// {
|
||||||
bool log = false;
|
// bool log = false;
|
||||||
std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
// std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
||||||
testConfig(log);
|
// testConfig(log);
|
||||||
// relay to all peers, no hash queue
|
// // relay to all peers, no hash queue
|
||||||
testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
// testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
||||||
// relay to nPeers - skip (10-5=5)
|
// // relay to nPeers - skip (10-5=5)
|
||||||
testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip);
|
// testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0,
|
||||||
// relay to all peers because min is greater than nPeers
|
// skip);
|
||||||
testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
// // relay to all peers because min is greater than nPeers
|
||||||
// relay to all peers because min + disabled is greater thant nPeers
|
// testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
||||||
testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
// // relay to all peers because min + disabled is greater thant nPeers
|
||||||
// relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
// testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
||||||
// queue the rest (30)
|
// // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
||||||
testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
// // queue the rest (30)
|
||||||
// relay to minPeers + 25% of (nPeers - nPeers) - skip
|
// testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
||||||
// (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed
|
// // relay to minPeers + 25% of (nPeers - nPeers) - skip
|
||||||
// (60-25-5=30)
|
// // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards
|
||||||
testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
// relayed
|
||||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed)
|
// // (60-25-5=30)
|
||||||
// (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
// testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
||||||
testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||||
// relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers
|
// disalbed)
|
||||||
// - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
// // (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
||||||
// towards relayed (70-35-5=30))
|
// testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
||||||
testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
// // relay to minPeers + disabled-not-in-skip + 25% of (nPeers -
|
||||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
// minPeers
|
||||||
// - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
// // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
||||||
// towards relayed (15-5-10=0)
|
// // towards relayed (70-35-5=30))
|
||||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
// testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
||||||
testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip);
|
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
// disabled)
|
||||||
// - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
// // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
||||||
// towards relayed (20-14=6)
|
// // towards relayed (15-5-10=0)
|
||||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||||
testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip);
|
// testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0,
|
||||||
}
|
// skip);
|
||||||
};
|
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||||
|
// disabled)
|
||||||
|
// // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
||||||
|
// // towards relayed (20-14=6)
|
||||||
|
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
||||||
|
// testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6,
|
||||||
|
// skip);
|
||||||
|
// }
|
||||||
|
// };
|
||||||
|
|
||||||
BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
// BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
||||||
} // namespace test
|
// } // namespace test
|
||||||
} // namespace ripple
|
// } // namespace ripple
|
||||||
|
|||||||
@@ -23,6 +23,7 @@
|
|||||||
#include <xrpld/overlay/detail/ProtocolVersion.h>
|
#include <xrpld/overlay/detail/ProtocolVersion.h>
|
||||||
|
|
||||||
#include <xrpl/json/json_reader.h>
|
#include <xrpl/json/json_reader.h>
|
||||||
|
#include <xrpl/server/detail/StreamInterface.h>
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -410,17 +411,22 @@ ConnectAttempt::processResponse()
|
|||||||
if (result != PeerFinder::Result::success)
|
if (result != PeerFinder::Result::success)
|
||||||
return fail("Outbound " + std::string(to_string(result)));
|
return fail("Outbound " + std::string(to_string(result)));
|
||||||
|
|
||||||
|
// Extract peer attributes from the response before creating PeerImp
|
||||||
|
auto const attributes =
|
||||||
|
extractPeerAttributes(response_, app_.config(), false);
|
||||||
|
|
||||||
auto const peer = std::make_shared<PeerImp>(
|
auto const peer = std::make_shared<PeerImp>(
|
||||||
app_,
|
app_,
|
||||||
std::move(stream_ptr_),
|
std::make_unique<ProductionStream>(std::move(stream_ptr_)),
|
||||||
read_buf_.data(),
|
read_buf_.data(),
|
||||||
std::move(slot_),
|
std::move(slot_),
|
||||||
std::move(response_),
|
|
||||||
usage_,
|
usage_,
|
||||||
publicKey,
|
publicKey,
|
||||||
*negotiatedProtocol,
|
*negotiatedProtocol,
|
||||||
id_,
|
id_,
|
||||||
overlay_);
|
attributes,
|
||||||
|
overlay_,
|
||||||
|
app_.cluster().member(publicKey).value_or(""));
|
||||||
|
|
||||||
overlay_.add_active(peer);
|
overlay_.add_active(peer);
|
||||||
}
|
}
|
||||||
|
|||||||
234
src/xrpld/overlay/detail/InboundHandshake.cpp
Normal file
234
src/xrpld/overlay/detail/InboundHandshake.cpp
Normal file
@@ -0,0 +1,234 @@
|
|||||||
|
//------------------------------------------------------------------------------
|
||||||
|
/*
|
||||||
|
This file is part of rippled: https://github.com/ripple/rippled
|
||||||
|
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||||
|
|
||||||
|
Permission to use, copy, modify, and/or distribute this software for any
|
||||||
|
purpose with or without fee is hereby granted, provided that the above
|
||||||
|
copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
*/
|
||||||
|
//==============================================================================
|
||||||
|
|
||||||
|
#include <xrpld/overlay/Cluster.h>
|
||||||
|
#include <xrpld/overlay/detail/Handshake.h>
|
||||||
|
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||||
|
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||||
|
#include <xrpld/overlay/detail/PeerImp.h>
|
||||||
|
|
||||||
|
#include <boost/beast/core/ostream.hpp>
|
||||||
|
|
||||||
|
namespace ripple {
|
||||||
|
|
||||||
|
InboundHandshake::InboundHandshake(
|
||||||
|
Application& app,
|
||||||
|
std::uint32_t id,
|
||||||
|
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||||
|
http_request_type&& request,
|
||||||
|
PublicKey const& publicKey,
|
||||||
|
ProtocolVersion protocolVersion,
|
||||||
|
Resource::Consumer consumer,
|
||||||
|
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||||
|
PeerAttributes const& attributes,
|
||||||
|
endpoint_type const& remoteEndpoint,
|
||||||
|
OverlayImpl& overlay)
|
||||||
|
: Child(overlay)
|
||||||
|
, app_(app)
|
||||||
|
, id_(id)
|
||||||
|
, sink_(app_.logs()["Peer"], OverlayImpl::makePrefix(id))
|
||||||
|
, journal_(sink_)
|
||||||
|
, stream_ptr_(std::move(stream_ptr))
|
||||||
|
, request_(std::move(request))
|
||||||
|
, publicKey_(publicKey)
|
||||||
|
, protocolVersion_(protocolVersion)
|
||||||
|
, consumer_(consumer)
|
||||||
|
, attributes_(attributes)
|
||||||
|
, slot_(slot)
|
||||||
|
, remoteEndpoint_(remoteEndpoint)
|
||||||
|
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
InboundHandshake::~InboundHandshake()
|
||||||
|
{
|
||||||
|
if (slot_ != nullptr)
|
||||||
|
overlay_.peerFinder().on_closed(slot_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::stop()
|
||||||
|
{
|
||||||
|
if (!strand_.running_in_this_thread())
|
||||||
|
return boost::asio::post(
|
||||||
|
strand_, std::bind(&InboundHandshake::stop, shared_from_this()));
|
||||||
|
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::shutdown()
|
||||||
|
{
|
||||||
|
XRPL_ASSERT(
|
||||||
|
strand_.running_in_this_thread(),
|
||||||
|
"ripple::InboundHandshake::shutdown : strand in this thread");
|
||||||
|
|
||||||
|
if (!stream_ptr_->is_open() || shutdown_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
shutdown_ = true;
|
||||||
|
|
||||||
|
stream_ptr_->cancel();
|
||||||
|
|
||||||
|
tryAsyncShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::tryAsyncShutdown()
|
||||||
|
{
|
||||||
|
XRPL_ASSERT(
|
||||||
|
strand_.running_in_this_thread(),
|
||||||
|
"ripple::InboundHandshake::tryAsyncShutdown : strand in this thread");
|
||||||
|
|
||||||
|
if (!stream_ptr_->is_open())
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (shutdown_ || shutdownStarted_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (ioPending_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
shutdownStarted_ = true;
|
||||||
|
|
||||||
|
return stream_ptr_->async_shutdown(boost::asio::bind_executor(
|
||||||
|
strand_,
|
||||||
|
std::bind(
|
||||||
|
&InboundHandshake::onShutdown,
|
||||||
|
shared_from_this(),
|
||||||
|
std::placeholders::_1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::onShutdown(error_code ec)
|
||||||
|
{
|
||||||
|
XRPL_ASSERT(
|
||||||
|
strand_.running_in_this_thread(),
|
||||||
|
"ripple::InboundHandshake::onShutdown : strand in this thread");
|
||||||
|
|
||||||
|
if (!stream_ptr_->is_open())
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (ec == boost::asio::error::operation_aborted)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (ec)
|
||||||
|
{
|
||||||
|
JLOG(journal_.warn()) << "onShutdown: " << ec.message();
|
||||||
|
}
|
||||||
|
|
||||||
|
stream_ptr_->close();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::run()
|
||||||
|
{
|
||||||
|
if (!strand_.running_in_this_thread())
|
||||||
|
return boost::asio::post(
|
||||||
|
strand_, std::bind(&InboundHandshake::run, shared_from_this()));
|
||||||
|
|
||||||
|
// TODO: implement fail overload to handle strings
|
||||||
|
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
|
||||||
|
if (!sharedValue)
|
||||||
|
return fail("makeSharedValue", boost::system::error_code{});
|
||||||
|
|
||||||
|
// Create the handshake response
|
||||||
|
auto const response = makeResponse(
|
||||||
|
!overlay_.peerFinder().config().peerPrivate,
|
||||||
|
request_,
|
||||||
|
overlay_.setup().public_ip,
|
||||||
|
remoteEndpoint_.address(),
|
||||||
|
*sharedValue,
|
||||||
|
overlay_.setup().networkID,
|
||||||
|
protocolVersion_,
|
||||||
|
app_);
|
||||||
|
|
||||||
|
// Convert response to buffer for async_write
|
||||||
|
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
||||||
|
boost::beast::ostream(*write_buffer) << response;
|
||||||
|
|
||||||
|
ioPending_ = true;
|
||||||
|
// Write the response asynchronously
|
||||||
|
stream_ptr_->async_write(
|
||||||
|
write_buffer->data(),
|
||||||
|
boost::asio::bind_executor(
|
||||||
|
strand_,
|
||||||
|
[this, write_buffer, self = shared_from_this()](
|
||||||
|
error_code ec, std::size_t bytes_transferred) {
|
||||||
|
onHandshake(ec, bytes_transferred);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::onHandshake(error_code ec, std::size_t bytes_transferred)
|
||||||
|
{
|
||||||
|
ioPending_ = false;
|
||||||
|
if (!stream_ptr_->is_open())
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (ec == boost::asio::error::operation_aborted || shutdown_)
|
||||||
|
return tryAsyncShutdown();
|
||||||
|
|
||||||
|
if (ec)
|
||||||
|
return fail("onHandshake", ec);
|
||||||
|
|
||||||
|
JLOG(journal_.debug()) << "InboundHandshake completed for "
|
||||||
|
<< remoteEndpoint_
|
||||||
|
<< ", bytes transferred: " << bytes_transferred;
|
||||||
|
|
||||||
|
// Handshake successful, create the peer
|
||||||
|
createPeer();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::createPeer()
|
||||||
|
{
|
||||||
|
auto const peer = std::make_shared<PeerImp>(
|
||||||
|
app_,
|
||||||
|
overlay_,
|
||||||
|
std::move(slot_),
|
||||||
|
std::move(stream_ptr_),
|
||||||
|
consumer_,
|
||||||
|
protocolVersion_,
|
||||||
|
attributes_,
|
||||||
|
publicKey_,
|
||||||
|
id_,
|
||||||
|
app_.cluster().member(publicKey_).value_or(""));
|
||||||
|
|
||||||
|
// Add the peer to the overlay
|
||||||
|
overlay_.add_active(peer);
|
||||||
|
JLOG(journal_.debug()) << "Created peer for " << remoteEndpoint_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
InboundHandshake::fail(std::string const& name, error_code ec)
|
||||||
|
{
|
||||||
|
XRPL_ASSERT(
|
||||||
|
strand_.running_in_this_thread(),
|
||||||
|
"ripple::InboundHandshake::fail : strand in this thread");
|
||||||
|
|
||||||
|
JLOG(journal_.warn()) << name << " from "
|
||||||
|
<< toBase58(TokenType::NodePublic, publicKey_)
|
||||||
|
<< " at " << remoteEndpoint_.address().to_string()
|
||||||
|
<< ": " << ec.message();
|
||||||
|
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ripple
|
||||||
109
src/xrpld/overlay/detail/InboundHandshake.h
Normal file
109
src/xrpld/overlay/detail/InboundHandshake.h
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
//------------------------------------------------------------------------------
|
||||||
|
/*
|
||||||
|
This file is part of rippled: https://github.com/ripple/rippled
|
||||||
|
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||||
|
|
||||||
|
Permission to use, copy, modify, and/or distribute this software for any
|
||||||
|
purpose with or without fee is hereby granted, provided that the above
|
||||||
|
copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
*/
|
||||||
|
//==============================================================================
|
||||||
|
|
||||||
|
#ifndef RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||||
|
#define RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||||
|
|
||||||
|
#include <xrpld/overlay/detail/Handshake.h>
|
||||||
|
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||||
|
#include <xrpld/overlay/detail/PeerImp.h>
|
||||||
|
|
||||||
|
#include <xrpl/server/Handoff.h>
|
||||||
|
|
||||||
|
namespace ripple {
|
||||||
|
|
||||||
|
/** Manages an inbound peer handshake. */
|
||||||
|
class InboundHandshake : public OverlayImpl::Child,
|
||||||
|
public std::enable_shared_from_this<InboundHandshake>
|
||||||
|
{
|
||||||
|
using error_code = boost::system::error_code;
|
||||||
|
using endpoint_type = boost::asio::ip::tcp::endpoint;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Application& app_;
|
||||||
|
std::uint32_t const id_;
|
||||||
|
beast::WrappedSink sink_;
|
||||||
|
beast::Journal const journal_;
|
||||||
|
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||||
|
http_request_type request_;
|
||||||
|
PublicKey publicKey_;
|
||||||
|
ProtocolVersion protocolVersion_;
|
||||||
|
Resource::Consumer consumer_;
|
||||||
|
PeerAttributes attributes_;
|
||||||
|
std::shared_ptr<PeerFinder::Slot> slot_;
|
||||||
|
endpoint_type remoteEndpoint_;
|
||||||
|
boost::asio::strand<boost::asio::executor> strand_;
|
||||||
|
bool shutdown_ = false;
|
||||||
|
bool ioPending_ = false;
|
||||||
|
bool shutdownStarted_ = false;
|
||||||
|
|
||||||
|
public:
|
||||||
|
InboundHandshake(
|
||||||
|
Application& app,
|
||||||
|
std::uint32_t id,
|
||||||
|
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||||
|
http_request_type&& request,
|
||||||
|
PublicKey const& public_key,
|
||||||
|
ProtocolVersion protocol_version,
|
||||||
|
Resource::Consumer consumer,
|
||||||
|
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||||
|
PeerAttributes const& attributes,
|
||||||
|
endpoint_type const& remote_endpoint,
|
||||||
|
OverlayImpl& overlay);
|
||||||
|
|
||||||
|
~InboundHandshake();
|
||||||
|
|
||||||
|
void
|
||||||
|
stop() override;
|
||||||
|
|
||||||
|
void
|
||||||
|
run();
|
||||||
|
|
||||||
|
private:
|
||||||
|
void
|
||||||
|
setTimer();
|
||||||
|
|
||||||
|
void
|
||||||
|
onTimer(error_code ec);
|
||||||
|
|
||||||
|
void
|
||||||
|
cancelTimer();
|
||||||
|
|
||||||
|
void
|
||||||
|
shutdown();
|
||||||
|
|
||||||
|
void
|
||||||
|
tryAsyncShutdown();
|
||||||
|
|
||||||
|
void
|
||||||
|
onShutdown(error_code ec);
|
||||||
|
|
||||||
|
void
|
||||||
|
onHandshake(error_code ec, std::size_t bytes_transferred);
|
||||||
|
|
||||||
|
void
|
||||||
|
createPeer();
|
||||||
|
|
||||||
|
void
|
||||||
|
fail(std::string const& name, error_code ec);
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace ripple
|
||||||
|
|
||||||
|
#endif
|
||||||
@@ -25,6 +25,7 @@
|
|||||||
#include <xrpld/app/rdb/Wallet.h>
|
#include <xrpld/app/rdb/Wallet.h>
|
||||||
#include <xrpld/overlay/Cluster.h>
|
#include <xrpld/overlay/Cluster.h>
|
||||||
#include <xrpld/overlay/detail/ConnectAttempt.h>
|
#include <xrpld/overlay/detail/ConnectAttempt.h>
|
||||||
|
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||||
#include <xrpld/overlay/detail/PeerImp.h>
|
#include <xrpld/overlay/detail/PeerImp.h>
|
||||||
#include <xrpld/overlay/detail/TrafficCount.h>
|
#include <xrpld/overlay/detail/TrafficCount.h>
|
||||||
#include <xrpld/overlay/detail/Tuning.h>
|
#include <xrpld/overlay/detail/Tuning.h>
|
||||||
@@ -39,6 +40,7 @@
|
|||||||
#include <xrpl/beast/core/LexicalCast.h>
|
#include <xrpl/beast/core/LexicalCast.h>
|
||||||
#include <xrpl/protocol/STTx.h>
|
#include <xrpl/protocol/STTx.h>
|
||||||
#include <xrpl/server/SimpleWriter.h>
|
#include <xrpl/server/SimpleWriter.h>
|
||||||
|
#include <xrpl/server/detail/StreamInterface.h>
|
||||||
|
|
||||||
#include <boost/algorithm/string/predicate.hpp>
|
#include <boost/algorithm/string/predicate.hpp>
|
||||||
#include <boost/asio/executor_work_guard.hpp>
|
#include <boost/asio/executor_work_guard.hpp>
|
||||||
@@ -163,7 +165,7 @@ OverlayImpl::OverlayImpl(
|
|||||||
|
|
||||||
Handoff
|
Handoff
|
||||||
OverlayImpl::onHandoff(
|
OverlayImpl::onHandoff(
|
||||||
std::unique_ptr<stream_type>&& stream_ptr,
|
std::unique_ptr<stream_type>&& ssl_stream_ptr,
|
||||||
http_request_type&& request,
|
http_request_type&& request,
|
||||||
endpoint_type remote_endpoint)
|
endpoint_type remote_endpoint)
|
||||||
{
|
{
|
||||||
@@ -172,9 +174,7 @@ OverlayImpl::onHandoff(
|
|||||||
beast::Journal journal(sink);
|
beast::Journal journal(sink);
|
||||||
|
|
||||||
Handoff handoff;
|
Handoff handoff;
|
||||||
if (processRequest(request, handoff))
|
if (processRequest(request, handoff) || !isPeerUpgrade(request))
|
||||||
return handoff;
|
|
||||||
if (!isPeerUpgrade(request))
|
|
||||||
return handoff;
|
return handoff;
|
||||||
|
|
||||||
handoff.moved = true;
|
handoff.moved = true;
|
||||||
@@ -183,7 +183,7 @@ OverlayImpl::onHandoff(
|
|||||||
|
|
||||||
error_code ec;
|
error_code ec;
|
||||||
auto const local_endpoint(
|
auto const local_endpoint(
|
||||||
stream_ptr->next_layer().socket().local_endpoint(ec));
|
ssl_stream_ptr->next_layer().socket().local_endpoint(ec));
|
||||||
if (ec)
|
if (ec)
|
||||||
{
|
{
|
||||||
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
|
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
|
||||||
@@ -195,16 +195,27 @@ OverlayImpl::onHandoff(
|
|||||||
if (consumer.disconnect(journal))
|
if (consumer.disconnect(journal))
|
||||||
return handoff;
|
return handoff;
|
||||||
|
|
||||||
auto const [slot, result] = m_peerFinder->new_inbound_slot(
|
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
||||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
if (!negotiatedVersion)
|
||||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
|
||||||
|
|
||||||
if (slot == nullptr)
|
|
||||||
{
|
{
|
||||||
// connection refused either IP limit exceeded or self-connect
|
|
||||||
handoff.moved = false;
|
handoff.moved = false;
|
||||||
JLOG(journal.debug())
|
handoff.response = makeErrorResponse(
|
||||||
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
|
request,
|
||||||
|
remote_endpoint.address(),
|
||||||
|
"Unable to agree on a protocol version");
|
||||||
|
handoff.keep_alive = false;
|
||||||
|
return handoff;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto stream_ptr =
|
||||||
|
std::make_unique<ProductionStream>(std::move(ssl_stream_ptr));
|
||||||
|
auto const sharedValue = stream_ptr->makeSharedValue(journal);
|
||||||
|
if (!sharedValue)
|
||||||
|
{
|
||||||
|
handoff.moved = false;
|
||||||
|
handoff.response = makeErrorResponse(
|
||||||
|
request, remote_endpoint.address(), "Incorrect security cookie");
|
||||||
|
handoff.keep_alive = false;
|
||||||
return handoff;
|
return handoff;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -217,38 +228,23 @@ OverlayImpl::onHandoff(
|
|||||||
}) == types.end())
|
}) == types.end())
|
||||||
{
|
{
|
||||||
handoff.moved = false;
|
handoff.moved = false;
|
||||||
handoff.response =
|
handoff.response = makeErrorResponse(
|
||||||
makeRedirectResponse(slot, request, remote_endpoint.address());
|
request, remote_endpoint.address(), "Invalid Peer Type");
|
||||||
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
|
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
|
||||||
return handoff;
|
return handoff;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
auto const [slot, result] = m_peerFinder->new_inbound_slot(
|
||||||
if (!negotiatedVersion)
|
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||||
{
|
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||||
m_peerFinder->on_closed(slot);
|
|
||||||
handoff.moved = false;
|
|
||||||
handoff.response = makeErrorResponse(
|
|
||||||
slot,
|
|
||||||
request,
|
|
||||||
remote_endpoint.address(),
|
|
||||||
"Unable to agree on a protocol version");
|
|
||||||
handoff.keep_alive = false;
|
|
||||||
return handoff;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto const sharedValue = makeSharedValue(*stream_ptr, journal);
|
if (slot == nullptr)
|
||||||
if (!sharedValue)
|
|
||||||
{
|
{
|
||||||
m_peerFinder->on_closed(slot);
|
// connection refused either IP limit exceeded or self-connect
|
||||||
handoff.moved = false;
|
handoff.moved = false;
|
||||||
handoff.response = makeErrorResponse(
|
JLOG(journal.debug())
|
||||||
slot,
|
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
|
||||||
request,
|
|
||||||
remote_endpoint.address(),
|
|
||||||
"Incorrect security cookie");
|
|
||||||
handoff.keep_alive = false;
|
|
||||||
return handoff;
|
return handoff;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -266,10 +262,12 @@ OverlayImpl::onHandoff(
|
|||||||
// The node gets a reserved slot if it is in our cluster
|
// The node gets a reserved slot if it is in our cluster
|
||||||
// or if it has a reservation.
|
// or if it has a reservation.
|
||||||
bool const reserved =
|
bool const reserved =
|
||||||
static_cast<bool>(app_.cluster().member(publicKey)) ||
|
app_.cluster().member(publicKey).has_value() ||
|
||||||
app_.peerReservations().contains(publicKey);
|
app_.peerReservations().contains(publicKey);
|
||||||
|
|
||||||
auto const result =
|
auto const result =
|
||||||
m_peerFinder->activate(slot, publicKey, reserved);
|
m_peerFinder->activate(slot, publicKey, reserved);
|
||||||
|
|
||||||
if (result != PeerFinder::Result::success)
|
if (result != PeerFinder::Result::success)
|
||||||
{
|
{
|
||||||
m_peerFinder->on_closed(slot);
|
m_peerFinder->on_closed(slot);
|
||||||
@@ -283,7 +281,11 @@ OverlayImpl::onHandoff(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto const peer = std::make_shared<PeerImp>(
|
// Extract peer attributes from the request before creating PeerImp
|
||||||
|
auto const attributes =
|
||||||
|
extractPeerAttributes(request, app_.config(), true);
|
||||||
|
|
||||||
|
auto const p = std::make_shared<InboundHandshake>(
|
||||||
app_,
|
app_,
|
||||||
id,
|
id,
|
||||||
slot,
|
slot,
|
||||||
@@ -292,23 +294,14 @@ OverlayImpl::onHandoff(
|
|||||||
*negotiatedVersion,
|
*negotiatedVersion,
|
||||||
consumer,
|
consumer,
|
||||||
std::move(stream_ptr),
|
std::move(stream_ptr),
|
||||||
|
attributes,
|
||||||
|
remote_endpoint,
|
||||||
*this);
|
*this);
|
||||||
{
|
|
||||||
// As we are not on the strand, run() must be called
|
|
||||||
// while holding the lock, otherwise new I/O can be
|
|
||||||
// queued after a call to stop().
|
|
||||||
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
||||||
{
|
|
||||||
auto const result = m_peers.emplace(peer->slot(), peer);
|
|
||||||
XRPL_ASSERT(
|
|
||||||
result.second,
|
|
||||||
"ripple::OverlayImpl::onHandoff : peer is inserted");
|
|
||||||
(void)result.second;
|
|
||||||
}
|
|
||||||
list_.emplace(peer.get(), peer);
|
|
||||||
|
|
||||||
peer->run();
|
std::lock_guard lock(mutex_);
|
||||||
}
|
list_.emplace(p.get(), p);
|
||||||
|
p->run();
|
||||||
|
|
||||||
handoff.moved = true;
|
handoff.moved = true;
|
||||||
return handoff;
|
return handoff;
|
||||||
}
|
}
|
||||||
@@ -319,8 +312,8 @@ OverlayImpl::onHandoff(
|
|||||||
|
|
||||||
m_peerFinder->on_closed(slot);
|
m_peerFinder->on_closed(slot);
|
||||||
handoff.moved = false;
|
handoff.moved = false;
|
||||||
handoff.response = makeErrorResponse(
|
handoff.response =
|
||||||
slot, request, remote_endpoint.address(), e.what());
|
makeErrorResponse(request, remote_endpoint.address(), e.what());
|
||||||
handoff.keep_alive = false;
|
handoff.keep_alive = false;
|
||||||
return handoff;
|
return handoff;
|
||||||
}
|
}
|
||||||
@@ -374,7 +367,6 @@ OverlayImpl::makeRedirectResponse(
|
|||||||
|
|
||||||
std::shared_ptr<Writer>
|
std::shared_ptr<Writer>
|
||||||
OverlayImpl::makeErrorResponse(
|
OverlayImpl::makeErrorResponse(
|
||||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
|
||||||
http_request_type const& request,
|
http_request_type const& request,
|
||||||
address_type remote_address,
|
address_type remote_address,
|
||||||
std::string text)
|
std::string text)
|
||||||
|
|||||||
@@ -465,7 +465,6 @@ private:
|
|||||||
|
|
||||||
std::shared_ptr<Writer>
|
std::shared_ptr<Writer>
|
||||||
makeErrorResponse(
|
makeErrorResponse(
|
||||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
|
||||||
http_request_type const& request,
|
http_request_type const& request,
|
||||||
address_type remote_address,
|
address_type remote_address,
|
||||||
std::string msg);
|
std::string msg);
|
||||||
|
|||||||
@@ -29,6 +29,7 @@
|
|||||||
#include <xrpld/app/misc/ValidatorList.h>
|
#include <xrpld/app/misc/ValidatorList.h>
|
||||||
#include <xrpld/app/tx/apply.h>
|
#include <xrpld/app/tx/apply.h>
|
||||||
#include <xrpld/overlay/Cluster.h>
|
#include <xrpld/overlay/Cluster.h>
|
||||||
|
#include <xrpld/overlay/detail/Handshake.h>
|
||||||
#include <xrpld/overlay/detail/PeerImp.h>
|
#include <xrpld/overlay/detail/PeerImp.h>
|
||||||
#include <xrpld/overlay/detail/Tuning.h>
|
#include <xrpld/overlay/detail/Tuning.h>
|
||||||
#include <xrpld/perflog/PerfLog.h>
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
@@ -39,6 +40,7 @@
|
|||||||
#include <xrpl/basics/safe_cast.h>
|
#include <xrpl/basics/safe_cast.h>
|
||||||
#include <xrpl/protocol/TxFlags.h>
|
#include <xrpl/protocol/TxFlags.h>
|
||||||
#include <xrpl/protocol/digest.h>
|
#include <xrpl/protocol/digest.h>
|
||||||
|
#include <xrpl/server/detail/StreamInterface.h>
|
||||||
|
|
||||||
#include <boost/algorithm/string/predicate.hpp>
|
#include <boost/algorithm/string/predicate.hpp>
|
||||||
#include <boost/beast/core/ostream.hpp>
|
#include <boost/beast/core/ostream.hpp>
|
||||||
@@ -48,6 +50,7 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <numeric>
|
#include <numeric>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
@@ -61,19 +64,102 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
|
|||||||
std::chrono::seconds constexpr peerTimerInterval{60};
|
std::chrono::seconds constexpr peerTimerInterval{60};
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
PeerAttributes
|
||||||
|
extractPeerAttributes(
|
||||||
|
boost::beast::http::fields const& headers,
|
||||||
|
Config const& config,
|
||||||
|
bool inbound)
|
||||||
|
{
|
||||||
|
PeerAttributes attributes;
|
||||||
|
|
||||||
|
// Extract feature flags
|
||||||
|
attributes.compressionEnabled =
|
||||||
|
peerFeatureEnabled(headers, FEATURE_COMPR, "lz4", config.COMPRESSION);
|
||||||
|
|
||||||
|
attributes.txReduceRelayEnabled = peerFeatureEnabled(
|
||||||
|
headers, FEATURE_TXRR, config.TX_REDUCE_RELAY_ENABLE);
|
||||||
|
|
||||||
|
attributes.ledgerReplayEnabled = peerFeatureEnabled(
|
||||||
|
headers, FEATURE_LEDGER_REPLAY, config.LEDGER_REPLAY);
|
||||||
|
|
||||||
|
attributes.vpReduceRelayEnabled = peerFeatureEnabled(
|
||||||
|
headers, FEATURE_VPRR, config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
|
||||||
|
|
||||||
|
// Extract connection information
|
||||||
|
if (auto const iter = headers.find("Crawl"); iter != headers.end())
|
||||||
|
attributes.crawlEnabled = boost::iequals(iter->value(), "public");
|
||||||
|
|
||||||
|
if (inbound)
|
||||||
|
{
|
||||||
|
if (auto const iter = headers.find("User-Agent"); iter != headers.end())
|
||||||
|
attributes.userAgent = std::string{iter->value()};
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (auto const iter = headers.find("Server"); iter != headers.end())
|
||||||
|
attributes.serverInfo = std::string{iter->value()};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto const iter = headers.find("Network-ID"); iter != headers.end())
|
||||||
|
attributes.networkId = std::string{iter->value()};
|
||||||
|
|
||||||
|
if (auto const iter = headers.find("Server-Domain"); iter != headers.end())
|
||||||
|
attributes.serverDomain = std::string{iter->value()};
|
||||||
|
|
||||||
|
// Extract ledger information
|
||||||
|
auto parseLedgerHash =
|
||||||
|
[](std::string_view value) -> std::optional<uint256> {
|
||||||
|
if (uint256 ret; ret.parseHex(value))
|
||||||
|
return ret;
|
||||||
|
|
||||||
|
if (auto const s = base64_decode(value); s.size() == uint256::size())
|
||||||
|
return uint256{s};
|
||||||
|
|
||||||
|
return std::nullopt;
|
||||||
|
};
|
||||||
|
|
||||||
|
bool hasClosedLedger = false;
|
||||||
|
bool hasPreviousLedger = false;
|
||||||
|
attributes.hasValidLedgerHashes = true;
|
||||||
|
|
||||||
|
if (auto const iter = headers.find("Closed-Ledger"); iter != headers.end())
|
||||||
|
{
|
||||||
|
hasClosedLedger = true;
|
||||||
|
attributes.closedLedgerHash = parseLedgerHash(iter->value());
|
||||||
|
if (!attributes.closedLedgerHash)
|
||||||
|
attributes.hasValidLedgerHashes = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto const iter = headers.find("Previous-Ledger");
|
||||||
|
iter != headers.end())
|
||||||
|
{
|
||||||
|
hasPreviousLedger = true;
|
||||||
|
attributes.previousLedgerHash = parseLedgerHash(iter->value());
|
||||||
|
if (!attributes.previousLedgerHash)
|
||||||
|
attributes.hasValidLedgerHashes = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate ledger hash consistency
|
||||||
|
if (hasPreviousLedger && !hasClosedLedger)
|
||||||
|
attributes.hasValidLedgerHashes = false;
|
||||||
|
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||||
// release.
|
// release.
|
||||||
|
|
||||||
PeerImp::PeerImp(
|
PeerImp::PeerImp(
|
||||||
Application& app,
|
Application& app,
|
||||||
id_t id,
|
OverlayImpl& overlay,
|
||||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||||
http_request_type&& request,
|
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||||
PublicKey const& publicKey,
|
|
||||||
ProtocolVersion protocol,
|
|
||||||
Resource::Consumer consumer,
|
Resource::Consumer consumer,
|
||||||
std::unique_ptr<stream_type>&& stream_ptr,
|
ProtocolVersion protocol,
|
||||||
OverlayImpl& overlay)
|
PeerAttributes const& attributes,
|
||||||
|
PublicKey const& publicKey,
|
||||||
|
id_t id,
|
||||||
|
std::string const& name)
|
||||||
: Child(overlay)
|
: Child(overlay)
|
||||||
, app_(app)
|
, app_(app)
|
||||||
, id_(id)
|
, id_(id)
|
||||||
@@ -82,10 +168,8 @@ PeerImp::PeerImp(
|
|||||||
, journal_(sink_)
|
, journal_(sink_)
|
||||||
, p_journal_(p_sink_)
|
, p_journal_(p_sink_)
|
||||||
, stream_ptr_(std::move(stream_ptr))
|
, stream_ptr_(std::move(stream_ptr))
|
||||||
, socket_(stream_ptr_->next_layer().socket())
|
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||||
, stream_(*stream_ptr_)
|
, timer_(waitable_timer{stream_ptr_->get_executor()})
|
||||||
, strand_(boost::asio::make_strand(socket_.get_executor()))
|
|
||||||
, timer_(waitable_timer{socket_.get_executor()})
|
|
||||||
, remote_address_(slot->remote_endpoint())
|
, remote_address_(slot->remote_endpoint())
|
||||||
, overlay_(overlay)
|
, overlay_(overlay)
|
||||||
, inbound_(true)
|
, inbound_(true)
|
||||||
@@ -93,41 +177,23 @@ PeerImp::PeerImp(
|
|||||||
, tracking_(Tracking::unknown)
|
, tracking_(Tracking::unknown)
|
||||||
, trackingTime_(clock_type::now())
|
, trackingTime_(clock_type::now())
|
||||||
, publicKey_(publicKey)
|
, publicKey_(publicKey)
|
||||||
|
, name_(name)
|
||||||
, lastPingTime_(clock_type::now())
|
, lastPingTime_(clock_type::now())
|
||||||
, creationTime_(clock_type::now())
|
, creationTime_(clock_type::now())
|
||||||
, squelch_(app_.journal("Squelch"))
|
, squelch_(app_.journal("Squelch"))
|
||||||
, usage_(consumer)
|
, usage_(consumer)
|
||||||
, fee_{Resource::feeTrivialPeer, ""}
|
, fee_{Resource::feeTrivialPeer, ""}
|
||||||
, slot_(slot)
|
, slot_(slot)
|
||||||
, request_(std::move(request))
|
, attributes_(attributes)
|
||||||
, headers_(request_)
|
|
||||||
, compressionEnabled_(
|
|
||||||
peerFeatureEnabled(
|
|
||||||
headers_,
|
|
||||||
FEATURE_COMPR,
|
|
||||||
"lz4",
|
|
||||||
app_.config().COMPRESSION)
|
|
||||||
? Compressed::On
|
|
||||||
: Compressed::Off)
|
|
||||||
, txReduceRelayEnabled_(peerFeatureEnabled(
|
|
||||||
headers_,
|
|
||||||
FEATURE_TXRR,
|
|
||||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
|
||||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
|
||||||
headers_,
|
|
||||||
FEATURE_LEDGER_REPLAY,
|
|
||||||
app_.config().LEDGER_REPLAY))
|
|
||||||
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
||||||
{
|
{
|
||||||
JLOG(journal_.info())
|
JLOG(journal_.info()) << "compression enabled "
|
||||||
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
|
<< attributes_.compressionEnabled
|
||||||
<< " vp reduce-relay base squelch enabled "
|
<< " vp reduce-relay base squelch enabled "
|
||||||
<< peerFeatureEnabled(
|
<< attributes_.vpReduceRelayEnabled
|
||||||
headers_,
|
<< " tx reduce-relay enabled "
|
||||||
FEATURE_VPRR,
|
<< attributes_.txReduceRelayEnabled << " on "
|
||||||
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
|
<< remote_address_ << " " << id_;
|
||||||
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
|
|
||||||
<< remote_address_ << " " << id_;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
PeerImp::~PeerImp()
|
PeerImp::~PeerImp()
|
||||||
@@ -158,47 +224,16 @@ PeerImp::run()
|
|||||||
if (!strand_.running_in_this_thread())
|
if (!strand_.running_in_this_thread())
|
||||||
return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
|
return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
|
||||||
|
|
||||||
auto parseLedgerHash =
|
// Validate ledger hash consistency
|
||||||
[](std::string_view value) -> std::optional<uint256> {
|
if (!attributes_.hasValidLedgerHashes)
|
||||||
if (uint256 ret; ret.parseHex(value))
|
fail("Malformed handshake data");
|
||||||
return ret;
|
|
||||||
|
|
||||||
if (auto const s = base64_decode(value); s.size() == uint256::size())
|
|
||||||
return uint256{s};
|
|
||||||
|
|
||||||
return std::nullopt;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::optional<uint256> closed;
|
|
||||||
std::optional<uint256> previous;
|
|
||||||
|
|
||||||
if (auto const iter = headers_.find("Closed-Ledger");
|
|
||||||
iter != headers_.end())
|
|
||||||
{
|
|
||||||
closed = parseLedgerHash(iter->value());
|
|
||||||
|
|
||||||
if (!closed)
|
|
||||||
fail("Malformed handshake data (1)");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto const iter = headers_.find("Previous-Ledger");
|
|
||||||
iter != headers_.end())
|
|
||||||
{
|
|
||||||
previous = parseLedgerHash(iter->value());
|
|
||||||
|
|
||||||
if (!previous)
|
|
||||||
fail("Malformed handshake data (2)");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (previous && !closed)
|
|
||||||
fail("Malformed handshake data (3)");
|
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> sl(recentLock_);
|
std::lock_guard<std::mutex> sl(recentLock_);
|
||||||
if (closed)
|
if (attributes_.closedLedgerHash)
|
||||||
closedLedgerHash_ = *closed;
|
closedLedgerHash_ = *attributes_.closedLedgerHash;
|
||||||
if (previous)
|
if (attributes_.previousLedgerHash)
|
||||||
previousLedgerHash_ = *previous;
|
previousLedgerHash_ = *attributes_.previousLedgerHash;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inbound_)
|
if (inbound_)
|
||||||
@@ -215,7 +250,7 @@ PeerImp::stop()
|
|||||||
{
|
{
|
||||||
if (!strand_.running_in_this_thread())
|
if (!strand_.running_in_this_thread())
|
||||||
return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
|
return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
|
||||||
if (socket_.is_open())
|
if (socketOpen())
|
||||||
{
|
{
|
||||||
// The rationale for using different severity levels is that
|
// The rationale for using different severity levels is that
|
||||||
// outbound connections are under our control and may be logged
|
// outbound connections are under our control and may be logged
|
||||||
@@ -251,19 +286,28 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
|||||||
{
|
{
|
||||||
overlay_.reportOutboundTraffic(
|
overlay_.reportOutboundTraffic(
|
||||||
TrafficCount::category::squelch_suppressed,
|
TrafficCount::category::squelch_suppressed,
|
||||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
static_cast<int>(
|
||||||
|
m->getBuffer(
|
||||||
|
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||||
|
.size()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// report categorized outgoing traffic
|
// report categorized outgoing traffic
|
||||||
overlay_.reportOutboundTraffic(
|
overlay_.reportOutboundTraffic(
|
||||||
safe_cast<TrafficCount::category>(m->getCategory()),
|
safe_cast<TrafficCount::category>(m->getCategory()),
|
||||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
static_cast<int>(
|
||||||
|
m->getBuffer(
|
||||||
|
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||||
|
.size()));
|
||||||
|
|
||||||
// report total outgoing traffic
|
// report total outgoing traffic
|
||||||
overlay_.reportOutboundTraffic(
|
overlay_.reportOutboundTraffic(
|
||||||
TrafficCount::category::total,
|
TrafficCount::category::total,
|
||||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
static_cast<int>(
|
||||||
|
m->getBuffer(
|
||||||
|
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||||
|
.size()));
|
||||||
|
|
||||||
auto sendq_size = send_queue_.size();
|
auto sendq_size = send_queue_.size();
|
||||||
|
|
||||||
@@ -287,17 +331,24 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
|||||||
if (sendq_size != 0)
|
if (sendq_size != 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
boost::asio::async_write(
|
// Capture shared_ptr to ensure object lifetime
|
||||||
stream_,
|
auto self = shared_from_this();
|
||||||
boost::asio::buffer(
|
|
||||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
stream_ptr_->async_write(
|
||||||
bind_executor(
|
boost::asio::buffer(send_queue_.front()->getBuffer(
|
||||||
strand_,
|
compressionEnabled() ? Compressed::On : Compressed::Off)),
|
||||||
std::bind(
|
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||||
&PeerImp::onWriteMessage,
|
// Post completion to the strand to ensure thread safety
|
||||||
shared_from_this(),
|
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||||
std::placeholders::_1,
|
self->onWriteMessage(ec, bytes);
|
||||||
std::placeholders::_2)));
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
PeerImp::socketOpen() const
|
||||||
|
{
|
||||||
|
return stream_ptr_->is_open();
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -365,24 +416,21 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
|
|||||||
bool
|
bool
|
||||||
PeerImp::crawl() const
|
PeerImp::crawl() const
|
||||||
{
|
{
|
||||||
auto const iter = headers_.find("Crawl");
|
return attributes_.crawlEnabled;
|
||||||
if (iter == headers_.end())
|
|
||||||
return false;
|
|
||||||
return boost::iequals(iter->value(), "public");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
PeerImp::cluster() const
|
PeerImp::cluster() const
|
||||||
{
|
{
|
||||||
return static_cast<bool>(app_.cluster().member(publicKey_));
|
return app_.cluster().member(publicKey_).has_value();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string
|
std::string
|
||||||
PeerImp::getVersion() const
|
PeerImp::getVersion() const
|
||||||
{
|
{
|
||||||
if (inbound_)
|
if (inbound_)
|
||||||
return headers_["User-Agent"];
|
return attributes_.userAgent.value_or("");
|
||||||
return headers_["Server"];
|
return attributes_.serverInfo.value_or("");
|
||||||
}
|
}
|
||||||
|
|
||||||
Json::Value
|
Json::Value
|
||||||
@@ -408,8 +456,8 @@ PeerImp::json()
|
|||||||
if (auto const d = domain(); !d.empty())
|
if (auto const d = domain(); !d.empty())
|
||||||
ret[jss::server_domain] = std::string{d};
|
ret[jss::server_domain] = std::string{d};
|
||||||
|
|
||||||
if (auto const nid = headers_["Network-ID"]; !nid.empty())
|
if (attributes_.networkId.has_value())
|
||||||
ret[jss::network_id] = std::string{nid};
|
ret[jss::network_id] = *attributes_.networkId;
|
||||||
|
|
||||||
ret[jss::load] = usage_.balance();
|
ret[jss::load] = usage_.balance();
|
||||||
|
|
||||||
@@ -513,7 +561,7 @@ PeerImp::supportsFeature(ProtocolFeature f) const
|
|||||||
case ProtocolFeature::ValidatorList2Propagation:
|
case ProtocolFeature::ValidatorList2Propagation:
|
||||||
return protocol_ >= make_protocol(2, 2);
|
return protocol_ >= make_protocol(2, 2);
|
||||||
case ProtocolFeature::LedgerReplay:
|
case ProtocolFeature::LedgerReplay:
|
||||||
return ledgerReplayEnabled_;
|
return attributes_.ledgerReplayEnabled;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -578,13 +626,13 @@ PeerImp::close()
|
|||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
strand_.running_in_this_thread(),
|
strand_.running_in_this_thread(),
|
||||||
"ripple::PeerImp::close : strand in this thread");
|
"ripple::PeerImp::close : strand in this thread");
|
||||||
if (socket_.is_open())
|
if (socketOpen())
|
||||||
{
|
{
|
||||||
detaching_ = true; // DEPRECATED
|
detaching_ = true; // DEPRECATED
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
timer_.cancel();
|
timer_.cancel();
|
||||||
socket_.close();
|
stream_ptr_->close();
|
||||||
}
|
}
|
||||||
catch (boost::system::system_error const&)
|
catch (boost::system::system_error const&)
|
||||||
{
|
{
|
||||||
@@ -613,7 +661,7 @@ PeerImp::fail(std::string const& reason)
|
|||||||
(void(Peer::*)(std::string const&)) & PeerImp::fail,
|
(void(Peer::*)(std::string const&)) & PeerImp::fail,
|
||||||
shared_from_this(),
|
shared_from_this(),
|
||||||
reason));
|
reason));
|
||||||
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
|
if (journal_.active(beast::severities::kWarning) && socketOpen())
|
||||||
{
|
{
|
||||||
std::string const n = name();
|
std::string const n = name();
|
||||||
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
|
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
|
||||||
@@ -628,7 +676,7 @@ PeerImp::fail(std::string const& name, error_code ec)
|
|||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
strand_.running_in_this_thread(),
|
strand_.running_in_this_thread(),
|
||||||
"ripple::PeerImp::fail : strand in this thread");
|
"ripple::PeerImp::fail : strand in this thread");
|
||||||
if (socket_.is_open())
|
if (socketOpen())
|
||||||
{
|
{
|
||||||
JLOG(journal_.warn())
|
JLOG(journal_.warn())
|
||||||
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
|
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
|
||||||
@@ -644,7 +692,7 @@ PeerImp::gracefulClose()
|
|||||||
strand_.running_in_this_thread(),
|
strand_.running_in_this_thread(),
|
||||||
"ripple::PeerImp::gracefulClose : strand in this thread");
|
"ripple::PeerImp::gracefulClose : strand in this thread");
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
|
socketOpen(), "ripple::PeerImp::gracefulClose : socket is open");
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
!gracefulClose_,
|
!gracefulClose_,
|
||||||
"ripple::PeerImp::gracefulClose : socket is not closing");
|
"ripple::PeerImp::gracefulClose : socket is not closing");
|
||||||
@@ -652,7 +700,7 @@ PeerImp::gracefulClose()
|
|||||||
if (send_queue_.size() > 0)
|
if (send_queue_.size() > 0)
|
||||||
return;
|
return;
|
||||||
setTimer();
|
setTimer();
|
||||||
stream_.async_shutdown(bind_executor(
|
stream_ptr_->async_shutdown(bind_executor(
|
||||||
strand_,
|
strand_,
|
||||||
std::bind(
|
std::bind(
|
||||||
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
|
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
|
||||||
@@ -703,7 +751,7 @@ PeerImp::makePrefix(id_t id)
|
|||||||
void
|
void
|
||||||
PeerImp::onTimer(error_code const& ec)
|
PeerImp::onTimer(error_code const& ec)
|
||||||
{
|
{
|
||||||
if (!socket_.is_open())
|
if (!socketOpen())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (ec == boost::asio::error::operation_aborted)
|
if (ec == boost::asio::error::operation_aborted)
|
||||||
@@ -784,78 +832,28 @@ PeerImp::doAccept()
|
|||||||
read_buffer_.size() == 0,
|
read_buffer_.size() == 0,
|
||||||
"ripple::PeerImp::doAccept : empty read buffer");
|
"ripple::PeerImp::doAccept : empty read buffer");
|
||||||
|
|
||||||
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
|
|
||||||
|
|
||||||
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
|
|
||||||
|
|
||||||
// This shouldn't fail since we already computed
|
|
||||||
// the shared value successfully in OverlayImpl
|
|
||||||
if (!sharedValue)
|
|
||||||
return fail("makeSharedValue: Unexpected failure");
|
|
||||||
|
|
||||||
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
||||||
JLOG(journal_.info()) << "Public Key: "
|
JLOG(journal_.info()) << "Public Key: "
|
||||||
<< toBase58(TokenType::NodePublic, publicKey_);
|
<< toBase58(TokenType::NodePublic, publicKey_);
|
||||||
|
|
||||||
if (auto member = app_.cluster().member(publicKey_))
|
if (auto member = app_.cluster().member(publicKey_))
|
||||||
{
|
{
|
||||||
{
|
JLOG(journal_.info()) << "Cluster name: " << name_;
|
||||||
std::unique_lock lock{nameMutex_};
|
|
||||||
name_ = *member;
|
|
||||||
}
|
|
||||||
JLOG(journal_.info()) << "Cluster name: " << *member;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
overlay_.activate(shared_from_this());
|
doProtocolStart();
|
||||||
|
|
||||||
// XXX Set timer: connection is in grace period to be useful.
|
|
||||||
// XXX Set timer: connection idle (idle may vary depending on connection
|
|
||||||
// type.)
|
|
||||||
|
|
||||||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
|
||||||
|
|
||||||
boost::beast::ostream(*write_buffer) << makeResponse(
|
|
||||||
!overlay_.peerFinder().config().peerPrivate,
|
|
||||||
request_,
|
|
||||||
overlay_.setup().public_ip,
|
|
||||||
remote_address_.address(),
|
|
||||||
*sharedValue,
|
|
||||||
overlay_.setup().networkID,
|
|
||||||
protocol_,
|
|
||||||
app_);
|
|
||||||
|
|
||||||
// Write the whole buffer and only start protocol when that's done.
|
|
||||||
boost::asio::async_write(
|
|
||||||
stream_,
|
|
||||||
write_buffer->data(),
|
|
||||||
boost::asio::transfer_all(),
|
|
||||||
bind_executor(
|
|
||||||
strand_,
|
|
||||||
[this, write_buffer, self = shared_from_this()](
|
|
||||||
error_code ec, std::size_t bytes_transferred) {
|
|
||||||
if (!socket_.is_open())
|
|
||||||
return;
|
|
||||||
if (ec == boost::asio::error::operation_aborted)
|
|
||||||
return;
|
|
||||||
if (ec)
|
|
||||||
return fail("onWriteResponse", ec);
|
|
||||||
if (write_buffer->size() == bytes_transferred)
|
|
||||||
return doProtocolStart();
|
|
||||||
return fail("Failed to write header");
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string
|
std::string
|
||||||
PeerImp::name() const
|
PeerImp::name() const
|
||||||
{
|
{
|
||||||
std::shared_lock read_lock{nameMutex_};
|
|
||||||
return name_;
|
return name_;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string
|
std::string
|
||||||
PeerImp::domain() const
|
PeerImp::domain() const
|
||||||
{
|
{
|
||||||
return headers_["Server-Domain"];
|
return attributes_.serverDomain.value_or("");
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
@@ -903,7 +901,11 @@ PeerImp::doProtocolStart()
|
|||||||
void
|
void
|
||||||
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||||
{
|
{
|
||||||
if (!socket_.is_open())
|
XRPL_ASSERT(
|
||||||
|
strand_.running_in_this_thread(),
|
||||||
|
"ripple::PeerImp::onReadMessage : strand in this thread");
|
||||||
|
|
||||||
|
if (!socketOpen())
|
||||||
return;
|
return;
|
||||||
if (ec == boost::asio::error::operation_aborted)
|
if (ec == boost::asio::error::operation_aborted)
|
||||||
return;
|
return;
|
||||||
@@ -943,7 +945,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
|
|
||||||
if (ec)
|
if (ec)
|
||||||
return fail("onReadMessage", ec);
|
return fail("onReadMessage", ec);
|
||||||
if (!socket_.is_open())
|
if (!socketOpen())
|
||||||
return;
|
return;
|
||||||
if (gracefulClose_)
|
if (gracefulClose_)
|
||||||
return;
|
return;
|
||||||
@@ -952,22 +954,27 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
read_buffer_.consume(bytes_consumed);
|
read_buffer_.consume(bytes_consumed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto self = shared_from_this();
|
||||||
|
|
||||||
// Timeout on writes only
|
// Timeout on writes only
|
||||||
stream_.async_read_some(
|
stream_ptr_->async_read_some(
|
||||||
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
|
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
|
||||||
bind_executor(
|
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||||
strand_,
|
// Post completion to the strand to ensure thread safety
|
||||||
std::bind(
|
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||||
&PeerImp::onReadMessage,
|
self->onReadMessage(ec, bytes);
|
||||||
shared_from_this(),
|
});
|
||||||
std::placeholders::_1,
|
});
|
||||||
std::placeholders::_2)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||||
{
|
{
|
||||||
if (!socket_.is_open())
|
XRPL_ASSERT(
|
||||||
|
strand_.running_in_this_thread(),
|
||||||
|
"ripple::PeerImp::onWriteMessage : strand in this thread");
|
||||||
|
|
||||||
|
if (!socketOpen())
|
||||||
return;
|
return;
|
||||||
if (ec == boost::asio::error::operation_aborted)
|
if (ec == boost::asio::error::operation_aborted)
|
||||||
return;
|
return;
|
||||||
@@ -980,7 +987,6 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
else
|
else
|
||||||
stream << "onWriteMessage";
|
stream << "onWriteMessage";
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics_.sent.add_message(bytes_transferred);
|
metrics_.sent.add_message(bytes_transferred);
|
||||||
|
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
@@ -990,27 +996,31 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
|||||||
if (!send_queue_.empty())
|
if (!send_queue_.empty())
|
||||||
{
|
{
|
||||||
// Timeout on writes only
|
// Timeout on writes only
|
||||||
return boost::asio::async_write(
|
|
||||||
stream_,
|
// Capture shared_ptr to ensure object lifetime
|
||||||
boost::asio::buffer(
|
auto self = shared_from_this();
|
||||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
|
||||||
bind_executor(
|
return stream_ptr_->async_write(
|
||||||
strand_,
|
boost::asio::buffer(send_queue_.front()->getBuffer(
|
||||||
std::bind(
|
compressionEnabled() ? Compressed::On : Compressed::Off)),
|
||||||
&PeerImp::onWriteMessage,
|
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||||
shared_from_this(),
|
// Post completion to the strand to ensure thread safety
|
||||||
std::placeholders::_1,
|
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||||
std::placeholders::_2)));
|
self->onWriteMessage(ec, bytes);
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (gracefulClose_)
|
if (gracefulClose_)
|
||||||
{
|
{
|
||||||
return stream_.async_shutdown(bind_executor(
|
// Capture shared_ptr to ensure object lifetime
|
||||||
strand_,
|
auto self = shared_from_this();
|
||||||
std::bind(
|
|
||||||
&PeerImp::onShutdown,
|
return stream_ptr_->async_shutdown([self](boost::beast::error_code ec) {
|
||||||
shared_from_this(),
|
// Post completion to the strand to ensure thread safety
|
||||||
std::placeholders::_1)));
|
boost::asio::post(
|
||||||
|
self->strand_, [self, ec]() { self->onShutdown(ec); });
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1298,8 +1308,8 @@ PeerImp::handleTransaction(
|
|||||||
auto stx = std::make_shared<STTx const>(sit);
|
auto stx = std::make_shared<STTx const>(sit);
|
||||||
uint256 txID = stx->getTransactionID();
|
uint256 txID = stx->getTransactionID();
|
||||||
|
|
||||||
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
|
// Charge strongly for attempting to relay a txn with
|
||||||
// LCOV_EXCL_START
|
// tfInnerBatchTxn LCOV_EXCL_START
|
||||||
if (stx->isFlag(tfInnerBatchTxn) &&
|
if (stx->isFlag(tfInnerBatchTxn) &&
|
||||||
getCurrentTransactionRules()->enabled(featureBatch))
|
getCurrentTransactionRules()->enabled(featureBatch))
|
||||||
{
|
{
|
||||||
@@ -1322,8 +1332,9 @@ PeerImp::handleTransaction(
|
|||||||
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
|
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Erase only if the server has seen this tx. If the server has not
|
// Erase only if the server has seen this tx. If the server has
|
||||||
// seen this tx then the tx could not has been queued for this peer.
|
// not seen this tx then the tx could not has been queued for
|
||||||
|
// this peer.
|
||||||
else if (eraseTxQueue && txReduceRelayEnabled())
|
else if (eraseTxQueue && txReduceRelayEnabled())
|
||||||
removeTxQueue(txID);
|
removeTxQueue(txID);
|
||||||
|
|
||||||
@@ -1485,7 +1496,7 @@ void
|
|||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
|
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
|
||||||
if (!ledgerReplayEnabled_)
|
if (!attributes_.ledgerReplayEnabled)
|
||||||
{
|
{
|
||||||
fee_.update(
|
fee_.update(
|
||||||
Resource::feeMalformedRequest, "proof_path_request disabled");
|
Resource::feeMalformedRequest, "proof_path_request disabled");
|
||||||
@@ -1523,7 +1534,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
||||||
{
|
{
|
||||||
if (!ledgerReplayEnabled_)
|
if (!attributes_.ledgerReplayEnabled)
|
||||||
{
|
{
|
||||||
fee_.update(
|
fee_.update(
|
||||||
Resource::feeMalformedRequest, "proof_path_response disabled");
|
Resource::feeMalformedRequest, "proof_path_response disabled");
|
||||||
@@ -1540,7 +1551,7 @@ void
|
|||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
|
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
|
||||||
if (!ledgerReplayEnabled_)
|
if (!attributes_.ledgerReplayEnabled)
|
||||||
{
|
{
|
||||||
fee_.update(
|
fee_.update(
|
||||||
Resource::feeMalformedRequest, "replay_delta_request disabled");
|
Resource::feeMalformedRequest, "replay_delta_request disabled");
|
||||||
@@ -1578,7 +1589,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
||||||
{
|
{
|
||||||
if (!ledgerReplayEnabled_)
|
if (!attributes_.ledgerReplayEnabled)
|
||||||
{
|
{
|
||||||
fee_.update(
|
fee_.update(
|
||||||
Resource::feeMalformedRequest, "replay_delta_response disabled");
|
Resource::feeMalformedRequest, "replay_delta_response disabled");
|
||||||
@@ -1710,14 +1721,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||||
// suppression for 30 seconds to avoid doing a relatively expensive lookup
|
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||||
// every time a spam packet is received
|
// lookup every time a spam packet is received
|
||||||
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||||
|
|
||||||
// If the operator has specified that untrusted proposals be dropped then
|
// If the operator has specified that untrusted proposals be dropped
|
||||||
// this happens here I.e. before further wasting CPU verifying the signature
|
// then this happens here I.e. before further wasting CPU verifying the
|
||||||
// of an untrusted key
|
// signature of an untrusted key
|
||||||
if (!isTrusted)
|
if (!isTrusted)
|
||||||
{
|
{
|
||||||
// report untrusted proposal messages
|
// report untrusted proposal messages
|
||||||
@@ -1746,8 +1757,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
|||||||
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||||
!added)
|
!added)
|
||||||
{
|
{
|
||||||
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
|
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||||
// receives within IDLED seconds since the message has been relayed.
|
// peer receives within IDLED seconds since the message has been
|
||||||
|
// relayed.
|
||||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||||
overlay_.updateSlotAndSquelch(
|
overlay_.updateSlotAndSquelch(
|
||||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||||
@@ -1828,8 +1840,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
|||||||
{
|
{
|
||||||
bool outOfSync{false};
|
bool outOfSync{false};
|
||||||
{
|
{
|
||||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||||
// guarded by recentLock_.
|
// be guarded by recentLock_.
|
||||||
std::lock_guard sl(recentLock_);
|
std::lock_guard sl(recentLock_);
|
||||||
if (!closedLedgerHash_.isZero())
|
if (!closedLedgerHash_.isZero())
|
||||||
{
|
{
|
||||||
@@ -1851,8 +1863,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
|||||||
m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
|
m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
|
||||||
|
|
||||||
{
|
{
|
||||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||||
// guarded by recentLock_.
|
// be guarded by recentLock_.
|
||||||
std::lock_guard sl(recentLock_);
|
std::lock_guard sl(recentLock_);
|
||||||
if (peerChangedLedgers)
|
if (peerChangedLedgers)
|
||||||
{
|
{
|
||||||
@@ -2053,7 +2065,8 @@ PeerImp::onValidatorListMessage(
|
|||||||
std::vector<ValidatorBlobInfo> const& blobs)
|
std::vector<ValidatorBlobInfo> const& blobs)
|
||||||
{
|
{
|
||||||
// If there are no blobs, the message is malformed (possibly because of
|
// If there are no blobs, the message is malformed (possibly because of
|
||||||
// ValidatorList class rules), so charge accordingly and skip processing.
|
// ValidatorList class rules), so charge accordingly and skip
|
||||||
|
// processing.
|
||||||
if (blobs.empty())
|
if (blobs.empty())
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
|
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
|
||||||
@@ -2110,7 +2123,8 @@ PeerImp::onValidatorListMessage(
|
|||||||
|
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
applyResult.publisherKey,
|
applyResult.publisherKey,
|
||||||
"ripple::PeerImp::onValidatorListMessage : publisher key is "
|
"ripple::PeerImp::onValidatorListMessage : publisher key "
|
||||||
|
"is "
|
||||||
"set");
|
"set");
|
||||||
auto const& pubKey = *applyResult.publisherKey;
|
auto const& pubKey = *applyResult.publisherKey;
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
@@ -2119,7 +2133,8 @@ PeerImp::onValidatorListMessage(
|
|||||||
{
|
{
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
iter->second < applyResult.sequence,
|
iter->second < applyResult.sequence,
|
||||||
"ripple::PeerImp::onValidatorListMessage : lower sequence");
|
"ripple::PeerImp::onValidatorListMessage : lower "
|
||||||
|
"sequence");
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
publisherListSequences_[pubKey] = applyResult.sequence;
|
publisherListSequences_[pubKey] = applyResult.sequence;
|
||||||
@@ -2132,12 +2147,14 @@ PeerImp::onValidatorListMessage(
|
|||||||
std::lock_guard<std::mutex> sl(recentLock_);
|
std::lock_guard<std::mutex> sl(recentLock_);
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
applyResult.sequence && applyResult.publisherKey,
|
applyResult.sequence && applyResult.publisherKey,
|
||||||
"ripple::PeerImp::onValidatorListMessage : nonzero sequence "
|
"ripple::PeerImp::onValidatorListMessage : nonzero "
|
||||||
|
"sequence "
|
||||||
"and set publisher key");
|
"and set publisher key");
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
publisherListSequences_[*applyResult.publisherKey] <=
|
publisherListSequences_[*applyResult.publisherKey] <=
|
||||||
applyResult.sequence,
|
applyResult.sequence,
|
||||||
"ripple::PeerImp::onValidatorListMessage : maximum sequence");
|
"ripple::PeerImp::onValidatorListMessage : maximum "
|
||||||
|
"sequence");
|
||||||
}
|
}
|
||||||
#endif // !NDEBUG
|
#endif // !NDEBUG
|
||||||
|
|
||||||
@@ -2149,7 +2166,8 @@ PeerImp::onValidatorListMessage(
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
UNREACHABLE(
|
UNREACHABLE(
|
||||||
"ripple::PeerImp::onValidatorListMessage : invalid best list "
|
"ripple::PeerImp::onValidatorListMessage : invalid best "
|
||||||
|
"list "
|
||||||
"disposition");
|
"disposition");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2193,7 +2211,8 @@ PeerImp::onValidatorListMessage(
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
UNREACHABLE(
|
UNREACHABLE(
|
||||||
"ripple::PeerImp::onValidatorListMessage : invalid worst list "
|
"ripple::PeerImp::onValidatorListMessage : invalid worst "
|
||||||
|
"list "
|
||||||
"disposition");
|
"disposition");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2253,7 +2272,8 @@ PeerImp::onValidatorListMessage(
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
UNREACHABLE(
|
UNREACHABLE(
|
||||||
"ripple::PeerImp::onValidatorListMessage : invalid list "
|
"ripple::PeerImp::onValidatorListMessage : invalid "
|
||||||
|
"list "
|
||||||
"disposition");
|
"disposition");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2297,7 +2317,8 @@ PeerImp::onMessage(
|
|||||||
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.debug())
|
JLOG(p_journal_.debug())
|
||||||
<< "ValidatorListCollection: received validator list from peer "
|
<< "ValidatorListCollection: received validator list from "
|
||||||
|
"peer "
|
||||||
<< "using protocol version " << to_string(protocol_)
|
<< "using protocol version " << to_string(protocol_)
|
||||||
<< " which shouldn't support this feature.";
|
<< " which shouldn't support this feature.";
|
||||||
fee_.update(Resource::feeUselessData, "unsupported peer");
|
fee_.update(Resource::feeUselessData, "unsupported peer");
|
||||||
@@ -2306,7 +2327,8 @@ PeerImp::onMessage(
|
|||||||
else if (m->version() < 2)
|
else if (m->version() < 2)
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.debug())
|
JLOG(p_journal_.debug())
|
||||||
<< "ValidatorListCollection: received invalid validator list "
|
<< "ValidatorListCollection: received invalid validator "
|
||||||
|
"list "
|
||||||
"version "
|
"version "
|
||||||
<< m->version() << " from peer using protocol version "
|
<< m->version() << " from peer using protocol version "
|
||||||
<< to_string(protocol_);
|
<< to_string(protocol_);
|
||||||
@@ -2366,9 +2388,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
// RH TODO: when isTrusted = false we should probably also cache a
|
||||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
// key suppression for 30 seconds to avoid doing a relatively
|
||||||
// lookup every time a spam packet is received
|
// expensive lookup every time a spam packet is received
|
||||||
auto const isTrusted =
|
auto const isTrusted =
|
||||||
app_.validators().trusted(val->getSignerPublic());
|
app_.validators().trusted(val->getSignerPublic());
|
||||||
|
|
||||||
@@ -2393,9 +2415,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
|||||||
|
|
||||||
if (!added)
|
if (!added)
|
||||||
{
|
{
|
||||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
// Count unique messages (Slots has it's own 'HashRouter'),
|
||||||
// peer receives within IDLED seconds since the message has been
|
// which a peer receives within IDLED seconds since the message
|
||||||
// relayed.
|
// has been relayed.
|
||||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||||
overlay_.updateSlotAndSquelch(
|
overlay_.updateSlotAndSquelch(
|
||||||
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
|
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
|
||||||
@@ -2888,8 +2910,8 @@ PeerImp::checkTransaction(
|
|||||||
|
|
||||||
if (isPseudoTx(*stx))
|
if (isPseudoTx(*stx))
|
||||||
{
|
{
|
||||||
// Don't do anything with pseudo transactions except put them in the
|
// Don't do anything with pseudo transactions except put them in
|
||||||
// TransactionMaster cache
|
// the TransactionMaster cache
|
||||||
std::string reason;
|
std::string reason;
|
||||||
auto tx = std::make_shared<Transaction>(stx, reason, app_);
|
auto tx = std::make_shared<Transaction>(stx, reason, app_);
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
@@ -3045,16 +3067,17 @@ PeerImp::checkValidation(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME it should be safe to remove this try/catch. Investigate codepaths.
|
// FIXME it should be safe to remove this try/catch. Investigate
|
||||||
|
// codepaths.
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
|
if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
|
||||||
cluster())
|
cluster())
|
||||||
{
|
{
|
||||||
// haveMessage contains peers, which are suppressed; i.e. the peers
|
// haveMessage contains peers, which are suppressed; i.e. the
|
||||||
// are the source of the message, consequently the message should
|
// peers are the source of the message, consequently the message
|
||||||
// not be relayed to these peers. But the message must be counted
|
// should not be relayed to these peers. But the message must be
|
||||||
// as part of the squelch logic.
|
// counted as part of the squelch logic.
|
||||||
auto haveMessage =
|
auto haveMessage =
|
||||||
overlay_.relay(*packet, key, val->getSignerPublic());
|
overlay_.relay(*packet, key, val->getSignerPublic());
|
||||||
if (!haveMessage.empty())
|
if (!haveMessage.empty())
|
||||||
|
|||||||
@@ -35,6 +35,7 @@
|
|||||||
#include <xrpl/protocol/STTx.h>
|
#include <xrpl/protocol/STTx.h>
|
||||||
#include <xrpl/protocol/STValidation.h>
|
#include <xrpl/protocol/STValidation.h>
|
||||||
#include <xrpl/resource/Fees.h>
|
#include <xrpl/resource/Fees.h>
|
||||||
|
#include <xrpl/server/detail/StreamInterface.h>
|
||||||
|
|
||||||
#include <boost/circular_buffer.hpp>
|
#include <boost/circular_buffer.hpp>
|
||||||
#include <boost/endian/conversion.hpp>
|
#include <boost/endian/conversion.hpp>
|
||||||
@@ -49,6 +50,37 @@ namespace ripple {
|
|||||||
struct ValidatorBlobInfo;
|
struct ValidatorBlobInfo;
|
||||||
class SHAMap;
|
class SHAMap;
|
||||||
|
|
||||||
|
/** Attributes extracted from peer HTTP headers */
|
||||||
|
struct PeerAttributes
|
||||||
|
{
|
||||||
|
// Feature flags
|
||||||
|
bool compressionEnabled = false;
|
||||||
|
bool txReduceRelayEnabled = false;
|
||||||
|
bool ledgerReplayEnabled = false;
|
||||||
|
bool vpReduceRelayEnabled = false;
|
||||||
|
|
||||||
|
// Connection information
|
||||||
|
bool crawlEnabled = false;
|
||||||
|
std::optional<std::string> userAgent;
|
||||||
|
std::optional<std::string> serverInfo;
|
||||||
|
std::optional<std::string> networkId;
|
||||||
|
std::optional<std::string> serverDomain;
|
||||||
|
|
||||||
|
// Ledger information
|
||||||
|
std::optional<uint256> closedLedgerHash;
|
||||||
|
std::optional<uint256> previousLedgerHash;
|
||||||
|
|
||||||
|
// Validation state
|
||||||
|
bool hasValidLedgerHashes = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
/** Extract peer attributes from HTTP headers and application configuration */
|
||||||
|
PeerAttributes
|
||||||
|
extractPeerAttributes(
|
||||||
|
boost::beast::http::fields const& headers,
|
||||||
|
Config const& config,
|
||||||
|
bool inbound);
|
||||||
|
|
||||||
class PeerImp : public Peer,
|
class PeerImp : public Peer,
|
||||||
public std::enable_shared_from_this<PeerImp>,
|
public std::enable_shared_from_this<PeerImp>,
|
||||||
public OverlayImpl::Child
|
public OverlayImpl::Child
|
||||||
@@ -60,7 +92,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
using clock_type = std::chrono::steady_clock;
|
using clock_type = std::chrono::steady_clock;
|
||||||
using error_code = boost::system::error_code;
|
using error_code = boost::system::error_code;
|
||||||
using socket_type = boost::asio::ip::tcp::socket;
|
|
||||||
using middle_type = boost::beast::tcp_stream;
|
using middle_type = boost::beast::tcp_stream;
|
||||||
using stream_type = boost::beast::ssl_stream<middle_type>;
|
using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||||
using address_type = boost::asio::ip::address;
|
using address_type = boost::asio::ip::address;
|
||||||
@@ -69,55 +100,6 @@ private:
|
|||||||
boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
|
boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
|
||||||
using Compressed = compression::Compressed;
|
using Compressed = compression::Compressed;
|
||||||
|
|
||||||
Application& app_;
|
|
||||||
id_t const id_;
|
|
||||||
beast::WrappedSink sink_;
|
|
||||||
beast::WrappedSink p_sink_;
|
|
||||||
beast::Journal const journal_;
|
|
||||||
beast::Journal const p_journal_;
|
|
||||||
std::unique_ptr<stream_type> stream_ptr_;
|
|
||||||
socket_type& socket_;
|
|
||||||
stream_type& stream_;
|
|
||||||
boost::asio::strand<boost::asio::executor> strand_;
|
|
||||||
waitable_timer timer_;
|
|
||||||
|
|
||||||
// Updated at each stage of the connection process to reflect
|
|
||||||
// the current conditions as closely as possible.
|
|
||||||
beast::IP::Endpoint const remote_address_;
|
|
||||||
|
|
||||||
// These are up here to prevent warnings about order of initializations
|
|
||||||
//
|
|
||||||
OverlayImpl& overlay_;
|
|
||||||
bool const inbound_;
|
|
||||||
|
|
||||||
// Protocol version to use for this link
|
|
||||||
ProtocolVersion protocol_;
|
|
||||||
|
|
||||||
std::atomic<Tracking> tracking_;
|
|
||||||
clock_type::time_point trackingTime_;
|
|
||||||
bool detaching_ = false;
|
|
||||||
// Node public key of peer.
|
|
||||||
PublicKey const publicKey_;
|
|
||||||
std::string name_;
|
|
||||||
std::shared_mutex mutable nameMutex_;
|
|
||||||
|
|
||||||
// The indices of the smallest and largest ledgers this peer has available
|
|
||||||
//
|
|
||||||
LedgerIndex minLedger_ = 0;
|
|
||||||
LedgerIndex maxLedger_ = 0;
|
|
||||||
uint256 closedLedgerHash_;
|
|
||||||
uint256 previousLedgerHash_;
|
|
||||||
|
|
||||||
boost::circular_buffer<uint256> recentLedgers_{128};
|
|
||||||
boost::circular_buffer<uint256> recentTxSets_{128};
|
|
||||||
|
|
||||||
std::optional<std::chrono::milliseconds> latency_;
|
|
||||||
std::optional<std::uint32_t> lastPingSeq_;
|
|
||||||
clock_type::time_point lastPingTime_;
|
|
||||||
clock_type::time_point const creationTime_;
|
|
||||||
|
|
||||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
|
||||||
|
|
||||||
// Notes on thread locking:
|
// Notes on thread locking:
|
||||||
//
|
//
|
||||||
// During an audit it was noted that some member variables that looked
|
// During an audit it was noted that some member variables that looked
|
||||||
@@ -165,37 +147,6 @@ private:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
std::mutex mutable recentLock_;
|
|
||||||
protocol::TMStatusChange last_status_;
|
|
||||||
Resource::Consumer usage_;
|
|
||||||
ChargeWithContext fee_;
|
|
||||||
std::shared_ptr<PeerFinder::Slot> const slot_;
|
|
||||||
boost::beast::multi_buffer read_buffer_;
|
|
||||||
http_request_type request_;
|
|
||||||
http_response_type response_;
|
|
||||||
boost::beast::http::fields const& headers_;
|
|
||||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
|
||||||
bool gracefulClose_ = false;
|
|
||||||
int large_sendq_ = 0;
|
|
||||||
std::unique_ptr<LoadEvent> load_event_;
|
|
||||||
// The highest sequence of each PublisherList that has
|
|
||||||
// been sent to or received from this peer.
|
|
||||||
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
|
||||||
|
|
||||||
Compressed compressionEnabled_ = Compressed::Off;
|
|
||||||
|
|
||||||
// Queue of transactions' hashes that have not been
|
|
||||||
// relayed. The hashes are sent once a second to a peer
|
|
||||||
// and the peer requests missing transactions from the node.
|
|
||||||
hash_set<uint256> txQueue_;
|
|
||||||
// true if tx reduce-relay feature is enabled on the peer.
|
|
||||||
bool txReduceRelayEnabled_ = false;
|
|
||||||
|
|
||||||
bool ledgerReplayEnabled_ = false;
|
|
||||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
|
||||||
|
|
||||||
friend class OverlayImpl;
|
|
||||||
|
|
||||||
class Metrics
|
class Metrics
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -229,6 +180,78 @@ private:
|
|||||||
Metrics recv;
|
Metrics recv;
|
||||||
} metrics_;
|
} metrics_;
|
||||||
|
|
||||||
|
Application& app_;
|
||||||
|
id_t const id_;
|
||||||
|
|
||||||
|
beast::WrappedSink sink_;
|
||||||
|
beast::WrappedSink p_sink_;
|
||||||
|
beast::Journal const journal_;
|
||||||
|
beast::Journal const p_journal_;
|
||||||
|
|
||||||
|
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||||
|
boost::asio::strand<boost::asio::executor> strand_;
|
||||||
|
waitable_timer timer_;
|
||||||
|
|
||||||
|
// Updated at each stage of the connection process to reflect
|
||||||
|
// the current conditions as closely as possible.
|
||||||
|
beast::IP::Endpoint const remote_address_;
|
||||||
|
|
||||||
|
// These are up here to prevent warnings about order of initializations
|
||||||
|
//
|
||||||
|
OverlayImpl& overlay_;
|
||||||
|
bool const inbound_;
|
||||||
|
|
||||||
|
// Protocol version to use for this link
|
||||||
|
ProtocolVersion protocol_;
|
||||||
|
|
||||||
|
std::atomic<Tracking> tracking_;
|
||||||
|
clock_type::time_point trackingTime_;
|
||||||
|
bool detaching_ = false;
|
||||||
|
// Node public key of peer.
|
||||||
|
PublicKey const publicKey_;
|
||||||
|
std::string name_;
|
||||||
|
|
||||||
|
// The indices of the smallest and largest ledgers this peer has available
|
||||||
|
//
|
||||||
|
LedgerIndex minLedger_ = 0;
|
||||||
|
LedgerIndex maxLedger_ = 0;
|
||||||
|
uint256 closedLedgerHash_;
|
||||||
|
uint256 previousLedgerHash_;
|
||||||
|
|
||||||
|
boost::circular_buffer<uint256> recentLedgers_{128};
|
||||||
|
boost::circular_buffer<uint256> recentTxSets_{128};
|
||||||
|
|
||||||
|
std::optional<std::chrono::milliseconds> latency_;
|
||||||
|
std::optional<std::uint32_t> lastPingSeq_;
|
||||||
|
clock_type::time_point lastPingTime_;
|
||||||
|
clock_type::time_point const creationTime_;
|
||||||
|
|
||||||
|
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||||
|
|
||||||
|
std::mutex mutable recentLock_;
|
||||||
|
protocol::TMStatusChange last_status_;
|
||||||
|
Resource::Consumer usage_;
|
||||||
|
ChargeWithContext fee_;
|
||||||
|
std::shared_ptr<PeerFinder::Slot> const slot_;
|
||||||
|
boost::beast::multi_buffer read_buffer_;
|
||||||
|
PeerAttributes const attributes_;
|
||||||
|
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||||
|
bool gracefulClose_ = false;
|
||||||
|
int large_sendq_ = 0;
|
||||||
|
std::unique_ptr<LoadEvent> load_event_;
|
||||||
|
// The highest sequence of each PublisherList that has
|
||||||
|
// been sent to or received from this peer.
|
||||||
|
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
||||||
|
|
||||||
|
// Queue of transactions' hashes that have not been
|
||||||
|
// relayed. The hashes are sent once a second to a peer
|
||||||
|
// and the peer requests missing transactions from the node.
|
||||||
|
hash_set<uint256> txQueue_;
|
||||||
|
|
||||||
|
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||||
|
|
||||||
|
friend class OverlayImpl;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
PeerImp(PeerImp const&) = delete;
|
PeerImp(PeerImp const&) = delete;
|
||||||
PeerImp&
|
PeerImp&
|
||||||
@@ -237,29 +260,30 @@ public:
|
|||||||
/** Create an active incoming peer from an established ssl connection. */
|
/** Create an active incoming peer from an established ssl connection. */
|
||||||
PeerImp(
|
PeerImp(
|
||||||
Application& app,
|
Application& app,
|
||||||
id_t id,
|
OverlayImpl& overlay,
|
||||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||||
http_request_type&& request,
|
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||||
PublicKey const& publicKey,
|
|
||||||
ProtocolVersion protocol,
|
|
||||||
Resource::Consumer consumer,
|
Resource::Consumer consumer,
|
||||||
std::unique_ptr<stream_type>&& stream_ptr,
|
ProtocolVersion protocol,
|
||||||
OverlayImpl& overlay);
|
PeerAttributes const& attributes,
|
||||||
|
PublicKey const& publicKey,
|
||||||
|
id_t id,
|
||||||
|
std::string const& name);
|
||||||
|
|
||||||
/** Create outgoing, handshaked peer. */
|
/** Create outgoing, handshaked peer. */
|
||||||
// VFALCO legacyPublicKey should be implied by the Slot
|
|
||||||
template <class Buffers>
|
template <class Buffers>
|
||||||
PeerImp(
|
PeerImp(
|
||||||
Application& app,
|
Application& app,
|
||||||
std::unique_ptr<stream_type>&& stream_ptr,
|
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||||
Buffers const& buffers,
|
Buffers const& buffers,
|
||||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||||
http_response_type&& response,
|
Resource::Consumer consumer,
|
||||||
Resource::Consumer usage,
|
|
||||||
PublicKey const& publicKey,
|
PublicKey const& publicKey,
|
||||||
ProtocolVersion protocol,
|
ProtocolVersion protocol,
|
||||||
id_t id,
|
id_t id,
|
||||||
OverlayImpl& overlay);
|
PeerAttributes const& attributes,
|
||||||
|
OverlayImpl& overlay,
|
||||||
|
std::string const& name);
|
||||||
|
|
||||||
virtual ~PeerImp();
|
virtual ~PeerImp();
|
||||||
|
|
||||||
@@ -431,13 +455,13 @@ public:
|
|||||||
bool
|
bool
|
||||||
compressionEnabled() const override
|
compressionEnabled() const override
|
||||||
{
|
{
|
||||||
return compressionEnabled_ == Compressed::On;
|
return attributes_.compressionEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
txReduceRelayEnabled() const override
|
txReduceRelayEnabled() const override
|
||||||
{
|
{
|
||||||
return txReduceRelayEnabled_;
|
return attributes_.txReduceRelayEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@@ -519,6 +543,9 @@ private:
|
|||||||
handleHaveTransactions(
|
handleHaveTransactions(
|
||||||
std::shared_ptr<protocol::TMHaveTransactions> const& m);
|
std::shared_ptr<protocol::TMHaveTransactions> const& m);
|
||||||
|
|
||||||
|
bool
|
||||||
|
socketOpen() const;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
//
|
//
|
||||||
@@ -650,15 +677,16 @@ private:
|
|||||||
template <class Buffers>
|
template <class Buffers>
|
||||||
PeerImp::PeerImp(
|
PeerImp::PeerImp(
|
||||||
Application& app,
|
Application& app,
|
||||||
std::unique_ptr<stream_type>&& stream_ptr,
|
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||||
Buffers const& buffers,
|
Buffers const& buffers,
|
||||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||||
http_response_type&& response,
|
|
||||||
Resource::Consumer usage,
|
Resource::Consumer usage,
|
||||||
PublicKey const& publicKey,
|
PublicKey const& publicKey,
|
||||||
ProtocolVersion protocol,
|
ProtocolVersion protocol,
|
||||||
id_t id,
|
id_t id,
|
||||||
OverlayImpl& overlay)
|
PeerAttributes const& attributes,
|
||||||
|
OverlayImpl& overlay,
|
||||||
|
std::string const& name)
|
||||||
: Child(overlay)
|
: Child(overlay)
|
||||||
, app_(app)
|
, app_(app)
|
||||||
, id_(id)
|
, id_(id)
|
||||||
@@ -667,10 +695,8 @@ PeerImp::PeerImp(
|
|||||||
, journal_(sink_)
|
, journal_(sink_)
|
||||||
, p_journal_(p_sink_)
|
, p_journal_(p_sink_)
|
||||||
, stream_ptr_(std::move(stream_ptr))
|
, stream_ptr_(std::move(stream_ptr))
|
||||||
, socket_(stream_ptr_->next_layer().socket())
|
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||||
, stream_(*stream_ptr_)
|
, timer_(waitable_timer{stream_ptr_->get_executor()})
|
||||||
, strand_(boost::asio::make_strand(socket_.get_executor()))
|
|
||||||
, timer_(waitable_timer{socket_.get_executor()})
|
|
||||||
, remote_address_(slot->remote_endpoint())
|
, remote_address_(slot->remote_endpoint())
|
||||||
, overlay_(overlay)
|
, overlay_(overlay)
|
||||||
, inbound_(false)
|
, inbound_(false)
|
||||||
@@ -678,43 +704,25 @@ PeerImp::PeerImp(
|
|||||||
, tracking_(Tracking::unknown)
|
, tracking_(Tracking::unknown)
|
||||||
, trackingTime_(clock_type::now())
|
, trackingTime_(clock_type::now())
|
||||||
, publicKey_(publicKey)
|
, publicKey_(publicKey)
|
||||||
|
, name_(name)
|
||||||
, lastPingTime_(clock_type::now())
|
, lastPingTime_(clock_type::now())
|
||||||
, creationTime_(clock_type::now())
|
, creationTime_(clock_type::now())
|
||||||
, squelch_(app_.journal("Squelch"))
|
, squelch_(app_.journal("Squelch"))
|
||||||
, usage_(usage)
|
, usage_(usage)
|
||||||
, fee_{Resource::feeTrivialPeer}
|
, fee_{Resource::feeTrivialPeer}
|
||||||
, slot_(std::move(slot))
|
, slot_(std::move(slot))
|
||||||
, response_(std::move(response))
|
, attributes_(attributes)
|
||||||
, headers_(response_)
|
|
||||||
, compressionEnabled_(
|
|
||||||
peerFeatureEnabled(
|
|
||||||
headers_,
|
|
||||||
FEATURE_COMPR,
|
|
||||||
"lz4",
|
|
||||||
app_.config().COMPRESSION)
|
|
||||||
? Compressed::On
|
|
||||||
: Compressed::Off)
|
|
||||||
, txReduceRelayEnabled_(peerFeatureEnabled(
|
|
||||||
headers_,
|
|
||||||
FEATURE_TXRR,
|
|
||||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
|
||||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
|
||||||
headers_,
|
|
||||||
FEATURE_LEDGER_REPLAY,
|
|
||||||
app_.config().LEDGER_REPLAY))
|
|
||||||
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
||||||
{
|
{
|
||||||
read_buffer_.commit(boost::asio::buffer_copy(
|
read_buffer_.commit(boost::asio::buffer_copy(
|
||||||
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
|
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
|
||||||
JLOG(journal_.info())
|
JLOG(journal_.info()) << "compression enabled "
|
||||||
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
|
<< attributes_.compressionEnabled
|
||||||
<< " vp reduce-relay base squelch enabled "
|
<< " vp reduce-relay base squelch enabled "
|
||||||
<< peerFeatureEnabled(
|
<< attributes_.vpReduceRelayEnabled
|
||||||
headers_,
|
<< " tx reduce-relay enabled "
|
||||||
FEATURE_VPRR,
|
<< attributes_.txReduceRelayEnabled << " on "
|
||||||
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
|
<< remote_address_ << " " << id_;
|
||||||
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
|
|
||||||
<< remote_address_ << " " << id_;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class FwdIt, class>
|
template <class FwdIt, class>
|
||||||
|
|||||||
144
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp
Normal file
144
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.cpp
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
|
||||||
|
#include <xrpld/app/consensus/RCLCxPeerPos.h>
|
||||||
|
#include <xrpld/app/ledger/Ledger.h>
|
||||||
|
#include <xrpld/app/misc/HashRouter.h>
|
||||||
|
#include <xrpld/overlay/detail/handlers/ProtocolMessageHandler.h>
|
||||||
|
#include <xrpld/shamap/SHAMap.h>
|
||||||
|
|
||||||
|
#include <xrpl/protocol/STTx.h>
|
||||||
|
#include <xrpl/protocol/STValidation.h>
|
||||||
|
#include <xrpl/protocol/messages.h>
|
||||||
|
|
||||||
|
namespace ripple {
|
||||||
|
|
||||||
|
// Helper function to check for valid uint256 values in protobuf buffers
|
||||||
|
static bool
|
||||||
|
stringIsUint256Sized(std::string const& pBuffStr)
|
||||||
|
{
|
||||||
|
return pBuffStr.size() == uint256::size();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ProtocolMessageHandler::onMessage(
|
||||||
|
std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||||
|
{
|
||||||
|
protocol::TMProposeSet& set = *m;
|
||||||
|
|
||||||
|
auto const sig = makeSlice(set.signature());
|
||||||
|
|
||||||
|
// Preliminary check for the validity of the signature: A DER encoded
|
||||||
|
// signature can't be longer than 72 bytes.
|
||||||
|
if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
|
||||||
|
(publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
|
||||||
|
{
|
||||||
|
JLOG(p_journal_.warn()) << "Proposal: malformed";
|
||||||
|
fee_.update(
|
||||||
|
Resource::feeInvalidSignature,
|
||||||
|
" signature can't be longer than 72 bytes");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!stringIsUint256Sized(set.currenttxhash()) ||
|
||||||
|
!stringIsUint256Sized(set.previousledger()))
|
||||||
|
{
|
||||||
|
JLOG(p_journal_.warn()) << "Proposal: malformed";
|
||||||
|
fee_.update(Resource::feeMalformedRequest, "bad hashes");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||||
|
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||||
|
// lookup every time a spam packet is received
|
||||||
|
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||||
|
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||||
|
|
||||||
|
// If the operator has specified that untrusted proposals be dropped
|
||||||
|
// then this happens here I.e. before further wasting CPU verifying the
|
||||||
|
// signature of an untrusted key
|
||||||
|
if (!isTrusted)
|
||||||
|
{
|
||||||
|
// report untrusted proposal messages
|
||||||
|
overlay_.reportInboundTraffic(
|
||||||
|
TrafficCount::category::proposal_untrusted,
|
||||||
|
Message::messageSize(*m));
|
||||||
|
|
||||||
|
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint256 const proposeHash{set.currenttxhash()};
|
||||||
|
uint256 const prevLedger{set.previousledger()};
|
||||||
|
|
||||||
|
NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
|
||||||
|
|
||||||
|
uint256 const suppression = proposalUniqueId(
|
||||||
|
proposeHash,
|
||||||
|
prevLedger,
|
||||||
|
set.proposeseq(),
|
||||||
|
closeTime,
|
||||||
|
publicKey.slice(),
|
||||||
|
sig);
|
||||||
|
|
||||||
|
if (auto [added, relayed] =
|
||||||
|
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||||
|
!added)
|
||||||
|
{
|
||||||
|
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||||
|
// peer receives within IDLED seconds since the message has been
|
||||||
|
// relayed.
|
||||||
|
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||||
|
overlay_.updateSlotAndSquelch(
|
||||||
|
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||||
|
|
||||||
|
// report duplicate proposal messages
|
||||||
|
overlay_.reportInboundTraffic(
|
||||||
|
TrafficCount::category::proposal_duplicate,
|
||||||
|
Message::messageSize(*m));
|
||||||
|
|
||||||
|
JLOG(p_journal_.trace()) << "Proposal: duplicate";
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isTrusted)
|
||||||
|
{
|
||||||
|
if (tracking_.load() == Tracking::diverged)
|
||||||
|
{
|
||||||
|
JLOG(p_journal_.debug())
|
||||||
|
<< "Proposal: Dropping untrusted (peer divergence)";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
|
||||||
|
{
|
||||||
|
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
JLOG(p_journal_.trace())
|
||||||
|
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
|
||||||
|
|
||||||
|
auto proposal = RCLCxPeerPos(
|
||||||
|
publicKey,
|
||||||
|
sig,
|
||||||
|
suppression,
|
||||||
|
RCLCxPeerPos::Proposal{
|
||||||
|
prevLedger,
|
||||||
|
set.proposeseq(),
|
||||||
|
proposeHash,
|
||||||
|
closeTime,
|
||||||
|
app_.timeKeeper().closeTime(),
|
||||||
|
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
|
||||||
|
|
||||||
|
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||||
|
app_.getJobQueue().addJob(
|
||||||
|
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
|
||||||
|
"recvPropose->checkPropose",
|
||||||
|
[weak, isTrusted, m, proposal]() {
|
||||||
|
if (auto peer = weak.lock())
|
||||||
|
peer->checkPropose(isTrusted, m, proposal);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ripple
|
||||||
142
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h
Normal file
142
src/xrpld/overlay/detail/handlers/ProtocolMessageHandler.h
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
|
||||||
|
#include <xrpld/app/consensus/RCLCxPeerPos.h>
|
||||||
|
#include <xrpld/app/ledger/Ledger.h>
|
||||||
|
#include <xrpld/app/misc/HashRouter.h>
|
||||||
|
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||||
|
#include <xrpld/shamap/SHAMap.h>
|
||||||
|
|
||||||
|
#include <xrpl/protocol/STTx.h>
|
||||||
|
#include <xrpl/protocol/STValidation.h>
|
||||||
|
#include <xrpl/protocol/messages.h>
|
||||||
|
|
||||||
|
namespace ripple {
|
||||||
|
|
||||||
|
class ProtocolMessageHandler
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
beast::Journal const journal_;
|
||||||
|
beast::Journal const p_journal_;
|
||||||
|
OverlayImpl& overlay_;
|
||||||
|
Application& app_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
void
|
||||||
|
onMessageUnknown(std::uint16_t type);
|
||||||
|
|
||||||
|
void
|
||||||
|
onMessageBegin(
|
||||||
|
std::uint16_t type,
|
||||||
|
std::shared_ptr<::google::protobuf::Message> const& m,
|
||||||
|
std::size_t size,
|
||||||
|
std::size_t uncompressed_size,
|
||||||
|
bool isCompressed);
|
||||||
|
|
||||||
|
void
|
||||||
|
onMessageEnd(
|
||||||
|
std::uint16_t type,
|
||||||
|
std::shared_ptr<::google::protobuf::Message> const& m);
|
||||||
|
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMManifests> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMPing> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMCluster> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMEndpoints> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMTransaction> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMLedgerData> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMProposeSet> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMStatusChange> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMValidatorList> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMValidatorListCollection> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMTransactions> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
|
||||||
|
void
|
||||||
|
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
|
||||||
|
|
||||||
|
private:
|
||||||
|
//--------------------------------------------------------------------------
|
||||||
|
// lockedRecentLock is passed as a reminder to callers that recentLock_
|
||||||
|
// must be locked.
|
||||||
|
void
|
||||||
|
addLedger(
|
||||||
|
uint256 const& hash,
|
||||||
|
std::lock_guard<std::mutex> const& lockedRecentLock);
|
||||||
|
|
||||||
|
void
|
||||||
|
doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
|
||||||
|
|
||||||
|
void
|
||||||
|
onValidatorListMessage(
|
||||||
|
std::string const& messageType,
|
||||||
|
std::string const& manifest,
|
||||||
|
std::uint32_t version,
|
||||||
|
std::vector<protocol::ValidatorBlobInfo> const& blobs);
|
||||||
|
|
||||||
|
/** Process peer's request to send missing transactions. The request is
|
||||||
|
sent in response to TMHaveTransactions.
|
||||||
|
@param packet protocol message containing missing transactions' hashes.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
|
||||||
|
|
||||||
|
void
|
||||||
|
checkTransaction(
|
||||||
|
HashRouterFlags flags,
|
||||||
|
bool checkSignature,
|
||||||
|
std::shared_ptr<STTx const> const& stx,
|
||||||
|
bool batch);
|
||||||
|
|
||||||
|
void
|
||||||
|
checkPropose(
|
||||||
|
bool isTrusted,
|
||||||
|
std::shared_ptr<protocol::TMProposeSet> const& packet,
|
||||||
|
RCLCxPeerPos peerPos);
|
||||||
|
|
||||||
|
void
|
||||||
|
checkValidation(
|
||||||
|
std::shared_ptr<STValidation> const& val,
|
||||||
|
uint256 const& key,
|
||||||
|
std::shared_ptr<protocol::TMValidation> const& packet);
|
||||||
|
|
||||||
|
void
|
||||||
|
sendLedgerBase(
|
||||||
|
std::shared_ptr<Ledger const> const& ledger,
|
||||||
|
protocol::TMLedgerData& ledgerData);
|
||||||
|
|
||||||
|
std::shared_ptr<Ledger const>
|
||||||
|
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||||
|
|
||||||
|
std::shared_ptr<SHAMap const>
|
||||||
|
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
|
||||||
|
|
||||||
|
void
|
||||||
|
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace ripple
|
||||||
14
src/xrpld/server/detail/StreamInterface.cpp
Normal file
14
src/xrpld/server/detail/StreamInterface.cpp
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
#include <xrpld/overlay/detail/Handshake.h>
|
||||||
|
|
||||||
|
#include <xrpl/server/detail/StreamInterface.h>
|
||||||
|
|
||||||
|
namespace ripple {
|
||||||
|
|
||||||
|
std::optional<base_uint<256>>
|
||||||
|
ProductionStream::makeSharedValue(beast::Journal journal)
|
||||||
|
{
|
||||||
|
// Delegate to the existing Handshake module
|
||||||
|
return ripple::makeSharedValue(*stream_, journal);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ripple
|
||||||
Reference in New Issue
Block a user