Compare commits

...

12 Commits

Author SHA1 Message Date
Alex Kremer
854749a05e fix: Add upper bound to limit 2024-09-11 15:15:17 +01:00
cyan317
f57706be3d fix: no restriction on type field (#1644)
'type' should not matter if 'full' or 'accounts' is false. Relax the
restriction for 'type'
2024-09-11 14:44:20 +01:00
cyan317
bb0d912f2b fix: Add more restrictions to admin fields (#1643) 2024-09-10 15:05:23 +01:00
cyan317
d02d6affdb fix: Don't forward ledger API if 'full' is a string (#1640)
Fix #1635
2024-09-09 11:21:42 +01:00
cyan317
0054e4b64c fix: not forward admin API (#1629)
To merge to 2.2.3 branch.
It is different from the #1628 . I think this branch still forward
feature API to rippled.
2024-09-06 10:37:08 +01:00
Sergey Kuznetsov
9fe9e7c9d2 fix: Subscription source bugs fix (#1626)
For #1620.

- Add timeouts for websocket operations for connections to rippled.
Without these timeouts if connection hangs for some reason, clio
wouldn't know the connection is hanging.
- Fix potential data race in choosing new subscription source which will
forward messages to users.
- Optimise switching between subscription sources.
2024-09-05 14:58:06 +01:00
Alex Kremer
2e2740d4c5 feat: Published subscription message counters (#1618)
This PR adds counters to track the amount of published messages for each
subscription stream.
2024-08-29 16:48:04 +01:00
Sergey Kuznetsov
5004dc4e15 fix: Fix logging in SubscriptionSource (#1617)
For #1616. Later should be ported to develop as well.
2024-08-29 15:59:02 +01:00
cyan317
665fab183a fix: Add more account check (#1543)
Make sure all char is alphanumeric for account
2024-07-18 15:38:24 +01:00
Alex Kremer
b65ac67d17 fix: Relax error when full or accounts set to false (#1540)
Fixes #1537
2024-07-18 15:20:46 +01:00
Sergey Kuznetsov
7b18e28c47 fix: Fix extra brackets in warnings (#1519)
Fixes #1518
2024-07-05 12:05:14 +01:00
cyan317
4940d463dc Fix empty currency (#1481) 2024-06-21 13:01:14 +01:00
37 changed files with 652 additions and 148 deletions

View File

@@ -109,10 +109,13 @@ LoadBalancer::LoadBalancer(
validatedLedgers, validatedLedgers,
forwardingTimeout, forwardingTimeout,
[this]() { [this]() {
if (not hasForwardingSource_) if (not hasForwardingSource_.lock().get())
chooseForwardingSource();
},
[this](bool wasForwarding) {
if (wasForwarding)
chooseForwardingSource(); chooseForwardingSource();
}, },
[this]() { chooseForwardingSource(); },
[this]() { [this]() {
if (forwardingCache_.has_value()) if (forwardingCache_.has_value())
forwardingCache_->invalidate(); forwardingCache_->invalidate();
@@ -314,11 +317,13 @@ LoadBalancer::getETLState() noexcept
void void
LoadBalancer::chooseForwardingSource() LoadBalancer::chooseForwardingSource()
{ {
hasForwardingSource_ = false; LOG(log_.info()) << "Choosing a new source to forward subscriptions";
auto hasForwardingSourceLock = hasForwardingSource_.lock();
hasForwardingSourceLock.get() = false;
for (auto& source : sources_) { for (auto& source : sources_) {
if (not hasForwardingSource_ and source->isConnected()) { if (not hasForwardingSourceLock.get() and source->isConnected()) {
source->setForwarding(true); source->setForwarding(true);
hasForwardingSource_ = true; hasForwardingSourceLock.get() = true;
} else { } else {
source->setForwarding(false); source->setForwarding(false);
} }

View File

@@ -25,6 +25,7 @@
#include "etl/Source.hpp" #include "etl/Source.hpp"
#include "etl/impl/ForwardingCache.hpp" #include "etl/impl/ForwardingCache.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp"
#include "util/config/Config.hpp" #include "util/config/Config.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
@@ -38,7 +39,6 @@
#include <org/xrpl/rpc/v1/ledger.pb.h> #include <org/xrpl/rpc/v1/ledger.pb.h>
#include <ripple/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h> #include <ripple/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
@@ -74,7 +74,10 @@ private:
std::optional<ETLState> etlState_; std::optional<ETLState> etlState_;
std::uint32_t downloadRanges_ = std::uint32_t downloadRanges_ =
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
std::atomic_bool hasForwardingSource_{false};
// Using mutext instead of atomic_bool because choosing a new source to
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
util::Mutex<bool> hasForwardingSource_{false};
public: public:
/** /**

View File

@@ -51,7 +51,7 @@ namespace etl {
class SourceBase { class SourceBase {
public: public:
using OnConnectHook = std::function<void()>; using OnConnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void()>; using OnDisconnectHook = std::function<void(bool)>;
using OnLedgerClosedHook = std::function<void()>; using OnLedgerClosedHook = std::function<void()>;
virtual ~SourceBase() = default; virtual ~SourceBase() = default;

View File

@@ -46,7 +46,7 @@
namespace etl::impl { namespace etl::impl {
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend) GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend)) : log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
{ {
try { try {
boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)}; boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)};

View File

@@ -24,6 +24,8 @@
#include "rpc/JS.hpp" #include "rpc/JS.hpp"
#include "util/Retry.hpp" #include "util/Retry.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "util/requests/Types.hpp" #include "util/requests/Types.hpp"
#include <boost/algorithm/string/classification.hpp> #include <boost/algorithm/string/classification.hpp>
@@ -66,22 +68,28 @@ SubscriptionSource::SubscriptionSource(
OnConnectHook onConnect, OnConnectHook onConnect,
OnDisconnectHook onDisconnect, OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed, OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout, std::chrono::steady_clock::duration const wsTimeout,
std::chrono::steady_clock::duration const retryDelay std::chrono::steady_clock::duration const retryDelay
) )
: log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort)) : log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
, wsConnectionBuilder_(ip, wsPort) , wsConnectionBuilder_(ip, wsPort)
, validatedLedgers_(std::move(validatedLedgers)) , validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions)) , subscriptions_(std::move(subscriptions))
, strand_(boost::asio::make_strand(ioContext)) , strand_(boost::asio::make_strand(ioContext))
, wsTimeout_(wsTimeout)
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_)) , retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
, onConnect_(std::move(onConnect)) , onConnect_(std::move(onConnect))
, onDisconnect_(std::move(onDisconnect)) , onDisconnect_(std::move(onDisconnect))
, onLedgerClosed_(std::move(onLedgerClosed)) , onLedgerClosed_(std::move(onLedgerClosed))
, lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt(
"subscription_source_last_message_time",
util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}),
"Seconds since epoch of the last message received from rippled subscription streams"
))
{ {
wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"}) wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
.addHeader({"X-User", "clio-client"}) .addHeader({"X-User", "clio-client"})
.setConnectionTimeout(connectionTimeout); .setConnectionTimeout(wsTimeout_);
} }
SubscriptionSource::~SubscriptionSource() SubscriptionSource::~SubscriptionSource()
@@ -133,6 +141,7 @@ void
SubscriptionSource::setForwarding(bool isForwarding) SubscriptionSource::setForwarding(bool isForwarding)
{ {
isForwarding_ = isForwarding; isForwarding_ = isForwarding;
LOG(log_.info()) << "Forwarding set to " << isForwarding_;
} }
std::chrono::steady_clock::time_point std::chrono::steady_clock::time_point
@@ -166,20 +175,22 @@ SubscriptionSource::subscribe()
} }
wsConnection_ = std::move(connection).value(); wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
auto const& subscribeCommand = getSubscribeCommandJson(); auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield); auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
if (writeErrorOpt) { if (writeErrorOpt) {
handleError(writeErrorOpt.value(), yield); handleError(writeErrorOpt.value(), yield);
return; return;
} }
isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();
retry_.reset(); retry_.reset();
while (!stop_) { while (!stop_) {
auto const message = wsConnection_->read(yield); auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) { if (not message) {
handleError(message.error(), yield); handleError(message.error(), yield);
return; return;
@@ -224,10 +235,11 @@ SubscriptionSource::handleMessage(std::string const& message)
auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers))); auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
setValidatedRange(std::move(validatedLedgers)); setValidatedRange(std::move(validatedLedgers));
} }
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object; LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) { } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object; LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
<< object;
if (object.contains(JS(ledger_index))) { if (object.contains(JS(ledger_index))) {
ledgerIndex = object.at(JS(ledger_index)).as_int64(); ledgerIndex = object.at(JS(ledger_index)).as_int64();
} }
@@ -245,10 +257,13 @@ SubscriptionSource::handleMessage(std::string const& message)
// 2 - Validated transaction // 2 - Validated transaction
// Only forward proposed transaction, validated transactions are sent by Clio itself // Only forward proposed transaction, validated transactions are sent by Clio itself
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) { if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
subscriptions_->forwardProposedTransaction(object); subscriptions_->forwardProposedTransaction(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) { } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
LOG(log_.debug()) << "Forwarding validation: " << object;
subscriptions_->forwardValidation(object); subscriptions_->forwardValidation(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) { } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object); subscriptions_->forwardManifest(object);
} }
} }
@@ -261,7 +276,7 @@ SubscriptionSource::handleMessage(std::string const& message)
return std::nullopt; return std::nullopt;
} catch (std::exception const& e) { } catch (std::exception const& e) {
LOG(log_.error()) << "Exception in handleMessage : " << e.what(); LOG(log_.error()) << "Exception in handleMessage: " << e.what();
return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())}; return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
} }
} }
@@ -270,16 +285,14 @@ void
SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield) SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield)
{ {
isConnected_ = false; isConnected_ = false;
isForwarding_ = false; bool const wasForwarding = isForwarding_.exchange(false);
if (not stop_) { if (not stop_) {
onDisconnect_(); LOG(log_.info()) << "Disconnected";
onDisconnect_(wasForwarding);
} }
if (wsConnection_ != nullptr) { if (wsConnection_ != nullptr) {
auto const err = wsConnection_->close(yield); wsConnection_->close(yield);
if (err) {
LOG(log_.error()) << "Error closing websocket connection: " << err->message();
}
wsConnection_.reset(); wsConnection_.reset();
} }
@@ -306,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const
void void
SubscriptionSource::setLastMessageTime() SubscriptionSource::setLastMessageTime()
{ {
lastMessageTime_.lock().get() = std::chrono::steady_clock::now(); lastMessageTimeSecondsSinceEpoch_.get().set(
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count()
);
auto lock = lastMessageTime_.lock();
lock.get() = std::chrono::steady_clock::now();
} }
void void

View File

@@ -19,12 +19,13 @@
#pragma once #pragma once
#include "etl/ETLHelpers.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp" #include "etl/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp" #include "util/Mutex.hpp"
#include "util/Retry.hpp" #include "util/Retry.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/requests/Types.hpp" #include "util/requests/Types.hpp"
#include "util/requests/WsConnection.hpp" #include "util/requests/WsConnection.hpp"
@@ -37,6 +38,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
#include <functional>
#include <future> #include <future>
#include <memory> #include <memory>
#include <optional> #include <optional>
@@ -71,6 +73,8 @@ private:
boost::asio::strand<boost::asio::io_context::executor_type> strand_; boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::chrono::steady_clock::duration wsTimeout_;
util::Retry retry_; util::Retry retry_;
OnConnectHook onConnect_; OnConnectHook onConnect_;
@@ -83,9 +87,11 @@ private:
util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_; util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_;
std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
std::future<void> runFuture_; std::future<void> runFuture_;
static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30}; static constexpr std::chrono::seconds WS_TIMEOUT{30};
static constexpr std::chrono::seconds RETRY_MAX_DELAY{30}; static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
static constexpr std::chrono::seconds RETRY_DELAY{1}; static constexpr std::chrono::seconds RETRY_DELAY{1};
@@ -103,7 +109,7 @@ public:
* @param onNewLedger The onNewLedger hook. Called when a new ledger is received * @param onNewLedger The onNewLedger hook. Called when a new ledger is received
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is * @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
* forwarding * forwarding
* @param connectionTimeout The connection timeout. Defaults to 30 seconds * @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
* @param retryDelay The retry delay. Defaults to 1 second * @param retryDelay The retry delay. Defaults to 1 second
*/ */
SubscriptionSource( SubscriptionSource(
@@ -115,7 +121,7 @@ public:
OnConnectHook onConnect, OnConnectHook onConnect,
OnDisconnectHook onDisconnect, OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed, OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT, std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
); );

View File

@@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() { boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
notified_.clear(); notified_.clear();
signal_.emit(pubMsg); signal_.emit(pubMsg);
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts // Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
// However, if the same connection subscribe both stream and account, it will still receive the message twice. // However, if the same connection subscribe both stream and account, it will still receive the message twice.
// notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled // notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
// acts like this. // acts like this.
notified_.clear(); notified_.clear();
for (auto const& account : affectedAccounts) for (auto const& account : affectedAccounts)
accountSignal_.emit(account, pubMsg); accountSignal_.emit(account, pubMsg);
++pubCount_.get();
}); });
} }

View File

@@ -24,6 +24,7 @@
#include "feed/impl/TrackableSignalMap.hpp" #include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp" #include "feed/impl/Util.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp" #include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
@@ -54,6 +55,7 @@ class ProposedTransactionFeed {
boost::asio::strand<boost::asio::io_context::executor_type> strand_; boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_; std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_; std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_; TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_;
TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_; TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_;
@@ -67,7 +69,7 @@ public:
: strand_(boost::asio::make_strand(ioContext)) : strand_(boost::asio::make_strand(ioContext))
, subAllCount_(getSubscriptionsGaugeInt("tx_proposed")) , subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
, subAccountCount_(getSubscriptionsGaugeInt("account_proposed")) , subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))
, pubCount_(getPublishedMessagesCounterInt("tx_proposed"))
{ {
} }

View File

@@ -36,7 +36,10 @@
namespace feed::impl { namespace feed::impl {
SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name) SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
: strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name) : strand_(boost::asio::make_strand(ioContext))
, subCount_(getSubscriptionsGaugeInt(name))
, pubCount_(getPublishedMessagesCounterInt(name))
, name_(name)
{ {
} }
@@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const
boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable { boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
auto const msgPtr = std::make_shared<std::string>(std::move(msg)); auto const msgPtr = std::make_shared<std::string>(std::move(msg));
signal_.emit(msgPtr); signal_.emit(msgPtr);
++pubCount_.get();
}); });
} }

View File

@@ -22,6 +22,7 @@
#include "feed/Types.hpp" #include "feed/Types.hpp"
#include "feed/impl/TrackableSignal.hpp" #include "feed/impl/TrackableSignal.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp" #include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
@@ -40,6 +41,7 @@ namespace feed::impl {
class SingleFeedBase { class SingleFeedBase {
boost::asio::strand<boost::asio::io_context::executor_type> strand_; boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subCount_; std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_; TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
util::Logger logger_{"Subscriptions"}; util::Logger logger_{"Subscriptions"};
std::string name_; std::string name_;

View File

@@ -284,23 +284,29 @@ TransactionFeed::pub(
affectedBooks = std::move(affectedBooks)]() { affectedBooks = std::move(affectedBooks)]() {
notified_.clear(); notified_.clear();
signal_.emit(allVersionsMsgs); signal_.emit(allVersionsMsgs);
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions, // clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
// rippled SENDS the same message twice // rippled SENDS the same message twice
notified_.clear(); notified_.clear();
txProposedsignal_.emit(allVersionsMsgs); txProposedsignal_.emit(allVersionsMsgs);
notified_.clear(); notified_.clear();
// check duplicate for account and proposed_account, this prevents sending the same message multiple times // check duplicate for account and proposed_account, this prevents sending the same message multiple times
// if it affects multiple accounts watched by the same connection // if it affects multiple accounts watched by the same connection
for (auto const& account : affectedAccounts) { for (auto const& account : affectedAccounts) {
accountSignal_.emit(account, allVersionsMsgs); accountSignal_.emit(account, allVersionsMsgs);
accountProposedSignal_.emit(account, allVersionsMsgs); accountProposedSignal_.emit(account, allVersionsMsgs);
} }
notified_.clear(); notified_.clear();
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple // check duplicate for books, this prevents sending the same message multiple times if it affects multiple
// books watched by the same connection // books watched by the same connection
for (auto const& book : affectedBooks) { for (auto const& book : affectedBooks) {
bookSignal_.emit(book, allVersionsMsgs); bookSignal_.emit(book, allVersionsMsgs);
} }
++pubCount_.get();
} }
); );
} }

View File

@@ -26,6 +26,7 @@
#include "feed/impl/TrackableSignalMap.hpp" #include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp" #include "feed/impl/Util.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp" #include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
@@ -67,6 +68,7 @@ class TransactionFeed {
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_; std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_; std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_; std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_; TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_;
TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_; TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
@@ -89,6 +91,7 @@ public:
, subAllCount_(getSubscriptionsGaugeInt("tx")) , subAllCount_(getSubscriptionsGaugeInt("tx"))
, subAccountCount_(getSubscriptionsGaugeInt("account")) , subAccountCount_(getSubscriptionsGaugeInt("account"))
, subBookCount_(getSubscriptionsGaugeInt("book")) , subBookCount_(getSubscriptionsGaugeInt("book"))
, pubCount_(getPublishedMessagesCounterInt("tx"))
{ {
} }

View File

@@ -19,6 +19,7 @@
#pragma once #pragma once
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp" #include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp" #include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp" #include "util/prometheus/Prometheus.hpp"
@@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName)
fmt::format("Current subscribers number on the {} stream", counterName) fmt::format("Current subscribers number on the {} stream", counterName)
); );
} }
inline util::prometheus::CounterInt&
getPublishedMessagesCounterInt(std::string const& counterName)
{
return PrometheusService::counterInt(
"subscriptions_published_count",
util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}),
fmt::format("Total published messages on the {} stream", counterName)
);
}
} // namespace feed::impl } // namespace feed::impl

