Merge develop into confidential MPT

This commit is contained in:
Peter Chen
2026-06-02 13:01:46 -04:00
parent dacd108657
commit 7471bfb182
1051 changed files with 27649 additions and 16775 deletions

View File

@@ -21,7 +21,7 @@ struct Handoff
bool moved = false;
// If response is set, this determines the keep alive
bool keep_alive = false;
bool keepAlive = false;
// When set, this will be sent back
std::shared_ptr<Writer> response;

View File

@@ -63,7 +63,7 @@ public:
static std::uint32_t
getLoadBase()
{
return kLFT_NORMAL_FEE;
return kLftNormalFee;
}
std::uint32_t
@@ -100,29 +100,29 @@ public:
isLoadedLocal() const
{
std::scoped_lock const sl(lock_);
return (raiseCount_ != 0) || (localTxnLoadFee_ != kLFT_NORMAL_FEE);
return (raiseCount_ != 0) || (localTxnLoadFee_ != kLftNormalFee);
}
bool
isLoadedCluster() const
{
std::scoped_lock const sl(lock_);
return (raiseCount_ != 0) || (localTxnLoadFee_ != kLFT_NORMAL_FEE) ||
(clusterTxnLoadFee_ != kLFT_NORMAL_FEE);
return (raiseCount_ != 0) || (localTxnLoadFee_ != kLftNormalFee) ||
(clusterTxnLoadFee_ != kLftNormalFee);
}
private:
static std::uint32_t constexpr kLFT_NORMAL_FEE = 256; // 256 is the minimum/normal load factor
static std::uint32_t constexpr kLFT_FEE_INC_FRACTION = 4; // increase fee by 1/4
static std::uint32_t constexpr kLFT_FEE_DEC_FRACTION = 4; // decrease fee by 1/4
static std::uint32_t constexpr kLFT_FEE_MAX = kLFT_NORMAL_FEE * 1000000;
static constexpr std::uint32_t kLftNormalFee = 256; // 256 is the minimum/normal load factor
static constexpr std::uint32_t kLftFeeIncFraction = 4; // increase fee by 1/4
static constexpr std::uint32_t kLftFeeDecFraction = 4; // decrease fee by 1/4
static constexpr std::uint32_t kLftFeeMax = kLftNormalFee * 1000000;
beast::Journal const j_;
std::mutex mutable lock_;
std::uint32_t localTxnLoadFee_{kLFT_NORMAL_FEE}; // Scale factor, lftNormalFee = normal fee
std::uint32_t remoteTxnLoadFee_{kLFT_NORMAL_FEE}; // Scale factor, lftNormalFee = normal fee
std::uint32_t clusterTxnLoadFee_{kLFT_NORMAL_FEE}; // Scale factor, lftNormalFee = normal fee
std::uint32_t localTxnLoadFee_{kLftNormalFee}; // Scale factor, lftNormalFee = normal fee
std::uint32_t remoteTxnLoadFee_{kLftNormalFee}; // Scale factor, lftNormalFee = normal fee
std::uint32_t clusterTxnLoadFee_{kLftNormalFee}; // Scale factor, lftNormalFee = normal fee
std::uint32_t raiseCount_{0};
};

View File

