#ifndef XRPL_SERVER_BASEHTTPPEER_H_INCLUDED #define XRPL_SERVER_BASEHTTPPEER_H_INCLUDED #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace ripple { /** Represents an active connection. */ template class BaseHTTPPeer : public io_list::work, public Session { protected: using clock_type = std::chrono::system_clock; using error_code = boost::system::error_code; using endpoint_type = boost::asio::ip::tcp::endpoint; using yield_context = boost::asio::yield_context; enum { // Size of our read/write buffer bufferSize = 4 * 1024, // Max seconds without completing a message timeoutSeconds = 30, timeoutSecondsLocal = 3 // used for localhost clients }; struct buffer { buffer(void const* ptr, std::size_t len) : data(new char[len]), bytes(len), used(0) { memcpy(data.get(), ptr, len); } std::unique_ptr data; std::size_t bytes; std::size_t used; }; Port const& port_; Handler& handler_; boost::asio::executor_work_guard work_; boost::asio::strand strand_; endpoint_type remote_address_; beast::Journal const journal_; std::string id_; std::size_t nid_; boost::asio::streambuf read_buf_; http_request_type message_; std::vector wq_; std::vector wq2_; std::mutex mutex_; bool graceful_ = false; bool complete_ = false; boost::system::error_code ec_; int request_count_ = 0; std::size_t bytes_in_ = 0; std::size_t bytes_out_ = 0; //-------------------------------------------------------------------------- public: template BaseHTTPPeer( Port const& port, Handler& handler, boost::asio::executor const& executor, beast::Journal journal, endpoint_type remote_address, ConstBufferSequence const& buffers); virtual ~BaseHTTPPeer(); Session& session() { return *this; } void close() override; protected: Impl& impl() { return *static_cast(this); } void fail(error_code ec, char const* what); void start_timer(); void cancel_timer(); void on_timer(); void do_read(yield_context do_yield); void on_write(error_code const& ec, std::size_t bytes_transferred); void do_writer( std::shared_ptr const& writer, bool keep_alive, yield_context do_yield); virtual void do_request() = 0; virtual void do_close() = 0; // Session beast::Journal journal() override { return journal_; } Port const& port() override { return port_; } beast::IP::Endpoint remoteAddress() override { return beast::IPAddressConversion::from_asio(remote_address_); } http_request_type& request() override { return message_; } void write(void const* buffer, std::size_t bytes) override; void write(std::shared_ptr const& writer, bool keep_alive) override; std::shared_ptr detach() override; void complete() override; void close(bool graceful) override; }; //------------------------------------------------------------------------------ template template BaseHTTPPeer::BaseHTTPPeer( Port const& port, Handler& handler, boost::asio::executor const& executor, beast::Journal journal, endpoint_type remote_address, ConstBufferSequence const& buffers) : port_(port) , handler_(handler) , work_(boost::asio::make_work_guard(executor)) , strand_(boost::asio::make_strand(executor)) , remote_address_(remote_address) , journal_(journal) { read_buf_.commit(boost::asio::buffer_copy( read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers)); static std::atomic sid; nid_ = ++sid; id_ = std::string("#") + std::to_string(nid_) + " "; JLOG(journal_.trace()) << id_ << "accept: " << remote_address_.address(); } template BaseHTTPPeer::~BaseHTTPPeer() { handler_.onClose(session(), ec_); JLOG(journal_.trace()) << id_ << "destroyed: " << request_count_ << ((request_count_ == 1) ? " request" : " requests"); } template void BaseHTTPPeer::close() { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( (void(BaseHTTPPeer::*)(void)) & BaseHTTPPeer::close, impl().shared_from_this())); boost::beast::get_lowest_layer(impl().stream_).close(); } //------------------------------------------------------------------------------ template void BaseHTTPPeer::fail(error_code ec, char const* what) { if (!ec_ && ec != boost::asio::error::operation_aborted) { ec_ = ec; JLOG(journal_.trace()) << id_ << std::string(what) << ": " << ec.message(); boost::beast::get_lowest_layer(impl().stream_).close(); } } template void BaseHTTPPeer::start_timer() { boost::beast::get_lowest_layer(impl().stream_) .expires_after(std::chrono::seconds( remote_address_.address().is_loopback() ? timeoutSecondsLocal : timeoutSeconds)); } // Convenience for discarding the error code template void BaseHTTPPeer::cancel_timer() { boost::beast::get_lowest_layer(impl().stream_).expires_never(); } // Called when session times out template void BaseHTTPPeer::on_timer() { auto ec = boost::system::errc::make_error_code(boost::system::errc::timed_out); fail(ec, "timer"); } //------------------------------------------------------------------------------ template void BaseHTTPPeer::do_read(yield_context do_yield) { complete_ = false; error_code ec; start_timer(); boost::beast::http::async_read( impl().stream_, read_buf_, message_, do_yield[ec]); cancel_timer(); if (ec == boost::beast::http::error::end_of_stream) return do_close(); if (ec == boost::beast::error::timeout) return on_timer(); if (ec) return fail(ec, "http::read"); do_request(); } // Send everything in the write queue. // The write queue must not be empty upon entry. template void BaseHTTPPeer::on_write( error_code const& ec, std::size_t bytes_transferred) { cancel_timer(); if (ec == boost::beast::error::timeout) return on_timer(); if (ec) return fail(ec, "write"); bytes_out_ += bytes_transferred; { std::lock_guard lock(mutex_); wq2_.clear(); wq2_.reserve(wq_.size()); std::swap(wq2_, wq_); } if (!wq2_.empty()) { std::vector v; v.reserve(wq2_.size()); for (auto const& b : wq2_) v.emplace_back(b.data.get(), b.bytes); start_timer(); return boost::asio::async_write( impl().stream_, v, bind_executor( strand_, std::bind( &BaseHTTPPeer::on_write, impl().shared_from_this(), std::placeholders::_1, std::placeholders::_2))); } if (!complete_) return; if (graceful_) return do_close(); util::spawn( strand_, std::bind( &BaseHTTPPeer::do_read, impl().shared_from_this(), std::placeholders::_1)); } template void BaseHTTPPeer::do_writer( std::shared_ptr const& writer, bool keep_alive, yield_context do_yield) { std::function resume; { auto const p = impl().shared_from_this(); resume = std::function([this, p, writer, keep_alive]() { util::spawn( strand_, std::bind( &BaseHTTPPeer::do_writer, p, writer, keep_alive, std::placeholders::_1)); }); } for (;;) { if (!writer->prepare(bufferSize, resume)) return; error_code ec; auto const bytes_transferred = boost::asio::async_write( impl().stream_, writer->data(), boost::asio::transfer_at_least(1), do_yield[ec]); if (ec) return fail(ec, "writer"); writer->consume(bytes_transferred); if (writer->complete()) break; } if (!keep_alive) return do_close(); util::spawn( strand_, std::bind( &BaseHTTPPeer::do_read, impl().shared_from_this(), std::placeholders::_1)); } //------------------------------------------------------------------------------ // Send a copy of the data. template void BaseHTTPPeer::write(void const* buf, std::size_t bytes) { if (bytes == 0) return; if ([&] { std::lock_guard lock(mutex_); wq_.emplace_back(buf, bytes); return wq_.size() == 1 && wq2_.size() == 0; }()) { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( &BaseHTTPPeer::on_write, impl().shared_from_this(), error_code{}, 0)); else return on_write(error_code{}, 0); } } template void BaseHTTPPeer::write( std::shared_ptr const& writer, bool keep_alive) { util::spawn( strand_, std::bind( &BaseHTTPPeer::do_writer, impl().shared_from_this(), writer, keep_alive, std::placeholders::_1)); } // DEPRECATED // Make the Session asynchronous template std::shared_ptr BaseHTTPPeer::detach() { return impl().shared_from_this(); } // DEPRECATED // Called to indicate the response has been written(but not sent) template void BaseHTTPPeer::complete() { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( &BaseHTTPPeer::complete, impl().shared_from_this())); message_ = {}; complete_ = true; { std::lock_guard lock(mutex_); if (!wq_.empty() && !wq2_.empty()) return; } // keep-alive util::spawn( strand_, std::bind( &BaseHTTPPeer::do_read, impl().shared_from_this(), std::placeholders::_1)); } // DEPRECATED // Called from the Handler to close the session. template void BaseHTTPPeer::close(bool graceful) { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( (void(BaseHTTPPeer::*)(bool)) & BaseHTTPPeer::close, impl().shared_from_this(), graceful)); complete_ = true; if (graceful) { graceful_ = true; { std::lock_guard lock(mutex_); if (!wq_.empty() || !wq2_.empty()) return; } return do_close(); } boost::beast::get_lowest_layer(impl().stream_).close(); } } // namespace ripple #endif