View File

@@ -22,6 +22,7 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "rpc/Counters.hpp" #include "rpc/Counters.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
#include "rpc/RPCHelpers.hpp"
#include "rpc/WorkQueue.hpp" #include "rpc/WorkQueue.hpp"
#include "rpc/common/HandlerProvider.hpp" #include "rpc/common/HandlerProvider.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
@@ -134,8 +135,13 @@ public:
Result Result
buildResponse(web::Context const& ctx) buildResponse(web::Context const& ctx)
{ {
if (forwardingProxy_.shouldForward(ctx)) if (forwardingProxy_.shouldForward(ctx)) {
// Disallow forwarding of the admin api, only user api is allowed for security reasons.
if (isAdminCmd(ctx.method, ctx.params))
return Result{Status{RippledError::rpcNO_PERMISSION}};
return forwardingProxy_.forward(ctx); return forwardingProxy_.forward(ctx);
}
if (backend_->isTooBusy()) { if (backend_->isTooBusy()) {
LOG(log_.error()) << "Database is too busy. Rejecting request"; LOG(log_.error()) << "Database is too busy. Rejecting request";

View File

@@ -24,6 +24,7 @@
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
#include "rpc/JS.hpp" #include "rpc/JS.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "util/AccountUtils.hpp"
#include "util/Profiler.hpp" #include "util/Profiler.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "web/Context.hpp" #include "web/Context.hpp"
@@ -35,6 +36,7 @@
#include <boost/json/array.hpp> #include <boost/json/array.hpp>
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
#include <boost/json/parse.hpp> #include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/string.hpp> #include <boost/json/string.hpp>
#include <boost/json/value.hpp> #include <boost/json/value.hpp>
#include <boost/json/value_to.hpp> #include <boost/json/value_to.hpp>
@@ -48,6 +50,7 @@
#include <ripple/basics/chrono.h> #include <ripple/basics/chrono.h>
#include <ripple/basics/strHex.h> #include <ripple/basics/strHex.h>
#include <ripple/beast/utility/Zero.h> #include <ripple/beast/utility/Zero.h>
#include <ripple/json/json_reader.h>
#include <ripple/json/json_value.h> #include <ripple/json/json_value.h>
#include <ripple/protocol/AccountID.h> #include <ripple/protocol/AccountID.h>
#include <ripple/protocol/Book.h> #include <ripple/protocol/Book.h>
@@ -186,14 +189,14 @@ accountFromStringStrict(std::string const& account)
if (blob && ripple::publicKeyType(ripple::makeSlice(*blob))) { if (blob && ripple::publicKeyType(ripple::makeSlice(*blob))) {
publicKey = ripple::PublicKey(ripple::Slice{blob->data(), blob->size()}); publicKey = ripple::PublicKey(ripple::Slice{blob->data(), blob->size()});
} else { } else {
publicKey = ripple::parseBase58<ripple::PublicKey>(ripple::TokenType::AccountPublic, account); publicKey = util::parseBase58Wrapper<ripple::PublicKey>(ripple::TokenType::AccountPublic, account);
} }
std::optional<ripple::AccountID> result; std::optional<ripple::AccountID> result;
if (publicKey) { if (publicKey) {
result = ripple::calcAccountID(*publicKey); result = ripple::calcAccountID(*publicKey);
} else { } else {
result = ripple::parseBase58<ripple::AccountID>(account); result = util::parseBase58Wrapper<ripple::AccountID>(account);
} }
return result; return result;
@@ -799,7 +802,7 @@ getAccountsFromTransaction(boost::json::object const& transaction)
auto inObject = getAccountsFromTransaction(value.as_object()); auto inObject = getAccountsFromTransaction(value.as_object());
accounts.insert(accounts.end(), inObject.begin(), inObject.end()); accounts.insert(accounts.end(), inObject.begin(), inObject.end());
} else if (value.is_string()) { } else if (value.is_string()) {
auto const account = ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(value)); auto const account = util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(value));
if (account) { if (account) {
accounts.push_back(*account); accounts.push_back(*account);
} }
@@ -1272,6 +1275,31 @@ specifiesCurrentOrClosedLedger(boost::json::object const& request)
return false; return false;
} }
bool
isAdminCmd(std::string const& method, boost::json::object const& request)
{
if (method == JS(ledger)) {
auto const requestStr = boost::json::serialize(request);
Json::Value jv;
Json::Reader{}.parse(requestStr, jv);
// rippled considers string/non-zero int/non-empty array/ non-empty json as true.
// Use rippled's API asBool to get the same result.
// https://github.com/XRPLF/rippled/issues/5119
auto const isFieldSet = [&jv](auto const field) { return jv.isMember(field) and jv[field].asBool(); };
// According to doc
// https://xrpl.org/docs/references/http-websocket-apis/public-api-methods/ledger-methods/ledger,
// full/accounts/type are admin only, but type only works when full/accounts are set, so we don't need to check
// type.
if (isFieldSet(JS(full)) or isFieldSet(JS(accounts)))
return true;
}
if (method == JS(feature) and request.contains(JS(vetoed)))
return true;
return false;
}
std::variant<ripple::uint256, Status> std::variant<ripple::uint256, Status>
getNFTID(boost::json::object const& request) getNFTID(boost::json::object const& request)
{ {

View File

@@ -557,6 +557,16 @@ parseIssue(boost::json::object const& issue);
bool bool
specifiesCurrentOrClosedLedger(boost::json::object const& request); specifiesCurrentOrClosedLedger(boost::json::object const& request);
/**
* @brief Check whether a request requires administrative privileges on rippled side.
*
* @param method The method name to check
* @param request The request to check
* @return true if the request requires ADMIN role
*/
bool
isAdminCmd(std::string const& method, boost::json::object const& request);
/** /**
* @brief Get the NFTID from the request * @brief Get the NFTID from the request
* *

View File

@@ -94,7 +94,7 @@ struct ReturnType {
* @param warnings The warnings generated by the RPC call * @param warnings The warnings generated by the RPC call
*/ */
ReturnType(std::expected<boost::json::value, Status> result, boost::json::array warnings = {}) ReturnType(std::expected<boost::json::value, Status> result, boost::json::array warnings = {})
: result{std::move(result)}, warnings{std::move(warnings)} : result{std::move(result)}, warnings(std::move(warnings))
{ {
} }

View File

@@ -22,6 +22,7 @@
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
#include "rpc/RPCHelpers.hpp" #include "rpc/RPCHelpers.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "util/AccountUtils.hpp"
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
#include <boost/json/value.hpp> #include <boost/json/value.hpp>
@@ -29,6 +30,7 @@
#include <fmt/core.h> #include <fmt/core.h>
#include <ripple/basics/base_uint.h> #include <ripple/basics/base_uint.h>
#include <ripple/protocol/AccountID.h> #include <ripple/protocol/AccountID.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/UintTypes.h> #include <ripple/protocol/UintTypes.h>
#include <ripple/protocol/tokens.h> #include <ripple/protocol/tokens.h>
@@ -113,7 +115,7 @@ CustomValidator AccountBase58Validator =
if (!value.is_string()) if (!value.is_string())
return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotString"}}; return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotString"}};
auto const account = ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(value)); auto const account = util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(value));
if (!account || account->isZero()) if (!account || account->isZero())
return Error{Status{ClioError::rpcMALFORMED_ADDRESS}}; return Error{Status{ClioError::rpcMALFORMED_ADDRESS}};
@@ -140,8 +142,12 @@ CustomValidator CurrencyValidator =
if (!value.is_string()) if (!value.is_string())
return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotString"}}; return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotString"}};
auto const currencyStr = boost::json::value_to<std::string>(value);
if (currencyStr.empty())
return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "IsEmpty"}};
ripple::Currency currency; ripple::Currency currency;
if (!ripple::to_currency(currency, boost::json::value_to<std::string>(value))) if (!ripple::to_currency(currency, currencyStr))
return Error{Status{ClioError::rpcMALFORMED_CURRENCY, "malformedCurrency"}}; return Error{Status{ClioError::rpcMALFORMED_CURRENCY, "malformedCurrency"}};
return MaybeError{}; return MaybeError{};