@@ -30,19 +30,19 @@ struct Port
boost::asio::ip::address ip;
std::uint16_t port = 0;
std::set<std::string, boost::beast::iless> protocol;
std::vector<boost::asio::ip::network_v4> admin_nets_v4;
std::vector<boost::asio::ip::network_v6> admin_nets_v6;
std::vector<boost::asio::ip::network_v4> secure_gateway_nets_v4;
std::vector<boost::asio::ip::network_v6> secure_gateway_nets_v6;
std::vector<boost::asio::ip::network_v4> adminNetsV4;
std::vector<boost::asio::ip::network_v6> adminNetsV6;
std::vector<boost::asio::ip::network_v4> secureGatewayNetsV4;
std::vector<boost::asio::ip::network_v6> secureGatewayNetsV6;
std::string user;
std::string password;
std::string admin_user;
std::string admin_password;
std::string ssl_key;
std::string ssl_cert;
std::string ssl_chain;
std::string ssl_ciphers;
boost::beast::websocket::permessage_deflate pmd_options;
std::string adminUser;
std::string adminPassword;
std::string sslKey;
std::string sslCert;
std::string sslChain;
std::string sslCiphers;
boost::beast::websocket::permessage_deflate pmdOptions;
std::shared_ptr<boost::asio::ssl::context> context;
// How many incoming connections are allowed on this
@@ -50,7 +50,7 @@ struct Port
int limit = 0;
// Websocket disconnects if send queue exceeds this limit
std::uint16_t ws_queue_limit{};
std::uint16_t wsQueueLimit{};
// Returns `true` if any websocket protocols are specified
[[nodiscard]] bool
@@ -78,22 +78,22 @@ struct ParsedPort
std::set<std::string, boost::beast::iless> protocol;
std::string user;
std::string password;
std::string admin_user;
std::string admin_password;
std::string ssl_key;
std::string ssl_cert;
std::string ssl_chain;
std::string ssl_ciphers;
boost::beast::websocket::permessage_deflate pmd_options;
std::string adminUser;
std::string adminPassword;
std::string sslKey;
std::string sslCert;
std::string sslChain;
std::string sslCiphers;
boost::beast::websocket::permessage_deflate pmdOptions;
int limit = 0;
std::uint16_t ws_queue_limit{};
std::uint16_t wsQueueLimit{};
std::optional<boost::asio::ip::address> ip;
std::optional<std::uint16_t> port;
std::vector<boost::asio::ip::network_v4> admin_nets_v4;
std::vector<boost::asio::ip::network_v6> admin_nets_v6;
std::vector<boost::asio::ip::network_v4> secure_gateway_nets_v4;
std::vector<boost::asio::ip::network_v6> secure_gateway_nets_v6;
std::vector<boost::asio::ip::network_v4> adminNetsV4;
std::vector<boost::asio::ip::network_v6> adminNetsV6;
std::vector<boost::asio::ip::network_v4> secureGatewayNetsV4;
std::vector<boost::asio::ip::network_v6> secureGatewayNetsV6;
};
void

View File

