mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
Periodically reshare federator sigs:
* Periodically remove signatures for txns this federator is unaware of. * Reshare mainchain and sidechain signatures on a heartbeat timer. * Switch between sharing sidechain and mainchain signatures on each timeout, in an attempt to reduce network traffic.
This commit is contained in:
@@ -41,14 +41,17 @@
|
||||
#include <ripple/rpc/Role.h>
|
||||
#include <ripple/rpc/impl/RPCHelpers.h>
|
||||
#include <ripple/rpc/impl/TransactionSign.h>
|
||||
#include <mutex>
|
||||
#include <ripple.pb.h>
|
||||
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/asio/error.hpp>
|
||||
#include <boost/system/detail/errc.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <cmath>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
|
||||
namespace ripple {
|
||||
@@ -815,6 +818,13 @@ Federator::init(
|
||||
ticketRunner_.setRpcChannel(false, sidechainListener_);
|
||||
mainDoorKeeper_.setRpcChannel(mainchainListener_);
|
||||
sideDoorKeeper_.setRpcChannel(sidechainListener_);
|
||||
|
||||
heartbeatTimer =
|
||||
std::make_unique<boost::asio::deadline_timer>(ios, heartbeatInterval);
|
||||
heartbeatTimer->async_wait(
|
||||
[self = shared_from_this()](boost::system::error_code const& ec) {
|
||||
self->heartbeatTimerHandler(ec);
|
||||
});
|
||||
}
|
||||
|
||||
Federator::~Federator()
|
||||
@@ -853,6 +863,25 @@ Federator::stop()
|
||||
mainchainListener_->shutdown();
|
||||
}
|
||||
|
||||
void
|
||||
Federator::heartbeatTimerHandler(const boost::system::error_code& ec)
|
||||
{
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
|
||||
if (ec == boost::system::errc::success)
|
||||
{
|
||||
onEvent(event::HeartbeatTimer{});
|
||||
}
|
||||
|
||||
heartbeatTimer->expires_at(
|
||||
heartbeatTimer->expires_at() + heartbeatInterval);
|
||||
heartbeatTimer->async_wait(
|
||||
[self = shared_from_this()](boost::system::error_code const& ec) {
|
||||
self->heartbeatTimerHandler(ec);
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
Federator::push(FederatorEvent&& e)
|
||||
{
|
||||
@@ -1354,6 +1383,13 @@ Federator::onEvent(event::HeartbeatTimer const& e)
|
||||
"Federator::onEvent",
|
||||
jv("eventtype", "HeartbeatTimer"),
|
||||
jv("event", e.toJson()));
|
||||
|
||||
static bool shareMain = false;
|
||||
shareMain = !shareMain;
|
||||
if (shareMain)
|
||||
mainSigCollector_.reshareSigs();
|
||||
else
|
||||
sideSigCollector_.reshareSigs();
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -39,6 +39,8 @@
|
||||
#include <ripple/protocol/STAmount.h>
|
||||
#include <ripple/protocol/SecretKey.h>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/container/flat_map.hpp>
|
||||
|
||||
#include <atomic>
|
||||
@@ -140,6 +142,9 @@ private:
|
||||
DoorKeeper mainDoorKeeper_;
|
||||
DoorKeeper sideDoorKeeper_;
|
||||
|
||||
boost::posix_time::seconds const heartbeatInterval{5};
|
||||
std::unique_ptr<boost::asio::deadline_timer> heartbeatTimer;
|
||||
|
||||
struct PeerTxnSignature
|
||||
{
|
||||
Buffer sig;
|
||||
@@ -308,6 +313,9 @@ private:
|
||||
std::shared_ptr<MainchainListener>&& mainchainListener,
|
||||
std::shared_ptr<SidechainListener>&& sidechainListener);
|
||||
|
||||
void
|
||||
heartbeatTimerHandler(const boost::system::error_code& ec);
|
||||
|
||||
// Convert between the asset on the src chain to the asset on the other
|
||||
// chain. The `assetProps_` array controls how this conversion is done.
|
||||
// An empty option is returned if the from issue is not part of the map in
|
||||
|
||||
@@ -139,9 +139,37 @@ void
|
||||
SignatureCollector::expire()
|
||||
{
|
||||
std::lock_guard lock(mtx_);
|
||||
// Never expire collections with this server's sig or submitted txns
|
||||
for (auto i = messages_.begin(), e = messages_.end(); i != e; ++i)
|
||||
{
|
||||
auto const& multiSigMsg = i->second;
|
||||
if (multiSigMsg.submitted_ ||
|
||||
std::any_of(
|
||||
multiSigMsg.sigMaps_.begin(),
|
||||
multiSigMsg.sigMaps_.end(),
|
||||
[&](auto const& p) { return p.first == myPubKey_; }))
|
||||
{
|
||||
messages_.touch(i);
|
||||
}
|
||||
}
|
||||
beast::expire(messages_, messageExpire);
|
||||
}
|
||||
|
||||
void
|
||||
SignatureCollector::reshareSigs()
|
||||
{
|
||||
std::lock_guard lock(mtx_);
|
||||
for (auto const& [messageId, multiSigMsg] : messages_)
|
||||
{
|
||||
if (multiSigMsg.submitted_)
|
||||
continue;
|
||||
for (auto const& [pk, sig] : multiSigMsg.sigMaps_)
|
||||
{
|
||||
shareSig(messageId, sig);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
SignatureCollector::addSig(
|
||||
MessageId const& mId,
|
||||
|
||||
@@ -122,6 +122,9 @@ public:
|
||||
void
|
||||
setRpcChannel(std::shared_ptr<ChainListener> channel);
|
||||
|
||||
void
|
||||
reshareSigs() EXCLUDES(mtx_);
|
||||
|
||||
private:
|
||||
// verify a signature (if it is from a peer) and add to a collection
|
||||
bool
|
||||
|
||||
Reference in New Issue
Block a user