From 2084d61efa9b79f577e3b9d53acff6a0e37621ed Mon Sep 17 00:00:00 2001 From: seelabs Date: Sat, 9 Apr 2022 16:24:18 -0400 Subject: [PATCH] Periodically reshare federator sigs: * Periodically remove signatures for txns this federator is unaware of. * Reshare mainchain and sidechain signatures on a heartbeat timer. * Switch between sharing sidechain and mainchain signatures on each timeout, in an attempt to reduce network traffic. --- src/ripple/app/sidechain/Federator.cpp | 38 ++++++++++++++++++- src/ripple/app/sidechain/Federator.h | 8 ++++ .../app/sidechain/impl/SignatureCollector.cpp | 28 ++++++++++++++ .../app/sidechain/impl/SignatureCollector.h | 3 ++ 4 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/ripple/app/sidechain/Federator.cpp b/src/ripple/app/sidechain/Federator.cpp index 6faae715f6..0147d296db 100644 --- a/src/ripple/app/sidechain/Federator.cpp +++ b/src/ripple/app/sidechain/Federator.cpp @@ -41,14 +41,17 @@ #include #include #include -#include #include #include #include +#include +#include #include #include +#include +#include #include namespace ripple { @@ -815,6 +818,13 @@ Federator::init( ticketRunner_.setRpcChannel(false, sidechainListener_); mainDoorKeeper_.setRpcChannel(mainchainListener_); sideDoorKeeper_.setRpcChannel(sidechainListener_); + + heartbeatTimer = + std::make_unique(ios, heartbeatInterval); + heartbeatTimer->async_wait( + [self = shared_from_this()](boost::system::error_code const& ec) { + self->heartbeatTimerHandler(ec); + }); } Federator::~Federator() @@ -853,6 +863,25 @@ Federator::stop() mainchainListener_->shutdown(); } +void +Federator::heartbeatTimerHandler(const boost::system::error_code& ec) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec == boost::system::errc::success) + { + onEvent(event::HeartbeatTimer{}); + } + + heartbeatTimer->expires_at( + heartbeatTimer->expires_at() + heartbeatInterval); + heartbeatTimer->async_wait( + [self = shared_from_this()](boost::system::error_code const& ec) { + self->heartbeatTimerHandler(ec); + }); +} + void Federator::push(FederatorEvent&& e) { @@ -1354,6 +1383,13 @@ Federator::onEvent(event::HeartbeatTimer const& e) "Federator::onEvent", jv("eventtype", "HeartbeatTimer"), jv("event", e.toJson())); + + static bool shareMain = false; + shareMain = !shareMain; + if (shareMain) + mainSigCollector_.reshareSigs(); + else + sideSigCollector_.reshareSigs(); } void diff --git a/src/ripple/app/sidechain/Federator.h b/src/ripple/app/sidechain/Federator.h index e195006b2e..60e8522d3d 100644 --- a/src/ripple/app/sidechain/Federator.h +++ b/src/ripple/app/sidechain/Federator.h @@ -39,6 +39,8 @@ #include #include +#include +#include #include #include @@ -140,6 +142,9 @@ private: DoorKeeper mainDoorKeeper_; DoorKeeper sideDoorKeeper_; + boost::posix_time::seconds const heartbeatInterval{5}; + std::unique_ptr heartbeatTimer; + struct PeerTxnSignature { Buffer sig; @@ -308,6 +313,9 @@ private: std::shared_ptr&& mainchainListener, std::shared_ptr&& sidechainListener); + void + heartbeatTimerHandler(const boost::system::error_code& ec); + // Convert between the asset on the src chain to the asset on the other // chain. The `assetProps_` array controls how this conversion is done. // An empty option is returned if the from issue is not part of the map in diff --git a/src/ripple/app/sidechain/impl/SignatureCollector.cpp b/src/ripple/app/sidechain/impl/SignatureCollector.cpp index 6157f541f2..c24b4f39bf 100644 --- a/src/ripple/app/sidechain/impl/SignatureCollector.cpp +++ b/src/ripple/app/sidechain/impl/SignatureCollector.cpp @@ -139,9 +139,37 @@ void SignatureCollector::expire() { std::lock_guard lock(mtx_); + // Never expire collections with this server's sig or submitted txns + for (auto i = messages_.begin(), e = messages_.end(); i != e; ++i) + { + auto const& multiSigMsg = i->second; + if (multiSigMsg.submitted_ || + std::any_of( + multiSigMsg.sigMaps_.begin(), + multiSigMsg.sigMaps_.end(), + [&](auto const& p) { return p.first == myPubKey_; })) + { + messages_.touch(i); + } + } beast::expire(messages_, messageExpire); } +void +SignatureCollector::reshareSigs() +{ + std::lock_guard lock(mtx_); + for (auto const& [messageId, multiSigMsg] : messages_) + { + if (multiSigMsg.submitted_) + continue; + for (auto const& [pk, sig] : multiSigMsg.sigMaps_) + { + shareSig(messageId, sig); + } + } +} + bool SignatureCollector::addSig( MessageId const& mId, diff --git a/src/ripple/app/sidechain/impl/SignatureCollector.h b/src/ripple/app/sidechain/impl/SignatureCollector.h index ba7df10095..5a3f7d5905 100644 --- a/src/ripple/app/sidechain/impl/SignatureCollector.h +++ b/src/ripple/app/sidechain/impl/SignatureCollector.h @@ -122,6 +122,9 @@ public: void setRpcChannel(std::shared_ptr channel); + void + reshareSigs() EXCLUDES(mtx_); + private: // verify a signature (if it is from a peer) and add to a collection bool