Compare commits

...

10 Commits

Author SHA1 Message Date
cyan317
8b0e68f48e chore: Add counter for total messages waiting to be sent (#1691) 2024-10-18 16:11:38 +01:00
Alex Kremer
189098d092 fix: Static linkage (#1551)
Fixes #1507
2024-10-18 16:11:38 +01:00
Alex Kremer
854749a05e fix: Add upper bound to limit 2024-09-11 15:15:17 +01:00
cyan317
f57706be3d fix: no restriction on type field (#1644)
'type' should not matter if 'full' or 'accounts' is false. Relax the
restriction for 'type'
2024-09-11 14:44:20 +01:00
cyan317
bb0d912f2b fix: Add more restrictions to admin fields (#1643) 2024-09-10 15:05:23 +01:00
cyan317
d02d6affdb fix: Don't forward ledger API if 'full' is a string (#1640)
Fix #1635
2024-09-09 11:21:42 +01:00
cyan317
0054e4b64c fix: not forward admin API (#1629)
To merge to 2.2.3 branch.
It is different from the #1628 . I think this branch still forward
feature API to rippled.
2024-09-06 10:37:08 +01:00
Sergey Kuznetsov
9fe9e7c9d2 fix: Subscription source bugs fix (#1626)
For #1620.

- Add timeouts for websocket operations for connections to rippled.
Without these timeouts if connection hangs for some reason, clio
wouldn't know the connection is hanging.
- Fix potential data race in choosing new subscription source which will
forward messages to users.
- Optimise switching between subscription sources.
2024-09-05 14:58:06 +01:00
Alex Kremer
2e2740d4c5 feat: Published subscription message counters (#1618)
This PR adds counters to track the amount of published messages for each
subscription stream.
2024-08-29 16:48:04 +01:00
Sergey Kuznetsov
5004dc4e15 fix: Fix logging in SubscriptionSource (#1617)
For #1616. Later should be ported to develop as well.
2024-08-29 15:59:02 +01:00
27 changed files with 460 additions and 128 deletions

View File

@@ -13,12 +13,15 @@ jobs:
include:
- os: macos14
build_type: Release
static: false
- os: heavy
build_type: Release
static: true
container:
image: rippleci/clio_ci:latest
- os: heavy
build_type: Debug
static: true
container:
image: rippleci/clio_ci:latest
runs-on: [self-hosted, "${{ matrix.os }}"]
@@ -50,6 +53,7 @@ jobs:
conan_profile: ${{ steps.conan.outputs.conan_profile }}
conan_cache_hit: ${{ steps.restore_cache.outputs.conan_cache_hit }}
build_type: ${{ matrix.build_type }}
static: ${{ matrix.static }}
- name: Build Clio
uses: ./.github/actions/build_clio

View File

@@ -109,10 +109,13 @@ LoadBalancer::LoadBalancer(
validatedLedgers,
forwardingTimeout,
[this]() {
if (not hasForwardingSource_)
if (not hasForwardingSource_.lock().get())
chooseForwardingSource();
},
[this](bool wasForwarding) {
if (wasForwarding)
chooseForwardingSource();
},
[this]() { chooseForwardingSource(); },
[this]() {
if (forwardingCache_.has_value())
forwardingCache_->invalidate();
@@ -314,11 +317,13 @@ LoadBalancer::getETLState() noexcept
void
LoadBalancer::chooseForwardingSource()
{
hasForwardingSource_ = false;
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
auto hasForwardingSourceLock = hasForwardingSource_.lock();
hasForwardingSourceLock.get() = false;
for (auto& source : sources_) {
if (not hasForwardingSource_ and source->isConnected()) {
if (not hasForwardingSourceLock.get() and source->isConnected()) {
source->setForwarding(true);
hasForwardingSource_ = true;
hasForwardingSourceLock.get() = true;
} else {
source->setForwarding(false);
}

View File

@@ -25,6 +25,7 @@
#include "etl/Source.hpp"
#include "etl/impl/ForwardingCache.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp"
@@ -38,7 +39,6 @@
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <ripple/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
@@ -74,7 +74,10 @@ private:
std::optional<ETLState> etlState_;
std::uint32_t downloadRanges_ =
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
std::atomic_bool hasForwardingSource_{false};
// Using mutext instead of atomic_bool because choosing a new source to
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
util::Mutex<bool> hasForwardingSource_{false};
public:
/**

View File

@@ -51,7 +51,7 @@ namespace etl {
class SourceBase {
public:
using OnConnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void(bool)>;
using OnLedgerClosedHook = std::function<void()>;
virtual ~SourceBase() = default;

View File

@@ -46,7 +46,7 @@
namespace etl::impl {
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
: log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
{
try {
boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)};

View File

@@ -24,6 +24,8 @@
#include "rpc/JS.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "util/requests/Types.hpp"
#include <boost/algorithm/string/classification.hpp>
@@ -66,22 +68,28 @@ SubscriptionSource::SubscriptionSource(
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout,
std::chrono::steady_clock::duration const wsTimeout,
std::chrono::steady_clock::duration const retryDelay
)
: log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort))
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
, wsConnectionBuilder_(ip, wsPort)
, validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions))
, strand_(boost::asio::make_strand(ioContext))
, wsTimeout_(wsTimeout)
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
, onConnect_(std::move(onConnect))
, onDisconnect_(std::move(onDisconnect))
, onLedgerClosed_(std::move(onLedgerClosed))
, lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt(
"subscription_source_last_message_time",
util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}),
"Seconds since epoch of the last message received from rippled subscription streams"
))
{
wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
.addHeader({"X-User", "clio-client"})
.setConnectionTimeout(connectionTimeout);
.setConnectionTimeout(wsTimeout_);
}
SubscriptionSource::~SubscriptionSource()
@@ -133,6 +141,7 @@ void
SubscriptionSource::setForwarding(bool isForwarding)
{
isForwarding_ = isForwarding;
LOG(log_.info()) << "Forwarding set to " << isForwarding_;
}
std::chrono::steady_clock::time_point
@@ -166,20 +175,22 @@ SubscriptionSource::subscribe()
}
wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
if (writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
return;
}
isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();
retry_.reset();
while (!stop_) {
auto const message = wsConnection_->read(yield);
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
@@ -224,10 +235,11 @@ SubscriptionSource::handleMessage(std::string const& message)
auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
setValidatedRange(std::move(validatedLedgers));
}
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
<< object;
if (object.contains(JS(ledger_index))) {
ledgerIndex = object.at(JS(ledger_index)).as_int64();
}
@@ -245,10 +257,13 @@ SubscriptionSource::handleMessage(std::string const& message)
// 2 - Validated transaction
// Only forward proposed transaction, validated transactions are sent by Clio itself
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
subscriptions_->forwardProposedTransaction(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
LOG(log_.debug()) << "Forwarding validation: " << object;
subscriptions_->forwardValidation(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
}
}
@@ -261,7 +276,7 @@ SubscriptionSource::handleMessage(std::string const& message)
return std::nullopt;
} catch (std::exception const& e) {
LOG(log_.error()) << "Exception in handleMessage : " << e.what();
LOG(log_.error()) << "Exception in handleMessage: " << e.what();
return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
}
}
@@ -270,16 +285,14 @@ void
SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield)
{
isConnected_ = false;
isForwarding_ = false;
bool const wasForwarding = isForwarding_.exchange(false);
if (not stop_) {
onDisconnect_();
LOG(log_.info()) << "Disconnected";
onDisconnect_(wasForwarding);
}
if (wsConnection_ != nullptr) {
auto const err = wsConnection_->close(yield);
if (err) {
LOG(log_.error()) << "Error closing websocket connection: " << err->message();
}
wsConnection_->close(yield);
wsConnection_.reset();
}
@@ -306,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const
void
SubscriptionSource::setLastMessageTime()
{
lastMessageTime_.lock().get() = std::chrono::steady_clock::now();
lastMessageTimeSecondsSinceEpoch_.get().set(
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count()
);
auto lock = lastMessageTime_.lock();
lock.get() = std::chrono::steady_clock::now();
}
void

View File

@@ -19,12 +19,13 @@
#pragma once
#include "etl/ETLHelpers.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/requests/Types.hpp"
#include "util/requests/WsConnection.hpp"
@@ -37,6 +38,7 @@
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <optional>
@@ -71,6 +73,8 @@ private:
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::chrono::steady_clock::duration wsTimeout_;
util::Retry retry_;
OnConnectHook onConnect_;
@@ -83,9 +87,11 @@ private:
util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_;
std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
std::future<void> runFuture_;
static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30};
static constexpr std::chrono::seconds WS_TIMEOUT{30};
static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
static constexpr std::chrono::seconds RETRY_DELAY{1};
@@ -103,7 +109,7 @@ public:
* @param onNewLedger The onNewLedger hook. Called when a new ledger is received
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
* forwarding
* @param connectionTimeout The connection timeout. Defaults to 30 seconds
* @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
* @param retryDelay The retry delay. Defaults to 1 second
*/
SubscriptionSource(
@@ -115,7 +121,7 @@ public:
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT,
std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
);

View File

@@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
notified_.clear();
signal_.emit(pubMsg);
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
// However, if the same connection subscribe both stream and account, it will still receive the message twice.
// notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
// acts like this.
notified_.clear();
for (auto const& account : affectedAccounts)
accountSignal_.emit(account, pubMsg);
++pubCount_.get();
});
}

View File

@@ -24,6 +24,7 @@
#include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp>
@@ -54,6 +55,7 @@ class ProposedTransactionFeed {
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_;
TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_;
@@ -67,7 +69,7 @@ public:
: strand_(boost::asio::make_strand(ioContext))
, subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
, subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))
, pubCount_(getPublishedMessagesCounterInt("tx_proposed"))
{
}

View File

@@ -36,7 +36,10 @@
namespace feed::impl {
SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
: strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
: strand_(boost::asio::make_strand(ioContext))
, subCount_(getSubscriptionsGaugeInt(name))
, pubCount_(getPublishedMessagesCounterInt(name))
, name_(name)
{
}
@@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const
boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
auto const msgPtr = std::make_shared<std::string>(std::move(msg));
signal_.emit(msgPtr);
++pubCount_.get();
});
}

View File

@@ -22,6 +22,7 @@
#include "feed/Types.hpp"
#include "feed/impl/TrackableSignal.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp>
@@ -40,6 +41,7 @@ namespace feed::impl {
class SingleFeedBase {
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
util::Logger logger_{"Subscriptions"};
std::string name_;

View File

@@ -284,23 +284,29 @@ TransactionFeed::pub(
affectedBooks = std::move(affectedBooks)]() {
notified_.clear();
signal_.emit(allVersionsMsgs);
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
// rippled SENDS the same message twice
notified_.clear();
txProposedsignal_.emit(allVersionsMsgs);
notified_.clear();
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
// if it affects multiple accounts watched by the same connection
for (auto const& account : affectedAccounts) {
accountSignal_.emit(account, allVersionsMsgs);
accountProposedSignal_.emit(account, allVersionsMsgs);
}
notified_.clear();
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple
// books watched by the same connection
for (auto const& book : affectedBooks) {
bookSignal_.emit(book, allVersionsMsgs);
}
++pubCount_.get();
}
);
}

View File

@@ -26,6 +26,7 @@
#include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp>
@@ -67,6 +68,7 @@ class TransactionFeed {
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_;
TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
@@ -89,6 +91,7 @@ public:
, subAllCount_(getSubscriptionsGaugeInt("tx"))
, subAccountCount_(getSubscriptionsGaugeInt("account"))
, subBookCount_(getSubscriptionsGaugeInt("book"))
, pubCount_(getPublishedMessagesCounterInt("tx"))
{
}

View File

@@ -19,6 +19,7 @@
#pragma once
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
@@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName)
fmt::format("Current subscribers number on the {} stream", counterName)
);
}
inline util::prometheus::CounterInt&
getPublishedMessagesCounterInt(std::string const& counterName)
{
return PrometheusService::counterInt(
"subscriptions_published_count",
util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}),
fmt::format("Total published messages on the {} stream", counterName)
);
}
} // namespace feed::impl

View File

@@ -11,18 +11,16 @@ target_sources(clio_server PRIVATE Main.cpp)
target_link_libraries(clio_server PRIVATE clio)
if (static)
target_link_options(clio_server PRIVATE -static)
if (is_gcc AND NOT san)
if (san)
message(FATAL_ERROR "Static linkage not allowed when using sanitizers")
elseif (is_appleclang)
message(FATAL_ERROR "Static linkage not supported on AppleClang")
else ()
target_link_options(
# For now let's assume that we only using libstdc++ under gcc.
# Note: -static-libstdc++ can statically link both libstdc++ and libc++
clio_server PRIVATE -static-libstdc++ -static-libgcc
)
endif ()
if (is_appleclang)
message(FATAL_ERROR "Static linkage not supported on AppleClang")
endif ()
endif ()
set_target_properties(clio_server PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})

View File

@@ -22,6 +22,7 @@
#include "data/BackendInterface.hpp"
#include "rpc/Counters.hpp"
#include "rpc/Errors.hpp"
#include "rpc/RPCHelpers.hpp"
#include "rpc/WorkQueue.hpp"
#include "rpc/common/HandlerProvider.hpp"
#include "rpc/common/Types.hpp"
@@ -134,8 +135,13 @@ public:
Result
buildResponse(web::Context const& ctx)
{
if (forwardingProxy_.shouldForward(ctx))
if (forwardingProxy_.shouldForward(ctx)) {
// Disallow forwarding of the admin api, only user api is allowed for security reasons.
if (isAdminCmd(ctx.method, ctx.params))
return Result{Status{RippledError::rpcNO_PERMISSION}};
return forwardingProxy_.forward(ctx);
}
if (backend_->isTooBusy()) {
LOG(log_.error()) << "Database is too busy. Rejecting request";

View File

@@ -36,6 +36,7 @@
#include <boost/json/array.hpp>
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/string.hpp>
#include <boost/json/value.hpp>
#include <boost/json/value_to.hpp>
@@ -49,6 +50,7 @@
#include <ripple/basics/chrono.h>
#include <ripple/basics/strHex.h>
#include <ripple/beast/utility/Zero.h>
#include <ripple/json/json_reader.h>
#include <ripple/json/json_value.h>
#include <ripple/protocol/AccountID.h>
#include <ripple/protocol/Book.h>
@@ -1273,6 +1275,31 @@ specifiesCurrentOrClosedLedger(boost::json::object const& request)
return false;
}
bool
isAdminCmd(std::string const& method, boost::json::object const& request)
{
if (method == JS(ledger)) {
auto const requestStr = boost::json::serialize(request);
Json::Value jv;
Json::Reader{}.parse(requestStr, jv);
// rippled considers string/non-zero int/non-empty array/ non-empty json as true.
// Use rippled's API asBool to get the same result.
// https://github.com/XRPLF/rippled/issues/5119
auto const isFieldSet = [&jv](auto const field) { return jv.isMember(field) and jv[field].asBool(); };
// According to doc
// https://xrpl.org/docs/references/http-websocket-apis/public-api-methods/ledger-methods/ledger,
// full/accounts/type are admin only, but type only works when full/accounts are set, so we don't need to check
// type.
if (isFieldSet(JS(full)) or isFieldSet(JS(accounts)))
return true;
}
if (method == JS(feature) and request.contains(JS(vetoed)))
return true;
return false;
}
std::variant<ripple::uint256, Status>
getNFTID(boost::json::object const& request)
{

View File

@@ -557,6 +557,16 @@ parseIssue(boost::json::object const& issue);
bool
specifiesCurrentOrClosedLedger(boost::json::object const& request);
/**
* @brief Check whether a request requires administrative privileges on rippled side.
*
* @param method The method name to check
* @param request The request to check
* @return true if the request requires ADMIN role
*/
bool
isAdminCmd(std::string const& method, boost::json::object const& request);
/**
* @brief Get the NFTID from the request
*

View File

@@ -39,7 +39,6 @@
#include <ripple/protocol/jss.h>
#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
#include <string>
@@ -57,8 +56,8 @@ class AccountTxHandler {
std::shared_ptr<BackendInterface> sharedPtrBackend_;
public:
// no max limit
static auto constexpr LIMIT_MIN = 1;
static auto constexpr LIMIT_MAX = 1000;
static auto constexpr LIMIT_DEFAULT = 200;
/**
@@ -133,7 +132,7 @@ public:
{JS(limit),
validation::Type<uint32_t>{},
validation::Min(1u),
modifiers::Clamp<int32_t>{LIMIT_MIN, std::numeric_limits<int32_t>::max()}},
modifiers::Clamp<int32_t>{LIMIT_MIN, LIMIT_MAX}},
{JS(marker),
meta::WithCustomError{
validation::Type<boost::json::object>{},

View File

@@ -39,8 +39,10 @@
#include <boost/beast/websocket/stream_base.hpp>
#include <boost/system/errc.hpp>
#include <atomic>
#include <chrono>
#include <expected>
#include <memory>
#include <optional>
#include <string>
#include <utility>
@@ -123,15 +125,20 @@ private:
static void
withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
{
auto isCompleted = std::make_shared<bool>(false);
boost::asio::cancellation_signal cancellationSignal;
auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield);
boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout};
timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) {
if (!errorCode)
// The timer below can be called with no error code even if the operation is completed before the timeout, so we
// need an additional flag here
timer.async_wait([&cancellationSignal, isCompleted](boost::system::error_code errorCode) {
if (!errorCode and not*isCompleted)
cancellationSignal.emit(boost::asio::cancellation_type::terminal);
});
operation(cyield);
*isCompleted = true;
}
static boost::system::error_code

View File

@@ -69,10 +69,8 @@ namespace web {
* @tparam HandlerType The executor to handle the requests
*/
template <
template <typename>
class PlainSessionType,
template <typename>
class SslSessionType,
template <typename> class PlainSessionType,
template <typename> class SslSessionType,
SomeServerHandler HandlerType>
class Detector : public std::enable_shared_from_this<Detector<PlainSessionType, SslSessionType, HandlerType>> {
using std::enable_shared_from_this<Detector<PlainSessionType, SslSessionType, HandlerType>>::shared_from_this;
@@ -191,10 +189,8 @@ public:
* @tparam HandlerType The handler to process the request and return response.
*/
template <
template <typename>
class PlainSessionType,
template <typename>
class SslSessionType,
template <typename> class PlainSessionType,
template <typename> class SslSessionType,
SomeServerHandler HandlerType>
class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslSessionType, HandlerType>> {
using std::enable_shared_from_this<Server<PlainSessionType, SslSessionType, HandlerType>>::shared_from_this;

View File

@@ -23,6 +23,9 @@
#include "rpc/common/Types.hpp"
#include "util/Taggable.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "web/DOSGuard.hpp"
#include "web/interface/Concepts.hpp"
#include "web/interface/ConnectionBase.hpp"
@@ -71,6 +74,8 @@ template <template <typename> typename Derived, SomeServerHandler HandlerType>
class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
std::reference_wrapper<util::prometheus::GaugeInt> messagesLength_;
boost::beast::flat_buffer buffer_;
std::reference_wrapper<web::DOSGuard> dosGuard_;
bool sending_ = false;
@@ -103,15 +108,26 @@ public:
std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer
)
: ConnectionBase(tagFactory, ip), buffer_(std::move(buffer)), dosGuard_(dosGuard), handler_(handler)
: ConnectionBase(tagFactory, ip)
, messagesLength_(PrometheusService::gaugeInt(
"ws_messages_length",
util::prometheus::Labels(),
"The total length of messages in the queue"
))
, buffer_(std::move(buffer))
, dosGuard_(dosGuard)
, handler_(handler)
{
upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init)
LOG(perfLog_.debug()) << tag() << "session created";
}
~WsBase() override
{
LOG(perfLog_.debug()) << tag() << "session closed";
if (!messages_.empty())
messagesLength_.get() -= messages_.size();
dosGuard_.get().decrement(clientIp);
}
@@ -135,6 +151,7 @@ public:
onWrite(boost::system::error_code ec, std::size_t)
{
messages_.pop();
--messagesLength_.get();
sending_ = false;
if (ec) {
wsFail(ec, "Failed to write");
@@ -165,6 +182,7 @@ public:
derived().ws().get_executor(),
[this, self = derived().shared_from_this(), msg = std::move(msg)]() {
messages_.push(msg);
++messagesLength_.get();
maybeSendNext();
}
);

View File

@@ -271,15 +271,12 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0Disconnects)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(true);
}
TEST_F(LoadBalancerOnDisconnectHookTests, source1Disconnects)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();
sourceFactory_.callbacksAt(1).onDisconnect(false);
}
TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
@@ -288,29 +285,25 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(true);
sourceFactory_.callbacksAt(0).onConnect();
}
TEST_F(LoadBalancerOnDisconnectHookTests, source1DisconnectsAndConnectsBack)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();
sourceFactory_.callbacksAt(1).onDisconnect(false);
sourceFactory_.callbacksAt(1).onConnect();
}
TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).Times(2).WillRepeatedly(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).Times(2).WillRepeatedly(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)).Times(2);
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(1).onDisconnect();
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(0).onDisconnect(true);
sourceFactory_.callbacksAt(1).onDisconnect(false);
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
@@ -353,12 +346,7 @@ TEST_F(LoadBalancer3SourcesTests, forwardingUpdate)
sourceFactory_.callbacksAt(1).onConnect();
// Source 0 got disconnected
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(2), setForwarding(false)); // only source 1 must be forwarding
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(false);
}
struct LoadBalancerLoadInitialLedgerTests : LoadBalancerOnConnectHookTests {

View File

@@ -18,31 +18,35 @@
//==============================================================================
#include "etl/impl/SubscriptionSource.hpp"
#include "util/AssignRandomPort.hpp"
#include "util/Fixtures.hpp"
#include "util/MockNetworkValidatedLedgers.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp"
#include "util/TestWsServer.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/serialize.hpp>
#include <fmt/core.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <optional>
#include <string>
#include <thread>
#include <utility>
using namespace etl::impl;
using testing::MockFunction;
using testing::StrictMock;
struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
SubscriptionSourceConnectionTests()
struct SubscriptionSourceConnectionTestsBase : public NoLoggerFixture {
SubscriptionSourceConnectionTestsBase()
{
subscriptionSource_.run();
}
@@ -54,7 +58,7 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
StrictMockSubscriptionManagerSharedPtr subscriptionManager_;
StrictMock<MockFunction<void()>> onConnectHook_;
StrictMock<MockFunction<void()>> onDisconnectHook_;
StrictMock<MockFunction<void(bool)>> onDisconnectHook_;
StrictMock<MockFunction<void()>> onLedgerClosedHook_;
SubscriptionSource subscriptionSource_{
@@ -66,8 +70,8 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
onConnectHook_.AsStdFunction(),
onDisconnectHook_.AsStdFunction(),
onLedgerClosedHook_.AsStdFunction(),
std::chrono::milliseconds(1),
std::chrono::milliseconds(1)
std::chrono::milliseconds(5),
std::chrono::milliseconds(5)
};
[[maybe_unused]] TestWsConnection
@@ -92,15 +96,17 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
}
};
struct SubscriptionSourceConnectionTests : util::prometheus::WithPrometheus, SubscriptionSourceConnectionTestsBase {};
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed)
{
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed)
{
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -112,7 +118,19 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceConnectionTests, ReadTimeout)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
std::this_thread::sleep_for(std::chrono::milliseconds{10});
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -126,7 +144,7 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect)
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -139,14 +157,14 @@ TEST_F(SubscriptionSourceConnectionTests, IsConnected)
});
EXPECT_CALL(onConnectHook_, Call()).WillOnce([this]() { EXPECT_TRUE(subscriptionSource_.isConnected()); });
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() {
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() {
EXPECT_FALSE(subscriptionSource_.isConnected());
subscriptionSource_.stop();
});
ioContext_.run();
}
struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests {
struct SubscriptionSourceReadTestsBase : public SubscriptionSourceConnectionTestsBase {
[[maybe_unused]] TestWsConnection
connectAndSendMessage(std::string const message, boost::asio::yield_context yield)
{
@@ -157,6 +175,8 @@ struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests {
}
};
struct SubscriptionSourceReadTests : util::prometheus::WithPrometheus, SubscriptionSourceReadTestsBase {};
TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
@@ -167,7 +187,7 @@ TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -179,7 +199,7 @@ TEST_F(SubscriptionSourceReadTests, GotResult)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -191,7 +211,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndex)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
}
@@ -206,7 +226,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect)
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -220,7 +240,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconn
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -242,7 +262,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_TRUE(subscriptionSource_.hasLedger(123));
@@ -268,7 +288,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reco
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -286,7 +306,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
@@ -306,7 +326,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosed)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -321,7 +341,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet)
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onLedgerClosedHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() {
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() {
EXPECT_FALSE(subscriptionSource_.isForwarding());
subscriptionSource_.stop();
});
@@ -336,7 +356,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
}
@@ -351,7 +371,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Recon
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -365,7 +385,7 @@ TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -382,7 +402,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_FALSE(subscriptionSource_.hasLedger(0));
@@ -406,7 +426,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLe
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
@@ -425,7 +445,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingFalse)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -440,7 +460,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message));
ioContext_.run();
}
@@ -456,7 +476,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0);
ioContext_.run();
}
@@ -469,7 +489,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -484,7 +504,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardValidation(message));
ioContext_.run();
}
@@ -497,7 +517,7 @@ TEST_F(SubscriptionSourceReadTests, GotManiefstReceivedIsForwardingFalse)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
@@ -512,7 +532,7 @@ TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardManifest(message));
ioContext_.run();
}
@@ -525,7 +545,7 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime)
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
auto const actualLastTimeMessage = subscriptionSource_.lastMessageTime();
@@ -533,3 +553,27 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime)
auto const diff = std::chrono::duration_cast<std::chrono::milliseconds>(now - actualLastTimeMessage);
EXPECT_LT(diff, std::chrono::milliseconds(100));
}
struct SubscriptionSourcePrometheusCounterTests : util::prometheus::WithMockPrometheus,
SubscriptionSourceReadTestsBase {};
TEST_F(SubscriptionSourcePrometheusCounterTests, LastMessageTime)
{
auto& lastMessageTimeMock = makeMock<util::prometheus::GaugeInt>(
"subscription_source_last_message_time", fmt::format("{{source=\"127.0.0.1:{}\"}}", wsServer_.port())
);
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage("some_message", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(lastMessageTimeMock, set).WillOnce([](int64_t value) {
auto const now =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
EXPECT_LE(now - value, 1);
});
ioContext_.run();
}

View File

@@ -24,6 +24,7 @@
#include "rpc/common/Types.hpp"
#include "util/Fixtures.hpp"
#include "util/MockPrometheus.hpp"
#include "util/NameGenerator.hpp"
#include "util/TestObject.hpp"
#include <boost/asio/impl/spawn.hpp>
@@ -538,3 +539,67 @@ TEST_F(RPCHelpersTest, ParseIssue)
std::runtime_error
);
}
struct IsAdminCmdParamTestCaseBundle {
std::string testName;
std::string method;
std::string testJson;
bool expected;
};
struct IsAdminCmdParameterTest : public TestWithParam<IsAdminCmdParamTestCaseBundle> {};
static auto
generateTestValuesForParametersTest()
{
return std::vector<IsAdminCmdParamTestCaseBundle>{
{"ledgerEntry", "ledger_entry", R"({"type": false})", false},
{"featureVetoedTrue", "feature", R"({"vetoed": true, "feature": "foo"})", true},
{"featureVetoedFalse", "feature", R"({"vetoed": false, "feature": "foo"})", true},
{"featureVetoedIsStr", "feature", R"({"vetoed": "String"})", true},
{"ledger", "ledger", R"({})", false},
{"ledgerWithType", "ledger", R"({"type": "fee"})", false},
{"ledgerFullTrue", "ledger", R"({"full": true})", true},
{"ledgerFullFalse", "ledger", R"({"full": false})", false},
{"ledgerFullIsStr", "ledger", R"({"full": "String"})", true},
{"ledgerFullIsEmptyStr", "ledger", R"({"full": ""})", false},
{"ledgerFullIsNumber1", "ledger", R"({"full": 1})", true},
{"ledgerFullIsNumber0", "ledger", R"({"full": 0})", false},
{"ledgerFullIsNull", "ledger", R"({"full": null})", false},
{"ledgerFullIsFloat0", "ledger", R"({"full": 0.0})", false},
{"ledgerFullIsFloat1", "ledger", R"({"full": 0.1})", true},
{"ledgerFullIsArray", "ledger", R"({"full": [1]})", true},
{"ledgerFullIsEmptyArray", "ledger", R"({"full": []})", false},
{"ledgerFullIsObject", "ledger", R"({"full": {"key": 1}})", true},
{"ledgerFullIsEmptyObject", "ledger", R"({"full": {}})", false},
{"ledgerAccountsTrue", "ledger", R"({"accounts": true})", true},
{"ledgerAccountsFalse", "ledger", R"({"accounts": false})", false},
{"ledgerAccountsIsStr", "ledger", R"({"accounts": "String"})", true},
{"ledgerAccountsIsEmptyStr", "ledger", R"({"accounts": ""})", false},
{"ledgerAccountsIsNumber1", "ledger", R"({"accounts": 1})", true},
{"ledgerAccountsIsNumber0", "ledger", R"({"accounts": 0})", false},
{"ledgerAccountsIsNull", "ledger", R"({"accounts": null})", false},
{"ledgerAccountsIsFloat0", "ledger", R"({"accounts": 0.0})", false},
{"ledgerAccountsIsFloat1", "ledger", R"({"accounts": 0.1})", true},
{"ledgerAccountsIsArray", "ledger", R"({"accounts": [1]})", true},
{"ledgerAccountsIsEmptyArray", "ledger", R"({"accounts": []})", false},
{"ledgerAccountsIsObject", "ledger", R"({"accounts": {"key": 1}})", true},
{"ledgerAccountsIsEmptyObject", "ledger", R"({"accounts": {}})", false},
};
}
INSTANTIATE_TEST_CASE_P(
IsAdminCmdTest,
IsAdminCmdParameterTest,
ValuesIn(generateTestValuesForParametersTest()),
tests::util::NameGenerator
);
TEST_P(IsAdminCmdParameterTest, Test)
{
auto const testBundle = GetParam();
EXPECT_EQ(isAdminCmd(testBundle.method, boost::json::parse(testBundle.testJson).as_object()), testBundle.expected);
}

