diff --git a/src/ripple/app/misc/Manifest.h b/src/ripple/app/misc/Manifest.h index 52bebb1dd..532cb88d7 100644 --- a/src/ripple/app/misc/Manifest.h +++ b/src/ripple/app/misc/Manifest.h @@ -219,6 +219,8 @@ private: /** Master public keys stored by current ephemeral public key. */ hash_map signingToMasterKeys_; + std::atomic seq_{0}; + public: explicit ManifestCache( beast::Journal j = beast::Journal(beast::Journal::getNullSink())) @@ -226,6 +228,13 @@ public: { } + /** A monotonically increasing number used to detect new manifests. */ + std::uint32_t + sequence() const + { + return seq_.load(); + } + /** Returns master key's current signing key. @param pk Master public key diff --git a/src/ripple/app/misc/impl/Manifest.cpp b/src/ripple/app/misc/impl/Manifest.cpp index 319d09b2c..e69c5c49d 100644 --- a/src/ripple/app/misc/impl/Manifest.cpp +++ b/src/ripple/app/misc/impl/Manifest.cpp @@ -431,6 +431,9 @@ ManifestCache::applyManifest(Manifest m) iter->second = std::move(m); } + // Something has changed. Keep track of it. + seq_++; + return ManifestDisposition::accepted; } diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index db62bcb8c..489a69ed7 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -1263,6 +1263,36 @@ OverlayImpl::relay( return {}; } +std::shared_ptr +OverlayImpl::getManifestsMessage() +{ + std::lock_guard g(manifestLock_); + + if (auto seq = app_.validatorManifests().sequence(); + seq != manifestListSeq_) + { + protocol::TMManifests tm; + + app_.validatorManifests().for_each_manifest( + [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); }, + [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) { + tm.add_list()->set_stobject( + manifest.serialized.data(), manifest.serialized.size()); + hr.addSuppression(manifest.hash()); + }); + + manifestMessage_.reset(); + + if (tm.list_size() != 0) + manifestMessage_ = + std::make_shared(tm, protocol::mtMANIFESTS); + + manifestListSeq_ = seq; + } + + return manifestMessage_; +} + //------------------------------------------------------------------------------ void diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 527a22878..65f5ffc52 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,13 @@ private: squelch::Slots slots_; + // A message with the list of manifests we send to peers + std::shared_ptr manifestMessage_; + // Used to track whether we need to update the cached list of manifests + std::optional manifestListSeq_; + // Protects the message and the sequence list of manifests + std::mutex manifestLock_; + //-------------------------------------------------------------------------- public: @@ -218,6 +226,9 @@ public: uint256 const& uid, PublicKey const& validator) override; + std::shared_ptr + getManifestsMessage(); + //-------------------------------------------------------------------------- // // OverlayImpl diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 1bc94a3f1..63c1b1c5c 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -133,17 +133,15 @@ PeerImp::run() if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&PeerImp::run, shared_from_this())); - // We need to decipher auto parseLedgerHash = [](std::string const& value) -> boost::optional { - uint256 ret; - if (ret.SetHexExact(value)) - return {ret}; + if (uint256 ret; ret.SetHexExact(value)) + return ret; - auto const s = base64_decode(value); - if (s.size() != uint256::size()) - return boost::none; - return uint256{s}; + if (auto const s = base64_decode(value); s.size() == uint256::size()) + return uint256{s}; + + return boost::none; }; boost::optional closed; @@ -710,7 +708,6 @@ PeerImp::onShutdown(error_code ec) } //------------------------------------------------------------------------------ - void PeerImp::doAccept() { @@ -725,14 +722,6 @@ PeerImp::doAccept() if (!sharedValue) return fail("makeSharedValue: Unexpected failure"); - // TODO Apply headers to connection state. - - boost::beast::ostream(write_buffer_) << makeResponse( - !overlay_.peerFinder().config().peerPrivate, - request_, - remote_address_.address(), - *sharedValue); - JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey_); @@ -752,69 +741,54 @@ PeerImp::doAccept() // XXX Set timer: connection idle (idle may vary depending on connection // type.) - onWriteResponse(error_code(), 0); -} + auto write_buffer = [this, sharedValue]() { + auto buf = std::make_shared(); -http_response_type -PeerImp::makeResponse( - bool crawl, - http_request_type const& req, - beast::IP::Address remote_ip, - uint256 const& sharedValue) -{ - http_response_type resp; - resp.result(boost::beast::http::status::switching_protocols); - resp.version(req.version()); - resp.insert("Connection", "Upgrade"); - resp.insert("Upgrade", to_string(protocol_)); - resp.insert("Connect-As", "Peer"); - resp.insert("Server", BuildInfo::getFullVersionString()); - resp.insert("Crawl", crawl ? "public" : "private"); - if (req["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION) - resp.insert("X-Offer-Compression", "lz4"); + http_response_type resp; + resp.result(boost::beast::http::status::switching_protocols); + resp.version(request_.version()); + resp.insert("Connection", "Upgrade"); + resp.insert("Upgrade", to_string(protocol_)); + resp.insert("Connect-As", "Peer"); + resp.insert("Server", BuildInfo::getFullVersionString()); + resp.insert( + "Crawl", + overlay_.peerFinder().config().peerPrivate ? "private" : "public"); - buildHandshake( - resp, - sharedValue, - overlay_.setup().networkID, - overlay_.setup().public_ip, - remote_ip, - app_); + if (request_["X-Offer-Compression"] == "lz4" && + app_.config().COMPRESSION) + resp.insert("X-Offer-Compression", "lz4"); - return resp; -} + buildHandshake( + resp, + *sharedValue, + overlay_.setup().networkID, + overlay_.setup().public_ip, + remote_address_.address(), + app_); -// Called repeatedly to send the bytes in the response -void -PeerImp::onWriteResponse(error_code ec, std::size_t bytes_transferred) -{ - if (!socket_.is_open()) - return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec) - return fail("onWriteResponse", ec); - if (auto stream = journal_.trace()) - { - if (bytes_transferred > 0) - stream << "onWriteResponse: " << bytes_transferred << " bytes"; - else - stream << "onWriteResponse"; - } + boost::beast::ostream(*buf) << resp; - write_buffer_.consume(bytes_transferred); - if (write_buffer_.size() == 0) - return doProtocolStart(); + return buf; + }(); - stream_.async_write_some( - write_buffer_.data(), - bind_executor( - strand_, - std::bind( - &PeerImp::onWriteResponse, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); + // Write the whole buffer and only start protocol when that's done. + boost::asio::async_write( + stream_, + write_buffer->data(), + boost::asio::transfer_all(), + [this, write_buffer, self = shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + if (!socket_.is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; + if (ec) + return fail("onWriteResponse", ec); + if (write_buffer->size() == bytes_transferred) + return doProtocolStart(); + return fail("Failed to write header"); + }); } std::string @@ -860,30 +834,15 @@ PeerImp::doProtocolStart() << "Sending validator list for " << strHex(pubKey) << " with sequence " << sequence << " to " << remote_address_.to_string() << " (" << id_ << ")"; - auto m = std::make_shared(vl, protocol::mtVALIDATORLIST); - send(m); + send(std::make_shared(vl, protocol::mtVALIDATORLIST)); // Don't send it next time. app_.getHashRouter().addSuppressionPeer(hash, id_); setPublisherListSequence(pubKey, sequence); }); } - protocol::TMManifests tm; - - app_.validatorManifests().for_each_manifest( - [&tm](std::size_t s) { tm.mutable_list()->Reserve(s); }, - [&tm, &hr = app_.getHashRouter()](Manifest const& manifest) { - auto const& s = manifest.serialized; - auto& tm_e = *tm.add_list(); - tm_e.set_stobject(s.data(), s.size()); - hr.addSuppression(manifest.hash()); - }); - - if (tm.list_size() > 0) - { - auto m = std::make_shared(tm, protocol::mtMANIFESTS); + if (auto m = overlay_.getManifestsMessage()) send(m); - } } // Called repeatedly with protocol message data @@ -913,11 +872,13 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) read_buffer_.commit(bytes_transferred); + auto hint = Tuning::readBufferBytes; + while (read_buffer_.size() > 0) { std::size_t bytes_consumed; std::tie(bytes_consumed, ec) = - invokeProtocolMessage(read_buffer_.data(), *this); + invokeProtocolMessage(read_buffer_.data(), *this, hint); if (ec) return fail("onReadMessage", ec); if (!socket_.is_open()) @@ -928,9 +889,10 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred) break; read_buffer_.consume(bytes_consumed); } + // Timeout on writes only stream_.async_read_some( - read_buffer_.prepare(Tuning::readBufferBytes), + read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)), bind_executor( strand_, std::bind( diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index e68d3b98a..77baca9e2 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -57,8 +57,6 @@ public: RangeSet shardIndexes; }; - using ptr = std::shared_ptr; - private: using clock_type = std::chrono::steady_clock; using error_code = boost::system::error_code; @@ -156,7 +154,6 @@ private: http_request_type request_; http_response_type response_; boost::beast::http::fields const& headers_; - boost::beast::multi_buffer write_buffer_; std::queue> send_queue_; bool gracefulClose_ = false; int large_sendq_ = 0; @@ -435,16 +432,6 @@ private: void doAccept(); - http_response_type - makeResponse( - bool crawl, - http_request_type const& req, - beast::IP::Address remote_ip, - uint256 const& sharedValue); - - void - onWriteResponse(error_code ec, std::size_t bytes_transferred); - std::string name() const; diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index eb7dd18f0..f5114a245 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -258,11 +258,19 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler) If there is insufficient data to produce a complete protocol message, zero is returned for the number of bytes consumed. + @param buffers The buffer that contains the data we've received + @param handler The handler that will be used to process the message + @param hint If possible, a hint as to the amount of data to read next. The + returned value MAY be zero, which means "no hint" + @return The number of bytes consumed, or the error code if any. */ template std::pair -invokeProtocolMessage(Buffers const& buffers, Handler& handler) +invokeProtocolMessage( + Buffers const& buffers, + Handler& handler, + std::size_t& hint) { std::pair result = {0, {}}; @@ -303,7 +311,10 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler) // We don't have the whole message yet. This isn't an error but we have // nothing to do. if (header->total_wire_size > size) + { + hint = header->total_wire_size - size; return result; + } bool success; diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index 85a4465e3..4d3467ae4 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -27,9 +27,6 @@ namespace ripple { namespace Tuning { enum { - /** Size of buffer used to read from the socket. */ - readBufferBytes = 4096, - /** How many ledgers off a server can be and we will still consider it converged */ convergedLedgerLimit = 24, @@ -59,6 +56,9 @@ enum { checkIdlePeers = 4, }; +/** Size of buffer used to read from the socket. */ +std::size_t constexpr readBufferBytes = 16384; + } // namespace Tuning } // namespace ripple