1#ifndef XRPL_SERVER_BASEHTTPPEER_H_INCLUDED
2#define XRPL_SERVER_BASEHTTPPEER_H_INCLUDED
4#include <xrpl/basics/Log.h>
5#include <xrpl/beast/net/IPAddressConversion.h>
6#include <xrpl/beast/utility/instrumentation.h>
7#include <xrpl/server/Session.h>
8#include <xrpl/server/detail/Spawn.h>
9#include <xrpl/server/detail/io_list.h>
11#include <boost/asio/ip/tcp.hpp>
12#include <boost/asio/spawn.hpp>
13#include <boost/asio/ssl/stream.hpp>
14#include <boost/asio/strand.hpp>
15#include <boost/asio/streambuf.hpp>
16#include <boost/beast/core/stream_traits.hpp>
17#include <boost/beast/http/dynamic_body.hpp>
18#include <boost/beast/http/message.hpp>
19#include <boost/beast/http/parser.hpp>
20#include <boost/beast/http/read.hpp>
32template <
class Handler,
class Impl>
65 boost::asio::executor_work_guard<boost::asio::executor>
work_;
66 boost::asio::strand<boost::asio::executor>
strand_;
80 boost::system::error_code
ec_;
89 template <
class ConstBufferSequence>
93 boost::asio::executor
const& executor,
96 ConstBufferSequence
const& buffers);
113 return *
static_cast<Impl*
>(
this);
190template <
class Handler,
class Impl>
191template <
class ConstBufferSequence>
195 boost::asio::executor
const& executor,
198 ConstBufferSequence
const& buffers)
201 , work_(
boost::asio::make_work_guard(executor))
202 , strand_(
boost::asio::make_strand(executor))
203 , remote_address_(remote_address)
206 read_buf_.commit(boost::asio::buffer_copy(
207 read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers));
214template <
class Handler,
class Impl>
217 handler_.onClose(session(), ec_);
218 JLOG(journal_.trace()) << id_ <<
"destroyed: " << request_count_
219 << ((request_count_ == 1) ?
" request"
223template <
class Handler,
class Impl>
227 if (!strand_.running_in_this_thread())
232 impl().shared_from_this()));
233 boost::beast::get_lowest_layer(impl().stream_).close();
238template <
class Handler,
class Impl>
242 if (!ec_ && ec != boost::asio::error::operation_aborted)
245 JLOG(journal_.trace())
246 << id_ <<
std::string(what) <<
": " << ec.message();
247 boost::beast::get_lowest_layer(impl().stream_).close();
251template <
class Handler,
class Impl>
255 boost::beast::get_lowest_layer(impl().stream_)
257 remote_address_.address().is_loopback() ? timeoutSecondsLocal
262template <
class Handler,
class Impl>
266 boost::beast::get_lowest_layer(impl().stream_).expires_never();
270template <
class Handler,
class Impl>
275 boost::system::errc::make_error_code(boost::system::errc::timed_out);
281template <
class Handler,
class Impl>
288 boost::beast::http::async_read(
289 impl().stream_, read_buf_, message_, do_yield[ec]);
291 if (ec == boost::beast::http::error::end_of_stream)
293 if (ec == boost::beast::error::timeout)
296 return fail(ec,
"http::read");
302template <
class Handler,
class Impl>
309 if (ec == boost::beast::error::timeout)
312 return fail(ec,
"write");
313 bytes_out_ += bytes_transferred;
317 wq2_.reserve(wq_.size());
324 for (
auto const& b : wq2_)
327 return boost::asio::async_write(
334 impl().shared_from_this(),
335 std::placeholders::_1,
336 std::placeholders::_2)));
346 impl().shared_from_this(),
347 std::placeholders::_1));
350template <
class Handler,
class Impl>
359 auto const p = impl().shared_from_this();
368 std::placeholders::_1));
374 if (!writer->prepare(bufferSize, resume))
377 auto const bytes_transferred = boost::asio::async_write(
380 boost::asio::transfer_at_least(1),
383 return fail(ec,
"writer");
384 writer->consume(bytes_transferred);
385 if (writer->complete())
396 impl().shared_from_this(),
397 std::placeholders::_1));
403template <
class Handler,
class Impl>
411 wq_.emplace_back(buf, bytes);
412 return wq_.size() == 1 && wq2_.size() == 0;
415 if (!strand_.running_in_this_thread())
420 impl().shared_from_this(),
428template <
class Handler,
class Impl>
438 impl().shared_from_this(),
441 std::placeholders::_1));
446template <
class Handler,
class Impl>
450 return impl().shared_from_this();
455template <
class Handler,
class Impl>
459 if (!strand_.running_in_this_thread())
464 impl().shared_from_this()));
471 if (!wq_.empty() && !wq2_.empty())
480 impl().shared_from_this(),
481 std::placeholders::_1));
486template <
class Handler,
class Impl>
490 if (!strand_.running_in_this_thread())
496 impl().shared_from_this(),
505 if (!wq_.empty() || !wq2_.empty())
511 boost::beast::get_lowest_layer(impl().stream_).close();
A version-independent IP address and port combination.
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
Represents an active connection.
BaseHTTPPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, beast::Journal journal, endpoint_type remote_address, ConstBufferSequence const &buffers)
boost::asio::yield_context yield_context
void write(void const *buffer, std::size_t bytes) override
boost::system::error_code error_code
Port const & port() override
Returns the Port settings for this connection.
http_request_type message_
std::vector< buffer > wq2_
boost::asio::strand< boost::asio::executor > strand_
void on_write(error_code const &ec, std::size_t bytes_transferred)
void write(std::shared_ptr< Writer > const &writer, bool keep_alive) override
virtual void do_close()=0
beast::Journal const journal_
endpoint_type remote_address_
boost::asio::ip::tcp::endpoint endpoint_type
virtual void do_request()=0
boost::asio::streambuf read_buf_
std::vector< buffer > wq_
boost::system::error_code ec_
beast::IP::Endpoint remoteAddress() override
Returns the remote address of the connection.
void close(bool graceful) override
Close the session.
std::shared_ptr< Session > detach() override
Detach the session.
http_request_type & request() override
Returns the current HTTP request.
void complete() override
Indicate that the response is complete.
void do_writer(std::shared_ptr< Writer > const &writer, bool keep_alive, yield_context do_yield)
void fail(error_code ec, char const *what)
void do_read(yield_context do_yield)
boost::asio::executor_work_guard< boost::asio::executor > work_
beast::Journal journal() override
Returns the Journal to use for logging.
Persistent state information for a connection session.
T emplace_back(T... args)
void spawn(Ctx &&ctx, F &&func)
Spawns a coroutine using boost::asio::spawn
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
buffer(void const *ptr, std::size_t len)
std::unique_ptr< char[]> data
Configuration information for a Server listening port.