mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 11:55:51 +00:00 
			
		
		
		
	feat: ETLng task manager (#1843)
This commit is contained in:
		@@ -2,7 +2,7 @@ add_library(clio_etlng)
 | 
			
		||||
 | 
			
		||||
target_sources(
 | 
			
		||||
  clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
 | 
			
		||||
                     impl/Loading.cpp
 | 
			
		||||
                     impl/Loading.cpp impl/TaskManager.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
target_link_libraries(clio_etlng PUBLIC clio_data)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										49
									
								
								src/etlng/LedgerPublisherInterface.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								src/etlng/LedgerPublisherInterface.hpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,49 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of clio: https://github.com/XRPLF/clio
 | 
			
		||||
    Copyright (c) 2025, the clio developers.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and distribute this software for any
 | 
			
		||||
    purpose with or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL,  DIRECT,  INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <optional>
 | 
			
		||||
 | 
			
		||||
namespace etlng {
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief The interface of a scheduler for the extraction proccess
 | 
			
		||||
 */
 | 
			
		||||
struct LedgerPublisherInterface {
 | 
			
		||||
    virtual ~LedgerPublisherInterface() = default;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Publish the ledger by its sequence number
 | 
			
		||||
     *
 | 
			
		||||
     * @param seq The sequence number of the ledger
 | 
			
		||||
     * @param maxAttempts The maximum number of attempts to publish the ledger; no limit if nullopt
 | 
			
		||||
     * @param attemptsDelay The delay between attempts
 | 
			
		||||
     */
 | 
			
		||||
    virtual void
 | 
			
		||||
    publish(
 | 
			
		||||
        uint32_t seq,
 | 
			
		||||
        std::optional<uint32_t> maxAttempts,
 | 
			
		||||
        std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
 | 
			
		||||
    ) = 0;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // namespace etlng
 | 
			
		||||
							
								
								
									
										143
									
								
								src/etlng/impl/TaskManager.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										143
									
								
								src/etlng/impl/TaskManager.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,143 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of clio: https://github.com/XRPLF/clio
 | 
			
		||||
    Copyright (c) 2025, the clio developers.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and distribute this software for any
 | 
			
		||||
    purpose with or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL,  DIRECT,  INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#include "etlng/impl/TaskManager.hpp"
 | 
			
		||||
 | 
			
		||||
#include "etlng/ExtractorInterface.hpp"
 | 
			
		||||
#include "etlng/LoaderInterface.hpp"
 | 
			
		||||
#include "etlng/Models.hpp"
 | 
			
		||||
#include "etlng/SchedulerInterface.hpp"
 | 
			
		||||
#include "util/async/AnyExecutionContext.hpp"
 | 
			
		||||
#include "util/async/AnyOperation.hpp"
 | 
			
		||||
#include "util/async/AnyStrand.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
 | 
			
		||||
#include <xrpl/protocol/TxFormats.h>
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstddef>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <ranges>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <utility>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
namespace etlng::impl {
 | 
			
		||||
 | 
			
		||||
TaskManager::TaskManager(
 | 
			
		||||
    util::async::AnyExecutionContext&& ctx,
 | 
			
		||||
    std::reference_wrapper<SchedulerInterface> scheduler,
 | 
			
		||||
    std::reference_wrapper<ExtractorInterface> extractor,
 | 
			
		||||
    std::reference_wrapper<LoaderInterface> loader
 | 
			
		||||
)
 | 
			
		||||
    : ctx_(std::move(ctx)), schedulers_(scheduler), extractor_(extractor), loader_(loader)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TaskManager::~TaskManager()
 | 
			
		||||
{
 | 
			
		||||
    stop();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
TaskManager::run(Settings settings)
 | 
			
		||||
{
 | 
			
		||||
    static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz;
 | 
			
		||||
 | 
			
		||||
    auto schedulingStrand = ctx_.makeStrand();
 | 
			
		||||
    PriorityQueue queue(ctx_.makeStrand(), kQUEUE_SIZE_LIMIT);
 | 
			
		||||
 | 
			
		||||
    LOG(log_.debug()) << "Starting task manager...\n";
 | 
			
		||||
 | 
			
		||||
    extractors_.reserve(settings.numExtractors);
 | 
			
		||||
    for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numExtractors))
 | 
			
		||||
        extractors_.push_back(spawnExtractor(schedulingStrand, queue));
 | 
			
		||||
 | 
			
		||||
    loaders_.reserve(settings.numLoaders);
 | 
			
		||||
    for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numLoaders))
 | 
			
		||||
        loaders_.push_back(spawnLoader(queue));
 | 
			
		||||
 | 
			
		||||
    wait();
 | 
			
		||||
    LOG(log_.debug()) << "All finished in task manager..\n";
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
util::async::AnyOperation<void>
 | 
			
		||||
TaskManager::spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue)
 | 
			
		||||
{
 | 
			
		||||
    // TODO: these values may be extracted to config later and/or need to be fine-tuned on a realistic system
 | 
			
		||||
    static constexpr auto kDELAY_BETWEEN_ATTEMPTS = std::chrono::milliseconds{100u};
 | 
			
		||||
    static constexpr auto kDELAY_BETWEEN_ENQUEUE_ATTEMPTS = std::chrono::milliseconds{1u};
 | 
			
		||||
 | 
			
		||||
    return strand.execute([this, &queue](auto stopRequested) {
 | 
			
		||||
        while (not stopRequested) {
 | 
			
		||||
            if (auto task = schedulers_.get().next(); task.has_value()) {
 | 
			
		||||
                if (auto maybeBatch = extractor_.get().extractLedgerWithDiff(task->seq); maybeBatch.has_value()) {
 | 
			
		||||
                    LOG(log_.debug()) << "Adding data after extracting diff";
 | 
			
		||||
                    while (not queue.enqueue(*maybeBatch)) {
 | 
			
		||||
                        // TODO (https://github.com/XRPLF/clio/issues/1852)
 | 
			
		||||
                        std::this_thread::sleep_for(kDELAY_BETWEEN_ENQUEUE_ATTEMPTS);
 | 
			
		||||
 | 
			
		||||
                        if (stopRequested)
 | 
			
		||||
                            break;
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    // TODO: how do we signal to the loaders that it's time to shutdown? some special task?
 | 
			
		||||
                    break;  // TODO: handle server shutdown or other node took over ETL
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                // TODO (https://github.com/XRPLF/clio/issues/1852)
 | 
			
		||||
                std::this_thread::sleep_for(kDELAY_BETWEEN_ATTEMPTS);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
util::async::AnyOperation<void>
 | 
			
		||||
TaskManager::spawnLoader(PriorityQueue& queue)
 | 
			
		||||
{
 | 
			
		||||
    return ctx_.execute([this, &queue](auto stopRequested) {
 | 
			
		||||
        while (not stopRequested) {
 | 
			
		||||
            // TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not
 | 
			
		||||
            if (auto data = queue.dequeue(); data.has_value())
 | 
			
		||||
                loader_.get().load(*data);
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
TaskManager::wait()
 | 
			
		||||
{
 | 
			
		||||
    for (auto& extractor : extractors_)
 | 
			
		||||
        extractor.wait();
 | 
			
		||||
    for (auto& loader : loaders_)
 | 
			
		||||
        loader.wait();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
TaskManager::stop()
 | 
			
		||||
{
 | 
			
		||||
    for (auto& extractor : extractors_)
 | 
			
		||||
        extractor.abort();
 | 
			
		||||
    for (auto& loader : loaders_)
 | 
			
		||||
        loader.abort();
 | 
			
		||||
 | 
			
		||||
    wait();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // namespace etlng::impl
 | 
			
		||||
							
								
								
									
										94
									
								
								src/etlng/impl/TaskManager.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										94
									
								
								src/etlng/impl/TaskManager.hpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,94 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of clio: https://github.com/XRPLF/clio
 | 
			
		||||
    Copyright (c) 2025, the clio developers.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and distribute this software for any
 | 
			
		||||
    purpose with or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL,  DIRECT,  INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "etlng/ExtractorInterface.hpp"
 | 
			
		||||
#include "etlng/LoaderInterface.hpp"
 | 
			
		||||
#include "etlng/Models.hpp"
 | 
			
		||||
#include "etlng/SchedulerInterface.hpp"
 | 
			
		||||
#include "util/StrandedPriorityQueue.hpp"
 | 
			
		||||
#include "util/async/AnyExecutionContext.hpp"
 | 
			
		||||
#include "util/async/AnyOperation.hpp"
 | 
			
		||||
#include "util/async/AnyStrand.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
 | 
			
		||||
#include <xrpl/protocol/TxFormats.h>
 | 
			
		||||
 | 
			
		||||
#include <cstddef>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
namespace etlng::impl {
 | 
			
		||||
 | 
			
		||||
class TaskManager {
 | 
			
		||||
    util::async::AnyExecutionContext ctx_;
 | 
			
		||||
    std::reference_wrapper<SchedulerInterface> schedulers_;
 | 
			
		||||
    std::reference_wrapper<ExtractorInterface> extractor_;
 | 
			
		||||
    std::reference_wrapper<LoaderInterface> loader_;
 | 
			
		||||
 | 
			
		||||
    std::vector<util::async::AnyOperation<void>> extractors_;
 | 
			
		||||
    std::vector<util::async::AnyOperation<void>> loaders_;
 | 
			
		||||
 | 
			
		||||
    util::Logger log_{"ETL"};
 | 
			
		||||
 | 
			
		||||
    struct ReverseOrderComparator {
 | 
			
		||||
        [[nodiscard]] bool
 | 
			
		||||
        operator()(model::LedgerData const& lhs, model::LedgerData const& rhs) const noexcept
 | 
			
		||||
        {
 | 
			
		||||
            return lhs.seq > rhs.seq;
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    struct Settings {
 | 
			
		||||
        size_t numExtractors; /**< number of extraction tasks */
 | 
			
		||||
        size_t numLoaders;    /**< number of loading tasks */
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    // reverse order loading is needed (i.e. start with oldest seq in forward fill buffer)
 | 
			
		||||
    using PriorityQueue = util::StrandedPriorityQueue<model::LedgerData, ReverseOrderComparator>;
 | 
			
		||||
 | 
			
		||||
    TaskManager(
 | 
			
		||||
        util::async::AnyExecutionContext&& ctx,
 | 
			
		||||
        std::reference_wrapper<SchedulerInterface> scheduler,
 | 
			
		||||
        std::reference_wrapper<ExtractorInterface> extractor,
 | 
			
		||||
        std::reference_wrapper<LoaderInterface> loader
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    ~TaskManager();
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    run(Settings settings);
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    stop();
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    void
 | 
			
		||||
    wait();
 | 
			
		||||
 | 
			
		||||
    [[nodiscard]] util::async::AnyOperation<void>
 | 
			
		||||
    spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue);
 | 
			
		||||
 | 
			
		||||
    [[nodiscard]] util::async::AnyOperation<void>
 | 
			
		||||
    spawnLoader(PriorityQueue& queue);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // namespace etlng::impl
 | 
			
		||||
@@ -50,7 +50,7 @@ void
 | 
			
		||||
SubscriptionManager::pubBookChanges(
 | 
			
		||||
    ripple::LedgerHeader const& lgrInfo,
 | 
			
		||||
    std::vector<data::TransactionAndMetadata> const& transactions
 | 
			
		||||
) const
 | 
			
		||||
)
 | 
			
		||||
{
 | 
			
		||||
    bookChangesFeed_.pub(lgrInfo, transactions);
 | 
			
		||||
}
 | 
			
		||||
@@ -111,7 +111,7 @@ SubscriptionManager::pubLedger(
 | 
			
		||||
    ripple::Fees const& fees,
 | 
			
		||||
    std::string const& ledgerRange,
 | 
			
		||||
    std::uint32_t const txnCount
 | 
			
		||||
) const
 | 
			
		||||
)
 | 
			
		||||
{
 | 
			
		||||
    ledgerFeed_.pub(lgrInfo, fees, ledgerRange, txnCount);
 | 
			
		||||
}
 | 
			
		||||
@@ -129,7 +129,7 @@ SubscriptionManager::unsubManifest(SubscriberSharedPtr const& subscriber)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
SubscriptionManager::forwardManifest(boost::json::object const& manifestJson) const
 | 
			
		||||
SubscriptionManager::forwardManifest(boost::json::object const& manifestJson)
 | 
			
		||||
{
 | 
			
		||||
    manifestFeed_.pub(manifestJson);
 | 
			
		||||
}
 | 
			
		||||
@@ -147,7 +147,7 @@ SubscriptionManager::unsubValidation(SubscriberSharedPtr const& subscriber)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
SubscriptionManager::forwardValidation(boost::json::object const& validationJson) const
 | 
			
		||||
SubscriptionManager::forwardValidation(boost::json::object const& validationJson)
 | 
			
		||||
{
 | 
			
		||||
    validationsFeed_.pub(validationJson);
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -150,7 +150,7 @@ public:
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
 | 
			
		||||
        const final;
 | 
			
		||||
        final;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the proposed transactions feed.
 | 
			
		||||
@@ -218,7 +218,7 @@ public:
 | 
			
		||||
        ripple::Fees const& fees,
 | 
			
		||||
        std::string const& ledgerRange,
 | 
			
		||||
        std::uint32_t txnCount
 | 
			
		||||
    ) const final;
 | 
			
		||||
    ) final;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the manifest feed.
 | 
			
		||||
@@ -239,7 +239,7 @@ public:
 | 
			
		||||
     * @param manifestJson The manifest json to forward.
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    forwardManifest(boost::json::object const& manifestJson) const final;
 | 
			
		||||
    forwardManifest(boost::json::object const& manifestJson) final;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the validation feed.
 | 
			
		||||
@@ -260,7 +260,7 @@ public:
 | 
			
		||||
     * @param validationJson The validation feed json to forward.
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    forwardValidation(boost::json::object const& validationJson) const final;
 | 
			
		||||
    forwardValidation(boost::json::object const& validationJson) final;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the transactions feed.
 | 
			
		||||
 
 | 
			
		||||
@@ -71,8 +71,10 @@ public:
 | 
			
		||||
     * @param transactions The transactions in the current ledger.
 | 
			
		||||
     */
 | 
			
		||||
    virtual void
 | 
			
		||||
    pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
 | 
			
		||||
        const = 0;
 | 
			
		||||
    pubBookChanges(
 | 
			
		||||
        ripple::LedgerHeader const& lgrInfo,
 | 
			
		||||
        std::vector<data::TransactionAndMetadata> const& transactions
 | 
			
		||||
    ) = 0;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the proposed transactions feed.
 | 
			
		||||
@@ -141,7 +143,7 @@ public:
 | 
			
		||||
        ripple::Fees const& fees,
 | 
			
		||||
        std::string const& ledgerRange,
 | 
			
		||||
        std::uint32_t txnCount
 | 
			
		||||
    ) const = 0;
 | 
			
		||||
    ) = 0;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the manifest feed.
 | 
			
		||||
@@ -162,7 +164,7 @@ public:
 | 
			
		||||
     * @param manifestJson The manifest json to forward.
 | 
			
		||||
     */
 | 
			
		||||
    virtual void
 | 
			
		||||
    forwardManifest(boost::json::object const& manifestJson) const = 0;
 | 
			
		||||
    forwardManifest(boost::json::object const& manifestJson) = 0;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the validation feed.
 | 
			
		||||
@@ -183,7 +185,7 @@ public:
 | 
			
		||||
     * @param validationJson The validation feed json to forward.
 | 
			
		||||
     */
 | 
			
		||||
    virtual void
 | 
			
		||||
    forwardValidation(boost::json::object const& validationJson) const = 0;
 | 
			
		||||
    forwardValidation(boost::json::object const& validationJson) = 0;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Subscribe to the transactions feed.
 | 
			
		||||
 
 | 
			
		||||
@@ -48,7 +48,7 @@ struct BookChangesFeed : public SingleFeedBase {
 | 
			
		||||
     * @param transactions The transactions that were included in the ledger.
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    pub(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions) const
 | 
			
		||||
    pub(ripple::LedgerHeader const& lgrInfo, std::vector<data::TransactionAndMetadata> const& transactions)
 | 
			
		||||
    {
 | 
			
		||||
        SingleFeedBase::pub(boost::json::serialize(rpc::computeBookChanges(lgrInfo, transactions)));
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -36,7 +36,7 @@ struct ForwardFeed : public SingleFeedBase {
 | 
			
		||||
     * @brief Publishes the json object.
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    pub(boost::json::object const& json) const
 | 
			
		||||
    pub(boost::json::object const& json)
 | 
			
		||||
    {
 | 
			
		||||
        SingleFeedBase::pub(boost::json::serialize(json));
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -94,7 +94,7 @@ LedgerFeed::pub(
 | 
			
		||||
    ripple::Fees const& fees,
 | 
			
		||||
    std::string const& ledgerRange,
 | 
			
		||||
    std::uint32_t const txnCount
 | 
			
		||||
) const
 | 
			
		||||
)
 | 
			
		||||
{
 | 
			
		||||
    SingleFeedBase::pub(boost::json::serialize(makeLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)));
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -76,7 +76,7 @@ public:
 | 
			
		||||
    pub(ripple::LedgerHeader const& lgrInfo,
 | 
			
		||||
        ripple::Fees const& fees,
 | 
			
		||||
        std::string const& ledgerRange,
 | 
			
		||||
        std::uint32_t txnCount) const;
 | 
			
		||||
        std::uint32_t txnCount);
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    static boost::json::object
 | 
			
		||||
 
 | 
			
		||||
@@ -62,7 +62,7 @@ SingleFeedBase::unsub(SubscriberSharedPtr const& subscriber)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
SingleFeedBase::pub(std::string msg) const
 | 
			
		||||
SingleFeedBase::pub(std::string msg)
 | 
			
		||||
{
 | 
			
		||||
    [[maybe_unused]] auto task = strand_.execute([this, msg = std::move(msg)]() {
 | 
			
		||||
        auto const msgPtr = std::make_shared<std::string>(msg);
 | 
			
		||||
 
 | 
			
		||||
@@ -73,7 +73,7 @@ public:
 | 
			
		||||
     * @param msg The message.
 | 
			
		||||
     */
 | 
			
		||||
    void
 | 
			
		||||
    pub(std::string msg) const;
 | 
			
		||||
    pub(std::string msg);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Get the count of subscribers.
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										116
									
								
								src/util/StrandedPriorityQueue.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								src/util/StrandedPriorityQueue.hpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,116 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of clio: https://github.com/XRPLF/clio
 | 
			
		||||
    Copyright (c) 2025, the clio developers.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and distribute this software for any
 | 
			
		||||
    purpose with or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL,  DIRECT,  INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "util/async/AnyStrand.hpp"
 | 
			
		||||
 | 
			
		||||
#include <cstddef>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <queue>
 | 
			
		||||
#include <type_traits>
 | 
			
		||||
#include <utility>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
namespace util {
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief A wrapper for std::priority_queue that serialises operations using a strand
 | 
			
		||||
 * @note This may be a candidate for future improvements if performance proves to be poor (e.g. use a lock free queue)
 | 
			
		||||
 */
 | 
			
		||||
template <typename T, typename Compare = std::less<T>>
 | 
			
		||||
class StrandedPriorityQueue {
 | 
			
		||||
    util::async::AnyStrand strand_;
 | 
			
		||||
    std::size_t limit_;
 | 
			
		||||
    std::priority_queue<T, std::vector<T>, Compare> queue_;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Construct a new priority queue on a strand
 | 
			
		||||
     * @param strand The strand to use
 | 
			
		||||
     * @param limit The limit of items allowed simultaniously in the queue
 | 
			
		||||
     */
 | 
			
		||||
    StrandedPriorityQueue(util::async::AnyStrand&& strand, std::optional<std::size_t> limit = std::nullopt)
 | 
			
		||||
        : strand_(std::move(strand)), limit_(limit.value_or(0uz))
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Enqueue a new item onto the queue if space is available
 | 
			
		||||
     * @note This function blocks until the item is attempted to be added to the queue
 | 
			
		||||
     *
 | 
			
		||||
     * @tparam I Type of the item to add
 | 
			
		||||
     * @param item The item to add
 | 
			
		||||
     * @return true if item added to the queue; false otherwise
 | 
			
		||||
     */
 | 
			
		||||
    template <typename I>
 | 
			
		||||
    [[nodiscard]] bool
 | 
			
		||||
    enqueue(I&& item)
 | 
			
		||||
        requires std::is_same_v<std::decay_t<I>, T>
 | 
			
		||||
    {
 | 
			
		||||
        return strand_
 | 
			
		||||
            .execute([&item, this] {
 | 
			
		||||
                if (limit_ == 0uz or queue_.size() < limit_) {
 | 
			
		||||
                    queue_.push(std::forward<I>(item));
 | 
			
		||||
                    return true;
 | 
			
		||||
                }
 | 
			
		||||
                return false;
 | 
			
		||||
            })
 | 
			
		||||
            .get()
 | 
			
		||||
            .value_or(false);  // if some exception happens - failed to add
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Dequeue the next available item out of the queue
 | 
			
		||||
     * @note This function blocks until the item is taken off the queue
 | 
			
		||||
     * @return An item if available; nullopt otherwise
 | 
			
		||||
     */
 | 
			
		||||
    [[nodiscard]] std::optional<T>
 | 
			
		||||
    dequeue()
 | 
			
		||||
    {
 | 
			
		||||
        return strand_
 | 
			
		||||
            .execute([this] -> std::optional<T> {
 | 
			
		||||
                std::optional<T> out;
 | 
			
		||||
 | 
			
		||||
                if (not queue_.empty()) {
 | 
			
		||||
                    out.emplace(queue_.top());
 | 
			
		||||
                    queue_.pop();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                return out;
 | 
			
		||||
            })
 | 
			
		||||
            .get()
 | 
			
		||||
            .value_or(std::nullopt);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief Check if the queue is empty
 | 
			
		||||
     * @note This function blocks until the queue is checked
 | 
			
		||||
     *
 | 
			
		||||
     * @return true if the queue is empty; false otherwise
 | 
			
		||||
     */
 | 
			
		||||
    [[nodiscard]] bool
 | 
			
		||||
    empty()
 | 
			
		||||
    {
 | 
			
		||||
        return strand_.execute([this] { return queue_.empty(); }).get().value();
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // namespace util
 | 
			
		||||
@@ -19,6 +19,7 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "util/async/AnyOperation.hpp"
 | 
			
		||||
#include "util/async/AnyStopToken.hpp"
 | 
			
		||||
#include "util/async/Concepts.hpp"
 | 
			
		||||
#include "util/async/impl/ErasedOperation.hpp"
 | 
			
		||||
@@ -61,7 +62,7 @@ public:
 | 
			
		||||
     * @return The type-erased operation
 | 
			
		||||
     */
 | 
			
		||||
    [[nodiscard]] auto
 | 
			
		||||
    execute(SomeHandlerWithoutStopToken auto&& fn) const
 | 
			
		||||
    execute(SomeHandlerWithoutStopToken auto&& fn)
 | 
			
		||||
    {
 | 
			
		||||
        using RetType = std::decay_t<decltype(fn())>;
 | 
			
		||||
        static_assert(not std::is_same_v<RetType, std::any>);
 | 
			
		||||
@@ -85,7 +86,7 @@ public:
 | 
			
		||||
     * @return The type-erased operation
 | 
			
		||||
     */
 | 
			
		||||
    [[nodiscard]] auto
 | 
			
		||||
    execute(SomeHandlerWith<AnyStopToken> auto&& fn) const
 | 
			
		||||
    execute(SomeHandlerWith<AnyStopToken> auto&& fn)
 | 
			
		||||
    {
 | 
			
		||||
        using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
 | 
			
		||||
        static_assert(not std::is_same_v<RetType, std::any>);
 | 
			
		||||
@@ -110,7 +111,7 @@ public:
 | 
			
		||||
     * @return The type-erased operation
 | 
			
		||||
     */
 | 
			
		||||
    [[nodiscard]] auto
 | 
			
		||||
    execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout) const
 | 
			
		||||
    execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
 | 
			
		||||
    {
 | 
			
		||||
        using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
 | 
			
		||||
        static_assert(not std::is_same_v<RetType, std::any>);
 | 
			
		||||
@@ -137,7 +138,7 @@ private:
 | 
			
		||||
        [[nodiscard]] virtual impl::ErasedOperation
 | 
			
		||||
        execute(std::function<std::any(AnyStopToken)>, std::optional<std::chrono::milliseconds> timeout = std::nullopt)
 | 
			
		||||
            const = 0;
 | 
			
		||||
        [[nodiscard]] virtual impl::ErasedOperation execute(std::function<std::any()>) const = 0;
 | 
			
		||||
        [[nodiscard]] virtual impl::ErasedOperation execute(std::function<std::any()>) = 0;
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    template <typename StrandType>
 | 
			
		||||
@@ -158,7 +159,7 @@ private:
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        [[nodiscard]] impl::ErasedOperation
 | 
			
		||||
        execute(std::function<std::any()> fn) const override
 | 
			
		||||
        execute(std::function<std::any()> fn) override
 | 
			
		||||
        {
 | 
			
		||||
            return strand.execute(std::move(fn));
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -49,14 +49,14 @@ struct MockSubscriptionManager : feed::SubscriptionManagerInterface {
 | 
			
		||||
        void,
 | 
			
		||||
        pubLedger,
 | 
			
		||||
        (ripple::LedgerHeader const&, ripple::Fees const&, std::string const&, std::uint32_t),
 | 
			
		||||
        (const, override)
 | 
			
		||||
        (override)
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    MOCK_METHOD(
 | 
			
		||||
        void,
 | 
			
		||||
        pubBookChanges,
 | 
			
		||||
        (ripple::LedgerHeader const&, std::vector<data::TransactionAndMetadata> const&),
 | 
			
		||||
        (const, override)
 | 
			
		||||
        (override)
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    MOCK_METHOD(void, unsubLedger, (feed::SubscriberSharedPtr const&), (override));
 | 
			
		||||
@@ -89,9 +89,9 @@ struct MockSubscriptionManager : feed::SubscriptionManagerInterface {
 | 
			
		||||
 | 
			
		||||
    MOCK_METHOD(void, forwardProposedTransaction, (boost::json::object const&), (override));
 | 
			
		||||
 | 
			
		||||
    MOCK_METHOD(void, forwardManifest, (boost::json::object const&), (const, override));
 | 
			
		||||
    MOCK_METHOD(void, forwardManifest, (boost::json::object const&), (override));
 | 
			
		||||
 | 
			
		||||
    MOCK_METHOD(void, forwardValidation, (boost::json::object const&), (const, override));
 | 
			
		||||
    MOCK_METHOD(void, forwardValidation, (boost::json::object const&), (override));
 | 
			
		||||
 | 
			
		||||
    MOCK_METHOD(void, subProposedAccount, (ripple::AccountID const&, feed::SubscriberSharedPtr const&), (override));
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -40,6 +40,7 @@ target_sources(
 | 
			
		||||
          etlng/GrpcSourceTests.cpp
 | 
			
		||||
          etlng/RegistryTests.cpp
 | 
			
		||||
          etlng/SchedulingTests.cpp
 | 
			
		||||
          etlng/TaskManagerTests.cpp
 | 
			
		||||
          etlng/LoadingTests.cpp
 | 
			
		||||
          # Feed
 | 
			
		||||
          util/BytesConverterTests.cpp
 | 
			
		||||
@@ -129,6 +130,7 @@ target_sources(
 | 
			
		||||
          util/ConceptsTests.cpp
 | 
			
		||||
          util/CoroutineGroupTests.cpp
 | 
			
		||||
          util/LedgerUtilsTests.cpp
 | 
			
		||||
          util/StrandedPriorityQueueTests.cpp
 | 
			
		||||
          # Prometheus support
 | 
			
		||||
          util/prometheus/BoolTests.cpp
 | 
			
		||||
          util/prometheus/CounterTests.cpp
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										137
									
								
								tests/unit/etlng/TaskManagerTests.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										137
									
								
								tests/unit/etlng/TaskManagerTests.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,137 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of clio: https://github.com/XRPLF/clio
 | 
			
		||||
    Copyright (c) 2025, the clio developers.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and distribute this software for any
 | 
			
		||||
    purpose with or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL,  DIRECT,  INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#include "etlng/ExtractorInterface.hpp"
 | 
			
		||||
#include "etlng/LoaderInterface.hpp"
 | 
			
		||||
#include "etlng/Models.hpp"
 | 
			
		||||
#include "etlng/SchedulerInterface.hpp"
 | 
			
		||||
#include "etlng/impl/Loading.hpp"
 | 
			
		||||
#include "etlng/impl/TaskManager.hpp"
 | 
			
		||||
#include "util/BinaryTestObject.hpp"
 | 
			
		||||
#include "util/LoggerFixtures.hpp"
 | 
			
		||||
#include "util/TestObject.hpp"
 | 
			
		||||
#include "util/async/AnyExecutionContext.hpp"
 | 
			
		||||
#include "util/async/context/BasicExecutionContext.hpp"
 | 
			
		||||
 | 
			
		||||
#include <gmock/gmock.h>
 | 
			
		||||
#include <gtest/gtest.h>
 | 
			
		||||
#include <xrpl/protocol/LedgerHeader.h>
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <cstddef>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <semaphore>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
using namespace etlng::model;
 | 
			
		||||
using namespace etlng::impl;
 | 
			
		||||
 | 
			
		||||
namespace {
 | 
			
		||||
 | 
			
		||||
constinit auto const kSEQ = 30;
 | 
			
		||||
constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
 | 
			
		||||
 | 
			
		||||
struct MockScheduler : etlng::SchedulerInterface {
 | 
			
		||||
    MOCK_METHOD(std::optional<Task>, next, (), (override));
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct MockExtractor : etlng::ExtractorInterface {
 | 
			
		||||
    MOCK_METHOD(std::optional<LedgerData>, extractLedgerWithDiff, (uint32_t), (override));
 | 
			
		||||
    MOCK_METHOD(std::optional<LedgerData>, extractLedgerOnly, (uint32_t), (override));
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct MockLoader : etlng::LoaderInterface {
 | 
			
		||||
    MOCK_METHOD(void, load, (LedgerData const&), (override));
 | 
			
		||||
    MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (LedgerData const&), (override));
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct TaskManagerTests : NoLoggerFixture {
 | 
			
		||||
    using MockSchedulerType = testing::NiceMock<MockScheduler>;
 | 
			
		||||
    using MockExtractorType = testing::NiceMock<MockExtractor>;
 | 
			
		||||
    using MockLoaderType = testing::NiceMock<MockLoader>;
 | 
			
		||||
 | 
			
		||||
protected:
 | 
			
		||||
    util::async::CoroExecutionContext ctx_{2};
 | 
			
		||||
    std::shared_ptr<MockSchedulerType> mockSchedulerPtr_ = std::make_shared<MockSchedulerType>();
 | 
			
		||||
    std::shared_ptr<MockExtractorType> mockExtractorPtr_ = std::make_shared<MockExtractorType>();
 | 
			
		||||
    std::shared_ptr<MockLoaderType> mockLoaderPtr_ = std::make_shared<MockLoaderType>();
 | 
			
		||||
 | 
			
		||||
    TaskManager taskManager_{ctx_, *mockSchedulerPtr_, *mockExtractorPtr_, *mockLoaderPtr_};
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
auto
 | 
			
		||||
createTestData(uint32_t seq)
 | 
			
		||||
{
 | 
			
		||||
    auto const header = createLedgerHeader(kLEDGER_HASH, seq);
 | 
			
		||||
    return LedgerData{
 | 
			
		||||
        .transactions = {},
 | 
			
		||||
        .objects = {util::createObject(), util::createObject(), util::createObject()},
 | 
			
		||||
        .successors = {},
 | 
			
		||||
        .edgeKeys = {},
 | 
			
		||||
        .header = header,
 | 
			
		||||
        .rawHeader = {},
 | 
			
		||||
        .seq = seq
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // namespace
 | 
			
		||||
 | 
			
		||||
TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted)
 | 
			
		||||
{
 | 
			
		||||
    static constexpr auto kTOTAL = 64uz;
 | 
			
		||||
    static constexpr auto kEXTRACTORS = 5uz;
 | 
			
		||||
    static constexpr auto kLOADERS = 1uz;
 | 
			
		||||
 | 
			
		||||
    std::atomic_uint32_t seq = kSEQ;
 | 
			
		||||
    std::vector<uint32_t> loaded;
 | 
			
		||||
    std::binary_semaphore done{0};
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(*mockSchedulerPtr_, next()).WillRepeatedly([&]() {
 | 
			
		||||
        return Task{.priority = Task::Priority::Higher, .seq = seq++};
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(*mockExtractorPtr_, extractLedgerWithDiff(testing::_))
 | 
			
		||||
        .WillRepeatedly([](uint32_t seq) -> std::optional<LedgerData> {
 | 
			
		||||
            if (seq > kSEQ + kTOTAL - 1)
 | 
			
		||||
                return std::nullopt;
 | 
			
		||||
 | 
			
		||||
            return createTestData(seq);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(*mockLoaderPtr_, load(testing::_)).Times(kTOTAL).WillRepeatedly([&](LedgerData data) {
 | 
			
		||||
        loaded.push_back(data.seq);
 | 
			
		||||
 | 
			
		||||
        if (loaded.size() == kTOTAL) {
 | 
			
		||||
            done.release();
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    auto loop = ctx_.execute([&] { taskManager_.run({.numExtractors = kEXTRACTORS, .numLoaders = kLOADERS}); });
 | 
			
		||||
    done.acquire();
 | 
			
		||||
 | 
			
		||||
    taskManager_.stop();
 | 
			
		||||
    loop.wait();
 | 
			
		||||
 | 
			
		||||
    EXPECT_EQ(loaded.size(), kTOTAL);
 | 
			
		||||
    for (std::size_t i = 0; i < loaded.size(); ++i) {
 | 
			
		||||
        EXPECT_EQ(loaded[i], kSEQ + i);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										196
									
								
								tests/unit/util/StrandedPriorityQueueTests.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										196
									
								
								tests/unit/util/StrandedPriorityQueueTests.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,196 @@
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
/*
 | 
			
		||||
    This file is part of clio: https://github.com/XRPLF/clio
 | 
			
		||||
    Copyright (c) 2025, the clio developers.
 | 
			
		||||
 | 
			
		||||
    Permission to use, copy, modify, and distribute this software for any
 | 
			
		||||
    purpose with or without fee is hereby granted, provided that the above
 | 
			
		||||
    copyright notice and this permission notice appear in all copies.
 | 
			
		||||
 | 
			
		||||
    THE  SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 | 
			
		||||
    WITH  REGARD  TO  THIS  SOFTWARE  INCLUDING  ALL  IMPLIED  WARRANTIES  OF
 | 
			
		||||
    MERCHANTABILITY  AND  FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 | 
			
		||||
    ANY  SPECIAL,  DIRECT,  INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 | 
			
		||||
    WHATSOEVER  RESULTING  FROM  LOSS  OF USE, DATA OR PROFITS, WHETHER IN AN
 | 
			
		||||
    ACTION  OF  CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 | 
			
		||||
    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
			
		||||
*/
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#include "util/StrandedPriorityQueue.hpp"
 | 
			
		||||
#include "util/async/AnyExecutionContext.hpp"
 | 
			
		||||
#include "util/async/AnyOperation.hpp"
 | 
			
		||||
#include "util/async/context/BasicExecutionContext.hpp"
 | 
			
		||||
 | 
			
		||||
#include <gmock/gmock.h>
 | 
			
		||||
#include <gtest/gtest.h>
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <unordered_set>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
using namespace util;
 | 
			
		||||
 | 
			
		||||
namespace {
 | 
			
		||||
 | 
			
		||||
struct TestData {
 | 
			
		||||
    uint32_t seq;
 | 
			
		||||
 | 
			
		||||
    auto
 | 
			
		||||
    operator<=>(TestData const&) const = default;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // namespace
 | 
			
		||||
 | 
			
		||||
TEST(StrandedPriorityQueueTests, DefaultPriority)
 | 
			
		||||
{
 | 
			
		||||
    util::async::CoroExecutionContext ctx;
 | 
			
		||||
    StrandedPriorityQueue<TestData> queue{ctx.makeStrand()};
 | 
			
		||||
 | 
			
		||||
    for (auto i = 0u; i < 100u; ++i) {
 | 
			
		||||
        EXPECT_TRUE(queue.enqueue(TestData{.seq = i}));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    EXPECT_FALSE(queue.empty());
 | 
			
		||||
 | 
			
		||||
    auto next = 99u;
 | 
			
		||||
    while (auto maybeValue = queue.dequeue()) {
 | 
			
		||||
        EXPECT_EQ(maybeValue->seq, next--);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(queue.empty());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST(StrandedPriorityQueueTests, CustomPriority)
 | 
			
		||||
{
 | 
			
		||||
    struct Comp {
 | 
			
		||||
        [[nodiscard]] bool
 | 
			
		||||
        operator()(TestData const& lhs, TestData const& rhs) const noexcept
 | 
			
		||||
        {
 | 
			
		||||
            return lhs.seq > rhs.seq;
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    util::async::CoroExecutionContext ctx;
 | 
			
		||||
    StrandedPriorityQueue<TestData, Comp> queue{ctx.makeStrand()};
 | 
			
		||||
 | 
			
		||||
    for (auto i = 0u; i < 100u; ++i) {
 | 
			
		||||
        EXPECT_TRUE(queue.enqueue(TestData{.seq = i}));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    EXPECT_FALSE(queue.empty());
 | 
			
		||||
 | 
			
		||||
    auto next = 0u;
 | 
			
		||||
    while (auto maybeValue = queue.dequeue()) {
 | 
			
		||||
        EXPECT_EQ(maybeValue->seq, next++);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(queue.empty());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST(StrandedPriorityQueueTests, MultipleThreadsUnlimitedQueue)
 | 
			
		||||
{
 | 
			
		||||
    async::CoroExecutionContext realCtx{6};
 | 
			
		||||
    async::AnyExecutionContext ctx{realCtx};
 | 
			
		||||
    StrandedPriorityQueue<TestData> queue{ctx.makeStrand()};
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(queue.empty());
 | 
			
		||||
    static constexpr auto kTOTAL_THREADS = 5u;
 | 
			
		||||
    static constexpr auto kTOTAL_ITEMS_PER_THREAD = 100u;
 | 
			
		||||
 | 
			
		||||
    std::atomic_size_t totalEnqueued = 0uz;
 | 
			
		||||
    std::vector<async::AnyOperation<void>> tasks;
 | 
			
		||||
    tasks.reserve(kTOTAL_THREADS);
 | 
			
		||||
 | 
			
		||||
    for (auto batchIdx = 0u; batchIdx < kTOTAL_THREADS; ++batchIdx) {
 | 
			
		||||
        // enqueue batches tasks running on multiple threads
 | 
			
		||||
        tasks.push_back(ctx.execute([&queue, batchIdx, &totalEnqueued] {
 | 
			
		||||
            for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) {
 | 
			
		||||
                if (queue.enqueue(TestData{.seq = (batchIdx * kTOTAL_ITEMS_PER_THREAD) + i}))
 | 
			
		||||
                    ++totalEnqueued;
 | 
			
		||||
            }
 | 
			
		||||
        }));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for (auto& task : tasks)
 | 
			
		||||
        task.wait();
 | 
			
		||||
 | 
			
		||||
    auto next = (kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS) - 1;
 | 
			
		||||
    while (auto maybeValue = queue.dequeue()) {
 | 
			
		||||
        EXPECT_EQ(maybeValue->seq, next--);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(queue.empty());
 | 
			
		||||
    EXPECT_EQ(totalEnqueued, kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST(StrandedPriorityQueueTests, MultipleThreadsLimitedQueue)
 | 
			
		||||
{
 | 
			
		||||
    static constexpr auto kQUEUE_SIZE_LIMIT = 32uz;
 | 
			
		||||
    static constexpr auto kTOTAL_THREADS = 5u;
 | 
			
		||||
    static constexpr auto kTOTAL_ITEMS_PER_THREAD = 100u;
 | 
			
		||||
 | 
			
		||||
    async::CoroExecutionContext realCtx{8};
 | 
			
		||||
    async::AnyExecutionContext ctx{realCtx};
 | 
			
		||||
    StrandedPriorityQueue<TestData> queue{ctx.makeStrand(), kQUEUE_SIZE_LIMIT};
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(queue.empty());
 | 
			
		||||
 | 
			
		||||
    std::atomic_size_t totalEnqueued = 0uz;
 | 
			
		||||
    std::atomic_size_t totalSleepCycles = 0uz;
 | 
			
		||||
    std::vector<async::AnyOperation<void>> tasks;
 | 
			
		||||
    tasks.reserve(kTOTAL_THREADS);
 | 
			
		||||
 | 
			
		||||
    std::unordered_set<uint32_t> expectedSequences;
 | 
			
		||||
 | 
			
		||||
    for (auto batchIdx = 0u; batchIdx < kTOTAL_THREADS; ++batchIdx) {
 | 
			
		||||
        for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) {
 | 
			
		||||
            expectedSequences.insert((batchIdx * kTOTAL_ITEMS_PER_THREAD) + i);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // enqueue batches tasks running on multiple threads
 | 
			
		||||
        tasks.push_back(ctx.execute([&queue, batchIdx, &totalEnqueued, &totalSleepCycles] {
 | 
			
		||||
            for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) {
 | 
			
		||||
                auto data = TestData{.seq = (batchIdx * kTOTAL_ITEMS_PER_THREAD) + i};
 | 
			
		||||
                while (not queue.enqueue(data)) {
 | 
			
		||||
                    std::this_thread::sleep_for(std::chrono::nanoseconds{1});
 | 
			
		||||
                    ++totalSleepCycles;
 | 
			
		||||
                }
 | 
			
		||||
                ++totalEnqueued;
 | 
			
		||||
            }
 | 
			
		||||
        }));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    EXPECT_FALSE(expectedSequences.empty());
 | 
			
		||||
 | 
			
		||||
    auto loader = ctx.execute([&queue, &expectedSequences] {
 | 
			
		||||
        while (not expectedSequences.empty()) {
 | 
			
		||||
            while (auto maybeValue = queue.dequeue()) {
 | 
			
		||||
                EXPECT_TRUE(expectedSequences.contains(maybeValue->seq));
 | 
			
		||||
                expectedSequences.erase(maybeValue->seq);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    for (auto& task : tasks)
 | 
			
		||||
        task.wait();
 | 
			
		||||
    loader.wait();
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(queue.empty());
 | 
			
		||||
    EXPECT_TRUE(expectedSequences.empty());
 | 
			
		||||
    EXPECT_EQ(totalEnqueued, kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS);
 | 
			
		||||
    EXPECT_GE(totalSleepCycles, 1uz);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST(StrandedPriorityQueueTests, ReturnsNulloptIfQueueEmpty)
 | 
			
		||||
{
 | 
			
		||||
    async::CoroExecutionContext realCtx;
 | 
			
		||||
    StrandedPriorityQueue<TestData> queue{realCtx.makeStrand()};
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(queue.empty());
 | 
			
		||||
    auto maybeValue = queue.dequeue();
 | 
			
		||||
    EXPECT_FALSE(maybeValue.has_value());
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user