View File

@@ -39,7 +39,6 @@
#include <ripple/protocol/jss.h> #include <ripple/protocol/jss.h>
#include <cstdint> #include <cstdint>
#include <limits>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
@@ -57,8 +56,8 @@ class AccountTxHandler {
std::shared_ptr<BackendInterface> sharedPtrBackend_; std::shared_ptr<BackendInterface> sharedPtrBackend_;
public: public:
// no max limit
static auto constexpr LIMIT_MIN = 1; static auto constexpr LIMIT_MIN = 1;
static auto constexpr LIMIT_MAX = 1000;
static auto constexpr LIMIT_DEFAULT = 200; static auto constexpr LIMIT_DEFAULT = 200;
/** /**
@@ -133,7 +132,7 @@ public:
{JS(limit), {JS(limit),
validation::Type<uint32_t>{}, validation::Type<uint32_t>{},
validation::Min(1u), validation::Min(1u),
modifiers::Clamp<int32_t>{LIMIT_MIN, std::numeric_limits<int32_t>::max()}}, modifiers::Clamp<int32_t>{LIMIT_MIN, LIMIT_MAX}},
{JS(marker), {JS(marker),
meta::WithCustomError{ meta::WithCustomError{
validation::Type<boost::json::object>{}, validation::Type<boost::json::object>{},

View File

@@ -25,6 +25,7 @@
#include "rpc/common/Specs.hpp" #include "rpc/common/Specs.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "rpc/common/Validators.hpp" #include "rpc/common/Validators.hpp"
#include "util/AccountUtils.hpp"
#include <boost/json/array.hpp> #include <boost/json/array.hpp>
#include <boost/json/conversion.hpp> #include <boost/json/conversion.hpp>
@@ -116,14 +117,14 @@ public:
auto const wallets = value.is_array() ? value.as_array() : boost::json::array{value}; auto const wallets = value.is_array() ? value.as_array() : boost::json::array{value};
auto const getAccountID = [](auto const& j) -> std::optional<ripple::AccountID> { auto const getAccountID = [](auto const& j) -> std::optional<ripple::AccountID> {
if (j.is_string()) { if (j.is_string()) {
auto const pk = ripple::parseBase58<ripple::PublicKey>( auto const pk = util::parseBase58Wrapper<ripple::PublicKey>(
ripple::TokenType::AccountPublic, boost::json::value_to<std::string>(j) ripple::TokenType::AccountPublic, boost::json::value_to<std::string>(j)
); );
if (pk) if (pk)
return ripple::calcAccountID(*pk); return ripple::calcAccountID(*pk);
return ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(j)); return util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(j));
} }
return {}; return {};

View File

@@ -23,6 +23,7 @@
#include "rpc/JS.hpp" #include "rpc/JS.hpp"
#include "rpc/RPCHelpers.hpp" #include "rpc/RPCHelpers.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "util/AccountUtils.hpp"
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/bimap/bimap.hpp> #include <boost/bimap/bimap.hpp>
@@ -263,7 +264,7 @@ tag_invoke(boost::json::value_to_tag<GetAggregatePriceHandler::Input>, boost::js
for (auto const& oracle : jsonObject.at(JS(oracles)).as_array()) { for (auto const& oracle : jsonObject.at(JS(oracles)).as_array()) {
input.oracles.push_back(GetAggregatePriceHandler::Oracle{ input.oracles.push_back(GetAggregatePriceHandler::Oracle{
.documentId = boost::json::value_to<std::uint64_t>(oracle.as_object().at(JS(oracle_document_id))), .documentId = boost::json::value_to<std::uint64_t>(oracle.as_object().at(JS(oracle_document_id))),
.account = *ripple::parseBase58<ripple::AccountID>( .account = *util::parseBase58Wrapper<ripple::AccountID>(
boost::json::value_to<std::string>(oracle.as_object().at(JS(account))) boost::json::value_to<std::string>(oracle.as_object().at(JS(account)))
) )
}); });

View File

@@ -70,9 +70,8 @@ public:
* - ledger * - ledger
* - type * - type
* *
* Clio will throw an error when `queue` is set to `true` * Clio will throw an error when `queue`, `full` or `accounts` is set to `true`.
* or if `full` or `accounts` are used. * @see https://github.com/XRPLF/clio/issues/603 and https://github.com/XRPLF/clio/issues/1537
* @see https://github.com/XRPLF/clio/issues/603
*/ */
struct Input { struct Input {
std::optional<std::string> ledgerHash; std::optional<std::string> ledgerHash;
@@ -105,9 +104,9 @@ public:
spec([[maybe_unused]] uint32_t apiVersion) spec([[maybe_unused]] uint32_t apiVersion)
{ {
static auto const rpcSpec = RpcSpec{ static auto const rpcSpec = RpcSpec{
{JS(full), validation::NotSupported{}}, {JS(full), validation::Type<bool>{}, validation::NotSupported{true}},
{JS(full), check::Deprecated{}}, {JS(full), check::Deprecated{}},
{JS(accounts), validation::NotSupported{}}, {JS(accounts), validation::Type<bool>{}, validation::NotSupported{true}},
{JS(accounts), check::Deprecated{}}, {JS(accounts), check::Deprecated{}},
{JS(owner_funds), validation::Type<bool>{}}, {JS(owner_funds), validation::Type<bool>{}},
{JS(queue), validation::Type<bool>{}, validation::NotSupported{true}}, {JS(queue), validation::Type<bool>{}, validation::NotSupported{true}},

View File

@@ -23,6 +23,7 @@
#include "rpc/JS.hpp" #include "rpc/JS.hpp"
#include "rpc/RPCHelpers.hpp" #include "rpc/RPCHelpers.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "util/AccountUtils.hpp"
#include <boost/json/conversion.hpp> #include <boost/json/conversion.hpp>
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
@@ -62,9 +63,9 @@ LedgerEntryHandler::process(LedgerEntryHandler::Input input, Context const& ctx)
if (input.index) { if (input.index) {
key = ripple::uint256{std::string_view(*(input.index))}; key = ripple::uint256{std::string_view(*(input.index))};
} else if (input.accountRoot) { } else if (input.accountRoot) {
key = ripple::keylet::account(*ripple::parseBase58<ripple::AccountID>(*(input.accountRoot))).key; key = ripple::keylet::account(*util::parseBase58Wrapper<ripple::AccountID>(*(input.accountRoot))).key;
} else if (input.did) { } else if (input.did) {
key = ripple::keylet::did(*ripple::parseBase58<ripple::AccountID>(*(input.did))).key; key = ripple::keylet::did(*util::parseBase58Wrapper<ripple::AccountID>(*(input.did))).key;
} else if (input.directory) { } else if (input.directory) {
auto const keyOrStatus = composeKeyFromDirectory(*input.directory); auto const keyOrStatus = composeKeyFromDirectory(*input.directory);
if (auto const status = std::get_if<Status>(&keyOrStatus)) if (auto const status = std::get_if<Status>(&keyOrStatus))
@@ -73,13 +74,14 @@ LedgerEntryHandler::process(LedgerEntryHandler::Input input, Context const& ctx)
key = std::get<ripple::uint256>(keyOrStatus); key = std::get<ripple::uint256>(keyOrStatus);
} else if (input.offer) { } else if (input.offer) {
auto const id = auto const id =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(input.offer->at(JS(account)))); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(input.offer->at(JS(account)))
);
key = ripple::keylet::offer(*id, boost::json::value_to<std::uint32_t>(input.offer->at(JS(seq)))).key; key = ripple::keylet::offer(*id, boost::json::value_to<std::uint32_t>(input.offer->at(JS(seq)))).key;
} else if (input.rippleStateAccount) { } else if (input.rippleStateAccount) {
auto const id1 = ripple::parseBase58<ripple::AccountID>( auto const id1 = util::parseBase58Wrapper<ripple::AccountID>(
boost::json::value_to<std::string>(input.rippleStateAccount->at(JS(accounts)).as_array().at(0)) boost::json::value_to<std::string>(input.rippleStateAccount->at(JS(accounts)).as_array().at(0))
); );
auto const id2 = ripple::parseBase58<ripple::AccountID>( auto const id2 = util::parseBase58Wrapper<ripple::AccountID>(
boost::json::value_to<std::string>(input.rippleStateAccount->at(JS(accounts)).as_array().at(1)) boost::json::value_to<std::string>(input.rippleStateAccount->at(JS(accounts)).as_array().at(1))
); );
auto const currency = auto const currency =
@@ -88,20 +90,22 @@ LedgerEntryHandler::process(LedgerEntryHandler::Input input, Context const& ctx)
key = ripple::keylet::line(*id1, *id2, currency).key; key = ripple::keylet::line(*id1, *id2, currency).key;
} else if (input.escrow) { } else if (input.escrow) {
auto const id = auto const id =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(input.escrow->at(JS(owner)))); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(input.escrow->at(JS(owner)))
);
key = ripple::keylet::escrow(*id, input.escrow->at(JS(seq)).as_int64()).key; key = ripple::keylet::escrow(*id, input.escrow->at(JS(seq)).as_int64()).key;
} else if (input.depositPreauth) { } else if (input.depositPreauth) {
auto const owner = ripple::parseBase58<ripple::AccountID>( auto const owner = util::parseBase58Wrapper<ripple::AccountID>(
boost::json::value_to<std::string>(input.depositPreauth->at(JS(owner))) boost::json::value_to<std::string>(input.depositPreauth->at(JS(owner)))
); );
auto const authorized = ripple::parseBase58<ripple::AccountID>( auto const authorized = util::parseBase58Wrapper<ripple::AccountID>(
boost::json::value_to<std::string>(input.depositPreauth->at(JS(authorized))) boost::json::value_to<std::string>(input.depositPreauth->at(JS(authorized)))
); );
key = ripple::keylet::depositPreauth(*owner, *authorized).key; key = ripple::keylet::depositPreauth(*owner, *authorized).key;
} else if (input.ticket) { } else if (input.ticket) {
auto const id = auto const id =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(input.ticket->at(JS(account)))); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(input.ticket->at(JS(account))
));
key = ripple::getTicketIndex(*id, input.ticket->at(JS(ticket_seq)).as_int64()); key = ripple::getTicketIndex(*id, input.ticket->at(JS(ticket_seq)).as_int64());
} else if (input.amm) { } else if (input.amm) {
@@ -112,7 +116,8 @@ LedgerEntryHandler::process(LedgerEntryHandler::Input input, Context const& ctx)
return ripple::xrpIssue(); return ripple::xrpIssue();
} }
auto const issuer = auto const issuer =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(assetJson.at(JS(issuer)))); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(assetJson.at(JS(issuer)))
);
return ripple::Issue{currency, *issuer}; return ripple::Issue{currency, *issuer};
}; };
@@ -125,7 +130,7 @@ LedgerEntryHandler::process(LedgerEntryHandler::Input input, Context const& ctx)
return Error{Status{ClioError::rpcMALFORMED_REQUEST}}; return Error{Status{ClioError::rpcMALFORMED_REQUEST}};
if (input.bridgeAccount) { if (input.bridgeAccount) {
auto const bridgeAccount = ripple::parseBase58<ripple::AccountID>(*(input.bridgeAccount)); auto const bridgeAccount = util::parseBase58Wrapper<ripple::AccountID>(*(input.bridgeAccount));
auto const chainType = ripple::STXChainBridge::srcChain(bridgeAccount == input.bridge->lockingChainDoor()); auto const chainType = ripple::STXChainBridge::srcChain(bridgeAccount == input.bridge->lockingChainDoor());
if (bridgeAccount != input.bridge->door(chainType)) if (bridgeAccount != input.bridge->door(chainType))
@@ -201,7 +206,7 @@ LedgerEntryHandler::composeKeyFromDirectory(boost::json::object const& directory
} }
auto const ownerID = auto const ownerID =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(directory.at(JS(owner)))); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(directory.at(JS(owner))));
return ripple::keylet::page(ripple::keylet::ownerDir(*ownerID), subIndex).key; return ripple::keylet::page(ripple::keylet::ownerDir(*ownerID), subIndex).key;
} }
@@ -262,10 +267,10 @@ tag_invoke(boost::json::value_to_tag<LedgerEntryHandler::Input>, boost::json::va
}; };
auto const parseBridgeFromJson = [](boost::json::value const& bridgeJson) { auto const parseBridgeFromJson = [](boost::json::value const& bridgeJson) {
auto const lockingDoor = *ripple::parseBase58<ripple::AccountID>( auto const lockingDoor = *util::parseBase58Wrapper<ripple::AccountID>(
boost::json::value_to<std::string>(bridgeJson.at(ripple::sfLockingChainDoor.getJsonName().c_str())) boost::json::value_to<std::string>(bridgeJson.at(ripple::sfLockingChainDoor.getJsonName().c_str()))
); );
auto const issuingDoor = *ripple::parseBase58<ripple::AccountID>( auto const issuingDoor = *util::parseBase58Wrapper<ripple::AccountID>(
boost::json::value_to<std::string>(bridgeJson.at(ripple::sfIssuingChainDoor.getJsonName().c_str())) boost::json::value_to<std::string>(bridgeJson.at(ripple::sfIssuingChainDoor.getJsonName().c_str()))
); );
auto const lockingIssue = auto const lockingIssue =
@@ -278,7 +283,7 @@ tag_invoke(boost::json::value_to_tag<LedgerEntryHandler::Input>, boost::json::va
auto const parseOracleFromJson = [](boost::json::value const& json) { auto const parseOracleFromJson = [](boost::json::value const& json) {
auto const account = auto const account =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(json.at(JS(account)))); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(json.at(JS(account))));
auto const documentId = boost::json::value_to<uint32_t>(json.at(JS(oracle_document_id))); auto const documentId = boost::json::value_to<uint32_t>(json.at(JS(oracle_document_id)));
return ripple::keylet::oracle(*account, documentId).key; return ripple::keylet::oracle(*account, documentId).key;

View File

@@ -28,6 +28,7 @@
#include "rpc/common/Specs.hpp" #include "rpc/common/Specs.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "rpc/common/Validators.hpp" #include "rpc/common/Validators.hpp"
#include "util/AccountUtils.hpp"
#include <boost/json/conversion.hpp> #include <boost/json/conversion.hpp>
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
@@ -136,9 +137,11 @@ public:
} }
auto const id1 = auto const id1 =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(value.as_array()[0])); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(value.as_array()[0])
);
auto const id2 = auto const id2 =
ripple::parseBase58<ripple::AccountID>(boost::json::value_to<std::string>(value.as_array()[1])); util::parseBase58Wrapper<ripple::AccountID>(boost::json::value_to<std::string>(value.as_array()[1])
);
if (!id1 || !id2) if (!id1 || !id2)
return Error{Status{ClioError::rpcMALFORMED_ADDRESS, "malformedAddresses"}}; return Error{Status{ClioError::rpcMALFORMED_ADDRESS, "malformedAddresses"}};

