mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-28 06:55:50 +00:00
Fix crash due to recurrent call to `Slot::deletePeer` (via `OverlayImpl::unsquelch`) when a peer is disconnected at just the wrong moment.
1612 lines
46 KiB
C++
1612 lines
46 KiB
C++
//------------------------------------------------------------------------------
|
|
/*
|
|
This file is part of rippled: https://github.com/ripple/rippled
|
|
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
|
|
|
Permission to use, copy, modify, and/or distribute this software for any
|
|
purpose with or without fee is hereby granted, provided that the above
|
|
copyright notice and this permission notice appear in all copies.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
//==============================================================================
|
|
|
|
#include <xrpld/app/misc/HashRouter.h>
|
|
#include <xrpld/app/misc/NetworkOPs.h>
|
|
#include <xrpld/app/misc/ValidatorList.h>
|
|
#include <xrpld/app/misc/ValidatorSite.h>
|
|
#include <xrpld/app/rdb/RelationalDatabase.h>
|
|
#include <xrpld/app/rdb/Wallet.h>
|
|
#include <xrpld/overlay/Cluster.h>
|
|
#include <xrpld/overlay/detail/ConnectAttempt.h>
|
|
#include <xrpld/overlay/detail/PeerImp.h>
|
|
#include <xrpld/overlay/detail/Tuning.h>
|
|
#include <xrpld/overlay/predicates.h>
|
|
#include <xrpld/peerfinder/make_Manager.h>
|
|
#include <xrpld/rpc/handlers/GetCounts.h>
|
|
#include <xrpld/rpc/json_body.h>
|
|
|
|
#include <xrpl/basics/base64.h>
|
|
#include <xrpl/basics/make_SSLContext.h>
|
|
#include <xrpl/basics/random.h>
|
|
#include <xrpl/beast/core/LexicalCast.h>
|
|
#include <xrpl/protocol/STTx.h>
|
|
#include <xrpl/server/SimpleWriter.h>
|
|
|
|
#include <boost/algorithm/string/predicate.hpp>
|
|
|
|
#include "xrpld/overlay/detail/TrafficCount.h"
|
|
|
|
namespace ripple {
|
|
|
|
namespace CrawlOptions {
|
|
enum {
|
|
Disabled = 0,
|
|
Overlay = (1 << 0),
|
|
ServerInfo = (1 << 1),
|
|
ServerCounts = (1 << 2),
|
|
Unl = (1 << 3)
|
|
};
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
OverlayImpl::Child::Child(OverlayImpl& overlay) : overlay_(overlay)
|
|
{
|
|
}
|
|
|
|
OverlayImpl::Child::~Child()
|
|
{
|
|
overlay_.remove(*this);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
OverlayImpl::Timer::Timer(OverlayImpl& overlay)
|
|
: Child(overlay), timer_(overlay_.io_service_)
|
|
{
|
|
}
|
|
|
|
void
|
|
OverlayImpl::Timer::stop()
|
|
{
|
|
// This method is only ever called from the same strand that calls
|
|
// Timer::on_timer, ensuring they never execute concurrently.
|
|
stopping_ = true;
|
|
timer_.cancel();
|
|
}
|
|
|
|
void
|
|
OverlayImpl::Timer::async_wait()
|
|
{
|
|
timer_.expires_after(std::chrono::seconds(1));
|
|
timer_.async_wait(overlay_.strand_.wrap(std::bind(
|
|
&Timer::on_timer, shared_from_this(), std::placeholders::_1)));
|
|
}
|
|
|
|
void
|
|
OverlayImpl::Timer::on_timer(error_code ec)
|
|
{
|
|
if (ec || stopping_)
|
|
{
|
|
if (ec && ec != boost::asio::error::operation_aborted)
|
|
{
|
|
JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
|
|
}
|
|
return;
|
|
}
|
|
|
|
overlay_.m_peerFinder->once_per_second();
|
|
overlay_.sendEndpoints();
|
|
overlay_.autoConnect();
|
|
if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
|
|
overlay_.sendTxQueue();
|
|
|
|
if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
|
|
overlay_.deleteIdlePeers();
|
|
|
|
async_wait();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
OverlayImpl::OverlayImpl(
|
|
Application& app,
|
|
Setup const& setup,
|
|
ServerHandler& serverHandler,
|
|
Resource::Manager& resourceManager,
|
|
Resolver& resolver,
|
|
boost::asio::io_service& io_service,
|
|
BasicConfig const& config,
|
|
beast::insight::Collector::ptr const& collector)
|
|
: app_(app)
|
|
, io_service_(io_service)
|
|
, work_(std::in_place, std::ref(io_service_))
|
|
, strand_(io_service_)
|
|
, setup_(setup)
|
|
, journal_(app_.journal("Overlay"))
|
|
, serverHandler_(serverHandler)
|
|
, m_resourceManager(resourceManager)
|
|
, m_peerFinder(PeerFinder::make_Manager(
|
|
io_service,
|
|
stopwatch(),
|
|
app_.journal("PeerFinder"),
|
|
config,
|
|
collector))
|
|
, m_resolver(resolver)
|
|
, next_id_(1)
|
|
, timer_count_(0)
|
|
, slots_(app.logs(), *this, app.config())
|
|
, m_stats(
|
|
std::bind(&OverlayImpl::collect_metrics, this),
|
|
collector,
|
|
[counts = m_traffic.getCounts(), collector]() {
|
|
std::unordered_map<TrafficCount::category, TrafficGauges> ret;
|
|
|
|
for (auto const& pair : counts)
|
|
ret.emplace(
|
|
pair.first, TrafficGauges(pair.second.name, collector));
|
|
|
|
return ret;
|
|
}())
|
|
{
|
|
beast::PropertyStream::Source::add(m_peerFinder.get());
|
|
}
|
|
|
|
Handoff
|
|
OverlayImpl::onHandoff(
|
|
std::unique_ptr<stream_type>&& stream_ptr,
|
|
http_request_type&& request,
|
|
endpoint_type remote_endpoint)
|
|
{
|
|
auto const id = next_id_++;
|
|
beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
|
|
beast::Journal journal(sink);
|
|
|
|
Handoff handoff;
|
|
if (processRequest(request, handoff))
|
|
return handoff;
|
|
if (!isPeerUpgrade(request))
|
|
return handoff;
|
|
|
|
handoff.moved = true;
|
|
|
|
JLOG(journal.debug()) << "Peer connection upgrade from " << remote_endpoint;
|
|
|
|
error_code ec;
|
|
auto const local_endpoint(
|
|
stream_ptr->next_layer().socket().local_endpoint(ec));
|
|
if (ec)
|
|
{
|
|
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
|
|
return handoff;
|
|
}
|
|
|
|
auto consumer = m_resourceManager.newInboundEndpoint(
|
|
beast::IPAddressConversion::from_asio(remote_endpoint));
|
|
if (consumer.disconnect(journal))
|
|
return handoff;
|
|
|
|
auto const slot = m_peerFinder->new_inbound_slot(
|
|
beast::IPAddressConversion::from_asio(local_endpoint),
|
|
beast::IPAddressConversion::from_asio(remote_endpoint));
|
|
|
|
if (slot == nullptr)
|
|
{
|
|
// self-connect, close
|
|
handoff.moved = false;
|
|
return handoff;
|
|
}
|
|
|
|
// Validate HTTP request
|
|
|
|
{
|
|
auto const types = beast::rfc2616::split_commas(request["Connect-As"]);
|
|
if (std::find_if(types.begin(), types.end(), [](std::string const& s) {
|
|
return boost::iequals(s, "peer");
|
|
}) == types.end())
|
|
{
|
|
handoff.moved = false;
|
|
handoff.response =
|
|
makeRedirectResponse(slot, request, remote_endpoint.address());
|
|
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
|
|
return handoff;
|
|
}
|
|
}
|
|
|
|
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
|
if (!negotiatedVersion)
|
|
{
|
|
m_peerFinder->on_closed(slot);
|
|
handoff.moved = false;
|
|
handoff.response = makeErrorResponse(
|
|
slot,
|
|
request,
|
|
remote_endpoint.address(),
|
|
"Unable to agree on a protocol version");
|
|
handoff.keep_alive = false;
|
|
return handoff;
|
|
}
|
|
|
|
auto const sharedValue = makeSharedValue(*stream_ptr, journal);
|
|
if (!sharedValue)
|
|
{
|
|
m_peerFinder->on_closed(slot);
|
|
handoff.moved = false;
|
|
handoff.response = makeErrorResponse(
|
|
slot,
|
|
request,
|
|
remote_endpoint.address(),
|
|
"Incorrect security cookie");
|
|
handoff.keep_alive = false;
|
|
return handoff;
|
|
}
|
|
|
|
try
|
|
{
|
|
auto publicKey = verifyHandshake(
|
|
request,
|
|
*sharedValue,
|
|
setup_.networkID,
|
|
setup_.public_ip,
|
|
remote_endpoint.address(),
|
|
app_);
|
|
|
|
{
|
|
// The node gets a reserved slot if it is in our cluster
|
|
// or if it has a reservation.
|
|
bool const reserved =
|
|
static_cast<bool>(app_.cluster().member(publicKey)) ||
|
|
app_.peerReservations().contains(publicKey);
|
|
auto const result =
|
|
m_peerFinder->activate(slot, publicKey, reserved);
|
|
if (result != PeerFinder::Result::success)
|
|
{
|
|
m_peerFinder->on_closed(slot);
|
|
JLOG(journal.debug())
|
|
<< "Peer " << remote_endpoint << " redirected, slots full";
|
|
handoff.moved = false;
|
|
handoff.response = makeRedirectResponse(
|
|
slot, request, remote_endpoint.address());
|
|
handoff.keep_alive = false;
|
|
return handoff;
|
|
}
|
|
}
|
|
|
|
auto const peer = std::make_shared<PeerImp>(
|
|
app_,
|
|
id,
|
|
slot,
|
|
std::move(request),
|
|
publicKey,
|
|
*negotiatedVersion,
|
|
consumer,
|
|
std::move(stream_ptr),
|
|
*this);
|
|
{
|
|
// As we are not on the strand, run() must be called
|
|
// while holding the lock, otherwise new I/O can be
|
|
// queued after a call to stop().
|
|
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
|
{
|
|
auto const result = m_peers.emplace(peer->slot(), peer);
|
|
XRPL_ASSERT(
|
|
result.second,
|
|
"ripple::OverlayImpl::onHandoff : peer is inserted");
|
|
(void)result.second;
|
|
}
|
|
list_.emplace(peer.get(), peer);
|
|
|
|
peer->run();
|
|
}
|
|
handoff.moved = true;
|
|
return handoff;
|
|
}
|
|
catch (std::exception const& e)
|
|
{
|
|
JLOG(journal.debug()) << "Peer " << remote_endpoint
|
|
<< " fails handshake (" << e.what() << ")";
|
|
|
|
m_peerFinder->on_closed(slot);
|
|
handoff.moved = false;
|
|
handoff.response = makeErrorResponse(
|
|
slot, request, remote_endpoint.address(), e.what());
|
|
handoff.keep_alive = false;
|
|
return handoff;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
bool
|
|
OverlayImpl::isPeerUpgrade(http_request_type const& request)
|
|
{
|
|
if (!is_upgrade(request))
|
|
return false;
|
|
auto const versions = parseProtocolVersions(request["Upgrade"]);
|
|
return !versions.empty();
|
|
}
|
|
|
|
std::string
|
|
OverlayImpl::makePrefix(std::uint32_t id)
|
|
{
|
|
std::stringstream ss;
|
|
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
|
|
return ss.str();
|
|
}
|
|
|
|
std::shared_ptr<Writer>
|
|
OverlayImpl::makeRedirectResponse(
|
|
std::shared_ptr<PeerFinder::Slot> const& slot,
|
|
http_request_type const& request,
|
|
address_type remote_address)
|
|
{
|
|
boost::beast::http::response<json_body> msg;
|
|
msg.version(request.version());
|
|
msg.result(boost::beast::http::status::service_unavailable);
|
|
msg.insert("Server", BuildInfo::getFullVersionString());
|
|
{
|
|
std::ostringstream ostr;
|
|
ostr << remote_address;
|
|
msg.insert("Remote-Address", ostr.str());
|
|
}
|
|
msg.insert("Content-Type", "application/json");
|
|
msg.insert(boost::beast::http::field::connection, "close");
|
|
msg.body() = Json::objectValue;
|
|
{
|
|
Json::Value& ips = (msg.body()["peer-ips"] = Json::arrayValue);
|
|
for (auto const& _ : m_peerFinder->redirect(slot))
|
|
ips.append(_.address.to_string());
|
|
}
|
|
msg.prepare_payload();
|
|
return std::make_shared<SimpleWriter>(msg);
|
|
}
|
|
|
|
std::shared_ptr<Writer>
|
|
OverlayImpl::makeErrorResponse(
|
|
std::shared_ptr<PeerFinder::Slot> const& slot,
|
|
http_request_type const& request,
|
|
address_type remote_address,
|
|
std::string text)
|
|
{
|
|
boost::beast::http::response<boost::beast::http::empty_body> msg;
|
|
msg.version(request.version());
|
|
msg.result(boost::beast::http::status::bad_request);
|
|
msg.reason("Bad Request (" + text + ")");
|
|
msg.insert("Server", BuildInfo::getFullVersionString());
|
|
msg.insert("Remote-Address", remote_address.to_string());
|
|
msg.insert(boost::beast::http::field::connection, "close");
|
|
msg.prepare_payload();
|
|
return std::make_shared<SimpleWriter>(msg);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
void
|
|
OverlayImpl::connect(beast::IP::Endpoint const& remote_endpoint)
|
|
{
|
|
XRPL_ASSERT(work_, "ripple::OverlayImpl::connect : work is set");
|
|
|
|
auto usage = resourceManager().newOutboundEndpoint(remote_endpoint);
|
|
if (usage.disconnect(journal_))
|
|
{
|
|
JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
|
|
return;
|
|
}
|
|
|
|
auto const slot = peerFinder().new_outbound_slot(remote_endpoint);
|
|
if (slot == nullptr)
|
|
{
|
|
JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint;
|
|
return;
|
|
}
|
|
|
|
auto const p = std::make_shared<ConnectAttempt>(
|
|
app_,
|
|
io_service_,
|
|
beast::IPAddressConversion::to_asio_endpoint(remote_endpoint),
|
|
usage,
|
|
setup_.context,
|
|
next_id_++,
|
|
slot,
|
|
app_.journal("Peer"),
|
|
*this);
|
|
|
|
std::lock_guard lock(mutex_);
|
|
list_.emplace(p.get(), p);
|
|
p->run();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
// Adds a peer that is already handshaked and active
|
|
void
|
|
OverlayImpl::add_active(std::shared_ptr<PeerImp> const& peer)
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
|
|
{
|
|
auto const result = m_peers.emplace(peer->slot(), peer);
|
|
XRPL_ASSERT(
|
|
result.second,
|
|
"ripple::OverlayImpl::add_active : peer is inserted");
|
|
(void)result.second;
|
|
}
|
|
|
|
{
|
|
auto const result = ids_.emplace(
|
|
std::piecewise_construct,
|
|
std::make_tuple(peer->id()),
|
|
std::make_tuple(peer));
|
|
XRPL_ASSERT(
|
|
result.second,
|
|
"ripple::OverlayImpl::add_active : peer ID is inserted");
|
|
(void)result.second;
|
|
}
|
|
|
|
list_.emplace(peer.get(), peer);
|
|
|
|
JLOG(journal_.debug()) << "activated " << peer->getRemoteAddress() << " ("
|
|
<< peer->id() << ":"
|
|
<< toBase58(
|
|
TokenType::NodePublic, peer->getNodePublic())
|
|
<< ")";
|
|
|
|
// As we are not on the strand, run() must be called
|
|
// while holding the lock, otherwise new I/O can be
|
|
// queued after a call to stop().
|
|
peer->run();
|
|
}
|
|
|
|
void
|
|
OverlayImpl::remove(std::shared_ptr<PeerFinder::Slot> const& slot)
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
auto const iter = m_peers.find(slot);
|
|
XRPL_ASSERT(
|
|
iter != m_peers.end(), "ripple::OverlayImpl::remove : valid input");
|
|
m_peers.erase(iter);
|
|
}
|
|
|
|
void
|
|
OverlayImpl::start()
|
|
{
|
|
PeerFinder::Config config = PeerFinder::Config::makeConfig(
|
|
app_.config(),
|
|
serverHandler_.setup().overlay.port(),
|
|
app_.getValidationPublicKey().has_value(),
|
|
setup_.ipLimit);
|
|
|
|
m_peerFinder->setConfig(config);
|
|
m_peerFinder->start();
|
|
|
|
// Populate our boot cache: if there are no entries in [ips] then we use
|
|
// the entries in [ips_fixed].
|
|
auto bootstrapIps =
|
|
app_.config().IPS.empty() ? app_.config().IPS_FIXED : app_.config().IPS;
|
|
|
|
// If nothing is specified, default to several well-known high-capacity
|
|
// servers to serve as bootstrap:
|
|
if (bootstrapIps.empty())
|
|
{
|
|
// Pool of servers operated by Ripple Labs Inc. - https://ripple.com
|
|
bootstrapIps.push_back("r.ripple.com 51235");
|
|
|
|
// Pool of servers operated by ISRDC - https://isrdc.in
|
|
bootstrapIps.push_back("sahyadri.isrdc.in 51235");
|
|
|
|
// Pool of servers operated by @Xrpkuwait - https://xrpkuwait.com
|
|
bootstrapIps.push_back("hubs.xrpkuwait.com 51235");
|
|
|
|
// Pool of servers operated by XRPL Commons - https://xrpl-commons.org
|
|
bootstrapIps.push_back("hub.xrpl-commons.org 51235");
|
|
}
|
|
|
|
m_resolver.resolve(
|
|
bootstrapIps,
|
|
[this](
|
|
std::string const& name,
|
|
std::vector<beast::IP::Endpoint> const& addresses) {
|
|
std::vector<std::string> ips;
|
|
ips.reserve(addresses.size());
|
|
for (auto const& addr : addresses)
|
|
{
|
|
if (addr.port() == 0)
|
|
ips.push_back(to_string(addr.at_port(DEFAULT_PEER_PORT)));
|
|
else
|
|
ips.push_back(to_string(addr));
|
|
}
|
|
|
|
std::string const base("config: ");
|
|
if (!ips.empty())
|
|
m_peerFinder->addFallbackStrings(base + name, ips);
|
|
});
|
|
|
|
// Add the ips_fixed from the rippled.cfg file
|
|
if (!app_.config().standalone() && !app_.config().IPS_FIXED.empty())
|
|
{
|
|
m_resolver.resolve(
|
|
app_.config().IPS_FIXED,
|
|
[this](
|
|
std::string const& name,
|
|
std::vector<beast::IP::Endpoint> const& addresses) {
|
|
std::vector<beast::IP::Endpoint> ips;
|
|
ips.reserve(addresses.size());
|
|
|
|
for (auto& addr : addresses)
|
|
{
|
|
if (addr.port() == 0)
|
|
ips.emplace_back(addr.address(), DEFAULT_PEER_PORT);
|
|
else
|
|
ips.emplace_back(addr);
|
|
}
|
|
|
|
if (!ips.empty())
|
|
m_peerFinder->addFixedPeer(name, ips);
|
|
});
|
|
}
|
|
auto const timer = std::make_shared<Timer>(*this);
|
|
std::lock_guard lock(mutex_);
|
|
list_.emplace(timer.get(), timer);
|
|
timer_ = timer;
|
|
timer->async_wait();
|
|
}
|
|
|
|
void
|
|
OverlayImpl::stop()
|
|
{
|
|
strand_.dispatch(std::bind(&OverlayImpl::stopChildren, this));
|
|
{
|
|
std::unique_lock<decltype(mutex_)> lock(mutex_);
|
|
cond_.wait(lock, [this] { return list_.empty(); });
|
|
}
|
|
m_peerFinder->stop();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
//
|
|
// PropertyStream
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
|
|
void
|
|
OverlayImpl::onWrite(beast::PropertyStream::Map& stream)
|
|
{
|
|
beast::PropertyStream::Set set("traffic", stream);
|
|
auto const stats = m_traffic.getCounts();
|
|
for (auto const& pair : stats)
|
|
{
|
|
beast::PropertyStream::Map item(set);
|
|
item["category"] = pair.second.name;
|
|
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
|
|
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
|
|
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
|
|
item["messages_out"] = std::to_string(pair.second.messagesOut.load());
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
/** A peer has connected successfully
|
|
This is called after the peer handshake has been completed and during
|
|
peer activation. At this point, the peer address and the public key
|
|
are known.
|
|
*/
|
|
void
|
|
OverlayImpl::activate(std::shared_ptr<PeerImp> const& peer)
|
|
{
|
|
// Now track this peer
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
auto const result(ids_.emplace(
|
|
std::piecewise_construct,
|
|
std::make_tuple(peer->id()),
|
|
std::make_tuple(peer)));
|
|
XRPL_ASSERT(
|
|
result.second,
|
|
"ripple::OverlayImpl::activate : peer ID is inserted");
|
|
(void)result.second;
|
|
}
|
|
|
|
JLOG(journal_.debug()) << "activated " << peer->getRemoteAddress() << " ("
|
|
<< peer->id() << ":"
|
|
<< toBase58(
|
|
TokenType::NodePublic, peer->getNodePublic())
|
|
<< ")";
|
|
|
|
// We just accepted this peer so we have non-zero active peers
|
|
XRPL_ASSERT(size(), "ripple::OverlayImpl::activate : nonzero peers");
|
|
}
|
|
|
|
void
|
|
OverlayImpl::onPeerDeactivate(Peer::id_t id)
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
ids_.erase(id);
|
|
}
|
|
|
|
void
|
|
OverlayImpl::onManifests(
|
|
std::shared_ptr<protocol::TMManifests> const& m,
|
|
std::shared_ptr<PeerImp> const& from)
|
|
{
|
|
auto const n = m->list_size();
|
|
auto const& journal = from->pjournal();
|
|
|
|
protocol::TMManifests relay;
|
|
|
|
for (std::size_t i = 0; i < n; ++i)
|
|
{
|
|
auto& s = m->list().Get(i).stobject();
|
|
|
|
if (auto mo = deserializeManifest(s))
|
|
{
|
|
auto const serialized = mo->serialized;
|
|
|
|
auto const result =
|
|
app_.validatorManifests().applyManifest(std::move(*mo));
|
|
|
|
if (result == ManifestDisposition::accepted)
|
|
{
|
|
relay.add_list()->set_stobject(s);
|
|
|
|
// N.B.: this is important; the applyManifest call above moves
|
|
// the loaded Manifest out of the optional so we need to
|
|
// reload it here.
|
|
mo = deserializeManifest(serialized);
|
|
XRPL_ASSERT(
|
|
mo,
|
|
"ripple::OverlayImpl::onManifests : manifest "
|
|
"deserialization succeeded");
|
|
|
|
app_.getOPs().pubManifest(*mo);
|
|
|
|
if (app_.validators().listed(mo->masterKey))
|
|
{
|
|
auto db = app_.getWalletDB().checkoutDb();
|
|
addValidatorManifest(*db, serialized);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
JLOG(journal.debug())
|
|
<< "Malformed manifest #" << i + 1 << ": " << strHex(s);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (!relay.list().empty())
|
|
for_each([m2 = std::make_shared<Message>(relay, protocol::mtMANIFESTS)](
|
|
std::shared_ptr<PeerImp>&& p) { p->send(m2); });
|
|
}
|
|
|
|
void
|
|
OverlayImpl::reportInboundTraffic(TrafficCount::category cat, int size)
|
|
{
|
|
m_traffic.addCount(cat, true, size);
|
|
}
|
|
|
|
void
|
|
OverlayImpl::reportOutboundTraffic(TrafficCount::category cat, int size)
|
|
{
|
|
m_traffic.addCount(cat, false, size);
|
|
}
|
|
/** The number of active peers on the network
|
|
Active peers are only those peers that have completed the handshake
|
|
and are running the Ripple protocol.
|
|
*/
|
|
std::size_t
|
|
OverlayImpl::size() const
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
return ids_.size();
|
|
}
|
|
|
|
int
|
|
OverlayImpl::limit()
|
|
{
|
|
return m_peerFinder->config().maxPeers;
|
|
}
|
|
|
|
Json::Value
|
|
OverlayImpl::getOverlayInfo()
|
|
{
|
|
using namespace std::chrono;
|
|
Json::Value jv;
|
|
auto& av = jv["active"] = Json::Value(Json::arrayValue);
|
|
|
|
for_each([&](std::shared_ptr<PeerImp>&& sp) {
|
|
auto& pv = av.append(Json::Value(Json::objectValue));
|
|
pv[jss::public_key] = base64_encode(
|
|
sp->getNodePublic().data(), sp->getNodePublic().size());
|
|
pv[jss::type] = sp->slot()->inbound() ? "in" : "out";
|
|
pv[jss::uptime] = static_cast<std::uint32_t>(
|
|
duration_cast<seconds>(sp->uptime()).count());
|
|
if (sp->crawl())
|
|
{
|
|
pv[jss::ip] = sp->getRemoteAddress().address().to_string();
|
|
if (sp->slot()->inbound())
|
|
{
|
|
if (auto port = sp->slot()->listening_port())
|
|
pv[jss::port] = *port;
|
|
}
|
|
else
|
|
{
|
|
pv[jss::port] = std::to_string(sp->getRemoteAddress().port());
|
|
}
|
|
}
|
|
|
|
{
|
|
auto version{sp->getVersion()};
|
|
if (!version.empty())
|
|
// Could move here if Json::value supported moving from strings
|
|
pv[jss::version] = std::string{version};
|
|
}
|
|
|
|
std::uint32_t minSeq, maxSeq;
|
|
sp->ledgerRange(minSeq, maxSeq);
|
|
if (minSeq != 0 || maxSeq != 0)
|
|
pv[jss::complete_ledgers] =
|
|
std::to_string(minSeq) + "-" + std::to_string(maxSeq);
|
|
});
|
|
|
|
return jv;
|
|
}
|
|
|
|
Json::Value
|
|
OverlayImpl::getServerInfo()
|
|
{
|
|
bool const humanReadable = false;
|
|
bool const admin = false;
|
|
bool const counters = false;
|
|
|
|
Json::Value server_info =
|
|
app_.getOPs().getServerInfo(humanReadable, admin, counters);
|
|
|
|
// Filter out some information
|
|
server_info.removeMember(jss::hostid);
|
|
server_info.removeMember(jss::load_factor_fee_escalation);
|
|
server_info.removeMember(jss::load_factor_fee_queue);
|
|
server_info.removeMember(jss::validation_quorum);
|
|
|
|
if (server_info.isMember(jss::validated_ledger))
|
|
{
|
|
Json::Value& validated_ledger = server_info[jss::validated_ledger];
|
|
|
|
validated_ledger.removeMember(jss::base_fee);
|
|
validated_ledger.removeMember(jss::reserve_base_xrp);
|
|
validated_ledger.removeMember(jss::reserve_inc_xrp);
|
|
}
|
|
|
|
return server_info;
|
|
}
|
|
|
|
Json::Value
|
|
OverlayImpl::getServerCounts()
|
|
{
|
|
return getCountsJson(app_, 10);
|
|
}
|
|
|
|
Json::Value
|
|
OverlayImpl::getUnlInfo()
|
|
{
|
|
Json::Value validators = app_.validators().getJson();
|
|
|
|
if (validators.isMember(jss::publisher_lists))
|
|
{
|
|
Json::Value& publisher_lists = validators[jss::publisher_lists];
|
|
|
|
for (auto& publisher : publisher_lists)
|
|
{
|
|
publisher.removeMember(jss::list);
|
|
}
|
|
}
|
|
|
|
validators.removeMember(jss::signing_keys);
|
|
validators.removeMember(jss::trusted_validator_keys);
|
|
validators.removeMember(jss::validation_quorum);
|
|
|
|
Json::Value validatorSites = app_.validatorSites().getJson();
|
|
|
|
if (validatorSites.isMember(jss::validator_sites))
|
|
{
|
|
validators[jss::validator_sites] =
|
|
std::move(validatorSites[jss::validator_sites]);
|
|
}
|
|
|
|
return validators;
|
|
}
|
|
|
|
// Returns information on verified peers.
|
|
Json::Value
|
|
OverlayImpl::json()
|
|
{
|
|
Json::Value json;
|
|
for (auto const& peer : getActivePeers())
|
|
{
|
|
json.append(peer->json());
|
|
}
|
|
return json;
|
|
}
|
|
|
|
bool
|
|
OverlayImpl::processCrawl(http_request_type const& req, Handoff& handoff)
|
|
{
|
|
if (req.target() != "/crawl" ||
|
|
setup_.crawlOptions == CrawlOptions::Disabled)
|
|
return false;
|
|
|
|
boost::beast::http::response<json_body> msg;
|
|
msg.version(req.version());
|
|
msg.result(boost::beast::http::status::ok);
|
|
msg.insert("Server", BuildInfo::getFullVersionString());
|
|
msg.insert("Content-Type", "application/json");
|
|
msg.insert("Connection", "close");
|
|
msg.body()["version"] = Json::Value(2u);
|
|
|
|
if (setup_.crawlOptions & CrawlOptions::Overlay)
|
|
{
|
|
msg.body()["overlay"] = getOverlayInfo();
|
|
}
|
|
if (setup_.crawlOptions & CrawlOptions::ServerInfo)
|
|
{
|
|
msg.body()["server"] = getServerInfo();
|
|
}
|
|
if (setup_.crawlOptions & CrawlOptions::ServerCounts)
|
|
{
|
|
msg.body()["counts"] = getServerCounts();
|
|
}
|
|
if (setup_.crawlOptions & CrawlOptions::Unl)
|
|
{
|
|
msg.body()["unl"] = getUnlInfo();
|
|
}
|
|
|
|
msg.prepare_payload();
|
|
handoff.response = std::make_shared<SimpleWriter>(msg);
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
OverlayImpl::processValidatorList(
|
|
http_request_type const& req,
|
|
Handoff& handoff)
|
|
{
|
|
// If the target is in the form "/vl/<validator_list_public_key>",
|
|
// return the most recent validator list for that key.
|
|
constexpr std::string_view prefix("/vl/");
|
|
|
|
if (!req.target().starts_with(prefix.data()) || !setup_.vlEnabled)
|
|
return false;
|
|
|
|
std::uint32_t version = 1;
|
|
|
|
boost::beast::http::response<json_body> msg;
|
|
msg.version(req.version());
|
|
msg.insert("Server", BuildInfo::getFullVersionString());
|
|
msg.insert("Content-Type", "application/json");
|
|
msg.insert("Connection", "close");
|
|
|
|
auto fail = [&msg, &handoff](auto status) {
|
|
msg.result(status);
|
|
msg.insert("Content-Length", "0");
|
|
|
|
msg.body() = Json::nullValue;
|
|
|
|
msg.prepare_payload();
|
|
handoff.response = std::make_shared<SimpleWriter>(msg);
|
|
return true;
|
|
};
|
|
|
|
std::string_view key = req.target().substr(prefix.size());
|
|
|
|
if (auto slash = key.find('/'); slash != std::string_view::npos)
|
|
{
|
|
auto verString = key.substr(0, slash);
|
|
if (!boost::conversion::try_lexical_convert(verString, version))
|
|
return fail(boost::beast::http::status::bad_request);
|
|
key = key.substr(slash + 1);
|
|
}
|
|
|
|
if (key.empty())
|
|
return fail(boost::beast::http::status::bad_request);
|
|
|
|
// find the list
|
|
auto vl = app_.validators().getAvailable(key, version);
|
|
|
|
if (!vl)
|
|
{
|
|
// 404 not found
|
|
return fail(boost::beast::http::status::not_found);
|
|
}
|
|
else if (!*vl)
|
|
{
|
|
return fail(boost::beast::http::status::bad_request);
|
|
}
|
|
else
|
|
{
|
|
msg.result(boost::beast::http::status::ok);
|
|
|
|
msg.body() = *vl;
|
|
|
|
msg.prepare_payload();
|
|
handoff.response = std::make_shared<SimpleWriter>(msg);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
bool
|
|
OverlayImpl::processHealth(http_request_type const& req, Handoff& handoff)
|
|
{
|
|
if (req.target() != "/health")
|
|
return false;
|
|
boost::beast::http::response<json_body> msg;
|
|
msg.version(req.version());
|
|
msg.insert("Server", BuildInfo::getFullVersionString());
|
|
msg.insert("Content-Type", "application/json");
|
|
msg.insert("Connection", "close");
|
|
|
|
auto info = getServerInfo();
|
|
|
|
int last_validated_ledger_age = -1;
|
|
if (info.isMember(jss::validated_ledger))
|
|
last_validated_ledger_age =
|
|
info[jss::validated_ledger][jss::age].asInt();
|
|
bool amendment_blocked = false;
|
|
if (info.isMember(jss::amendment_blocked))
|
|
amendment_blocked = true;
|
|
int number_peers = info[jss::peers].asInt();
|
|
std::string server_state = info[jss::server_state].asString();
|
|
auto load_factor = info[jss::load_factor_server].asDouble() /
|
|
info[jss::load_base].asDouble();
|
|
|
|
enum { healthy, warning, critical };
|
|
int health = healthy;
|
|
auto set_health = [&health](int state) {
|
|
if (health < state)
|
|
health = state;
|
|
};
|
|
|
|
msg.body()[jss::info] = Json::objectValue;
|
|
if (last_validated_ledger_age >= 7 || last_validated_ledger_age < 0)
|
|
{
|
|
msg.body()[jss::info][jss::validated_ledger] =
|
|
last_validated_ledger_age;
|
|
if (last_validated_ledger_age < 20)
|
|
set_health(warning);
|
|
else
|
|
set_health(critical);
|
|
}
|
|
|
|
if (amendment_blocked)
|
|
{
|
|
msg.body()[jss::info][jss::amendment_blocked] = true;
|
|
set_health(critical);
|
|
}
|
|
|
|
if (number_peers <= 7)
|
|
{
|
|
msg.body()[jss::info][jss::peers] = number_peers;
|
|
if (number_peers != 0)
|
|
set_health(warning);
|
|
else
|
|
set_health(critical);
|
|
}
|
|
|
|
if (!(server_state == "full" || server_state == "validating" ||
|
|
server_state == "proposing"))
|
|
{
|
|
msg.body()[jss::info][jss::server_state] = server_state;
|
|
if (server_state == "syncing" || server_state == "tracking" ||
|
|
server_state == "connected")
|
|
{
|
|
set_health(warning);
|
|
}
|
|
else
|
|
set_health(critical);
|
|
}
|
|
|
|
if (load_factor > 100)
|
|
{
|
|
msg.body()[jss::info][jss::load_factor] = load_factor;
|
|
if (load_factor < 1000)
|
|
set_health(warning);
|
|
else
|
|
set_health(critical);
|
|
}
|
|
|
|
switch (health)
|
|
{
|
|
case healthy:
|
|
msg.result(boost::beast::http::status::ok);
|
|
break;
|
|
case warning:
|
|
msg.result(boost::beast::http::status::service_unavailable);
|
|
break;
|
|
case critical:
|
|
msg.result(boost::beast::http::status::internal_server_error);
|
|
break;
|
|
}
|
|
|
|
msg.prepare_payload();
|
|
handoff.response = std::make_shared<SimpleWriter>(msg);
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
OverlayImpl::processRequest(http_request_type const& req, Handoff& handoff)
|
|
{
|
|
// Take advantage of || short-circuiting
|
|
return processCrawl(req, handoff) || processValidatorList(req, handoff) ||
|
|
processHealth(req, handoff);
|
|
}
|
|
|
|
Overlay::PeerSequence
|
|
OverlayImpl::getActivePeers() const
|
|
{
|
|
Overlay::PeerSequence ret;
|
|
ret.reserve(size());
|
|
|
|
for_each([&ret](std::shared_ptr<PeerImp>&& sp) {
|
|
ret.emplace_back(std::move(sp));
|
|
});
|
|
|
|
return ret;
|
|
}
|
|
|
|
Overlay::PeerSequence
|
|
OverlayImpl::getActivePeers(
|
|
std::set<Peer::id_t> const& toSkip,
|
|
std::size_t& active,
|
|
std::size_t& disabled,
|
|
std::size_t& enabledInSkip) const
|
|
{
|
|
Overlay::PeerSequence ret;
|
|
std::lock_guard lock(mutex_);
|
|
|
|
active = ids_.size();
|
|
disabled = enabledInSkip = 0;
|
|
ret.reserve(ids_.size());
|
|
|
|
// NOTE The purpose of p is to delay the destruction of PeerImp
|
|
std::shared_ptr<PeerImp> p;
|
|
for (auto& [id, w] : ids_)
|
|
{
|
|
if (p = w.lock(); p != nullptr)
|
|
{
|
|
bool const reduceRelayEnabled = p->txReduceRelayEnabled();
|
|
// tx reduced relay feature disabled
|
|
if (!reduceRelayEnabled)
|
|
++disabled;
|
|
|
|
if (toSkip.count(id) == 0)
|
|
ret.emplace_back(std::move(p));
|
|
else if (reduceRelayEnabled)
|
|
++enabledInSkip;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
void
|
|
OverlayImpl::checkTracking(std::uint32_t index)
|
|
{
|
|
for_each(
|
|
[index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
|
|
}
|
|
|
|
std::shared_ptr<Peer>
|
|
OverlayImpl::findPeerByShortID(Peer::id_t const& id) const
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
auto const iter = ids_.find(id);
|
|
if (iter != ids_.end())
|
|
return iter->second.lock();
|
|
return {};
|
|
}
|
|
|
|
// A public key hash map was not used due to the peer connect/disconnect
|
|
// update overhead outweighing the performance of a small set linear search.
|
|
std::shared_ptr<Peer>
|
|
OverlayImpl::findPeerByPublicKey(PublicKey const& pubKey)
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
// NOTE The purpose of peer is to delay the destruction of PeerImp
|
|
std::shared_ptr<PeerImp> peer;
|
|
for (auto const& e : ids_)
|
|
{
|
|
if (peer = e.second.lock(); peer != nullptr)
|
|
{
|
|
if (peer->getNodePublic() == pubKey)
|
|
return peer;
|
|
}
|
|
}
|
|
return {};
|
|
}
|
|
|
|
void
|
|
OverlayImpl::broadcast(protocol::TMProposeSet& m)
|
|
{
|
|
auto const sm = std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER);
|
|
for_each([&](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
|
|
}
|
|
|
|
std::set<Peer::id_t>
|
|
OverlayImpl::relay(
|
|
protocol::TMProposeSet& m,
|
|
uint256 const& uid,
|
|
PublicKey const& validator)
|
|
{
|
|
if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
|
|
{
|
|
auto const sm =
|
|
std::make_shared<Message>(m, protocol::mtPROPOSE_LEDGER, validator);
|
|
for_each([&](std::shared_ptr<PeerImp>&& p) {
|
|
if (toSkip->find(p->id()) == toSkip->end())
|
|
p->send(sm);
|
|
});
|
|
return *toSkip;
|
|
}
|
|
return {};
|
|
}
|
|
|
|
void
|
|
OverlayImpl::broadcast(protocol::TMValidation& m)
|
|
{
|
|
auto const sm = std::make_shared<Message>(m, protocol::mtVALIDATION);
|
|
for_each([sm](std::shared_ptr<PeerImp>&& p) { p->send(sm); });
|
|
}
|
|
|
|
std::set<Peer::id_t>
|
|
OverlayImpl::relay(
|
|
protocol::TMValidation& m,
|
|
uint256 const& uid,
|
|
PublicKey const& validator)
|
|
{
|
|
if (auto const toSkip = app_.getHashRouter().shouldRelay(uid))
|
|
{
|
|
auto const sm =
|
|
std::make_shared<Message>(m, protocol::mtVALIDATION, validator);
|
|
for_each([&](std::shared_ptr<PeerImp>&& p) {
|
|
if (toSkip->find(p->id()) == toSkip->end())
|
|
p->send(sm);
|
|
});
|
|
return *toSkip;
|
|
}
|
|
return {};
|
|
}
|
|
|
|
std::shared_ptr<Message>
|
|
OverlayImpl::getManifestsMessage()
|
|
{
|
|
std::lock_guard g(manifestLock_);
|
|
|
|
if (auto seq = app_.validatorManifests().sequence();
|
|
seq != manifestListSeq_)
|
|
{
|
|
protocol::TMManifests tm;
|
|
|
|
app_.validatorManifests().for_each_manifest(
|
|
[&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
|
|
[&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
|
|
tm.add_list()->set_stobject(
|
|
manifest.serialized.data(), manifest.serialized.size());
|
|
hr.addSuppression(manifest.hash());
|
|
});
|
|
|
|
manifestMessage_.reset();
|
|
|
|
if (tm.list_size() != 0)
|
|
manifestMessage_ =
|
|
std::make_shared<Message>(tm, protocol::mtMANIFESTS);
|
|
|
|
manifestListSeq_ = seq;
|
|
}
|
|
|
|
return manifestMessage_;
|
|
}
|
|
|
|
void
|
|
OverlayImpl::relay(
|
|
uint256 const& hash,
|
|
std::optional<std::reference_wrapper<protocol::TMTransaction>> tx,
|
|
std::set<Peer::id_t> const& toSkip)
|
|
{
|
|
bool relay = tx.has_value();
|
|
if (relay)
|
|
{
|
|
auto& txn = tx->get();
|
|
SerialIter sit(makeSlice(txn.rawtransaction()));
|
|
relay = !isPseudoTx(STTx{sit});
|
|
}
|
|
|
|
Overlay::PeerSequence peers = {};
|
|
std::size_t total = 0;
|
|
std::size_t disabled = 0;
|
|
std::size_t enabledInSkip = 0;
|
|
|
|
if (!relay)
|
|
{
|
|
if (!app_.config().TX_REDUCE_RELAY_ENABLE)
|
|
return;
|
|
|
|
peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
|
|
JLOG(journal_.trace())
|
|
<< "not relaying tx, total peers " << peers.size();
|
|
for (auto const& p : peers)
|
|
p->addTxQueue(hash);
|
|
return;
|
|
}
|
|
|
|
auto& txn = tx->get();
|
|
auto const sm = std::make_shared<Message>(txn, protocol::mtTRANSACTION);
|
|
peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
|
|
auto const minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
|
|
|
|
if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
|
|
{
|
|
for (auto const& p : peers)
|
|
p->send(sm);
|
|
if (app_.config().TX_REDUCE_RELAY_ENABLE ||
|
|
app_.config().TX_REDUCE_RELAY_METRICS)
|
|
txMetrics_.addMetrics(total, toSkip.size(), 0);
|
|
return;
|
|
}
|
|
|
|
// We have more peers than the minimum (disabled + minimum enabled),
|
|
// relay to all disabled and some randomly selected enabled that
|
|
// do not have the transaction.
|
|
auto const enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
|
|
(total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;
|
|
|
|
txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
|
|
|
|
if (enabledTarget > enabledInSkip)
|
|
std::shuffle(peers.begin(), peers.end(), default_prng());
|
|
|
|
JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size()
|
|
<< " selected " << enabledTarget << " skip "
|
|
<< toSkip.size() << " disabled " << disabled;
|
|
|
|
// count skipped peers with the enabled feature towards the quota
|
|
std::uint16_t enabledAndRelayed = enabledInSkip;
|
|
for (auto const& p : peers)
|
|
{
|
|
// always relay to a peer with the disabled feature
|
|
if (!p->txReduceRelayEnabled())
|
|
{
|
|
p->send(sm);
|
|
}
|
|
else if (enabledAndRelayed < enabledTarget)
|
|
{
|
|
enabledAndRelayed++;
|
|
p->send(sm);
|
|
}
|
|
else
|
|
{
|
|
p->addTxQueue(hash);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
void
|
|
OverlayImpl::remove(Child& child)
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
list_.erase(&child);
|
|
if (list_.empty())
|
|
cond_.notify_all();
|
|
}
|
|
|
|
void
|
|
OverlayImpl::stopChildren()
|
|
{
|
|
// Calling list_[].second->stop() may cause list_ to be modified
|
|
// (OverlayImpl::remove() may be called on this same thread). So
|
|
// iterating directly over list_ to call child->stop() could lead to
|
|
// undefined behavior.
|
|
//
|
|
// Therefore we copy all of the weak/shared ptrs out of list_ before we
|
|
// start calling stop() on them. That guarantees OverlayImpl::remove()
|
|
// won't be called until vector<> children leaves scope.
|
|
std::vector<std::shared_ptr<Child>> children;
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
if (!work_)
|
|
return;
|
|
work_ = std::nullopt;
|
|
|
|
children.reserve(list_.size());
|
|
for (auto const& element : list_)
|
|
{
|
|
children.emplace_back(element.second.lock());
|
|
}
|
|
} // lock released
|
|
|
|
for (auto const& child : children)
|
|
{
|
|
if (child != nullptr)
|
|
child->stop();
|
|
}
|
|
}
|
|
|
|
void
|
|
OverlayImpl::autoConnect()
|
|
{
|
|
auto const result = m_peerFinder->autoconnect();
|
|
for (auto addr : result)
|
|
connect(addr);
|
|
}
|
|
|
|
void
|
|
OverlayImpl::sendEndpoints()
|
|
{
|
|
auto const result = m_peerFinder->buildEndpointsForPeers();
|
|
for (auto const& e : result)
|
|
{
|
|
std::shared_ptr<PeerImp> peer;
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
auto const iter = m_peers.find(e.first);
|
|
if (iter != m_peers.end())
|
|
peer = iter->second.lock();
|
|
}
|
|
if (peer)
|
|
peer->sendEndpoints(e.second.begin(), e.second.end());
|
|
}
|
|
}
|
|
|
|
void
|
|
OverlayImpl::sendTxQueue()
|
|
{
|
|
for_each([](auto const& p) {
|
|
if (p->txReduceRelayEnabled())
|
|
p->sendTxQueue();
|
|
});
|
|
}
|
|
|
|
std::shared_ptr<Message>
|
|
makeSquelchMessage(
|
|
PublicKey const& validator,
|
|
bool squelch,
|
|
uint32_t squelchDuration)
|
|
{
|
|
protocol::TMSquelch m;
|
|
m.set_squelch(squelch);
|
|
m.set_validatorpubkey(validator.data(), validator.size());
|
|
if (squelch)
|
|
m.set_squelchduration(squelchDuration);
|
|
return std::make_shared<Message>(m, protocol::mtSQUELCH);
|
|
}
|
|
|
|
void
|
|
OverlayImpl::unsquelch(PublicKey const& validator, Peer::id_t id) const
|
|
{
|
|
if (auto peer = findPeerByShortID(id); peer)
|
|
{
|
|
// optimize - multiple message with different
|
|
// validator might be sent to the same peer
|
|
peer->send(makeSquelchMessage(validator, false, 0));
|
|
}
|
|
}
|
|
|
|
void
|
|
OverlayImpl::squelch(
|
|
PublicKey const& validator,
|
|
Peer::id_t id,
|
|
uint32_t squelchDuration) const
|
|
{
|
|
if (auto peer = findPeerByShortID(id); peer)
|
|
{
|
|
peer->send(makeSquelchMessage(validator, true, squelchDuration));
|
|
}
|
|
}
|
|
|
|
void
|
|
OverlayImpl::updateSlotAndSquelch(
|
|
uint256 const& key,
|
|
PublicKey const& validator,
|
|
std::set<Peer::id_t>&& peers,
|
|
protocol::MessageType type)
|
|
{
|
|
if (!slots_.baseSquelchReady())
|
|
return;
|
|
|
|
if (!strand_.running_in_this_thread())
|
|
return post(
|
|
strand_,
|
|
// Must capture copies of reference parameters (i.e. key, validator)
|
|
[this,
|
|
key = key,
|
|
validator = validator,
|
|
peers = std::move(peers),
|
|
type]() mutable {
|
|
updateSlotAndSquelch(key, validator, std::move(peers), type);
|
|
});
|
|
|
|
for (auto id : peers)
|
|
slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
|
|
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
|
|
});
|
|
}
|
|
|
|
void
|
|
OverlayImpl::updateSlotAndSquelch(
|
|
uint256 const& key,
|
|
PublicKey const& validator,
|
|
Peer::id_t peer,
|
|
protocol::MessageType type)
|
|
{
|
|
if (!slots_.baseSquelchReady())
|
|
return;
|
|
|
|
if (!strand_.running_in_this_thread())
|
|
return post(
|
|
strand_,
|
|
// Must capture copies of reference parameters (i.e. key, validator)
|
|
[this, key = key, validator = validator, peer, type]() {
|
|
updateSlotAndSquelch(key, validator, peer, type);
|
|
});
|
|
|
|
slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
|
|
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
|
|
});
|
|
}
|
|
|
|
void
|
|
OverlayImpl::deletePeer(Peer::id_t id)
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(strand_, std::bind(&OverlayImpl::deletePeer, this, id));
|
|
|
|
slots_.deletePeer(id, true);
|
|
}
|
|
|
|
void
|
|
OverlayImpl::deleteIdlePeers()
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(strand_, std::bind(&OverlayImpl::deleteIdlePeers, this));
|
|
|
|
slots_.deleteIdlePeers();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
Overlay::Setup
|
|
setup_Overlay(BasicConfig const& config)
|
|
{
|
|
Overlay::Setup setup;
|
|
|
|
{
|
|
auto const& section = config.section("overlay");
|
|
setup.context = make_SSLContext("");
|
|
|
|
set(setup.ipLimit, "ip_limit", section);
|
|
if (setup.ipLimit < 0)
|
|
Throw<std::runtime_error>("Configured IP limit is invalid");
|
|
|
|
std::string ip;
|
|
set(ip, "public_ip", section);
|
|
if (!ip.empty())
|
|
{
|
|
boost::system::error_code ec;
|
|
setup.public_ip = beast::IP::Address::from_string(ip, ec);
|
|
if (ec || beast::IP::is_private(setup.public_ip))
|
|
Throw<std::runtime_error>("Configured public IP is invalid");
|
|
}
|
|
}
|
|
|
|
{
|
|
auto const& section = config.section("crawl");
|
|
auto const& values = section.values();
|
|
|
|
if (values.size() > 1)
|
|
{
|
|
Throw<std::runtime_error>(
|
|
"Configured [crawl] section is invalid, too many values");
|
|
}
|
|
|
|
bool crawlEnabled = true;
|
|
|
|
// Only allow "0|1" as a value
|
|
if (values.size() == 1)
|
|
{
|
|
try
|
|
{
|
|
crawlEnabled = boost::lexical_cast<bool>(values.front());
|
|
}
|
|
catch (boost::bad_lexical_cast const&)
|
|
{
|
|
Throw<std::runtime_error>(
|
|
"Configured [crawl] section has invalid value: " +
|
|
values.front());
|
|
}
|
|
}
|
|
|
|
if (crawlEnabled)
|
|
{
|
|
if (get<bool>(section, "overlay", true))
|
|
{
|
|
setup.crawlOptions |= CrawlOptions::Overlay;
|
|
}
|
|
if (get<bool>(section, "server", true))
|
|
{
|
|
setup.crawlOptions |= CrawlOptions::ServerInfo;
|
|
}
|
|
if (get<bool>(section, "counts", false))
|
|
{
|
|
setup.crawlOptions |= CrawlOptions::ServerCounts;
|
|
}
|
|
if (get<bool>(section, "unl", true))
|
|
{
|
|
setup.crawlOptions |= CrawlOptions::Unl;
|
|
}
|
|
}
|
|
}
|
|
{
|
|
auto const& section = config.section("vl");
|
|
|
|
set(setup.vlEnabled, "enabled", section);
|
|
}
|
|
|
|
try
|
|
{
|
|
auto id = config.legacy("network_id");
|
|
|
|
if (!id.empty())
|
|
{
|
|
if (id == "main")
|
|
id = "0";
|
|
|
|
if (id == "testnet")
|
|
id = "1";
|
|
|
|
if (id == "devnet")
|
|
id = "2";
|
|
|
|
setup.networkID = beast::lexicalCastThrow<std::uint32_t>(id);
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
Throw<std::runtime_error>(
|
|
"Configured [network_id] section is invalid: must be a number "
|
|
"or one of the strings 'main', 'testnet' or 'devnet'.");
|
|
}
|
|
|
|
return setup;
|
|
}
|
|
|
|
std::unique_ptr<Overlay>
|
|
make_Overlay(
|
|
Application& app,
|
|
Overlay::Setup const& setup,
|
|
ServerHandler& serverHandler,
|
|
Resource::Manager& resourceManager,
|
|
Resolver& resolver,
|
|
boost::asio::io_service& io_service,
|
|
BasicConfig const& config,
|
|
beast::insight::Collector::ptr const& collector)
|
|
{
|
|
return std::make_unique<OverlayImpl>(
|
|
app,
|
|
setup,
|
|
serverHandler,
|
|
resourceManager,
|
|
resolver,
|
|
io_service,
|
|
config,
|
|
collector);
|
|
}
|
|
|
|
} // namespace ripple
|