Improve socket writes in BaseHTTPPeer

This commit is contained in:
Vinnie Falco
2016-03-07 16:29:45 -05:00
parent 0a1731c4c9
commit 29a4849024

View File

@@ -38,10 +38,10 @@
#include <cassert>
#include <chrono>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <type_traits>
#include <vector>
namespace ripple {
@@ -97,7 +97,8 @@ protected:
boost::asio::streambuf read_buf_;
beast::http::message message_;
beast::http::body body_;
std::list <buffer> write_queue_;
std::vector<buffer> wq_;
std::vector<buffer> wq2_;
std::mutex mutex_;
bool graceful_ = false;
bool complete_ = false;
@@ -150,7 +151,8 @@ protected:
do_read (yield_context yield);
void
do_write (yield_context yield);
on_write(error_code const& ec,
std::size_t bytes_transferred);
void
do_writer (std::shared_ptr <Writer> const& writer,
@@ -379,54 +381,41 @@ BaseHTTPPeer<Impl>::do_read (yield_context yield)
// Send everything in the write queue.
// The write queue must not be empty upon entry.
template <class Impl>
template<class Impl>
void
BaseHTTPPeer<Impl>::do_write (yield_context yield)
BaseHTTPPeer<Impl>::on_write(error_code const& ec,
std::size_t bytes_transferred)
{
error_code ec;
std::size_t bytes = 0;
for(;;)
cancel_timer();
if(ec)
return fail(ec, "write");
bytes_out_ += bytes_transferred;
{
bytes_out_ += bytes;
void const* data;
{
std::lock_guard <std::mutex> lock (mutex_);
assert(! write_queue_.empty());
buffer& b1 = write_queue_.front();
b1.used += bytes;
if (b1.used >= b1.bytes)
{
write_queue_.pop_front();
if (write_queue_.empty())
break;
buffer& b2 = write_queue_.front();
data = b2.data.get();
bytes = b2.bytes;
}
else
{
data = b1.data.get() + b1.used;
bytes = b1.bytes - b1.used;
}
}
start_timer();
bytes = boost::asio::async_write (impl().stream_,
boost::asio::buffer (data, bytes),
boost::asio::transfer_at_least(1), yield[ec]);
cancel_timer();
if (ec)
return fail (ec, "write");
std::lock_guard<std::mutex> lock(mutex_);
wq2_.clear();
wq2_.reserve(wq_.size());
std::swap(wq2_, wq_);
}
if(! wq2_.empty())
{
std::vector<boost::asio::const_buffer> v;
v.reserve(wq2_.size());
for(auto const& b : wq2_)
v.emplace_back(b.data.get(), b.bytes);
start_timer();
using namespace beast::asio;
return boost::asio::async_write(impl().stream_, v,
strand_.wrap(std::bind(&BaseHTTPPeer::on_write,
impl().shared_from_this(), placeholders::error,
placeholders::bytes_transferred)));
}
if (! complete_)
return;
if (graceful_)
return do_close();
boost::asio::spawn (strand_, std::bind (&BaseHTTPPeer<Impl>::do_read,
impl().shared_from_this(), std::placeholders::_1));
boost::asio::spawn(strand_,
std::bind (&BaseHTTPPeer<Impl>::do_read,
impl().shared_from_this(), std::placeholders::_1));
}
template <class Impl>
@@ -473,21 +462,26 @@ BaseHTTPPeer<Impl>::do_writer (std::shared_ptr <Writer> const& writer,
// Send a copy of the data.
template <class Impl>
void
BaseHTTPPeer<Impl>::write (void const* buffer, std::size_t bytes)
BaseHTTPPeer<Impl>::write(
void const* buffer, std::size_t bytes)
{
if (bytes == 0)
return;
bool empty;
if([&]
{
std::lock_guard<std::mutex> lock(mutex_);
wq_.emplace_back(buffer, bytes);
return wq_.size() == 1 && wq2_.size() == 0;
}())
{
std::lock_guard <std::mutex> lock (mutex_);
empty = write_queue_.empty();
write_queue_.emplace_back (buffer, bytes);
if (strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseHTTPPeer::on_write,
impl().shared_from_this(),
error_code{}, 0));
else
return on_write(error_code{}, 0);
}
if (empty)
boost::asio::spawn (strand_, std::bind (&BaseHTTPPeer<Impl>::do_write,
impl().shared_from_this(), std::placeholders::_1));
}
template <class Impl>
@@ -524,7 +518,7 @@ BaseHTTPPeer<Impl>::complete()
{
std::lock_guard<std::mutex> lock(mutex_);
if (! write_queue_.empty())
if (! wq_.empty() && ! wq2_.empty())
return;
}
@@ -550,7 +544,7 @@ BaseHTTPPeer<Impl>::close (bool graceful)
graceful_ = true;
{
std::lock_guard<std::mutex> lock(mutex_);
if (! write_queue_.empty())
if (! wq_.empty() || ! wq2_.empty())
return;
}
return do_close();