67
src/util/AccountUtils.hpp Normal file
View File

@@ -0,0 +1,67 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <ripple/protocol/tokens.h>
#include <cctype>
#include <optional>
#include <string>
namespace util {
/**
* @brief A wrapper of parseBase58 function. It adds the check if all characters in the input string are alphanumeric.
* If not, it returns an empty optional, instead of calling the parseBase58 function.
*
* @tparam T The type of the value to parse to.
* @param str The string to parse.
* @return An optional with the parsed value, or an empty optional if the parse fails.
*/
template <class T>
[[nodiscard]] std::optional<T>
parseBase58Wrapper(std::string const& str)
{
if (!std::all_of(std::begin(str), std::end(str), [](unsigned char c) { return std::isalnum(c); }))
return std::nullopt;
return ripple::parseBase58<T>(str);
}
/**
* @brief A wrapper of parseBase58 function. It add the check if all characters in the input string are alphanumeric. If
* not, it returns an empty optional, instead of calling the parseBase58 function.
*
* @tparam T The type of the value to parse to.
* @param type The type of the token to parse.
* @param str The string to parse.
* @return An optional with the parsed value, or an empty optional if the parse fails.
*/
template <class T>
[[nodiscard]] std::optional<T>
parseBase58Wrapper(ripple::TokenType type, std::string const& str)
{
if (!std::all_of(std::begin(str), std::end(str), [](unsigned char c) { return std::isalnum(c); }))
return std::nullopt;
return ripple::parseBase58<T>(type, str);
}
} // namespace util

