mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 11:55:51 +00:00 
			
		
		
		
	Database read throttle (#242)
Track current outstanding read requests to the database. When the configured limit is exceeded, reject new RPCs and return rpcTOO_BUSY
This commit is contained in:
		@@ -9,7 +9,8 @@
 | 
			
		||||
            "keyspace":"clio",
 | 
			
		||||
            "replication_factor":1,
 | 
			
		||||
            "table_prefix":"",
 | 
			
		||||
            "max_requests_outstanding":25000,
 | 
			
		||||
            "max_write_requests_outstanding":25000,
 | 
			
		||||
            "max_read_requests_outstanding":30000,
 | 
			
		||||
            "threads":8
 | 
			
		||||
        }
 | 
			
		||||
    },
 | 
			
		||||
 
 | 
			
		||||
@@ -7,10 +7,12 @@
 | 
			
		||||
#include <backend/Types.h>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <type_traits>
 | 
			
		||||
 | 
			
		||||
namespace Backend {
 | 
			
		||||
 | 
			
		||||
class DatabaseTimeout : public std::exception
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
    const char*
 | 
			
		||||
    what() const throw() override
 | 
			
		||||
    {
 | 
			
		||||
@@ -30,9 +32,10 @@ retryOnTimeout(F func, size_t waitMs = 500)
 | 
			
		||||
        }
 | 
			
		||||
        catch (DatabaseTimeout& t)
 | 
			
