#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace xrpl { /** Represents an active connection. */ template class BaseHTTPPeer : public IOList::Work, public Session { protected: using clock_type = std::chrono::system_clock; using error_code = boost::system::error_code; using endpoint_type = boost::asio::ip::tcp::endpoint; using yield_context = boost::asio::yield_context; static constexpr auto kBufferSize = 4 * 1024; // size of read/write buffer static constexpr auto kTimeoutSeconds = 30; // max seconds without completing a message static constexpr auto kTimeoutSecondsLocal = 3; // used for localhost clients struct Buffer { Buffer(void const* ptr, std::size_t len) : data(new char[len]), bytes(len) { memcpy(data.get(), ptr, len); } std::unique_ptr data; std::size_t bytes; std::size_t used{0}; }; Port const& port_; Handler& handler_; boost::asio::executor_work_guard work_; boost::asio::strand strand_; endpoint_type remoteAddress_; beast::Journal const journal_; std::string id_; std::size_t nid_; boost::asio::streambuf readBuf_; http_request_type message_; std::vector wq_; std::vector wq2_; std::mutex mutex_; bool graceful_ = false; bool complete_ = false; boost::system::error_code ec_; int requestCount_ = 0; std::size_t bytesIn_ = 0; std::size_t bytesOut_ = 0; //-------------------------------------------------------------------------- public: template BaseHTTPPeer( Port const& port, Handler& handler, boost::asio::executor const& executor, beast::Journal journal, endpoint_type remoteAddress, ConstBufferSequence const& buffers); ~BaseHTTPPeer() override; Session& session() { return *this; } void close() override; protected: Impl& impl() { return *static_cast(this); } void fail(error_code ec, char const* what); void startTimer(); void cancelTimer(); void onTimer(); void doRead(yield_context doYield); void onWrite(error_code const& ec, std::size_t bytesTransferred); void doWriter(std::shared_ptr const& writer, bool keepAlive, yield_context doYield); virtual void doRequest() = 0; virtual void doClose() = 0; // Session beast::Journal journal() override { return journal_; } Port const& port() override { return port_; } beast::IP::Endpoint remoteAddress() override { return beast::IPAddressConversion::fromAsio(remoteAddress_); } http_request_type& request() override { return message_; } void write(void const* buffer, std::size_t bytes) override; void write(std::shared_ptr const& writer, bool keepAlive) override; std::shared_ptr detach() override; void complete() override; void close(bool graceful) override; }; //------------------------------------------------------------------------------ template template BaseHTTPPeer::BaseHTTPPeer( Port const& port, Handler& handler, boost::asio::executor const& executor, beast::Journal journal, endpoint_type remoteAddress, ConstBufferSequence const& buffers) : port_(port) , handler_(handler) , work_(boost::asio::make_work_guard(executor)) , strand_(boost::asio::make_strand(executor)) , remoteAddress_(std::move(remoteAddress)) , journal_(journal) { readBuf_.commit( boost::asio::buffer_copy(readBuf_.prepare(boost::asio::buffer_size(buffers)), buffers)); static std::atomic kSid; nid_ = ++kSid; id_ = std::string("#") + std::to_string(nid_) + " "; JLOG(journal_.trace()) << id_ << "accept: " << remoteAddress_.address(); } template BaseHTTPPeer::~BaseHTTPPeer() { handler_.onClose(session(), ec_); JLOG(journal_.trace()) << id_ << "destroyed: " << requestCount_ << ((requestCount_ == 1) ? " request" : " requests"); } template void BaseHTTPPeer::close() { if (!strand_.running_in_this_thread()) { return post( strand_, std::bind( (void (BaseHTTPPeer::*)(void))&BaseHTTPPeer::close, impl().shared_from_this())); } boost::beast::get_lowest_layer(impl().stream_).close(); } //------------------------------------------------------------------------------ template void BaseHTTPPeer::fail(error_code ec, char const* what) { if (!ec_ && ec != boost::asio::error::operation_aborted) { ec_ = ec; JLOG(journal_.trace()) << id_ << std::string(what) << ": " << ec.message(); boost::beast::get_lowest_layer(impl().stream_).close(); } } template void BaseHTTPPeer::startTimer() { boost::beast::get_lowest_layer(impl().stream_) .expires_after( std::chrono::seconds( remoteAddress_.address().is_loopback() ? kTimeoutSecondsLocal : kTimeoutSeconds)); } // Convenience for discarding the error code template void BaseHTTPPeer::cancelTimer() { boost::beast::get_lowest_layer(impl().stream_).expires_never(); } // Called when session times out template void BaseHTTPPeer::onTimer() { auto ec = boost::system::errc::make_error_code(boost::system::errc::timed_out); fail(ec, "timer"); } //------------------------------------------------------------------------------ template void BaseHTTPPeer::doRead(yield_context doYield) { complete_ = false; error_code ec; startTimer(); boost::beast::http::async_read(impl().stream_, readBuf_, message_, doYield[ec]); cancelTimer(); if (ec == boost::beast::http::error::end_of_stream) return doClose(); if (ec == boost::beast::error::timeout) return onTimer(); if (ec) return fail(ec, "http::read"); doRequest(); } // Send everything in the write queue. // The write queue must not be empty upon entry. template void BaseHTTPPeer::onWrite(error_code const& ec, std::size_t bytesTransferred) { cancelTimer(); if (ec == boost::beast::error::timeout) return onTimer(); if (ec) return fail(ec, "write"); bytesOut_ += bytesTransferred; { std::scoped_lock const lock(mutex_); wq2_.clear(); wq2_.reserve(wq_.size()); std::swap(wq2_, wq_); } if (!wq2_.empty()) { std::vector v; v.reserve(wq2_.size()); for (auto const& b : wq2_) v.emplace_back(b.data.get(), b.bytes); startTimer(); return boost::asio::async_write( impl().stream_, v, bind_executor( strand_, std::bind( &BaseHTTPPeer::onWrite, impl().shared_from_this(), std::placeholders::_1, std::placeholders::_2))); } if (!complete_) return; if (graceful_) return doClose(); util::spawn( strand_, std::bind( &BaseHTTPPeer::doRead, impl().shared_from_this(), std::placeholders::_1)); } template void BaseHTTPPeer::doWriter( std::shared_ptr const& writer, bool keepAlive, yield_context doYield) { std::function resume; { auto const p = impl().shared_from_this(); resume = std::function([this, p, writer, keepAlive]() { util::spawn( strand_, std::bind( &BaseHTTPPeer::doWriter, p, writer, keepAlive, std::placeholders::_1)); }); } for (;;) { if (!writer->prepare(kBufferSize, resume)) return; error_code ec; auto const bytesTransferred = boost::asio::async_write( impl().stream_, writer->data(), boost::asio::transfer_at_least(1), doYield[ec]); if (ec) return fail(ec, "writer"); writer->consume(bytesTransferred); if (writer->complete()) break; } if (!keepAlive) return doClose(); util::spawn( strand_, std::bind( &BaseHTTPPeer::doRead, impl().shared_from_this(), std::placeholders::_1)); } //------------------------------------------------------------------------------ // Send a copy of the data. template void BaseHTTPPeer::write(void const* buf, std::size_t bytes) { if (bytes == 0) return; if ([&] { std::scoped_lock const lock(mutex_); wq_.emplace_back(buf, bytes); return wq_.size() == 1 && wq2_.size() == 0; }()) { if (!strand_.running_in_this_thread()) { return post( strand_, std::bind(&BaseHTTPPeer::onWrite, impl().shared_from_this(), error_code{}, 0)); } return onWrite(error_code{}, 0); } } template void BaseHTTPPeer::write(std::shared_ptr const& writer, bool keepAlive) { util::spawn( strand_, std::bind( &BaseHTTPPeer::doWriter, impl().shared_from_this(), writer, keepAlive, std::placeholders::_1)); } // DEPRECATED // Make the Session asynchronous template std::shared_ptr BaseHTTPPeer::detach() { return impl().shared_from_this(); } // DEPRECATED // Called to indicate the response has been written(but not sent) template void BaseHTTPPeer::complete() { if (!strand_.running_in_this_thread()) { return post( strand_, std::bind(&BaseHTTPPeer::complete, impl().shared_from_this())); } message_ = {}; complete_ = true; { std::scoped_lock const lock(mutex_); if (!wq_.empty() && !wq2_.empty()) return; } // keep-alive util::spawn( strand_, std::bind( &BaseHTTPPeer::doRead, impl().shared_from_this(), std::placeholders::_1)); } // DEPRECATED // Called from the Handler to close the session. template void BaseHTTPPeer::close(bool graceful) { if (!strand_.running_in_this_thread()) { return post( strand_, std::bind( (void (BaseHTTPPeer::*)(bool))&BaseHTTPPeer::close, impl().shared_from_this(), graceful)); } complete_ = true; if (graceful) { graceful_ = true; { std::scoped_lock const lock(mutex_); if (!wq_.empty() || !wq2_.empty()) return; } return doClose(); } boost::beast::get_lowest_layer(impl().stream_).close(); } } // namespace xrpl