View File

@@ -39,8 +39,10 @@
#include <boost/beast/websocket/stream_base.hpp> #include <boost/beast/websocket/stream_base.hpp>
#include <boost/system/errc.hpp> #include <boost/system/errc.hpp>
#include <atomic>
#include <chrono> #include <chrono>
#include <expected> #include <expected>
#include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
#include <utility> #include <utility>
@@ -123,15 +125,20 @@ private:
static void static void
withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout) withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
{ {
auto isCompleted = std::make_shared<bool>(false);
boost::asio::cancellation_signal cancellationSignal; boost::asio::cancellation_signal cancellationSignal;
auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield); auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield);
boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout}; boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout};
timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) {
if (!errorCode) // The timer below can be called with no error code even if the operation is completed before the timeout, so we
// need an additional flag here
timer.async_wait([&cancellationSignal, isCompleted](boost::system::error_code errorCode) {
if (!errorCode and not *isCompleted)
cancellationSignal.emit(boost::asio::cancellation_type::terminal); cancellationSignal.emit(boost::asio::cancellation_type::terminal);
}); });
operation(cyield); operation(cyield);
*isCompleted = true;
} }
static boost::system::error_code static boost::system::error_code

View File

@@ -21,6 +21,7 @@
#include "data/DBHelpers.hpp" #include "data/DBHelpers.hpp"
#include "data/Types.hpp" #include "data/Types.hpp"
#include "util/AccountUtils.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include <ripple/basics/Blob.h> #include <ripple/basics/Blob.h>
@@ -60,7 +61,7 @@ constexpr static auto INDEX1 = "1B8590C01B0006EDFA9ED60296DD052DC5E90F99659B2501
ripple::AccountID ripple::AccountID
GetAccountIDWithString(std::string_view id) GetAccountIDWithString(std::string_view id)
{ {
return ripple::parseBase58<ripple::AccountID>(std::string(id)).value(); return util::parseBase58Wrapper<ripple::AccountID>(std::string(id)).value();
} }
ripple::uint256 ripple::uint256
@@ -154,11 +155,11 @@ CreatePaymentTransactionObject(
{ {
ripple::STObject obj(ripple::sfTransaction); ripple::STObject obj(ripple::sfTransaction);
obj.setFieldU16(ripple::sfTransactionType, ripple::ttPAYMENT); obj.setFieldU16(ripple::sfTransactionType, ripple::ttPAYMENT);
auto account = ripple::parseBase58<ripple::AccountID>(std::string(accountId1)); auto account = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId1));
obj.setAccountID(ripple::sfAccount, account.value()); obj.setAccountID(ripple::sfAccount, account.value());
obj.setFieldAmount(ripple::sfAmount, ripple::STAmount(amount, false)); obj.setFieldAmount(ripple::sfAmount, ripple::STAmount(amount, false));
obj.setFieldAmount(ripple::sfFee, ripple::STAmount(fee, false)); obj.setFieldAmount(ripple::sfFee, ripple::STAmount(fee, false));
auto account2 = ripple::parseBase58<ripple::AccountID>(std::string(accountId2)); auto account2 = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId2));
obj.setAccountID(ripple::sfDestination, account2.value()); obj.setAccountID(ripple::sfDestination, account2.value());
obj.setFieldU32(ripple::sfSequence, seq); obj.setFieldU32(ripple::sfSequence, seq);
char const* key = "test"; char const* key = "test";
@@ -258,14 +259,14 @@ CreateCreateOfferTransactionObject(
{ {
ripple::STObject obj(ripple::sfTransaction); ripple::STObject obj(ripple::sfTransaction);
obj.setFieldU16(ripple::sfTransactionType, ripple::ttOFFER_CREATE); obj.setFieldU16(ripple::sfTransactionType, ripple::ttOFFER_CREATE);
auto account = ripple::parseBase58<ripple::AccountID>(std::string(accountId)); auto account = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId));
obj.setAccountID(ripple::sfAccount, account.value()); obj.setAccountID(ripple::sfAccount, account.value());
auto amount = ripple::STAmount(fee, false); auto amount = ripple::STAmount(fee, false);
obj.setFieldAmount(ripple::sfFee, amount); obj.setFieldAmount(ripple::sfFee, amount);
obj.setFieldU32(ripple::sfSequence, seq); obj.setFieldU32(ripple::sfSequence, seq);
// add amount // add amount
ripple::Issue const issue1( ripple::Issue const issue1(
ripple::Currency{currency}, ripple::parseBase58<ripple::AccountID>(std::string(issuer)).value() ripple::Currency{currency}, util::parseBase58Wrapper<ripple::AccountID>(std::string(issuer)).value()
); );
if (reverse) { if (reverse) {
obj.setFieldAmount(ripple::sfTakerPays, ripple::STAmount(issue1, takerGets)); obj.setFieldAmount(ripple::sfTakerPays, ripple::STAmount(issue1, takerGets));
@@ -288,11 +289,11 @@ GetIssue(std::string_view currency, std::string_view issuerId)
if (currency.size() == 3) { if (currency.size() == 3) {
return ripple::Issue( return ripple::Issue(
ripple::to_currency(std::string(currency)), ripple::to_currency(std::string(currency)),
ripple::parseBase58<ripple::AccountID>(std::string(issuerId)).value() util::parseBase58Wrapper<ripple::AccountID>(std::string(issuerId)).value()
); );
} }
return ripple::Issue( return ripple::Issue(
ripple::Currency{currency}, ripple::parseBase58<ripple::AccountID>(std::string(issuerId)).value() ripple::Currency{currency}, util::parseBase58Wrapper<ripple::AccountID>(std::string(issuerId)).value()
); );
} }
@@ -636,7 +637,7 @@ CreateMintNFTTxWithMetadata(
// tx // tx
ripple::STObject tx(ripple::sfTransaction); ripple::STObject tx(ripple::sfTransaction);
tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_MINT); tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_MINT);
auto account = ripple::parseBase58<ripple::AccountID>(std::string(accountId)); auto account = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId));
tx.setAccountID(ripple::sfAccount, account.value()); tx.setAccountID(ripple::sfAccount, account.value());
auto amount = ripple::STAmount(fee, false); auto amount = ripple::STAmount(fee, false);
tx.setFieldAmount(ripple::sfFee, amount); tx.setFieldAmount(ripple::sfFee, amount);
@@ -693,7 +694,7 @@ CreateAcceptNFTOfferTxWithMetadata(std::string_view accountId, uint32_t seq, uin
// tx // tx
ripple::STObject tx(ripple::sfTransaction); ripple::STObject tx(ripple::sfTransaction);
tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_ACCEPT_OFFER); tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_ACCEPT_OFFER);
auto account = ripple::parseBase58<ripple::AccountID>(std::string(accountId)); auto account = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId));
tx.setAccountID(ripple::sfAccount, account.value()); tx.setAccountID(ripple::sfAccount, account.value());
auto amount = ripple::STAmount(fee, false); auto amount = ripple::STAmount(fee, false);
tx.setFieldAmount(ripple::sfFee, amount); tx.setFieldAmount(ripple::sfFee, amount);
@@ -737,7 +738,7 @@ CreateCancelNFTOffersTxWithMetadata(
// tx // tx
ripple::STObject tx(ripple::sfTransaction); ripple::STObject tx(ripple::sfTransaction);
tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_CANCEL_OFFER); tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_CANCEL_OFFER);
auto account = ripple::parseBase58<ripple::AccountID>(std::string(accountId)); auto account = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId));
tx.setAccountID(ripple::sfAccount, account.value()); tx.setAccountID(ripple::sfAccount, account.value());
auto amount = ripple::STAmount(fee, false); auto amount = ripple::STAmount(fee, false);
tx.setFieldAmount(ripple::sfFee, amount); tx.setFieldAmount(ripple::sfFee, amount);
@@ -791,7 +792,7 @@ CreateCreateNFTOfferTxWithMetadata(
// tx // tx
ripple::STObject tx(ripple::sfTransaction); ripple::STObject tx(ripple::sfTransaction);
tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_CREATE_OFFER); tx.setFieldU16(ripple::sfTransactionType, ripple::ttNFTOKEN_CREATE_OFFER);
auto account = ripple::parseBase58<ripple::AccountID>(std::string(accountId)); auto account = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId));
tx.setAccountID(ripple::sfAccount, account.value()); tx.setAccountID(ripple::sfAccount, account.value());
auto amount = ripple::STAmount(fee, false); auto amount = ripple::STAmount(fee, false);
tx.setFieldAmount(ripple::sfFee, amount); tx.setFieldAmount(ripple::sfFee, amount);
@@ -839,7 +840,7 @@ CreateOracleSetTxWithMetadata(
// tx // tx
ripple::STObject tx(ripple::sfTransaction); ripple::STObject tx(ripple::sfTransaction);
tx.setFieldU16(ripple::sfTransactionType, ripple::ttORACLE_SET); tx.setFieldU16(ripple::sfTransactionType, ripple::ttORACLE_SET);
auto account = ripple::parseBase58<ripple::AccountID>(std::string(accountId)); auto account = util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId));
tx.setAccountID(ripple::sfAccount, account.value()); tx.setAccountID(ripple::sfAccount, account.value());
auto amount = ripple::STAmount(fee, false); auto amount = ripple::STAmount(fee, false);
tx.setFieldAmount(ripple::sfFee, amount); tx.setFieldAmount(ripple::sfFee, amount);
@@ -909,7 +910,7 @@ CreateAMMObject(
amm.setFieldIssue(ripple::sfAsset2, ripple::STIssue{ripple::sfAsset2, GetIssue(asset2Currency, asset2Issuer)}); amm.setFieldIssue(ripple::sfAsset2, ripple::STIssue{ripple::sfAsset2, GetIssue(asset2Currency, asset2Issuer)});
ripple::Issue const issue1( ripple::Issue const issue1(
ripple::Currency{lpTokenBalanceIssueCurrency}, ripple::Currency{lpTokenBalanceIssueCurrency},
ripple::parseBase58<ripple::AccountID>(std::string(accountId)).value() util::parseBase58Wrapper<ripple::AccountID>(std::string(accountId)).value()
); );
amm.setFieldAmount(ripple::sfLPTokenBalance, ripple::STAmount(issue1, lpTokenBalanceIssueAmount)); amm.setFieldAmount(ripple::sfLPTokenBalance, ripple::STAmount(issue1, lpTokenBalanceIssueAmount));
amm.setFieldU32(ripple::sfFlags, 0); amm.setFieldU32(ripple::sfFlags, 0);

View File

@@ -92,6 +92,7 @@ target_sources(
rpc/JsonBoolTests.cpp rpc/JsonBoolTests.cpp
rpc/RPCHelpersTests.cpp rpc/RPCHelpersTests.cpp
rpc/WorkQueueTests.cpp rpc/WorkQueueTests.cpp
util/AccountUtilsTests.cpp
util/AssertTests.cpp util/AssertTests.cpp
# Async framework # Async framework
util/async/AnyExecutionContextTests.cpp util/async/AnyExecutionContextTests.cpp

View File

@@ -271,15 +271,12 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0Disconnects)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)); EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true)); EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect(); sourceFactory_.callbacksAt(0).onDisconnect(true);
} }
TEST_F(LoadBalancerOnDisconnectHookTests, source1Disconnects) TEST_F(LoadBalancerOnDisconnectHookTests, source1Disconnects)
{ {
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true)); sourceFactory_.callbacksAt(1).onDisconnect(false);
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();
} }
TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack) TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
@@ -288,29 +285,25 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)); EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true)); EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect(); sourceFactory_.callbacksAt(0).onDisconnect(true);
sourceFactory_.callbacksAt(0).onConnect(); sourceFactory_.callbacksAt(0).onConnect();
} }
TEST_F(LoadBalancerOnDisconnectHookTests, source1DisconnectsAndConnectsBack) TEST_F(LoadBalancerOnDisconnectHookTests, source1DisconnectsAndConnectsBack)
{ {
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true)); sourceFactory_.callbacksAt(1).onDisconnect(false);
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();
sourceFactory_.callbacksAt(1).onConnect(); sourceFactory_.callbacksAt(1).onConnect();
} }
TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack) TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack)
{ {
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).Times(2).WillRepeatedly(Return(false)); EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)).Times(2); EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).Times(2).WillRepeatedly(Return(false)); EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)).Times(2); EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(0).onDisconnect(); sourceFactory_.callbacksAt(0).onDisconnect(true);
sourceFactory_.callbacksAt(1).onDisconnect(); sourceFactory_.callbacksAt(1).onDisconnect(false);
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true)); EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
@@ -353,12 +346,7 @@ TEST_F(LoadBalancer3SourcesTests, forwardingUpdate)
sourceFactory_.callbacksAt(1).onConnect(); sourceFactory_.callbacksAt(1).onConnect();
// Source 0 got disconnected // Source 0 got disconnected
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false)); sourceFactory_.callbacksAt(0).onDisconnect(false);
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(2), setForwarding(false)); // only source 1 must be forwarding
sourceFactory_.callbacksAt(0).onDisconnect();
} }
struct LoadBalancerLoadInitialLedgerTests : LoadBalancerOnConnectHookTests { struct LoadBalancerLoadInitialLedgerTests : LoadBalancerOnConnectHookTests {

View File

@@ -18,31 +18,35 @@
//============================================================================== //==============================================================================
#include "etl/impl/SubscriptionSource.hpp" #include "etl/impl/SubscriptionSource.hpp"
#include "util/AssignRandomPort.hpp"
#include "util/Fixtures.hpp" #include "util/Fixtures.hpp"
#include "util/MockNetworkValidatedLedgers.hpp" #include "util/MockNetworkValidatedLedgers.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp" #include "util/MockSubscriptionManager.hpp"
#include "util/TestWsServer.hpp" #include "util/TestWsServer.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
#include <boost/json/serialize.hpp> #include <boost/json/serialize.hpp>
#include <fmt/core.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
#include <cstdlib>
#include <optional> #include <optional>
#include <string> #include <string>
#include <thread>
#include <utility> #include <utility>
using namespace etl::impl; using namespace etl::impl;
using testing::MockFunction; using testing::MockFunction;
using testing::StrictMock; using testing::StrictMock;
struct SubscriptionSourceConnectionTests : public NoLoggerFixture { struct SubscriptionSourceConnectionTestsBase : public NoLoggerFixture {
SubscriptionSourceConnectionTests() SubscriptionSourceConnectionTestsBase()
{ {
subscriptionSource_.run(); subscriptionSource_.run();
} }
@@ -54,7 +58,7 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
StrictMockSubscriptionManagerSharedPtr subscriptionManager_; StrictMockSubscriptionManagerSharedPtr subscriptionManager_;
StrictMock<MockFunction<void()>> onConnectHook_; StrictMock<MockFunction<void()>> onConnectHook_;
StrictMock<MockFunction<void()>> onDisconnectHook_; StrictMock<MockFunction<void(bool)>> onDisconnectHook_;
StrictMock<MockFunction<void()>> onLedgerClosedHook_; StrictMock<MockFunction<void()>> onLedgerClosedHook_;
SubscriptionSource subscriptionSource_{ SubscriptionSource subscriptionSource_{
@@ -66,8 +70,8 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
onConnectHook_.AsStdFunction(), onConnectHook_.AsStdFunction(),
onDisconnectHook_.AsStdFunction(), onDisconnectHook_.AsStdFunction(),
onLedgerClosedHook_.AsStdFunction(), onLedgerClosedHook_.AsStdFunction(),
std::chrono::milliseconds(1), std::chrono::milliseconds(5),
std::chrono::milliseconds(1) std::chrono::milliseconds(5)
}; };
[[maybe_unused]] TestWsConnection [[maybe_unused]] TestWsConnection
@@ -92,15 +96,17 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
} }
}; };
struct SubscriptionSourceConnectionTests : util::prometheus::WithPrometheus, SubscriptionSourceConnectionTestsBase {};
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed) TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed)
{ {
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed) TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed)
{ {
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -112,7 +118,19 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
}
TEST_F(SubscriptionSourceConnectionTests, ReadTimeout)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
std::this_thread::sleep_for(std::chrono::milliseconds{10});
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -126,7 +144,7 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect)
}); });
EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -139,14 +157,14 @@ TEST_F(SubscriptionSourceConnectionTests, IsConnected)
}); });
EXPECT_CALL(onConnectHook_, Call()).WillOnce([this]() { EXPECT_TRUE(subscriptionSource_.isConnected()); }); EXPECT_CALL(onConnectHook_, Call()).WillOnce([this]() { EXPECT_TRUE(subscriptionSource_.isConnected()); });
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() {
EXPECT_FALSE(subscriptionSource_.isConnected()); EXPECT_FALSE(subscriptionSource_.isConnected());
subscriptionSource_.stop(); subscriptionSource_.stop();
}); });
ioContext_.run(); ioContext_.run();
} }
struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests { struct SubscriptionSourceReadTestsBase : public SubscriptionSourceConnectionTestsBase {
[[maybe_unused]] TestWsConnection [[maybe_unused]] TestWsConnection
connectAndSendMessage(std::string const message, boost::asio::yield_context yield) connectAndSendMessage(std::string const message, boost::asio::yield_context yield)
{ {
@@ -157,6 +175,8 @@ struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests {
} }
}; };
struct SubscriptionSourceReadTests : util::prometheus::WithPrometheus, SubscriptionSourceReadTestsBase {};
TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect) TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
{ {
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) { boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
@@ -167,7 +187,7 @@ TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
}); });
EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -179,7 +199,7 @@ TEST_F(SubscriptionSourceReadTests, GotResult)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -191,7 +211,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndex)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123)); EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run(); ioContext_.run();
} }
@@ -206,7 +226,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect)
}); });
EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -220,7 +240,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconn
}); });
EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -242,7 +262,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
EXPECT_TRUE(subscriptionSource_.hasLedger(123)); EXPECT_TRUE(subscriptionSource_.hasLedger(123));
@@ -268,7 +288,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reco
}); });
EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -286,7 +306,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123)); EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run(); ioContext_.run();
@@ -306,7 +326,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosed)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -321,7 +341,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet)
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onLedgerClosedHook_, Call()); EXPECT_CALL(onLedgerClosedHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() {
EXPECT_FALSE(subscriptionSource_.isForwarding()); EXPECT_FALSE(subscriptionSource_.isForwarding());
subscriptionSource_.stop(); subscriptionSource_.stop();
}); });
@@ -336,7 +356,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123)); EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run(); ioContext_.run();
} }
@@ -351,7 +371,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Recon
}); });
EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -365,7 +385,7 @@ TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_
}); });
EXPECT_CALL(onConnectHook_, Call()).Times(2); EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -382,7 +402,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
EXPECT_FALSE(subscriptionSource_.hasLedger(0)); EXPECT_FALSE(subscriptionSource_.hasLedger(0));
@@ -406,7 +426,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLe
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123)); EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run(); ioContext_.run();
@@ -425,7 +445,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingFalse)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -440,7 +460,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)); EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message));
ioContext_.run(); ioContext_.run();
} }
@@ -456,7 +476,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0); EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0);
ioContext_.run(); ioContext_.run();
} }
@@ -469,7 +489,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -484,7 +504,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardValidation(message)); EXPECT_CALL(*subscriptionManager_, forwardValidation(message));
ioContext_.run(); ioContext_.run();
} }
@@ -497,7 +517,7 @@ TEST_F(SubscriptionSourceReadTests, GotManiefstReceivedIsForwardingFalse)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
} }
@@ -512,7 +532,7 @@ TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(*subscriptionManager_, forwardManifest(message)); EXPECT_CALL(*subscriptionManager_, forwardManifest(message));
ioContext_.run(); ioContext_.run();
} }
@@ -525,7 +545,7 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime)
}); });
EXPECT_CALL(onConnectHook_, Call()); EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); }); EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run(); ioContext_.run();
auto const actualLastTimeMessage = subscriptionSource_.lastMessageTime(); auto const actualLastTimeMessage = subscriptionSource_.lastMessageTime();
@@ -533,3 +553,27 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime)
auto const diff = std::chrono::duration_cast<std::chrono::milliseconds>(now - actualLastTimeMessage); auto const diff = std::chrono::duration_cast<std::chrono::milliseconds>(now - actualLastTimeMessage);
EXPECT_LT(diff, std::chrono::milliseconds(100)); EXPECT_LT(diff, std::chrono::milliseconds(100));
} }
struct SubscriptionSourcePrometheusCounterTests : util::prometheus::WithMockPrometheus,
SubscriptionSourceReadTestsBase {};
TEST_F(SubscriptionSourcePrometheusCounterTests, LastMessageTime)
{
auto& lastMessageTimeMock = makeMock<util::prometheus::GaugeInt>(
"subscription_source_last_message_time", fmt::format("{{source=\"127.0.0.1:{}\"}}", wsServer_.port())
);
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage("some_message", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(lastMessageTimeMock, set).WillOnce([](int64_t value) {
auto const now =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
EXPECT_LE(now - value, 1);
});
ioContext_.run();
}

View File

@@ -426,6 +426,9 @@ TEST_F(RPCBaseTest, AccountValidator)
failingInput = json::parse(R"({ "account": "02000000000000000000000000000000000000000000000000000000000000000" })"); failingInput = json::parse(R"({ "account": "02000000000000000000000000000000000000000000000000000000000000000" })");
ASSERT_FALSE(spec.process(failingInput)); ASSERT_FALSE(spec.process(failingInput));
failingInput = json::parse(R"({ "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jp?" })");
ASSERT_FALSE(spec.process(failingInput));
auto passingInput = json::parse(R"({ "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn" })"); auto passingInput = json::parse(R"({ "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn" })");
ASSERT_TRUE(spec.process(passingInput)); ASSERT_TRUE(spec.process(passingInput));
@@ -434,6 +437,28 @@ TEST_F(RPCBaseTest, AccountValidator)
ASSERT_TRUE(spec.process(passingInput)); ASSERT_TRUE(spec.process(passingInput));
} }
TEST_F(RPCBaseTest, AccountBase58Validator)
{
auto spec = RpcSpec{
{"account", validation::AccountBase58Validator},
};
auto failingInput = json::parse(R"({ "account": 256 })");
ASSERT_FALSE(spec.process(failingInput));
failingInput = json::parse(R"({ "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jp" })");
ASSERT_FALSE(spec.process(failingInput));
failingInput =
json::parse(R"({ "account": "020000000000000000000000000000000000000000000000000000000000000000" })");
ASSERT_FALSE(spec.process(failingInput));
failingInput = json::parse(R"({ "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jp?" })");
ASSERT_FALSE(spec.process(failingInput));
auto passingInput = json::parse(R"({ "account": "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn" })");
ASSERT_TRUE(spec.process(passingInput));
}
TEST_F(RPCBaseTest, AccountMarkerValidator) TEST_F(RPCBaseTest, AccountMarkerValidator)
{ {
auto spec = RpcSpec{ auto spec = RpcSpec{

View File

@@ -24,6 +24,7 @@
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include "util/Fixtures.hpp" #include "util/Fixtures.hpp"
#include "util/MockPrometheus.hpp" #include "util/MockPrometheus.hpp"
#include "util/NameGenerator.hpp"
#include "util/TestObject.hpp" #include "util/TestObject.hpp"
#include <boost/asio/impl/spawn.hpp> #include <boost/asio/impl/spawn.hpp>
@@ -538,3 +539,67 @@ TEST_F(RPCHelpersTest, ParseIssue)
std::runtime_error std::runtime_error
); );
} }
struct IsAdminCmdParamTestCaseBundle {
std::string testName;
std::string method;
std::string testJson;
bool expected;
};
struct IsAdminCmdParameterTest : public TestWithParam<IsAdminCmdParamTestCaseBundle> {};
static auto
generateTestValuesForParametersTest()
{
return std::vector<IsAdminCmdParamTestCaseBundle>{
{"ledgerEntry", "ledger_entry", R"({"type": false})", false},
{"featureVetoedTrue", "feature", R"({"vetoed": true, "feature": "foo"})", true},
{"featureVetoedFalse", "feature", R"({"vetoed": false, "feature": "foo"})", true},
{"featureVetoedIsStr", "feature", R"({"vetoed": "String"})", true},
{"ledger", "ledger", R"({})", false},
{"ledgerWithType", "ledger", R"({"type": "fee"})", false},
{"ledgerFullTrue", "ledger", R"({"full": true})", true},
{"ledgerFullFalse", "ledger", R"({"full": false})", false},
{"ledgerFullIsStr", "ledger", R"({"full": "String"})", true},
{"ledgerFullIsEmptyStr", "ledger", R"({"full": ""})", false},
{"ledgerFullIsNumber1", "ledger", R"({"full": 1})", true},
{"ledgerFullIsNumber0", "ledger", R"({"full": 0})", false},
{"ledgerFullIsNull", "ledger", R"({"full": null})", false},
{"ledgerFullIsFloat0", "ledger", R"({"full": 0.0})", false},
{"ledgerFullIsFloat1", "ledger", R"({"full": 0.1})", true},
{"ledgerFullIsArray", "ledger", R"({"full": [1]})", true},
{"ledgerFullIsEmptyArray", "ledger", R"({"full": []})", false},
{"ledgerFullIsObject", "ledger", R"({"full": {"key": 1}})", true},
{"ledgerFullIsEmptyObject", "ledger", R"({"full": {}})", false},
{"ledgerAccountsTrue", "ledger", R"({"accounts": true})", true},
{"ledgerAccountsFalse", "ledger", R"({"accounts": false})", false},
{"ledgerAccountsIsStr", "ledger", R"({"accounts": "String"})", true},
{"ledgerAccountsIsEmptyStr", "ledger", R"({"accounts": ""})", false},
{"ledgerAccountsIsNumber1", "ledger", R"({"accounts": 1})", true},
{"ledgerAccountsIsNumber0", "ledger", R"({"accounts": 0})", false},
{"ledgerAccountsIsNull", "ledger", R"({"accounts": null})", false},
{"ledgerAccountsIsFloat0", "ledger", R"({"accounts": 0.0})", false},
{"ledgerAccountsIsFloat1", "ledger", R"({"accounts": 0.1})", true},
{"ledgerAccountsIsArray", "ledger", R"({"accounts": [1]})", true},
{"ledgerAccountsIsEmptyArray", "ledger", R"({"accounts": []})", false},
{"ledgerAccountsIsObject", "ledger", R"({"accounts": {"key": 1}})", true},
{"ledgerAccountsIsEmptyObject", "ledger", R"({"accounts": {}})", false},
};
}
INSTANTIATE_TEST_CASE_P(
IsAdminCmdTest,
IsAdminCmdParameterTest,
ValuesIn(generateTestValuesForParametersTest()),
tests::util::NameGenerator
);
TEST_P(IsAdminCmdParameterTest, Test)
{
auto const testBundle = GetParam();
EXPECT_EQ(isAdminCmd(testBundle.method, boost::json::parse(testBundle.testJson).as_object()), testBundle.expected);
}