		||||
        {
 | 
			
		||||
            std::this_thread::sleep_for(std::chrono::milliseconds(waitMs));
 | 
			
		||||
            BOOST_LOG_TRIVIAL(error)
 | 
			
		||||
                << __func__ << " function timed out. Retrying ... ";
 | 
			
		||||
                << __func__
 | 
			
		||||
                << " Database request timed out. Sleeping and retrying ... ";
 | 
			
		||||
            std::this_thread::sleep_for(std::chrono::milliseconds(waitMs));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -342,6 +345,9 @@ public:
 | 
			
		||||
    virtual void
 | 
			
		||||
    close(){};
 | 
			
		||||
 | 
			
		||||
    virtual bool
 | 
			
		||||
    isTooBusy() const = 0;
 | 
			
		||||
 | 
			
		||||
    // *** private helper methods
 | 
			
		||||
private:
 | 
			
		||||
    virtual void
 | 
			
		||||
 
 | 
			
		||||
@@ -486,6 +486,7 @@ CassandraBackend::fetchTransactions(
 | 
			
		||||
{
 | 
			
		||||
    if (hashes.size() == 0)
 | 
			
		||||
        return {};
 | 
			
		||||
    numReadRequestsOutstanding_ += hashes.size();
 | 
			
		||||
 | 
			
		||||
    handler_type handler(std::forward<decltype(yield)>(yield));
 | 
			
		||||
    result_type result(handler);
 | 
			
		||||
@@ -518,6 +519,7 @@ CassandraBackend::fetchTransactions(
 | 
			
		||||
 | 
			
		||||
    // suspend the coroutine until completion handler is called.
 | 
			
		||||
    result.get();
 | 
			
		||||
    numReadRequestsOutstanding_ -= hashes.size();
 | 
			
		||||
 | 
			
		||||
    auto end = std::chrono::system_clock::now();
 | 
			
		||||
    for (auto const& cb : cbs)
 | 
			
		||||
@@ -812,6 +814,8 @@ CassandraBackend::doFetchLedgerObjects(
 | 
			
		||||
    if (keys.size() == 0)
 | 
			
		||||
        return {};
 | 
			
		||||
 | 
			
		||||
    numReadRequestsOutstanding_ += keys.size();
 | 
			
		||||
 | 
			
		||||
    handler_type handler(std::forward<decltype(yield)>(yield));
 | 
			
		||||
    result_type result(handler);
 | 
			
		||||
 | 
			
		||||
@@ -838,6 +842,7 @@ CassandraBackend::doFetchLedgerObjects(
 | 
			
		||||
 | 
			
		||||
    // suspend the coroutine until completion handler is called.
 | 
			
		||||
    result.get();
 | 
			
		||||
    numReadRequestsOutstanding_ -= keys.size();
 | 
			
		||||
 | 
			
		||||
    for (auto const& cb : cbs)
 | 
			
		||||
    {
 | 
			
		||||
@@ -965,6 +970,12 @@ CassandraBackend::doOnlineDelete(
 | 
			
		||||
    return true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool
 | 
			
		||||
CassandraBackend::isTooBusy() const
 | 
			
		||||
{
 | 
			
		||||
    return numReadRequestsOutstanding_ >= maxReadRequestsOutstanding;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
CassandraBackend::open(bool readOnly)
 | 
			
		||||
{
 | 
			
		||||
@@ -1077,21 +1088,26 @@ CassandraBackend::open(bool readOnly)
 | 
			
		||||
           << ", result: " << rc << ", " << cass_error_desc(rc);
 | 
			
		||||
        throw std::runtime_error(ss.str());
 | 
			
		||||
    }
 | 
			
		||||
    if (getInt("max_requests_outstanding"))
 | 
			
		||||
        maxRequestsOutstanding = *getInt("max_requests_outstanding");
 | 
			
		||||
    if (getInt("max_write_requests_outstanding"))
 | 
			
		||||
        maxWriteRequestsOutstanding = *getInt("max_write_requests_outstanding");
 | 
			
		||||
 | 
			
		||||
    if (getInt("max_read_requests_outstanding"))
 | 
			
		||||
        maxReadRequestsOutstanding = *getInt("max_read_requests_outstanding");
 | 
			
		||||
 | 
			
		||||
    if (getInt("sync_interval"))
 | 
			
		||||
        syncInterval_ = *getInt("sync_interval");
 | 
			
		||||
    BOOST_LOG_TRIVIAL(info)
 | 
			
		||||
        << __func__ << " sync interval is " << syncInterval_
 | 
			
		||||
        << ". max requests outstanding is " << maxRequestsOutstanding;
 | 
			
		||||
        << ". max write requests outstanding is " << maxWriteRequestsOutstanding
 | 
			
		||||
        << ". max read requests outstanding is " << maxReadRequestsOutstanding;
 | 
			
		||||
 | 
			
		||||
    cass_cluster_set_request_timeout(cluster, 10000);
 | 
			
		||||
 | 
			
		||||
    rc = cass_cluster_set_queue_size_io(
 | 
			
		||||
        cluster,
 | 
			
		||||
        maxRequestsOutstanding);  // This number needs to scale w/ the
 | 
			
		||||
                                  // number of request per sec
 | 
			
		||||
        maxWriteRequestsOutstanding +
 | 
			
		||||
            maxReadRequestsOutstanding);  // This number needs to scale w/ the
 | 
			
		||||
                                          // number of request per sec
 | 
			
		||||
    if (rc != CASS_OK)
 | 
			
		||||
    {
 | 
			
		||||
        std::stringstream ss;
 | 
			
		||||
 
 | 
			
		||||
@@ -645,13 +645,18 @@ private:
 | 
			
		||||
    uint32_t syncInterval_ = 1;
 | 
			
		||||
    uint32_t lastSync_ = 0;
 | 
			
		||||
 | 
			
		||||
    // maximum number of concurrent in flight requests. New requests will wait
 | 
			
		||||
    // for earlier requests to finish if this limit is exceeded
 | 
			
		||||
    std::uint32_t maxRequestsOutstanding = 10000;
 | 
			
		||||
    mutable std::atomic_uint32_t numRequestsOutstanding_ = 0;
 | 
			
		||||
    // maximum number of concurrent in flight write requests. New requests will
 | 
			
		||||
    // wait for earlier requests to finish if this limit is exceeded
 | 
			
		||||
    std::uint32_t maxWriteRequestsOutstanding = 10000;
 | 
			
		||||
    mutable std::atomic_uint32_t numWriteRequestsOutstanding_ = 0;
 | 
			
		||||
 | 
			
		||||
    // maximum number of concurrent in flight read requests. isTooBusy() will
 | 
			
		||||
    // return true if the number of in flight read requests exceeds this limit
 | 
			
		||||
    std::uint32_t maxReadRequestsOutstanding = 100000;
 | 
			
		||||
    mutable std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
 | 
			
		||||
 | 
			
		||||
    // mutex and condition_variable to limit the number of concurrent in flight
 | 
			
		||||
    // requests
 | 
			
		||||
    // write requests
 | 
			
		||||
    mutable std::mutex throttleMutex_;
 | 
			
		||||
    mutable std::condition_variable throttleCv_;
 | 
			
		||||
 | 
			
		||||
@@ -1017,6 +1022,9 @@ public:
 | 
			
		||||
        std::uint32_t const numLedgersToKeep,
 | 
			
		||||
        boost::asio::yield_context& yield) const override;
 | 
			
		||||
 | 
			
		||||
    bool
 | 
			
		||||
    isTooBusy() const override;
 | 
			
		||||
 | 
			
		||||
    inline void
 | 
			
		||||
    incremementOutstandingRequestCount() const
 | 
			
		||||
    {
 | 
			
		||||
@@ -1031,19 +1039,19 @@ public:
 | 
			
		||||
                throttleCv_.wait(lck, [this]() { return canAddRequest(); });
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        ++numRequestsOutstanding_;
 | 
			
		||||
        ++numWriteRequestsOutstanding_;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    inline void
 | 
			
		||||
    decrementOutstandingRequestCount() const
 | 
			
		||||
    {
 | 
			
		||||
        // sanity check
 | 
			
		||||
        if (numRequestsOutstanding_ == 0)
 | 
			
		||||
        if (numWriteRequestsOutstanding_ == 0)
 | 
			
		||||
        {
 | 
			
		||||
            assert(false);
 | 
			
		||||
            throw std::runtime_error("decrementing num outstanding below 0");
 | 
			
		||||
        }
 | 
			
		||||
        size_t cur = (--numRequestsOutstanding_);
 | 
			
		||||
        size_t cur = (--numWriteRequestsOutstanding_);
 | 
			
		||||
        {
 | 
			
		||||
            // mutex lock required to prevent race condition around spurious
 | 
			
		||||
            // wakeup
 | 
			
		||||
@@ -1062,13 +1070,13 @@ public:
 | 
			
		||||
    inline bool
 | 
			
		||||
    canAddRequest() const
 | 
			
		||||
    {
 | 
			
		||||
        return numRequestsOutstanding_ < maxRequestsOutstanding;
 | 
			
		||||
        return numWriteRequestsOutstanding_ < maxWriteRequestsOutstanding;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    inline bool
 | 
			
		||||
    finishedAllRequests() const
 | 
			
		||||
    {
 | 
			
		||||
        return numRequestsOutstanding_ == 0;
 | 
			
		||||
        return numWriteRequestsOutstanding_ == 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
@@ -1209,10 +1217,12 @@ public:
 | 
			
		||||
        CassError rc;
 | 
			
		||||
        do
 | 
			
		||||
        {
 | 
			
		||||
            ++numReadRequestsOutstanding_;
 | 
			
		||||
            fut = cass_session_execute(session_.get(), statement.get());
 | 
			
		||||
 | 
			
		||||
            boost::system::error_code ec;
 | 
			
		||||
            rc = cass_future_error_code(fut, yield[ec]);
 | 
			
		||||
            --numReadRequestsOutstanding_;
 | 
			
		||||
 | 
			
		||||
            if (ec)
 | 
			
		||||
            {
 | 
			
		||||
 
 | 
			
		||||
@@ -156,6 +156,12 @@ public:
 | 
			
		||||
    bool
 | 
			
		||||
    doFinishWrites() override;
 | 
			
		||||
 | 
			
		||||
    bool
 | 
			
		||||
    isTooBusy() const override
 | 
			
		||||
    {
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    bool
 | 
			
		||||
    doOnlineDelete(
 | 
			
		||||
        std::uint32_t const numLedgersToKeep,
 | 
			
		||||
 
 | 
			
		||||
@@ -342,6 +342,13 @@ buildResponse(Context const& ctx)
 | 
			
		||||
    if (ctx.method == "ping")
 | 
			
		||||
        return boost::json::object{};
 | 
			
		||||
 | 
			
		||||
    if (ctx.backend->isTooBusy())
 | 
			
		||||
    {
 | 
			
		||||
        BOOST_LOG_TRIVIAL(error)
 | 
			
		||||
            << __func__ << " Database is too busy. Rejecting request";
 | 
			
		||||
        return Status{Error::rpcTOO_BUSY};
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    auto method = handlerTable.getHandler(ctx.method);
 | 
			
		||||
 | 
			
		||||
    if (!method)
 | 
			
		||||
@@ -364,6 +371,11 @@ buildResponse(Context const& ctx)
 | 
			
		||||
    {
 | 
			
		||||
        return Status{Error::rpcACT_NOT_FOUND, err.what()};
 | 
			
		||||
    }
 | 
			
		||||
    catch (Backend::DatabaseTimeout const& t)
 | 
			
		||||
    {
 | 
			
		||||
        BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout";
 | 
			
		||||
        return Status{Error::rpcTOO_BUSY};
 | 
			
		||||
    }
 | 
			
		||||
    catch (std::exception const& err)
 | 
			
		||||
    {
 | 
			
		||||
        BOOST_LOG_TRIVIAL(error)
 | 
			
		||||
 
 | 
			
		||||
@@ -266,59 +266,51 @@ public:
 | 
			
		||||
        try
 | 
			
		||||
        {
 | 
			
		||||
            BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
 | 
			
		||||
            try
 | 
			
		||||
            {
 | 
			
		||||
                auto range = backend_->fetchLedgerRange();
 | 
			
		||||
                if (!range)
 | 
			
		||||
                    return sendError(RPC::Error::rpcNOT_READY);
 | 
			
		||||
 | 
			
		||||
                std::optional<RPC::Context> context = RPC::make_WsContext(
 | 
			
		||||
                    yield,
 | 
			
		||||
                    request,
 | 
			
		||||
                    backend_,
 | 
			
		||||
                    subscriptions_.lock(),
 | 
			
		||||
                    balancer_,
 | 
			
		||||
                    etl_,
 | 
			
		||||
                    shared_from_this(),
 | 
			
		||||
                    *range,
 | 
			
		||||
                    counters_,
 | 
			
		||||
                    *ip);
 | 
			
		||||
 | 
			
		||||
                if (!context)
 | 
			
		||||
                    return sendError(RPC::Error::rpcBAD_SYNTAX);
 | 
			
		||||
 | 
			
		||||
                response = getDefaultWsResponse(id);
 | 
			
		||||
 | 
			
		||||
                auto start = std::chrono::system_clock::now();
 | 
			
		||||
                auto v = RPC::buildResponse(*context);
 | 
			
		||||
                auto end = std::chrono::system_clock::now();
 | 
			
		||||
                auto us = std::chrono::duration_cast<std::chrono::microseconds>(
 | 
			
		||||
                    end - start);
 | 
			
		||||
                logDuration(*context, us);
 | 
			
		||||
 | 
			
		||||
                if (auto status = std::get_if<RPC::Status>(&v))
 | 
			
		||||
                {
 | 
			
		||||
                    counters_.rpcErrored(context->method);
 | 
			
		||||
 | 
			
		||||
                    auto error = RPC::make_error(*status);
 | 
			
		||||
 | 
			
		||||
                    if (!id.is_null())
 | 
			
		||||
                        error["id"] = id;
 | 
			
		||||
 | 
			
		||||
                    error["request"] = request;
 | 
			
		||||
                    response = error;
 | 
			
		||||
                }
 | 
			
		||||
                else
 | 
			
		||||
                {
 | 
			
		||||
                    counters_.rpcComplete(context->method, us);
 | 
			
		||||
 | 
			
		||||
                    response["result"] = std::get<boost::json::object>(v);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            catch (Backend::DatabaseTimeout const& t)
 | 
			
		||||
            {
 | 
			
		||||
                BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout";
 | 
			
		||||
            auto range = backend_->fetchLedgerRange();
 | 
			
		||||
            if (!range)
 | 
			
		||||
                return sendError(RPC::Error::rpcNOT_READY);
 | 
			
		||||
 | 
			
		||||
            std::optional<RPC::Context> context = RPC::make_WsContext(
 | 
			
		||||
                yield,
 | 
			
		||||
                request,
 | 
			
		||||
                backend_,
 | 
			
		||||
                subscriptions_.lock(),
 | 
			
		||||
                balancer_,
 | 
			
		||||
                etl_,
 | 
			
		||||
                shared_from_this(),
 | 
			
		||||
                *range,
 | 
			
		||||
                counters_,
 | 
			
		||||
                *ip);
 | 
			
		||||
 | 
			
		||||
            if (!context)
 | 
			
		||||
                return sendError(RPC::Error::rpcBAD_SYNTAX);
 | 
			
		||||
 | 
			
		||||
            response = getDefaultWsResponse(id);
 | 
			
		||||
 | 
			
		||||
            auto start = std::chrono::system_clock::now();
 | 
			
		||||
            auto v = RPC::buildResponse(*context);
 | 
			
		||||
            auto end = std::chrono::system_clock::now();
 | 
			
		||||
            auto us = std::chrono::duration_cast<std::chrono::microseconds>(
 | 
			
		||||
                end - start);
 | 
			
		||||
            logDuration(*context, us);
 | 
			
		||||
 | 
			
		||||
            if (auto status = std::get_if<RPC::Status>(&v))
 | 
			
		||||
            {
 | 
			
		||||
                counters_.rpcErrored(context->method);
 | 
			
		||||
 | 
			
		||||
                auto error = RPC::make_error(*status);
 | 
			
		||||
 | 
			
		||||
                if (!id.is_null())
 | 
			
		||||
                    error["id"] = id;
 | 
			
		||||
 | 
			
		||||
                error["request"] = request;
 | 
			
		||||
                response = error;
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
            {
 | 
			
		||||
                counters_.rpcComplete(context->method, us);
 | 
			
		||||
 | 
			
		||||
                response["result"] = std::get<boost::json::object>(v);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        catch (std::exception const& e)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user