From 0dae22adf24761f5d19d174f1b218ec86cf8daf1 Mon Sep 17 00:00:00 2001 From: Edward Hennis Date: Wed, 23 Dec 2020 14:19:25 -0500 Subject: [PATCH] Prevent simultaneous outgoing async peer messages: * Also, only send validator list on incoming connection instead of both incoming and outgoing. --- src/ripple/app/main/Application.cpp | 22 ++++++------ src/ripple/overlay/impl/PeerImp.cpp | 52 ++++++++++++++++------------- 2 files changed, 39 insertions(+), 35 deletions(-) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index fb090088b..29af2f865 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -1480,8 +1480,6 @@ ApplicationImp::setup() return false; } - validatorSites_->start(); - // start first consensus round if (!m_networkOPs->beginConsensus( m_ledgerMaster->getClosedLedger()->info().hash)) @@ -1595,6 +1593,7 @@ ApplicationImp::setup() } } + RPC::ShardArchiveHandler* shardArchiveHandler = nullptr; if (shardStore_) { try @@ -1606,15 +1605,7 @@ ApplicationImp::setup() // Recovery is needed. if (handler) - { - if (!handler->start()) - { - JLOG(m_journal.fatal()) - << "Failed to start ShardArchiveHandler."; - - return false; - } - } + shardArchiveHandler = handler; } catch (std::exception const& e) { @@ -1627,6 +1618,15 @@ ApplicationImp::setup() } } + if (shardArchiveHandler && !shardArchiveHandler->start()) + { + JLOG(m_journal.fatal()) << "Failed to start ShardArchiveHandler."; + + return false; + } + + validatorSites_->start(); + return true; } diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 364884d9c..0ecc8967a 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -182,12 +182,8 @@ PeerImp::run() else doProtocolStart(); - // Request shard info from peer - protocol::TMGetPeerShardInfo tmGPS; - tmGPS.set_hops(0); - send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO)); - - setTimer(); + // Anything else that needs to be done with the connection should be + // done in doProtocolStart } void @@ -244,13 +240,12 @@ PeerImp::send(std::shared_ptr const& m) // a small senq periodically large_sendq_ = 0; } - else if ( - journal_.active(beast::severities::kDebug) && - (sendq_size % Tuning::sendQueueLogFreq) == 0) + else if (auto sink = journal_.debug(); + sink && (sendq_size % Tuning::sendQueueLogFreq) == 0) { std::string const n = name(); - JLOG(journal_.debug()) << (n.empty() ? remote_address_.to_string() : n) - << " sendq: " << sendq_size; + sink << (n.empty() ? remote_address_.to_string() : n) + << " sendq: " << sendq_size; } send_queue_.push(m); @@ -780,18 +775,20 @@ PeerImp::doAccept() stream_, write_buffer->data(), boost::asio::transfer_all(), - [this, write_buffer, self = shared_from_this()]( - error_code ec, std::size_t bytes_transferred) { - if (!socket_.is_open()) - return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec) - return fail("onWriteResponse", ec); - if (write_buffer->size() == bytes_transferred) - return doProtocolStart(); - return fail("Failed to write header"); - }); + bind_executor( + strand_, + [this, write_buffer, self = shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + if (!socket_.is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; + if (ec) + return fail("onWriteResponse", ec); + if (write_buffer->size() == bytes_transferred) + return doProtocolStart(); + return fail("Failed to write header"); + })); } std::string @@ -817,7 +814,7 @@ PeerImp::doProtocolStart() onReadMessage(error_code(), 0); // Send all the validator lists that have been loaded - if (supportsFeature(ProtocolFeature::ValidatorListPropagation)) + if (inbound_ && supportsFeature(ProtocolFeature::ValidatorListPropagation)) { app_.validators().for_each_available( [&](std::string const& manifest, @@ -844,6 +841,13 @@ PeerImp::doProtocolStart() if (auto m = overlay_.getManifestsMessage()) send(m); + + // Request shard info from peer + protocol::TMGetPeerShardInfo tmGPS; + tmGPS.set_hops(0); + send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO)); + + setTimer(); } // Called repeatedly with protocol message data