View File

@@ -20,6 +20,8 @@
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
#include "rpc/common/Types.hpp" #include "rpc/common/Types.hpp"
#include <boost/json/array.hpp>
#include <boost/json/value.hpp>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <expected> #include <expected>
@@ -34,3 +36,46 @@ TEST(MaybeErrorTest, OperatorEquals)
EXPECT_EQ(MaybeError{std::unexpected{Status{"Error"}}}, MaybeError{std::unexpected{Status{"Error"}}}); EXPECT_EQ(MaybeError{std::unexpected{Status{"Error"}}}, MaybeError{std::unexpected{Status{"Error"}}});
EXPECT_NE(MaybeError{std::unexpected{Status{"Error"}}}, MaybeError{std::unexpected{Status{"Another_error"}}}); EXPECT_NE(MaybeError{std::unexpected{Status{"Error"}}}, MaybeError{std::unexpected{Status{"Another_error"}}});
} }
TEST(ReturnTypeTests, Constructor)
{
boost::json::value const value{42};
{
ReturnType const r{value};
ASSERT_TRUE(r.result);
EXPECT_EQ(r.result.value(), value);
EXPECT_EQ(r.warnings, boost::json::array{});
}
{
boost::json::array const warnings{1, 2, 3};
ReturnType const r{value, warnings};
ASSERT_TRUE(r.result);
EXPECT_EQ(r.result.value(), value);
EXPECT_EQ(r.warnings, warnings);
}
{
Status const status{"Error"};
ReturnType const r{std::unexpected{status}};
ASSERT_FALSE(r.result);
EXPECT_EQ(r.result.error(), status);
EXPECT_EQ(r.warnings, boost::json::array{});
}
}
TEST(ReturnTypeTests, operatorBool)
{
{
boost::json::value const value{42};
ReturnType const r{value};
EXPECT_TRUE(r);
}
{
Status const status{"Error"};
ReturnType const r{std::unexpected{status}};
EXPECT_FALSE(r);
}
}

