Prevent simultaneous outgoing async peer messages:

* Also, only send validator list on incoming connection instead of both
  incoming and outgoing.
This commit is contained in:
Edward Hennis
2020-12-23 14:19:25 -05:00
parent 746181cb33
commit 0dae22adf2
2 changed files with 39 additions and 35 deletions

View File

@@ -1480,8 +1480,6 @@ ApplicationImp::setup()
return false; return false;
} }
validatorSites_->start();
// start first consensus round // start first consensus round
if (!m_networkOPs->beginConsensus( if (!m_networkOPs->beginConsensus(
m_ledgerMaster->getClosedLedger()->info().hash)) m_ledgerMaster->getClosedLedger()->info().hash))
@@ -1595,6 +1593,7 @@ ApplicationImp::setup()
} }
} }
RPC::ShardArchiveHandler* shardArchiveHandler = nullptr;
if (shardStore_) if (shardStore_)
{ {
try try
@@ -1606,15 +1605,7 @@ ApplicationImp::setup()
// Recovery is needed. // Recovery is needed.
if (handler) if (handler)
{ shardArchiveHandler = handler;
if (!handler->start())
{
JLOG(m_journal.fatal())
<< "Failed to start ShardArchiveHandler.";
return false;
}
}
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
@@ -1627,6 +1618,15 @@ ApplicationImp::setup()
} }
} }
if (shardArchiveHandler && !shardArchiveHandler->start())
{
JLOG(m_journal.fatal()) << "Failed to start ShardArchiveHandler.";
return false;
}
validatorSites_->start();
return true; return true;
} }

View File

@@ -182,12 +182,8 @@ PeerImp::run()
else else
doProtocolStart(); doProtocolStart();
// Request shard info from peer // Anything else that needs to be done with the connection should be
protocol::TMGetPeerShardInfo tmGPS; // done in doProtocolStart
tmGPS.set_hops(0);
send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO));
setTimer();
} }
void void
@@ -244,13 +240,12 @@ PeerImp::send(std::shared_ptr<Message> const& m)
// a small senq periodically // a small senq periodically
large_sendq_ = 0; large_sendq_ = 0;
} }
else if ( else if (auto sink = journal_.debug();
journal_.active(beast::severities::kDebug) && sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
(sendq_size % Tuning::sendQueueLogFreq) == 0)
{ {
std::string const n = name(); std::string const n = name();
JLOG(journal_.debug()) << (n.empty() ? remote_address_.to_string() : n) sink << (n.empty() ? remote_address_.to_string() : n)
<< " sendq: " << sendq_size; << " sendq: " << sendq_size;
} }
send_queue_.push(m); send_queue_.push(m);
@@ -780,18 +775,20 @@ PeerImp::doAccept()
stream_, stream_,
write_buffer->data(), write_buffer->data(),
boost::asio::transfer_all(), boost::asio::transfer_all(),
[this, write_buffer, self = shared_from_this()]( bind_executor(
error_code ec, std::size_t bytes_transferred) { strand_,
if (!socket_.is_open()) [this, write_buffer, self = shared_from_this()](
return; error_code ec, std::size_t bytes_transferred) {
if (ec == boost::asio::error::operation_aborted) if (!socket_.is_open())
return; return;
if (ec) if (ec == boost::asio::error::operation_aborted)
return fail("onWriteResponse", ec); return;
if (write_buffer->size() == bytes_transferred) if (ec)
return doProtocolStart(); return fail("onWriteResponse", ec);
return fail("Failed to write header"); if (write_buffer->size() == bytes_transferred)
}); return doProtocolStart();
return fail("Failed to write header");
}));
} }
std::string std::string
@@ -817,7 +814,7 @@ PeerImp::doProtocolStart()
onReadMessage(error_code(), 0); onReadMessage(error_code(), 0);
// Send all the validator lists that have been loaded // Send all the validator lists that have been loaded
if (supportsFeature(ProtocolFeature::ValidatorListPropagation)) if (inbound_ && supportsFeature(ProtocolFeature::ValidatorListPropagation))
{ {
app_.validators().for_each_available( app_.validators().for_each_available(
[&](std::string const& manifest, [&](std::string const& manifest,
@@ -844,6 +841,13 @@ PeerImp::doProtocolStart()
if (auto m = overlay_.getManifestsMessage()) if (auto m = overlay_.getManifestsMessage())
send(m); send(m);
// Request shard info from peer
protocol::TMGetPeerShardInfo tmGPS;
tmGPS.set_hops(0);
send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO));
setTimer();
} }
// Called repeatedly with protocol message data // Called repeatedly with protocol message data