View File

@@ -769,14 +769,13 @@ TEST_F(RPCAccountTxHandlerTest, LimitAndMarker)
auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
ON_CALL(*backend, fetchAccountTransactions).WillByDefault(Return(transCursor));
EXPECT_CALL(
*backend,
fetchAccountTransactions(
testing::_, testing::_, false, testing::Optional(testing::Eq(TransactionsCursor{10, 11})), testing::_
)
)
.Times(1);
.WillOnce(Return(transCursor));
runSpawn([&, this](auto yield) {
auto const handler = AnyHandler{AccountTxHandler{backend}};
@@ -804,6 +803,73 @@ TEST_F(RPCAccountTxHandlerTest, LimitAndMarker)
});
}
TEST_F(RPCAccountTxHandlerTest, LimitIsCapped)
{
backend->setRange(MINSEQ, MAXSEQ);
auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
EXPECT_CALL(*backend, fetchAccountTransactions(testing::_, testing::_, false, testing::_, testing::_))
.WillOnce(Return(transCursor));
runSpawn([&, this](auto yield) {
auto const handler = AnyHandler{AccountTxHandler{backend}};
auto static const input = json::parse(fmt::format(
R"({{
"account": "{}",
"ledger_index_min": {},
"ledger_index_max": {},
"limit": 100000,
"forward": false
}})",
ACCOUNT,
-1,
-1
));
auto const output = handler.process(input, Context{yield});
ASSERT_TRUE(output);
EXPECT_EQ(output.result->at("account").as_string(), ACCOUNT);
EXPECT_EQ(output.result->at("ledger_index_min").as_uint64(), MINSEQ);
EXPECT_EQ(output.result->at("ledger_index_max").as_uint64(), MAXSEQ);
EXPECT_EQ(output.result->at("limit").as_uint64(), AccountTxHandler::LIMIT_MAX);
EXPECT_EQ(output.result->at("transactions").as_array().size(), 2);
});
}
TEST_F(RPCAccountTxHandlerTest, LimitAllowedUpToCap)
{
backend->setRange(MINSEQ, MAXSEQ);
auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
EXPECT_CALL(*backend, fetchAccountTransactions(testing::_, testing::_, false, testing::_, testing::_))
.WillOnce(Return(transCursor));
runSpawn([&, this](auto yield) {
auto const handler = AnyHandler{AccountTxHandler{backend}};
auto static const input = json::parse(fmt::format(
R"({{
"account": "{}",
"ledger_index_min": {},
"ledger_index_max": {},
"limit": {},
"forward": false
}})",
ACCOUNT,
-1,
-1,
AccountTxHandler::LIMIT_MAX - 1
));
auto const output = handler.process(input, Context{yield});
ASSERT_TRUE(output);
EXPECT_EQ(output.result->at("account").as_string(), ACCOUNT);
EXPECT_EQ(output.result->at("ledger_index_min").as_uint64(), MINSEQ);
EXPECT_EQ(output.result->at("ledger_index_max").as_uint64(), MAXSEQ);
EXPECT_EQ(output.result->at("limit").as_uint64(), AccountTxHandler::LIMIT_MAX - 1);
EXPECT_EQ(output.result->at("transactions").as_array().size(), 2);
});
}
TEST_F(RPCAccountTxHandlerTest, SpecificLedgerIndex)
{
backend->setRange(MINSEQ, MAXSEQ);

View File

@@ -22,6 +22,7 @@
#include "util/MockPrometheus.hpp"
#include "util/TestHttpSyncClient.hpp"
#include "util/config/Config.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "web/DOSGuard.hpp"
@@ -42,6 +43,7 @@
#include <boost/json/parse.hpp>
#include <boost/system/system_error.hpp>
#include <fmt/core.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
@@ -202,6 +204,8 @@ private:
std::optional<std::thread> runner;
};
struct WebServerTestsWithMockPrometheus : WebServerTest, prometheus::WithMockPrometheus {};
class EchoExecutor {
public:
void
@@ -263,7 +267,7 @@ makeServerSync(
} // namespace
TEST_F(WebServerTest, Http)
TEST_F(WebServerTestsWithMockPrometheus, Http)
{
auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
@@ -271,8 +275,13 @@ TEST_F(WebServerTest, Http)
EXPECT_EQ(res, R"({"Hello":1})");
}
TEST_F(WebServerTest, Ws)
TEST_F(WebServerTestsWithMockPrometheus, Ws)
{
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
WebSocketSyncClient wsClient;
@@ -282,7 +291,7 @@ TEST_F(WebServerTest, Ws)
wsClient.disconnect();
}
TEST_F(WebServerTest, HttpInternalError)
TEST_F(WebServerTestsWithMockPrometheus, HttpInternalError)
{
auto e = std::make_shared<ExceptionExecutor>();
auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
@@ -293,8 +302,13 @@ TEST_F(WebServerTest, HttpInternalError)
);
}
TEST_F(WebServerTest, WsInternalError)
TEST_F(WebServerTestsWithMockPrometheus, WsInternalError)
{
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<ExceptionExecutor>();
auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
WebSocketSyncClient wsClient;
@@ -307,8 +321,13 @@ TEST_F(WebServerTest, WsInternalError)
);
}
TEST_F(WebServerTest, WsInternalErrorNotJson)
TEST_F(WebServerTestsWithMockPrometheus, WsInternalErrorNotJson)
{
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<ExceptionExecutor>();
auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
WebSocketSyncClient wsClient;
@@ -321,7 +340,7 @@ TEST_F(WebServerTest, WsInternalErrorNotJson)
);
}
TEST_F(WebServerTest, Https)
TEST_F(WebServerTestsWithMockPrometheus, Https)
{
auto e = std::make_shared<EchoExecutor>();
auto sslCtx = parseCertsForTest();
@@ -331,8 +350,13 @@ TEST_F(WebServerTest, Https)
EXPECT_EQ(res, R"({"Hello":1})");
}
TEST_F(WebServerTest, Wss)
TEST_F(WebServerTestsWithMockPrometheus, Wss)
{
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<EchoExecutor>();
auto sslCtx = parseCertsForTest();
auto const ctxSslRef = sslCtx ? std::optional<std::reference_wrapper<ssl::context>>{sslCtx.value()} : std::nullopt;
@@ -345,7 +369,7 @@ TEST_F(WebServerTest, Wss)
wsClient.disconnect();
}
TEST_F(WebServerTest, HttpRequestOverload)
TEST_F(WebServerTestsWithMockPrometheus, HttpRequestOverload)
{
auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
@@ -358,8 +382,13 @@ TEST_F(WebServerTest, HttpRequestOverload)
);
}
TEST_F(WebServerTest, WsRequestOverload)
TEST_F(WebServerTestsWithMockPrometheus, WsRequestOverload)
{
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1)).Times(2);
EXPECT_CALL(wsMessagesCounterMock, add(-1)).Times(2);
auto e = std::make_shared<EchoExecutor>();
auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
WebSocketSyncClient wsClient;
@@ -377,7 +406,7 @@ TEST_F(WebServerTest, WsRequestOverload)
);
}
TEST_F(WebServerTest, HttpPayloadOverload)
TEST_F(WebServerTestsWithMockPrometheus, HttpPayloadOverload)
{
std::string const s100(100, 'a');
auto e = std::make_shared<EchoExecutor>();
@@ -389,8 +418,13 @@ TEST_F(WebServerTest, HttpPayloadOverload)
);
}
TEST_F(WebServerTest, WsPayloadOverload)
TEST_F(WebServerTestsWithMockPrometheus, WsPayloadOverload)
{
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
std::string const s100(100, 'a');
auto e = std::make_shared<EchoExecutor>();
auto server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
@@ -404,7 +438,7 @@ TEST_F(WebServerTest, WsPayloadOverload)
);
}
TEST_F(WebServerTest, WsTooManyConnection)
TEST_F(WebServerTestsWithMockPrometheus, WsTooManyConnection)
{
auto e = std::make_shared<EchoExecutor>();
auto server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
@@ -510,10 +544,17 @@ struct WebServerAdminTestParams {
std::string expectedResponse;
};
class WebServerAdminTest : public WebServerTest, public ::testing::WithParamInterface<WebServerAdminTestParams> {};
class WebServerAdminTest : public WebServerTest,
public ::testing::WithParamInterface<WebServerAdminTestParams>,
public prometheus::WithMockPrometheus {};
TEST_P(WebServerAdminTest, WsAdminCheck)
{
::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
EXPECT_CALL(wsMessagesCounterMock, add(1));
EXPECT_CALL(wsMessagesCounterMock, add(-1));
auto e = std::make_shared<AdminCheckExecutor>();
Config const serverConfig{parse(GetParam().config)};
auto server = makeServerSync(serverConfig, ctx, std::nullopt, dosGuardOverload, e);