View File

@@ -769,14 +769,13 @@ TEST_F(RPCAccountTxHandlerTest, LimitAndMarker)
auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1); auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}}; auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
ON_CALL(*backend, fetchAccountTransactions).WillByDefault(Return(transCursor));
EXPECT_CALL( EXPECT_CALL(
*backend, *backend,
fetchAccountTransactions( fetchAccountTransactions(
testing::_, testing::_, false, testing::Optional(testing::Eq(TransactionsCursor{10, 11})), testing::_ testing::_, testing::_, false, testing::Optional(testing::Eq(TransactionsCursor{10, 11})), testing::_
) )
) )
.Times(1); .WillOnce(Return(transCursor));
runSpawn([&, this](auto yield) { runSpawn([&, this](auto yield) {
auto const handler = AnyHandler{AccountTxHandler{backend}}; auto const handler = AnyHandler{AccountTxHandler{backend}};
@@ -804,6 +803,73 @@ TEST_F(RPCAccountTxHandlerTest, LimitAndMarker)
}); });
} }
TEST_F(RPCAccountTxHandlerTest, LimitIsCapped)
{
backend->setRange(MINSEQ, MAXSEQ);
auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
EXPECT_CALL(*backend, fetchAccountTransactions(testing::_, testing::_, false, testing::_, testing::_))
.WillOnce(Return(transCursor));
runSpawn([&, this](auto yield) {
auto const handler = AnyHandler{AccountTxHandler{backend}};
auto static const input = json::parse(fmt::format(
R"({{
"account": "{}",
"ledger_index_min": {},
"ledger_index_max": {},
"limit": 100000,
"forward": false
}})",
ACCOUNT,
-1,
-1
));
auto const output = handler.process(input, Context{yield});
ASSERT_TRUE(output);
EXPECT_EQ(output.result->at("account").as_string(), ACCOUNT);
EXPECT_EQ(output.result->at("ledger_index_min").as_uint64(), MINSEQ);
EXPECT_EQ(output.result->at("ledger_index_max").as_uint64(), MAXSEQ);
EXPECT_EQ(output.result->at("limit").as_uint64(), AccountTxHandler::LIMIT_MAX);
EXPECT_EQ(output.result->at("transactions").as_array().size(), 2);
});
}
TEST_F(RPCAccountTxHandlerTest, LimitAllowedUpToCap)
{
backend->setRange(MINSEQ, MAXSEQ);
auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
EXPECT_CALL(*backend, fetchAccountTransactions(testing::_, testing::_, false, testing::_, testing::_))
.WillOnce(Return(transCursor));
runSpawn([&, this](auto yield) {
auto const handler = AnyHandler{AccountTxHandler{backend}};
auto static const input = json::parse(fmt::format(
R"({{
"account": "{}",
"ledger_index_min": {},
"ledger_index_max": {},
"limit": {},
"forward": false
}})",
ACCOUNT,
-1,
-1,
AccountTxHandler::LIMIT_MAX - 1
));
auto const output = handler.process(input, Context{yield});
ASSERT_TRUE(output);
EXPECT_EQ(output.result->at("account").as_string(), ACCOUNT);
EXPECT_EQ(output.result->at("ledger_index_min").as_uint64(), MINSEQ);
EXPECT_EQ(output.result->at("ledger_index_max").as_uint64(), MAXSEQ);
EXPECT_EQ(output.result->at("limit").as_uint64(), AccountTxHandler::LIMIT_MAX - 1);
EXPECT_EQ(output.result->at("transactions").as_array().size(), 2);
});
}
TEST_F(RPCAccountTxHandlerTest, SpecificLedgerIndex) TEST_F(RPCAccountTxHandlerTest, SpecificLedgerIndex)
{ {
backend->setRange(MINSEQ, MAXSEQ); backend->setRange(MINSEQ, MAXSEQ);

View File

@@ -160,6 +160,22 @@ generateTestValuesForParametersTest()
"invalidParams", "invalidParams",
"Invalid parameters." "Invalid parameters."
}, },
GetAggregatePriceParamTestCaseBundle{
"emtpy_base_asset",
R"({
"quote_asset" : "USD",
"base_asset": "",
"oracles":
[
{
"account": "rGh1VZCRBJY6rJiaFpD4LZtyHiuCkC8aeD",
"oracle_document_id": 2
}
]
})",
"invalidParams",
"Invalid parameters."
},
GetAggregatePriceParamTestCaseBundle{ GetAggregatePriceParamTestCaseBundle{
"invalid_base_asset2", "invalid_base_asset2",
R"({ R"({
@@ -207,6 +223,22 @@ generateTestValuesForParametersTest()
"invalidParams", "invalidParams",
"Invalid parameters." "Invalid parameters."
}, },
GetAggregatePriceParamTestCaseBundle{
"empty_quote_asset",
R"({
"quote_asset" : "",
"base_asset": "USD",
"oracles":
[
{
"account": "rGh1VZCRBJY6rJiaFpD4LZtyHiuCkC8aeD",
"oracle_document_id": 2
}
]
})",
"invalidParams",
"Invalid parameters."
},
GetAggregatePriceParamTestCaseBundle{ GetAggregatePriceParamTestCaseBundle{
"invalid_quote_asset2", "invalid_quote_asset2",
R"({ R"({

View File

@@ -79,25 +79,25 @@ generateTestValuesForParametersTest()
"AccountsInvalidBool", "AccountsInvalidBool",
R"({"accounts": true})", R"({"accounts": true})",
"notSupported", "notSupported",
"Not supported field 'accounts'", "Not supported field 'accounts's value 'true'",
}, },
{ {
"AccountsInvalidInt", "AccountsInvalidInt",
R"({"accounts": 123})", R"({"accounts": 123})",
"notSupported", "invalidParams",
"Not supported field 'accounts'", "Invalid parameters.",
}, },
{ {
"FullInvalidBool", "FullInvalidBool",
R"({"full": true})", R"({"full": true})",
"notSupported", "notSupported",
"Not supported field 'full'", "Not supported field 'full's value 'true'",
}, },
{ {
"FullInvalidInt", "FullInvalidInt",
R"({"full": 123})", R"({"full": 123})",
"notSupported", "invalidParams",
"Not supported field 'full'", "Invalid parameters.",
}, },
{ {
"QueueExist", "QueueExist",
@@ -304,6 +304,8 @@ TEST_F(RPCLedgerHandlerTest, ConditionallyNotSupportedFieldsDefaultValue)
auto const handler = AnyHandler{LedgerHandler{backend}}; auto const handler = AnyHandler{LedgerHandler{backend}};
auto const req = json::parse( auto const req = json::parse(
R"({ R"({
"full": false,
"accounts": false,
"queue": false "queue": false
})" })"
); );

View File

@@ -0,0 +1,40 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/AccountUtils.hpp"
#include <gtest/gtest.h>
#include <ripple/protocol/AccountID.h>
#include <ripple/protocol/SecretKey.h>
#include <ripple/protocol/tokens.h>
constexpr static auto ACCOUNT = "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn";
TEST(AccountUtils, parseBase58Wrapper)
{
EXPECT_FALSE(util::parseBase58Wrapper<ripple::AccountID>("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jp!"));
EXPECT_TRUE(util::parseBase58Wrapper<ripple::AccountID>(ACCOUNT));
EXPECT_TRUE(util::parseBase58Wrapper<ripple::SecretKey>(
ripple::TokenType::NodePrivate, "paQmjZ37pKKPMrgadBLsuf9ab7Y7EUNzh27LQrZqoexpAs31nJi"
));
EXPECT_FALSE(util::parseBase58Wrapper<ripple::SecretKey>(
ripple::TokenType::NodePrivate, "??paQmjZ37pKKPMrgadBLsuf9ab7Y7EUNzh27LQrZqoexpAs31n"
));
}