@@ -30,7 +30,7 @@ namespace xrpl {
/** Represents an active connection. */
template <class Handler, class Impl>
class BaseHTTPPeer : public IoList::Work, public Session
class BaseHTTPPeer : public IOList::Work, public Session
{
protected:
using clock_type = std::chrono::system_clock;
@@ -38,16 +38,9 @@ protected:
using endpoint_type = boost::asio::ip::tcp::endpoint;
using yield_context = boost::asio::yield_context;
// Need to be named before converting
// NOLINTNEXTLINE(cppcoreguidelines-use-enum-class)
enum {
// Size of our read/write buffer
BufferSize = 4 * 1024,
// Max seconds without completing a message
TimeoutSeconds = 30,
TimeoutSecondsLocal = 3 // used for localhost clients
};
static constexpr auto kBufferSize = 4 * 1024; // size of read/write buffer
static constexpr auto kTimeoutSeconds = 30; // max seconds without completing a message
static constexpr auto kTimeoutSecondsLocal = 3; // used for localhost clients
struct Buffer
{
@@ -65,13 +58,13 @@ protected:
Handler& handler_;
boost::asio::executor_work_guard<boost::asio::executor> work_;
boost::asio::strand<boost::asio::executor> strand_;
endpoint_type remote_address_;
endpoint_type remoteAddress_;
beast::Journal const journal_;
std::string id_;
std::size_t nid_;
boost::asio::streambuf read_buf_;
boost::asio::streambuf readBuf_;
http_request_type message_;
std::vector<Buffer> wq_;
std::vector<Buffer> wq2_;
@@ -80,9 +73,9 @@ protected:
bool complete_ = false;
boost::system::error_code ec_;
int request_count_ = 0;
std::size_t bytes_in_ = 0;
std::size_t bytes_out_ = 0;
int requestCount_ = 0;
std::size_t bytesIn_ = 0;
std::size_t bytesOut_ = 0;
//--------------------------------------------------------------------------
@@ -158,7 +151,7 @@ protected:
beast::IP::Endpoint
remoteAddress() override
{
return beast::IPAddressConversion::fromAsio(remote_address_);
return beast::IPAddressConversion::fromAsio(remoteAddress_);
}
http_request_type&
@@ -198,23 +191,23 @@ BaseHTTPPeer<Handler, Impl>::BaseHTTPPeer(
, handler_(handler)
, work_(boost::asio::make_work_guard(executor))
, strand_(boost::asio::make_strand(executor))
, remote_address_(std::move(remoteAddress))
, remoteAddress_(std::move(remoteAddress))
, journal_(journal)
{
read_buf_.commit(
boost::asio::buffer_copy(read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers));
static std::atomic<int> kSID;
nid_ = ++kSID;
readBuf_.commit(
boost::asio::buffer_copy(readBuf_.prepare(boost::asio::buffer_size(buffers)), buffers));
static std::atomic<int> kSid;
nid_ = ++kSid;
id_ = std::string("#") + std::to_string(nid_) + " ";
JLOG(journal_.trace()) << id_ << "accept: " << remote_address_.address();
JLOG(journal_.trace()) << id_ << "accept: " << remoteAddress_.address();
}
template <class Handler, class Impl>
BaseHTTPPeer<Handler, Impl>::~BaseHTTPPeer()
{
handler_.onClose(session(), ec_);
JLOG(journal_.trace()) << id_ << "destroyed: " << request_count_
<< ((request_count_ == 1) ? " request" : " requests");
JLOG(journal_.trace()) << id_ << "destroyed: " << requestCount_
<< ((requestCount_ == 1) ? " request" : " requests");
}
template <class Handler, class Impl>
@@ -252,7 +245,7 @@ BaseHTTPPeer<Handler, Impl>::startTimer()
boost::beast::get_lowest_layer(impl().stream_)
.expires_after(
std::chrono::seconds(
remote_address_.address().is_loopback() ? TimeoutSecondsLocal : TimeoutSeconds));
remoteAddress_.address().is_loopback() ? kTimeoutSecondsLocal : kTimeoutSeconds));
}
// Convenience for discarding the error code
@@ -281,7 +274,7 @@ BaseHTTPPeer<Handler, Impl>::doRead(yield_context doYield)
complete_ = false;
error_code ec;
startTimer();
boost::beast::http::async_read(impl().stream_, read_buf_, message_, doYield[ec]);
boost::beast::http::async_read(impl().stream_, readBuf_, message_, doYield[ec]);
cancelTimer();
if (ec == boost::beast::http::error::end_of_stream)
return doClose();
@@ -303,7 +296,7 @@ BaseHTTPPeer<Handler, Impl>::onWrite(error_code const& ec, std::size_t bytesTran
return onTimer();
if (ec)
return fail(ec, "write");
bytes_out_ += bytesTransferred;
bytesOut_ += bytesTransferred;
{
std::scoped_lock const lock(mutex_);
wq2_.clear();
@@ -364,7 +357,7 @@ BaseHTTPPeer<Handler, Impl>::doWriter(
for (;;)
{
if (!writer->prepare(BufferSize, resume))
if (!writer->prepare(kBufferSize, resume))
return;
error_code ec;
auto const bytesTransferred = boost::asio::async_write(

View File

@@ -17,7 +17,7 @@ namespace xrpl {
// Common part of all peers
template <class Handler, class Impl>
class BasePeer : public IoList::Work
class BasePeer : public IOList::Work
{
protected:
using clock_type = std::chrono::system_clock;
@@ -27,7 +27,7 @@ protected:
Port const& port_;
Handler& handler_;
endpoint_type remote_address_;
endpoint_type remoteAddress_;
beast::WrappedSink sink_;
beast::Journal const j_;
@@ -65,7 +65,7 @@ BasePeer<Handler, Impl>::BasePeer(
beast::Journal journal)
: port_(port)
, handler_(handler)
, remote_address_(std::move(remoteAddress))
, remoteAddress_(std::move(remoteAddress))
, sink_(
journal.sink(),
[] {

View File

@@ -42,15 +42,15 @@ private:
/// The socket has been closed, or will close after the next write
/// finishes. Do not do any more writes, and don't try to close
/// again.
bool do_close_ = false;
bool doClose_ = false;
boost::beast::websocket::close_reason cr_;
waitable_timer timer_;
bool close_on_timer_ = false;
bool ping_active_ = false;
bool closeOnTimer_ = false;
bool pingActive_ = false;
boost::beast::websocket::ping_data payload_;
error_code ec_;
std::function<void(boost::beast::websocket::frame_type, boost::beast::string_view)>
control_callback_;
controlCallback_;
public:
template <class Body, class Headers>
@@ -85,7 +85,7 @@ public:
[[nodiscard]] boost::asio::ip::tcp::endpoint const&
remoteEndpoint() const override
{
return this->remote_address_;
return this->remoteAddress_;
}
void
@@ -173,14 +173,14 @@ BaseWSPeer<Handler, Impl>::run()
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
impl().ws_.set_option(port().pmd_options);
impl().ws_.set_option(port().pmdOptions);
// Must manage the control callback memory outside of the `control_callback`
// function
control_callback_ =
controlCallback_ =
std::bind(&BaseWSPeer::onPingPong, this, std::placeholders::_1, std::placeholders::_2);
impl().ws_.control_callback(control_callback_);
impl().ws_.control_callback(controlCallback_);
startTimer();
close_on_timer_ = true;
closeOnTimer_ = true;
impl().ws_.set_option(boost::beast::websocket::stream_base::decorator([](auto& res) {
res.set(boost::beast::http::field::server, BuildInfo::getFullVersionString());
}));
@@ -198,9 +198,9 @@ BaseWSPeer<Handler, Impl>::send(std::shared_ptr<WSMsg> w)
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
if (do_close_)
if (doClose_)
return;
if (wq_.size() > port().ws_queue_limit)
if (wq_.size() > port().wsQueueLimit)
{
cr_.code = safeCast<decltype(cr_.code)>(boost::beast::websocket::close_code::policy_error);
cr_.reason = "Policy error: client is too slow.";
@@ -227,9 +227,9 @@ BaseWSPeer<Handler, Impl>::close(boost::beast::websocket::close_reason const& re
{
if (!strand_.running_in_this_thread())
return post(strand_, [self = impl().shared_from_this(), reason] { self->close(reason); });
if (do_close_)
if (doClose_)
return;
do_close_ = true;
doClose_ = true;
if (wq_.empty())
{
impl().ws_.async_close(
@@ -260,7 +260,7 @@ BaseWSPeer<Handler, Impl>::onWsHandshake(error_code const& ec)
{
if (ec)
return fail(ec, "on_ws_handshake");
close_on_timer_ = false;
closeOnTimer_ = false;
doRead();
}
@@ -313,7 +313,7 @@ BaseWSPeer<Handler, Impl>::onWriteFin(error_code const& ec)
if (ec)
return fail(ec, "write_fin");
wq_.pop_front();
if (do_close_)
if (doClose_)
{
impl().ws_.async_close(
cr_,
@@ -368,12 +368,12 @@ void
BaseWSPeer<Handler, Impl>::startTimer()
{
// Max seconds without completing a message
static constexpr std::chrono::seconds kTIMEOUT{30};
static constexpr std::chrono::seconds kTIMEOUT_LOCAL{3};
static constexpr std::chrono::seconds kTimeout{30};
static constexpr std::chrono::seconds kTimeoutLocal{3};
try
{
timer_.expires_after(remoteEndpoint().address().is_loopback() ? kTIMEOUT_LOCAL : kTIMEOUT);
timer_.expires_after(remoteEndpoint().address().is_loopback() ? kTimeoutLocal : kTimeout);
}
catch (boost::system::system_error const& e)
{
@@ -409,7 +409,7 @@ BaseWSPeer<Handler, Impl>::onPing(error_code const& ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
ping_active_ = false;
pingActive_ = false;
if (!ec)
return;
fail(ec, "on_ping");
@@ -426,7 +426,7 @@ BaseWSPeer<Handler, Impl>::onPingPong(
boost::beast::string_view const p(payload_.begin());
if (payload == p)
{
close_on_timer_ = false;
closeOnTimer_ = false;
JLOG(this->j_.trace()) << "got matching pong";
}
else
@@ -444,11 +444,11 @@ BaseWSPeer<Handler, Impl>::onTimer(error_code ec)
return;
if (!ec)
{
if (!close_on_timer_ || !ping_active_)
if (!closeOnTimer_ || !pingActive_)
{
startTimer();
close_on_timer_ = true;
ping_active_ = true;
closeOnTimer_ = true;
pingActive_ = true;
// cryptographic is probably overkill..
beast::rngfill(payload_.begin(), payload_.size(), cryptoPrng());
impl().ws_.async_ping(

View File

@@ -23,7 +23,6 @@
#include <sys/resource.h>
#include <dirent.h>
#include <unistd.h>
#endif
#include <algorithm>
@@ -39,7 +38,7 @@ namespace xrpl {
/** A listening socket. */
template <class Handler>
class Door : public IoList::Work, public std::enable_shared_from_this<Door<Handler>>
class Door : public IOList::Work, public std::enable_shared_from_this<Door<Handler>>
{
private:
using clock_type = std::chrono::steady_clock;
@@ -53,7 +52,7 @@ private:
using stream_type = boost::beast::tcp_stream;
// Detects SSL on a socket
class Detector : public IoList::Work, public std::enable_shared_from_this<Detector>
class Detector : public IOList::Work, public std::enable_shared_from_this<Detector>
{
private:
Port const& port_;
@@ -61,7 +60,7 @@ private:
boost::asio::io_context& ioc_;
stream_type stream_;
socket_type& socket_;
endpoint_type remote_address_;
endpoint_type remoteAddress_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
beast::Journal const j_;
@@ -90,16 +89,19 @@ private:
acceptor_type acceptor_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
bool ssl_{
port_.protocol.count("https") > 0 || port_.protocol.count("wss") > 0 ||
port_.protocol.count("wss2") > 0 || port_.protocol.count("peer") > 0};
port_.protocol.contains("https") || port_.protocol.contains("wss") ||
port_.protocol.contains("wss2") || port_.protocol.contains("peer")};
bool plain_{
port_.protocol.count("http") > 0 || port_.protocol.count("ws") > 0 ||
(port_.protocol.count("ws2") != 0u)};
static constexpr std::chrono::milliseconds kINITIAL_ACCEPT_DELAY{50};
static constexpr std::chrono::milliseconds kMAX_ACCEPT_DELAY{2000};
std::chrono::milliseconds accept_delay_{kINITIAL_ACCEPT_DELAY};
boost::asio::steady_timer backoff_timer_;
static constexpr double kFREE_FD_THRESHOLD = 0.70;
port_.protocol.contains("http") || port_.protocol.contains("ws") ||
(port_.protocol.contains("ws2"))};
static constexpr std::chrono::milliseconds kInitialAcceptDelay{50};
static constexpr std::chrono::milliseconds kMaxAcceptDelay{2000};
std::chrono::milliseconds acceptDelay_{kInitialAcceptDelay};
boost::asio::steady_timer backoffTimer_;
static constexpr std::uint64_t kMaxUsedFdPercent = 70;
static constexpr std::chrono::milliseconds kFdSampleInterval{250};
clock_type::time_point fdSampleAt_;
bool cachedThrottle_{false};
struct FDStats
{
@@ -164,7 +166,7 @@ Door<Handler>::Detector::Detector(
, ioc_(ioc)
, stream_(std::move(stream))
, socket_(stream_.socket())
, remote_address_(std::move(remoteAddress))
, remoteAddress_(std::move(remoteAddress))
, strand_(boost::asio::make_strand(ioc_))
, j_(j)
{
@@ -199,18 +201,18 @@ Door<Handler>::Detector::doDetect(boost::asio::yield_context doYield)
if (ssl)
{
if (auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
port_, handler_, ioc_, j_, remote_address_, buf.data(), std::move(stream_)))
port_, handler_, ioc_, j_, remoteAddress_, buf.data(), std::move(stream_)))
sp->run();
return;
}
if (auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
port_, handler_, ioc_, j_, remote_address_, buf.data(), std::move(stream_)))
port_, handler_, ioc_, j_, remoteAddress_, buf.data(), std::move(stream_)))
sp->run();
return;
}
if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.trace()) << "Error detecting ssl: " << ec.message() << " from " << remote_address_;
JLOG(j_.trace()) << "Error detecting ssl: " << ec.message() << " from " << remoteAddress_;
}
}
@@ -279,7 +281,8 @@ Door<Handler>::Door(
, ioc_(ioContext)
, acceptor_(ioContext)
, strand_(boost::asio::make_strand(ioContext))
, backoff_timer_(ioContext)
, backoffTimer_(ioContext)
, fdSampleAt_(clock_type::now() - kFdSampleInterval)
{
reOpen();
}
@@ -302,7 +305,7 @@ Door<Handler>::close()
return boost::asio::post(
strand_, std::bind(&Door<Handler>::close, this->shared_from_this()));
}
backoff_timer_.cancel();
backoffTimer_.cancel();
error_code ec;
acceptor_.close(ec);
}
@@ -338,11 +341,11 @@ Door<Handler>::doAccept(boost::asio::yield_context doYield)
{
if (shouldThrottleForFds())
{
backoff_timer_.expires_after(accept_delay_);
JLOG(j_.warn()) << "Throttling do_accept for " << acceptDelay_.count() << "ms.";
backoffTimer_.expires_after(acceptDelay_);
boost::system::error_code tec;
backoff_timer_.async_wait(doYield[tec]);
accept_delay_ = std::min(accept_delay_ * 2, kMAX_ACCEPT_DELAY);
JLOG(j_.warn()) << "Throttling do_accept for " << accept_delay_.count() << "ms.";
backoffTimer_.async_wait(doYield[tec]);
acceptDelay_ = std::min(acceptDelay_ * 2, kMaxAcceptDelay);
continue;
}
@@ -359,14 +362,17 @@ Door<Handler>::doAccept(boost::asio::yield_context doYield)
if (ec == boost::asio::error::no_descriptors ||
ec == boost::asio::error::no_buffer_space)
{
JLOG(j_.warn()) << "accept: Too many open files. Pausing for "
<< accept_delay_.count() << "ms.";
char const* const cause = (ec == boost::asio::error::no_descriptors)
? "too many open files"
: "kernel buffer space exhausted";
JLOG(j_.warn()) << "accept: " << cause << ". Pausing for " << acceptDelay_.count()
<< "ms.";
backoff_timer_.expires_after(accept_delay_);
backoffTimer_.expires_after(acceptDelay_);
boost::system::error_code tec;
backoff_timer_.async_wait(doYield[tec]);
backoffTimer_.async_wait(doYield[tec]);
accept_delay_ = std::min(accept_delay_ * 2, kMAX_ACCEPT_DELAY);
acceptDelay_ = std::min(acceptDelay_ * 2, kMaxAcceptDelay);
}
else
{
@@ -375,7 +381,7 @@ Door<Handler>::doAccept(boost::asio::yield_context doYield)
continue;
}
accept_delay_ = kINITIAL_ACCEPT_DELAY;
acceptDelay_ = kInitialAcceptDelay;
if (ssl_ && plain_)
{
@@ -403,11 +409,11 @@ Door<Handler>::queryFdStats() const
return std::nullopt;
s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
#if BOOST_OS_LINUX
constexpr char const* kFD_DIR = "/proc/self/fd";
static constexpr char const* kFdDir = "/proc/self/fd";
#else
constexpr char const* kFD_DIR = "/dev/fd";
static constexpr char const* kFdDir = "/dev/fd";
#endif
if (DIR* d = ::opendir(kFD_DIR))
if (DIR* d = ::opendir(kFdDir))
{
std::uint64_t cnt = 0;
while (::readdir(d) != nullptr)
@@ -428,14 +434,15 @@ Door<Handler>::shouldThrottleForFds()
#if BOOST_OS_WINDOWS
return false;
#else
auto const stats = queryFdStats();
if (!stats || stats->limit == 0)
return false;
auto const now = clock_type::now();
if (now - fdSampleAt_ < kFdSampleInterval)
return cachedThrottle_;
auto const& s = *stats;
auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
double const freeRatio = static_cast<double>(free) / static_cast<double>(s.limit);
return freeRatio < kFREE_FD_THRESHOLD;
fdSampleAt_ = now;
auto const stats = queryFdStats();
cachedThrottle_ =
stats && stats->limit > 0 && stats->used * 100 > stats->limit * kMaxUsedFdPercent;
return cachedThrottle_;
#endif
}

View File

@@ -82,7 +82,7 @@ template <class Handler>
void
PlainHTTPPeer<Handler>::run()
{
if (!this->handler_.onAccept(this->session(), this->remote_address_))
if (!this->handler_.onAccept(this->session(), this->remoteAddress_))
{
util::spawn(this->strand_, std::bind(&PlainHTTPPeer::doClose, this->shared_from_this()));
return;
@@ -103,7 +103,7 @@ PlainHTTPPeer<Handler>::websocketUpgrade()
auto ws = this->ios().template emplace<PlainWSPeer<Handler>>(
this->port_,
this->handler_,
this->remote_address_,
this->remoteAddress_,
std::move(this->message_),
std::move(stream_),
this->journal_);
@@ -114,20 +114,20 @@ template <class Handler>
void
PlainHTTPPeer<Handler>::doRequest()
{
++this->request_count_;
++this->requestCount_;
auto const what =
this->handler_.onHandoff(this->session(), std::move(this->message_), this->remote_address_);
this->handler_.onHandoff(this->session(), std::move(this->message_), this->remoteAddress_);
if (what.moved)
return;
boost::system::error_code ec;
if (what.response)
{
// half-close on Connection: close
if (!what.keep_alive)
if (!what.keepAlive)
socket_.shutdown(socket_type::shutdown_receive, ec);
if (ec)
return this->fail(ec, "request");
return this->write(what.response, what.keep_alive);
return this->write(what.response, what.keepAlive);
}
// Perform half-close when Connection: close and not SSL

View File

@@ -26,7 +26,7 @@ private:
using yield_context = boost::asio::yield_context;
using error_code = boost::system::error_code;
std::unique_ptr<stream_type> stream_ptr_;
std::unique_ptr<stream_type> streamPtr_;
stream_type& stream_;
socket_type& socket_;
@@ -80,8 +80,8 @@ SSLHTTPPeer<Handler>::SSLHTTPPeer(
journal,
remoteAddress,
buffers)
, stream_ptr_(std::make_unique<stream_type>(middle_type(std::move(stream)), *port.context))
, stream_(*stream_ptr_)
, streamPtr_(std::make_unique<stream_type>(middle_type(std::move(stream)), *port.context))
, stream_(*streamPtr_)
, socket_(stream_.next_layer().socket())
{
}
@@ -91,7 +91,7 @@ template <class Handler>
void
SSLHTTPPeer<Handler>::run()
{
if (!this->handler_.onAccept(this->session(), this->remote_address_))
if (!this->handler_.onAccept(this->session(), this->remoteAddress_))
{
util::spawn(this->strand_, std::bind(&SSLHTTPPeer::doClose, this->shared_from_this()));
return;
@@ -110,9 +110,9 @@ SSLHTTPPeer<Handler>::websocketUpgrade()
auto ws = this->ios().template emplace<SSLWSPeer<Handler>>(
this->port_,
this->handler_,
this->remote_address_,
this->remoteAddress_,
std::move(this->message_),
std::move(this->stream_ptr_),
std::move(this->streamPtr_),
this->journal_);
return ws;
}
@@ -124,8 +124,8 @@ SSLHTTPPeer<Handler>::doHandshake(yield_context doYield)
boost::system::error_code ec;
stream_.set_verify_mode(boost::asio::ssl::verify_none);
this->startTimer();
this->read_buf_.consume(
stream_.async_handshake(stream_type::server, this->read_buf_.data(), doYield[ec]));
this->readBuf_.consume(
stream_.async_handshake(stream_type::server, this->readBuf_.data(), doYield[ec]));
this->cancelTimer();
if (ec == boost::beast::error::timeout)
return this->onTimer();
@@ -148,13 +148,13 @@ template <class Handler>
void
SSLHTTPPeer<Handler>::doRequest()
{
++this->request_count_;
++this->requestCount_;
auto const what = this->handler_.onHandoff(
this->session(), std::move(stream_ptr_), std::move(this->message_), this->remote_address_);
this->session(), std::move(streamPtr_), std::move(this->message_), this->remoteAddress_);
if (what.moved)
return;
if (what.response)
return this->write(what.response, what.keep_alive);
return this->write(what.response, what.keepAlive);
// legacy
this->handler_.onRequest(this->session());
}

View File

@@ -28,7 +28,7 @@ class SSLWSPeer : public BaseWSPeer<Handler, SSLWSPeer<Handler>>,
using stream_type = boost::beast::ssl_stream<socket_type>;
using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
std::unique_ptr<stream_type> stream_ptr_;
std::unique_ptr<stream_type> streamPtr_;
boost::beast::websocket::stream<stream_type&> ws_;
public:
@@ -61,8 +61,8 @@ SSLWSPeer<Handler>::SSLWSPeer(
remoteEndpoint,
std::move(request),
journal)
, stream_ptr_(std::move(streamPtr))
, ws_(*stream_ptr_)
, streamPtr_(std::move(streamPtr))
, ws_(*streamPtr_)
{
}

View File

@@ -62,13 +62,11 @@ class ServerImpl : public Server
private:
using clock_type = std::chrono::system_clock;
// Need to be named before converting
// NOLINTNEXTLINE(cppcoreguidelines-use-enum-class)
enum { HistorySize = 100 };
static constexpr auto kHistorySize = 100;
Handler& handler_;
beast::Journal const j_;
boost::asio::io_context& io_context_;
boost::asio::io_context& ioContext_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
@@ -78,7 +76,7 @@ private:
int high_ = 0;
std::array<std::size_t, 64> hist_{};
IoList ios_;
IOList ios_;
public:
ServerImpl(Handler& handler, boost::asio::io_context& ioContext, beast::Journal journal);
@@ -97,7 +95,7 @@ public:
void
close() override;
IoList&
IOList&
ios()
{
return ios_;
@@ -106,7 +104,7 @@ public:
boost::asio::io_context&
getIoContext()
{
return io_context_;
return ioContext_;
}
bool
@@ -124,9 +122,9 @@ ServerImpl<Handler>::ServerImpl(
beast::Journal journal)
: handler_(handler)
, j_(journal)
, io_context_(ioContext)
, strand_(boost::asio::make_strand(io_context_))
, work_(std::in_place, boost::asio::make_work_guard(io_context_))
, ioContext_(ioContext)
, strand_(boost::asio::make_strand(ioContext_))
, work_(std::in_place, boost::asio::make_work_guard(ioContext_))
{
}
@@ -152,7 +150,7 @@ ServerImpl<Handler>::ports(std::vector<Port> const& ports)
{
ports_.push_back(port);
auto& internalPort = ports_.back();
if (auto sp = ios_.emplace<Door<Handler>>(handler_, io_context_, internalPort, j_))
if (auto sp = ios_.emplace<Door<Handler>>(handler_, ioContext_, internalPort, j_))
{
list_.push_back(sp);

View File

@@ -25,7 +25,7 @@ concept IsStrand = std::
*
* @param ePtr The exception that was caught on the coroutine
*/
inline constexpr auto kPROPAGATE_EXCEPTIONS = [](std::exception_ptr ePtr) {
inline constexpr auto kPropagateExceptions = [](std::exception_ptr ePtr) {
if (ePtr)
{
try
@@ -50,7 +50,7 @@ inline constexpr auto kPROPAGATE_EXCEPTIONS = [](std::exception_ptr ePtr) {
/**
* @brief Spawns a coroutine using `boost::asio::spawn`
*
* @note This uses kPROPAGATE_EXCEPTIONS to force asio to propagate exceptions
* @note This uses kPropagateExceptions to force asio to propagate exceptions
* through `io_context`
* @note Since implicit strand was removed from boost::asio::spawn this helper
* function adds the strand back
@@ -68,14 +68,14 @@ spawn(Ctx&& ctx, F&& func)
if constexpr (impl::IsStrand<Ctx>)
{
boost::asio::spawn(
std::forward<Ctx>(ctx), std::forward<F>(func), impl::kPROPAGATE_EXCEPTIONS);
std::forward<Ctx>(ctx), std::forward<F>(func), impl::kPropagateExceptions);
}
else
{
boost::asio::spawn(
boost::asio::make_strand(boost::asio::get_associated_executor(std::forward<Ctx>(ctx))),
std::forward<F>(func),
impl::kPROPAGATE_EXCEPTIONS);
impl::kPropagateExceptions);
}
}

View File

@@ -12,7 +12,7 @@
namespace xrpl {
/** Manages a set of objects performing asynchronous I/O. */
class IoList final
class IOList final
{
public:
class Work
@@ -21,8 +21,8 @@ public:
void
destroy();
friend class IoList;
IoList* ios_ = nullptr;
friend class IOList;
IOList* ios_ = nullptr;
public:
virtual ~Work()
@@ -30,13 +30,13 @@ public:
destroy();
}
/** Return the IoList associated with the work.
/** Return the IOList associated with the work.
Requirements:
The call to IoList::emplace to
The call to IOList::emplace to
create the work has already returned.
*/
IoList&
IOList&
ios()
{
return *ios_;
@@ -59,17 +59,17 @@ private:
std::function<void(void)> f_;
public:
IoList() = default;
IOList() = default;
/** Destroy the list.
Effects:
Closes the IoList if it was not previously
Closes the IOList if it was not previously
closed. No finisher is invoked in this case.
Blocks until all work is destroyed.
*/
~IoList()
~IOList()
{
destroy();
}
@@ -159,7 +159,7 @@ public:
template <class>
void
IoList::Work::destroy()
IOList::Work::destroy()
{
if (!ios_)
return;
@@ -179,7 +179,7 @@ IoList::Work::destroy()
template <class>
void
IoList::destroy()
IOList::destroy()
{
close();
join();
@@ -187,9 +187,9 @@ IoList::destroy()
template <class T, class... Args>
std::shared_ptr<T>
IoList::emplace(Args&&... args)
IOList::emplace(Args&&... args)
{
static_assert(std::is_base_of_v<Work, T>, "T must derive from IoList::Work");
static_assert(std::is_base_of_v<Work, T>, "T must derive from IOList::Work");
if (closed_)
return nullptr;
auto sp = std::make_shared<T>(std::forward<Args>(args)...);
@@ -211,7 +211,7 @@ IoList::emplace(Args&&... args)
template <class Finisher>
void
IoList::close(Finisher&& f)
IOList::close(Finisher&& f)
{
std::unique_lock<std::mutex> lock(m_);
if (closed_)
@@ -237,7 +237,7 @@ IoList::close(Finisher&& f)
template <class>
void
IoList::join()
IOList::join()
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] { return closed_ && n_ == 0; });