From 29a48490240534ae985adea9fed4007ed4570414 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 7 Mar 2016 16:29:45 -0500 Subject: [PATCH] Improve socket writes in BaseHTTPPeer --- src/ripple/server/impl/BaseHTTPPeer.h | 104 ++++++++++++-------------- 1 file changed, 49 insertions(+), 55 deletions(-) diff --git a/src/ripple/server/impl/BaseHTTPPeer.h b/src/ripple/server/impl/BaseHTTPPeer.h index 490a4ab582..5183d4a312 100644 --- a/src/ripple/server/impl/BaseHTTPPeer.h +++ b/src/ripple/server/impl/BaseHTTPPeer.h @@ -38,10 +38,10 @@ #include #include #include -#include #include #include #include +#include namespace ripple { @@ -97,7 +97,8 @@ protected: boost::asio::streambuf read_buf_; beast::http::message message_; beast::http::body body_; - std::list write_queue_; + std::vector wq_; + std::vector wq2_; std::mutex mutex_; bool graceful_ = false; bool complete_ = false; @@ -150,7 +151,8 @@ protected: do_read (yield_context yield); void - do_write (yield_context yield); + on_write(error_code const& ec, + std::size_t bytes_transferred); void do_writer (std::shared_ptr const& writer, @@ -379,54 +381,41 @@ BaseHTTPPeer::do_read (yield_context yield) // Send everything in the write queue. // The write queue must not be empty upon entry. -template +template void -BaseHTTPPeer::do_write (yield_context yield) +BaseHTTPPeer::on_write(error_code const& ec, + std::size_t bytes_transferred) { - error_code ec; - std::size_t bytes = 0; - for(;;) + cancel_timer(); + if(ec) + return fail(ec, "write"); + bytes_out_ += bytes_transferred; { - bytes_out_ += bytes; - void const* data; - { - std::lock_guard lock (mutex_); - assert(! write_queue_.empty()); - buffer& b1 = write_queue_.front(); - b1.used += bytes; - if (b1.used >= b1.bytes) - { - write_queue_.pop_front(); - if (write_queue_.empty()) - break; - buffer& b2 = write_queue_.front(); - data = b2.data.get(); - bytes = b2.bytes; - } - else - { - data = b1.data.get() + b1.used; - bytes = b1.bytes - b1.used; - } - } - - start_timer(); - bytes = boost::asio::async_write (impl().stream_, - boost::asio::buffer (data, bytes), - boost::asio::transfer_at_least(1), yield[ec]); - cancel_timer(); - if (ec) - return fail (ec, "write"); + std::lock_guard lock(mutex_); + wq2_.clear(); + wq2_.reserve(wq_.size()); + std::swap(wq2_, wq_); + } + if(! wq2_.empty()) + { + std::vector v; + v.reserve(wq2_.size()); + for(auto const& b : wq2_) + v.emplace_back(b.data.get(), b.bytes); + start_timer(); + using namespace beast::asio; + return boost::asio::async_write(impl().stream_, v, + strand_.wrap(std::bind(&BaseHTTPPeer::on_write, + impl().shared_from_this(), placeholders::error, + placeholders::bytes_transferred))); } - if (! complete_) return; - if (graceful_) return do_close(); - - boost::asio::spawn (strand_, std::bind (&BaseHTTPPeer::do_read, - impl().shared_from_this(), std::placeholders::_1)); + boost::asio::spawn(strand_, + std::bind (&BaseHTTPPeer::do_read, + impl().shared_from_this(), std::placeholders::_1)); } template @@ -473,21 +462,26 @@ BaseHTTPPeer::do_writer (std::shared_ptr const& writer, // Send a copy of the data. template void -BaseHTTPPeer::write (void const* buffer, std::size_t bytes) +BaseHTTPPeer::write( + void const* buffer, std::size_t bytes) { if (bytes == 0) return; - - bool empty; + if([&] + { + std::lock_guard lock(mutex_); + wq_.emplace_back(buffer, bytes); + return wq_.size() == 1 && wq2_.size() == 0; + }()) { - std::lock_guard lock (mutex_); - empty = write_queue_.empty(); - write_queue_.emplace_back (buffer, bytes); + if (strand_.running_in_this_thread()) + return strand_.post(std::bind( + &BaseHTTPPeer::on_write, + impl().shared_from_this(), + error_code{}, 0)); + else + return on_write(error_code{}, 0); } - - if (empty) - boost::asio::spawn (strand_, std::bind (&BaseHTTPPeer::do_write, - impl().shared_from_this(), std::placeholders::_1)); } template @@ -524,7 +518,7 @@ BaseHTTPPeer::complete() { std::lock_guard lock(mutex_); - if (! write_queue_.empty()) + if (! wq_.empty() && ! wq2_.empty()) return; } @@ -550,7 +544,7 @@ BaseHTTPPeer::close (bool graceful) graceful_ = true; { std::lock_guard lock(mutex_); - if (! write_queue_.empty()) + if (! wq_.empty() || ! wq2_.empty()) return; } return do_close();