Optimize peer I/O:

- Limit the lifetime of a buffer that was only used in the early
  phases of peer connection establishment but which lived on as
  long as the peer was active.
- Cache the message used to transfer manifests, so it can be reused
  instead of recreated for every peer connection.
- Improve the reading of partial messages by passing a hint to the
  I/O layer if the number of bytes needed to complete the message
  is known.
This commit is contained in:
Nik Bougalis
2020-11-17 23:20:10 -08:00
parent 8c386ae07e
commit 57ffc58613
8 changed files with 123 additions and 110 deletions

View File

@@ -219,6 +219,8 @@ private:
/** Master public keys stored by current ephemeral public key. */ /** Master public keys stored by current ephemeral public key. */
hash_map<PublicKey, PublicKey> signingToMasterKeys_; hash_map<PublicKey, PublicKey> signingToMasterKeys_;
std::atomic<std::uint32_t> seq_{0};
public: public:
explicit ManifestCache( explicit ManifestCache(
beast::Journal j = beast::Journal(beast::Journal::getNullSink())) beast::Journal j = beast::Journal(beast::Journal::getNullSink()))
@@ -226,6 +228,13 @@ public:
{ {
} }
/** A monotonically increasing number used to detect new manifests. */
std::uint32_t
sequence() const
{
return seq_.load();
}
/** Returns master key's current signing key. /** Returns master key's current signing key.
@param pk Master public key @param pk Master public key

View File

@@ -431,6 +431,9 @@ ManifestCache::applyManifest(Manifest m)
iter->second = std::move(m); iter->second = std::move(m);
} }
// Something has changed. Keep track of it.
seq_++;
return ManifestDisposition::accepted; return ManifestDisposition::accepted;
} }

View File

@@ -1263,6 +1263,36 @@ OverlayImpl::relay(
return {}; return {};
} }
std::shared_ptr<Message>
OverlayImpl::getManifestsMessage()
{
std::lock_guard g(manifestLock_);
if (auto seq = app_.validatorManifests().sequence();
seq != manifestListSeq_)
{
protocol::TMManifests tm;
app_.validatorManifests().for_each_manifest(
[&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
[&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
tm.add_list()->set_stobject(
manifest.serialized.data(), manifest.serialized.size());
hr.addSuppression(manifest.hash());
});
manifestMessage_.reset();
if (tm.list_size() != 0)
manifestMessage_ =
std::make_shared<Message>(tm, protocol::mtMANIFESTS);
manifestListSeq_ = seq;
}
return manifestMessage_;
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void

View File

@@ -25,6 +25,7 @@
#include <ripple/basics/UnorderedContainers.h> #include <ripple/basics/UnorderedContainers.h>
#include <ripple/basics/chrono.h> #include <ripple/basics/chrono.h>
#include <ripple/core/Job.h> #include <ripple/core/Job.h>
#include <ripple/overlay/Message.h>
#include <ripple/overlay/Overlay.h> #include <ripple/overlay/Overlay.h>
#include <ripple/overlay/Slot.h> #include <ripple/overlay/Slot.h>
#include <ripple/overlay/impl/Handshake.h> #include <ripple/overlay/impl/Handshake.h>
@@ -127,6 +128,13 @@ private:
squelch::Slots<UptimeClock> slots_; squelch::Slots<UptimeClock> slots_;
// A message with the list of manifests we send to peers
std::shared_ptr<Message> manifestMessage_;
// Used to track whether we need to update the cached list of manifests
std::optional<std::uint32_t> manifestListSeq_;
// Protects the message and the sequence list of manifests
std::mutex manifestLock_;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
public: public:
@@ -218,6 +226,9 @@ public:
uint256 const& uid, uint256 const& uid,
PublicKey const& validator) override; PublicKey const& validator) override;
std::shared_ptr<Message>
getManifestsMessage();
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
// OverlayImpl // OverlayImpl

View File

@@ -133,17 +133,15 @@ PeerImp::run()
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::run, shared_from_this())); return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
// We need to decipher
auto parseLedgerHash = auto parseLedgerHash =
[](std::string const& value) -> boost::optional<uint256> { [](std::string const& value) -> boost::optional<uint256> {
uint256 ret; if (uint256 ret; ret.SetHexExact(value))
if (ret.SetHexExact(value)) return ret;
return {ret};
auto const s = base64_decode(value); if (auto const s = base64_decode(value); s.size() == uint256::size())
if (s.size() != uint256::size()) return uint256{s};
return boost::none;
return uint256{s}; return boost::none;
}; };
boost::optional<uint256> closed; boost::optional<uint256> closed;
@@ -710,7 +708,6 @@ PeerImp::onShutdown(error_code ec)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void
PeerImp::doAccept() PeerImp::doAccept()
{ {
@@ -725,14 +722,6 @@ PeerImp::doAccept()
if (!sharedValue) if (!sharedValue)
return fail("makeSharedValue: Unexpected failure"); return fail("makeSharedValue: Unexpected failure");
// TODO Apply headers to connection state.
boost::beast::ostream(write_buffer_) << makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
remote_address_.address(),
*sharedValue);
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: " JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_); << toBase58(TokenType::NodePublic, publicKey_);
@@ -752,69 +741,54 @@ PeerImp::doAccept()
// XXX Set timer: connection idle (idle may vary depending on connection // XXX Set timer: connection idle (idle may vary depending on connection
// type.) // type.)
onWriteResponse(error_code(), 0); auto write_buffer = [this, sharedValue]() {
} auto buf = std::make_shared<boost::beast::multi_buffer>();
http_response_type http_response_type resp;
PeerImp::makeResponse( resp.result(boost::beast::http::status::switching_protocols);
bool crawl, resp.version(request_.version());
http_request_type const& req, resp.insert("Connection", "Upgrade");
beast::IP::Address remote_ip, resp.insert("Upgrade", to_string(protocol_));
uint256 const& sharedValue) resp.insert("Connect-As", "Peer");
{ resp.insert("Server", BuildInfo::getFullVersionString());
http_response_type resp; resp.insert(
resp.result(boost::beast::http::status::switching_protocols); "Crawl",
resp.version(req.version()); overlay_.peerFinder().config().peerPrivate ? "private" : "public");
resp.insert("Connection", "Upgrade");
resp.insert("Upgrade", to_string(protocol_));
resp.insert("Connect-As", "Peer");
resp.insert("Server", BuildInfo::getFullVersionString());
resp.insert("Crawl", crawl ? "public" : "private");
if (req["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION)
resp.insert("X-Offer-Compression", "lz4");
buildHandshake( if (request_["X-Offer-Compression"] == "lz4" &&
resp, app_.config().COMPRESSION)
sharedValue, resp.insert("X-Offer-Compression", "lz4");
overlay_.setup().networkID,
overlay_.setup().public_ip,
remote_ip,
app_);
return resp; buildHandshake(
} resp,
*sharedValue,
overlay_.setup().networkID,
overlay_.setup().public_ip,
remote_address_.address(),
app_);
// Called repeatedly to send the bytes in the response boost::beast::ostream(*buf) << resp;
void
PeerImp::onWriteResponse(error_code ec, std::size_t bytes_transferred)
{
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (auto stream = journal_.trace())
{
if (bytes_transferred > 0)
stream << "onWriteResponse: " << bytes_transferred << " bytes";
else
stream << "onWriteResponse";
}
write_buffer_.consume(bytes_transferred); return buf;
if (write_buffer_.size() == 0) }();
return doProtocolStart();
stream_.async_write_some( // Write the whole buffer and only start protocol when that's done.
write_buffer_.data(), boost::asio::async_write(
bind_executor( stream_,
strand_, write_buffer->data(),
std::bind( boost::asio::transfer_all(),
&PeerImp::onWriteResponse, [this, write_buffer, self = shared_from_this()](
shared_from_this(), error_code ec, std::size_t bytes_transferred) {
std::placeholders::_1, if (!socket_.is_open())
std::placeholders::_2))); return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return doProtocolStart();
return fail("Failed to write header");
});
} }
std::string std::string
@@ -860,30 +834,15 @@ PeerImp::doProtocolStart()
<< "Sending validator list for " << strHex(pubKey) << "Sending validator list for " << strHex(pubKey)
<< " with sequence " << sequence << " to " << " with sequence " << sequence << " to "
<< remote_address_.to_string() << " (" << id_ << ")"; << remote_address_.to_string() << " (" << id_ << ")";
auto m = std::make_shared<Message>(vl, protocol::mtVALIDATORLIST); send(std::make_shared<Message>(vl, protocol::mtVALIDATORLIST));
send(m);
// Don't send it next time. // Don't send it next time.
app_.getHashRouter().addSuppressionPeer(hash, id_); app_.getHashRouter().addSuppressionPeer(hash, id_);
setPublisherListSequence(pubKey, sequence); setPublisherListSequence(pubKey, sequence);
}); });
} }
protocol::TMManifests tm; if (auto m = overlay_.getManifestsMessage())
app_.validatorManifests().for_each_manifest(
[&tm](std::size_t s) { tm.mutable_list()->Reserve(s); },
[&tm, &hr = app_.getHashRouter()](Manifest const& manifest) {
auto const& s = manifest.serialized;
auto& tm_e = *tm.add_list();
tm_e.set_stobject(s.data(), s.size());
hr.addSuppression(manifest.hash());
});
if (tm.list_size() > 0)
{
auto m = std::make_shared<Message>(tm, protocol::mtMANIFESTS);
send(m); send(m);
}
} }
// Called repeatedly with protocol message data // Called repeatedly with protocol message data
@@ -913,11 +872,13 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
read_buffer_.commit(bytes_transferred); read_buffer_.commit(bytes_transferred);
auto hint = Tuning::readBufferBytes;
while (read_buffer_.size() > 0) while (read_buffer_.size() > 0)
{ {
std::size_t bytes_consumed; std::size_t bytes_consumed;
std::tie(bytes_consumed, ec) = std::tie(bytes_consumed, ec) =
invokeProtocolMessage(read_buffer_.data(), *this); invokeProtocolMessage(read_buffer_.data(), *this, hint);
if (ec) if (ec)
return fail("onReadMessage", ec); return fail("onReadMessage", ec);
if (!socket_.is_open()) if (!socket_.is_open())
@@ -928,9 +889,10 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
break; break;
read_buffer_.consume(bytes_consumed); read_buffer_.consume(bytes_consumed);
} }
// Timeout on writes only // Timeout on writes only
stream_.async_read_some( stream_.async_read_some(
read_buffer_.prepare(Tuning::readBufferBytes), read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
bind_executor( bind_executor(
strand_, strand_,
std::bind( std::bind(

View File

@@ -57,8 +57,6 @@ public:
RangeSet<std::uint32_t> shardIndexes; RangeSet<std::uint32_t> shardIndexes;
}; };
using ptr = std::shared_ptr<PeerImp>;
private: private:
using clock_type = std::chrono::steady_clock; using clock_type = std::chrono::steady_clock;
using error_code = boost::system::error_code; using error_code = boost::system::error_code;
@@ -156,7 +154,6 @@ private:
http_request_type request_; http_request_type request_;
http_response_type response_; http_response_type response_;
boost::beast::http::fields const& headers_; boost::beast::http::fields const& headers_;
boost::beast::multi_buffer write_buffer_;
std::queue<std::shared_ptr<Message>> send_queue_; std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false; bool gracefulClose_ = false;
int large_sendq_ = 0; int large_sendq_ = 0;
@@ -435,16 +432,6 @@ private:
void void
doAccept(); doAccept();
http_response_type
makeResponse(
bool crawl,
http_request_type const& req,
beast::IP::Address remote_ip,
uint256 const& sharedValue);
void
onWriteResponse(error_code ec, std::size_t bytes_transferred);
std::string std::string
name() const; name() const;

View File

@@ -258,11 +258,19 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
If there is insufficient data to produce a complete protocol If there is insufficient data to produce a complete protocol
message, zero is returned for the number of bytes consumed. message, zero is returned for the number of bytes consumed.
@param buffers The buffer that contains the data we've received
@param handler The handler that will be used to process the message
@param hint If possible, a hint as to the amount of data to read next. The
returned value MAY be zero, which means "no hint"
@return The number of bytes consumed, or the error code if any. @return The number of bytes consumed, or the error code if any.
*/ */
template <class Buffers, class Handler> template <class Buffers, class Handler>
std::pair<std::size_t, boost::system::error_code> std::pair<std::size_t, boost::system::error_code>
invokeProtocolMessage(Buffers const& buffers, Handler& handler) invokeProtocolMessage(
Buffers const& buffers,
Handler& handler,
std::size_t& hint)
{ {
std::pair<std::size_t, boost::system::error_code> result = {0, {}}; std::pair<std::size_t, boost::system::error_code> result = {0, {}};
@@ -303,7 +311,10 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler)
// We don't have the whole message yet. This isn't an error but we have // We don't have the whole message yet. This isn't an error but we have
// nothing to do. // nothing to do.
if (header->total_wire_size > size) if (header->total_wire_size > size)
{
hint = header->total_wire_size - size;
return result; return result;
}
bool success; bool success;

View File

@@ -27,9 +27,6 @@ namespace ripple {
namespace Tuning { namespace Tuning {
enum { enum {
/** Size of buffer used to read from the socket. */
readBufferBytes = 4096,
/** How many ledgers off a server can be and we will /** How many ledgers off a server can be and we will
still consider it converged */ still consider it converged */
convergedLedgerLimit = 24, convergedLedgerLimit = 24,
@@ -59,6 +56,9 @@ enum {
checkIdlePeers = 4, checkIdlePeers = 4,
}; };
/** Size of buffer used to read from the socket. */
std::size_t constexpr readBufferBytes = 16384;
} // namespace Tuning } // namespace Tuning
} // namespace ripple } // namespace ripple